import os import sqlite3 from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path _data_dir = Path(os.environ.get("DATA_DIR", Path(__file__).parent)) _data_dir.mkdir(parents=True, exist_ok=True) DB_PATH = _data_dir / "calibresync.db" def _connect() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn @contextmanager def get_db(): conn = _connect() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: with get_db() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS processed_zips ( id INTEGER PRIMARY KEY, remote_path TEXT UNIQUE NOT NULL, file_size INTEGER, processed_at TEXT, status TEXT, error_msg TEXT ); CREATE TABLE IF NOT EXISTS sync_runs ( id INTEGER PRIMARY KEY, started_at TEXT NOT NULL, finished_at TEXT, zips_found INTEGER DEFAULT 0, zips_new INTEGER DEFAULT 0, books_imported INTEGER DEFAULT 0, books_errored INTEGER DEFAULT 0, status TEXT DEFAULT 'running', error_msg TEXT ); CREATE TABLE IF NOT EXISTS remote_zip_cache ( remote_path TEXT PRIMARY KEY, file_size INTEGER NOT NULL, cached_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS processed_books ( id INTEGER PRIMARY KEY, zip_id INTEGER REFERENCES processed_zips(id), filename TEXT NOT NULL, sha256 TEXT, placed_at TEXT, status TEXT DEFAULT 'success', error_msg TEXT ); """) _migrate(conn) def _migrate(conn: sqlite3.Connection) -> None: existing = {row[1] for row in conn.execute("PRAGMA table_info(sync_runs)")} if "books_imported" not in existing: conn.execute("ALTER TABLE sync_runs ADD COLUMN books_imported INTEGER DEFAULT 0") if "books_skipped" not in existing: conn.execute("ALTER TABLE sync_runs ADD COLUMN books_skipped INTEGER DEFAULT 0") for old_col in ("books_uploaded",): if old_col in existing: try: conn.execute(f"ALTER TABLE sync_runs DROP COLUMN {old_col}") except Exception: pass # --- Settings --- def get_setting(key: str, default: str | None = None) -> str | None: with get_db() as conn: row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() return row["value"] if row else default def set_setting(key: str, value: str) -> None: with get_db() as conn: conn.execute( "INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), ) def get_all_settings() -> dict[str, str]: with get_db() as conn: rows = conn.execute("SELECT key, value FROM settings").fetchall() return {row["key"]: row["value"] for row in rows} # --- Processed zips --- def is_zip_processed(remote_path: str) -> bool: with get_db() as conn: row = conn.execute( "SELECT id FROM processed_zips WHERE remote_path = ?", (remote_path,) ).fetchone() return row is not None def get_all_processed_paths() -> set[str]: """Return successfully-processed remote paths. Errored zips are excluded so they get retried.""" with get_db() as conn: rows = conn.execute( "SELECT remote_path FROM processed_zips WHERE status = 'success'" ).fetchall() return {row["remote_path"] for row in rows} # --- Remote zip cache --- def get_remote_zip_cache() -> list[tuple[str, int]]: """Return cached (remote_path, file_size) tuples.""" with get_db() as conn: rows = conn.execute("SELECT remote_path, file_size FROM remote_zip_cache").fetchall() return [(row["remote_path"], row["file_size"]) for row in rows] def upsert_remote_zip_cache(zips: list[tuple[str, int]]) -> None: """Bulk-insert or replace cache entries. zips is a list of (remote_path, file_size).""" now = _now() with get_db() as conn: conn.executemany( "INSERT INTO remote_zip_cache (remote_path, file_size, cached_at) VALUES (?,?,?)" " ON CONFLICT(remote_path) DO UPDATE SET file_size=excluded.file_size, cached_at=excluded.cached_at", [(path, size, now) for path, size in zips], ) def get_remote_cache_info() -> dict: with get_db() as conn: row = conn.execute( "SELECT COUNT(*) as count, MAX(cached_at) as last_scan FROM remote_zip_cache" ).fetchone() return {"count": row["count"], "last_scan": row["last_scan"]} def clear_remote_zip_cache() -> int: with get_db() as conn: return conn.execute("DELETE FROM remote_zip_cache").rowcount def mark_zip_processed(remote_path: str, file_size: int, status: str, error_msg: str | None = None) -> None: with get_db() as conn: conn.execute( """INSERT INTO processed_zips (remote_path, file_size, processed_at, status, error_msg) VALUES (?, ?, ?, ?, ?) ON CONFLICT(remote_path) DO UPDATE SET processed_at = excluded.processed_at, status = excluded.status, error_msg = excluded.error_msg""", (remote_path, file_size, _now(), status, error_msg), ) def get_zip_id_by_path(remote_path: str) -> int | None: with get_db() as conn: row = conn.execute( "SELECT id FROM processed_zips WHERE remote_path = ?", (remote_path,) ).fetchone() return row["id"] if row else None def get_recent_zips(limit: int = 50) -> list[sqlite3.Row]: with get_db() as conn: return conn.execute( "SELECT * FROM processed_zips ORDER BY processed_at DESC LIMIT ?", (limit,) ).fetchall() # --- Sync runs --- def start_sync_run() -> int: with get_db() as conn: cur = conn.execute( "INSERT INTO sync_runs (started_at) VALUES (?)", (_now(),) ) return cur.lastrowid def finish_sync_run(run_id: int, **kwargs) -> None: fields = ", ".join(f"{k} = ?" for k in kwargs) values = list(kwargs.values()) + [_now(), run_id] with get_db() as conn: conn.execute( f"UPDATE sync_runs SET {fields}, finished_at = ? WHERE id = ?", values ) def get_recent_runs(limit: int = 10) -> list[sqlite3.Row]: with get_db() as conn: return conn.execute( "SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT ?", (limit,) ).fetchall() def is_book_processed(sha256: str) -> bool: with get_db() as conn: row = conn.execute( "SELECT id FROM processed_books WHERE sha256 = ? AND status = 'success'", (sha256,) ).fetchone() return row is not None def record_book(zip_id: int | None, filename: str, sha256: str, status: str = "success", error_msg: str | None = None) -> None: with get_db() as conn: conn.execute( "INSERT INTO processed_books (zip_id, filename, sha256, placed_at, status, error_msg) VALUES (?,?,?,?,?,?)", (zip_id, filename, sha256, _now(), status, error_msg), ) def get_recent_books(limit: int = 50) -> list[sqlite3.Row]: with get_db() as conn: return conn.execute( """SELECT pb.*, pz.remote_path as zip_remote_path FROM processed_books pb LEFT JOIN processed_zips pz ON pz.id = pb.zip_id ORDER BY pb.placed_at DESC LIMIT ?""", (limit,), ).fetchall() def get_stats() -> dict: with get_db() as conn: total_zips = conn.execute("SELECT COUNT(*) FROM processed_zips").fetchone()[0] total_books = conn.execute( "SELECT COUNT(*) FROM processed_books WHERE status = 'success'" ).fetchone()[0] total_skipped = conn.execute( "SELECT COUNT(*) FROM processed_books WHERE status = 'skipped'" ).fetchone()[0] total_errored = conn.execute( "SELECT COUNT(*) FROM processed_books WHERE status = 'error'" ).fetchone()[0] last_run = conn.execute( "SELECT started_at, status FROM sync_runs ORDER BY started_at DESC LIMIT 1" ).fetchone() return { "total_zips": total_zips, "total_books": total_books, "total_skipped": total_skipped, "total_errored": total_errored, "last_run": dict(last_run) if last_run else None, } def clear_sync_data() -> dict: """Delete all processed_books, processed_zips, and sync_runs rows. Settings are kept.""" with get_db() as conn: books = conn.execute("DELETE FROM processed_books").rowcount zips = conn.execute("DELETE FROM processed_zips").rowcount runs = conn.execute("DELETE FROM sync_runs").rowcount conn.execute("DELETE FROM settings WHERE key = 'remote_cache_last_scan'") return {"zips": zips, "runs": runs, "books": books} def _now() -> str: return datetime.now(timezone.utc).isoformat()