Files
calibresync/db.py
T
2026-05-10 20:21:23 +02:00

269 lines
8.8 KiB
Python

import os
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
_data_dir = Path(os.environ.get("DATA_DIR", Path(__file__).parent))
_data_dir.mkdir(parents=True, exist_ok=True)
DB_PATH = _data_dir / "calibresync.db"
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
@contextmanager
def get_db():
conn = _connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db() -> None:
with get_db() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS processed_zips (
id INTEGER PRIMARY KEY,
remote_path TEXT UNIQUE NOT NULL,
file_size INTEGER,
processed_at TEXT,
status TEXT,
error_msg TEXT
);
CREATE TABLE IF NOT EXISTS uploaded_books (
id INTEGER PRIMARY KEY,
filename TEXT NOT NULL,
file_hash TEXT UNIQUE NOT NULL,
zip_source TEXT,
uploaded_at TEXT,
status TEXT
);
CREATE TABLE IF NOT EXISTS sync_runs (
id INTEGER PRIMARY KEY,
started_at TEXT NOT NULL,
finished_at TEXT,
zips_found INTEGER DEFAULT 0,
zips_new INTEGER DEFAULT 0,
books_uploaded INTEGER DEFAULT 0,
books_skipped INTEGER DEFAULT 0,
books_errored INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
error_msg TEXT
);
CREATE TABLE IF NOT EXISTS remote_zip_cache (
remote_path TEXT PRIMARY KEY,
file_size INTEGER NOT NULL,
cached_at TEXT NOT NULL
);
""")
# --- Settings ---
def get_setting(key: str, default: str | None = None) -> str | None:
with get_db() as conn:
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else default
def set_setting(key: str, value: str) -> None:
with get_db() as conn:
conn.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)
def get_all_settings() -> dict[str, str]:
with get_db() as conn:
rows = conn.execute("SELECT key, value FROM settings").fetchall()
return {row["key"]: row["value"] for row in rows}
# --- Processed zips ---
def is_zip_processed(remote_path: str) -> bool:
with get_db() as conn:
row = conn.execute(
"SELECT id FROM processed_zips WHERE remote_path = ?", (remote_path,)
).fetchone()
return row is not None
def get_all_processed_paths() -> set[str]:
"""Return successfully-processed remote paths. Errored zips are excluded so they get retried."""
with get_db() as conn:
rows = conn.execute(
"SELECT remote_path FROM processed_zips WHERE status = 'success'"
).fetchall()
return {row["remote_path"] for row in rows}
# --- Remote zip cache ---
def get_remote_zip_cache() -> list[tuple[str, int]]:
"""Return cached (remote_path, file_size) tuples."""
with get_db() as conn:
rows = conn.execute("SELECT remote_path, file_size FROM remote_zip_cache").fetchall()
return [(row["remote_path"], row["file_size"]) for row in rows]
def upsert_remote_zip_cache(zips: list[tuple[str, int]]) -> None:
"""Bulk-insert or replace cache entries. zips is a list of (remote_path, file_size)."""
now = _now()
with get_db() as conn:
conn.executemany(
"INSERT INTO remote_zip_cache (remote_path, file_size, cached_at) VALUES (?,?,?)"
" ON CONFLICT(remote_path) DO UPDATE SET file_size=excluded.file_size, cached_at=excluded.cached_at",
[(path, size, now) for path, size in zips],
)
def get_remote_cache_info() -> dict:
with get_db() as conn:
row = conn.execute(
"SELECT COUNT(*) as count, MAX(cached_at) as last_scan FROM remote_zip_cache"
).fetchone()
return {"count": row["count"], "last_scan": row["last_scan"]}
def clear_remote_zip_cache() -> int:
with get_db() as conn:
return conn.execute("DELETE FROM remote_zip_cache").rowcount
def mark_zip_processed(remote_path: str, file_size: int, status: str, error_msg: str | None = None) -> None:
with get_db() as conn:
conn.execute(
"""INSERT INTO processed_zips (remote_path, file_size, processed_at, status, error_msg)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(remote_path) DO UPDATE SET
processed_at = excluded.processed_at,
status = excluded.status,
error_msg = excluded.error_msg""",
(remote_path, file_size, _now(), status, error_msg),
)
def get_recent_zips(limit: int = 50) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"SELECT * FROM processed_zips ORDER BY processed_at DESC LIMIT ?", (limit,)
).fetchall()
# --- Uploaded books ---
def is_book_uploaded(file_hash: str) -> bool:
with get_db() as conn:
row = conn.execute(
"SELECT id FROM uploaded_books WHERE file_hash = ? AND status IN ('uploaded', 'skipped_duplicate')",
(file_hash,),
).fetchone()
return row is not None
def record_book(filename: str, file_hash: str, zip_source: str, status: str) -> None:
with get_db() as conn:
conn.execute(
"""INSERT INTO uploaded_books (filename, file_hash, zip_source, uploaded_at, status)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(file_hash) DO UPDATE SET status = excluded.status""",
(filename, file_hash, zip_source, _now(), status),
)
def get_books(limit: int = 200, offset: int = 0) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"SELECT * FROM uploaded_books ORDER BY uploaded_at DESC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
def get_books_count() -> int:
with get_db() as conn:
return conn.execute("SELECT COUNT(*) FROM uploaded_books").fetchone()[0]
# --- Sync runs ---
def start_sync_run() -> int:
with get_db() as conn:
cur = conn.execute(
"INSERT INTO sync_runs (started_at) VALUES (?)", (_now(),)
)
return cur.lastrowid
def finish_sync_run(run_id: int, **kwargs) -> None:
fields = ", ".join(f"{k} = ?" for k in kwargs)
values = list(kwargs.values()) + [_now(), run_id]
with get_db() as conn:
conn.execute(
f"UPDATE sync_runs SET {fields}, finished_at = ? WHERE id = ?", values
)
def get_recent_runs(limit: int = 10) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"SELECT * FROM sync_runs ORDER BY started_at DESC LIMIT ?", (limit,)
).fetchall()
def get_stats() -> dict:
with get_db() as conn:
total_books = conn.execute("SELECT COUNT(*) FROM uploaded_books").fetchone()[0]
uploaded = conn.execute(
"SELECT COUNT(*) FROM uploaded_books WHERE status = 'uploaded'"
).fetchone()[0]
skipped = conn.execute(
"SELECT COUNT(*) FROM uploaded_books WHERE status = 'skipped_duplicate'"
).fetchone()[0]
total_zips = conn.execute("SELECT COUNT(*) FROM processed_zips").fetchone()[0]
last_run = conn.execute(
"SELECT started_at, status FROM sync_runs ORDER BY started_at DESC LIMIT 1"
).fetchone()
return {
"total_books": total_books,
"uploaded": uploaded,
"skipped": skipped,
"total_zips": total_zips,
"last_run": dict(last_run) if last_run else None,
}
def clear_sync_data() -> dict:
"""Delete all processed_zips, uploaded_books, and sync_runs rows. Settings are kept.
Also resets the remote scan timestamp so the next sync does a full rescan."""
with get_db() as conn:
zips = conn.execute("DELETE FROM processed_zips").rowcount
books = conn.execute("DELETE FROM uploaded_books").rowcount
runs = conn.execute("DELETE FROM sync_runs").rowcount
conn.execute("DELETE FROM settings WHERE key = 'remote_cache_last_scan'")
return {"zips": zips, "books": books, "runs": runs}
def _now() -> str:
return datetime.now(timezone.utc).isoformat()