switch to grimmory as book web app and a more modern dashboard

This commit is contained in:
2026-05-14 21:15:46 +02:00
parent 27fa22eee1
commit e029da895b
8 changed files with 785 additions and 100 deletions
+63 -7
View File
@@ -64,6 +64,16 @@ def init_db() -> None:
file_size INTEGER NOT NULL,
cached_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS processed_books (
id INTEGER PRIMARY KEY,
zip_id INTEGER REFERENCES processed_zips(id),
filename TEXT NOT NULL,
sha256 TEXT,
placed_at TEXT,
status TEXT DEFAULT 'success',
error_msg TEXT
);
""")
_migrate(conn)
@@ -72,7 +82,9 @@ def _migrate(conn: sqlite3.Connection) -> None:
existing = {row[1] for row in conn.execute("PRAGMA table_info(sync_runs)")}
if "books_imported" not in existing:
conn.execute("ALTER TABLE sync_runs ADD COLUMN books_imported INTEGER DEFAULT 0")
for old_col in ("books_uploaded", "books_skipped"):
if "books_skipped" not in existing:
conn.execute("ALTER TABLE sync_runs ADD COLUMN books_skipped INTEGER DEFAULT 0")
for old_col in ("books_uploaded",):
if old_col in existing:
try:
conn.execute(f"ALTER TABLE sync_runs DROP COLUMN {old_col}")
@@ -167,6 +179,14 @@ def mark_zip_processed(remote_path: str, file_size: int, status: str, error_msg:
)
def get_zip_id_by_path(remote_path: str) -> int | None:
with get_db() as conn:
row = conn.execute(
"SELECT id FROM processed_zips WHERE remote_path = ?", (remote_path,)
).fetchone()
return row["id"] if row else None
def get_recent_zips(limit: int = 50) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
@@ -200,30 +220,66 @@ def get_recent_runs(limit: int = 10) -> list[sqlite3.Row]:
).fetchall()
def is_book_processed(sha256: str) -> bool:
with get_db() as conn:
row = conn.execute(
"SELECT id FROM processed_books WHERE sha256 = ? AND status = 'success'", (sha256,)
).fetchone()
return row is not None
def record_book(zip_id: int | None, filename: str, sha256: str,
status: str = "success", error_msg: str | None = None) -> None:
with get_db() as conn:
conn.execute(
"INSERT INTO processed_books (zip_id, filename, sha256, placed_at, status, error_msg) VALUES (?,?,?,?,?,?)",
(zip_id, filename, sha256, _now(), status, error_msg),
)
def get_recent_books(limit: int = 50) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"""SELECT pb.*, pz.remote_path as zip_remote_path
FROM processed_books pb
LEFT JOIN processed_zips pz ON pz.id = pb.zip_id
ORDER BY pb.placed_at DESC LIMIT ?""",
(limit,),
).fetchall()
def get_stats() -> dict:
with get_db() as conn:
total_zips = conn.execute("SELECT COUNT(*) FROM processed_zips").fetchone()[0]
total_imported = conn.execute(
"SELECT COALESCE(SUM(books_imported), 0) FROM sync_runs"
total_books = conn.execute(
"SELECT COUNT(*) FROM processed_books WHERE status = 'success'"
).fetchone()[0]
total_skipped = conn.execute(
"SELECT COUNT(*) FROM processed_books WHERE status = 'skipped'"
).fetchone()[0]
total_errored = conn.execute(
"SELECT COUNT(*) FROM processed_books WHERE status = 'error'"
).fetchone()[0]
last_run = conn.execute(
"SELECT started_at, status FROM sync_runs ORDER BY started_at DESC LIMIT 1"
).fetchone()
return {
"total_zips": total_zips,
"total_imported": total_imported,
"total_books": total_books,
"total_skipped": total_skipped,
"total_errored": total_errored,
"last_run": dict(last_run) if last_run else None,
}
def clear_sync_data() -> dict:
"""Delete all processed_zips and sync_runs rows. Settings are kept.
Also resets the remote scan timestamp so the next sync does a full rescan."""
"""Delete all processed_books, processed_zips, and sync_runs rows. Settings are kept."""
with get_db() as conn:
books = conn.execute("DELETE FROM processed_books").rowcount
zips = conn.execute("DELETE FROM processed_zips").rowcount
runs = conn.execute("DELETE FROM sync_runs").rowcount
conn.execute("DELETE FROM settings WHERE key = 'remote_cache_last_scan'")
return {"zips": zips, "runs": runs}
return {"zips": zips, "runs": runs, "books": books}
def _now() -> str: