import hashlib import logging import re from pathlib import Path import requests import db from config import CalibreConfig log = logging.getLogger(__name__) MIME_TYPES = { ".epub": "application/epub+zip", ".pdf": "application/pdf", } class CalibreClient: def __init__(self, cfg: CalibreConfig): self._cfg = cfg self._session = requests.Session() self._authenticated = False self._upload_csrf: str | None = None def _ensure_auth(self) -> None: if self._authenticated: return # Fetch login page first to get the CSRF token (Flask-WTF requirement) login_url = f"{self._cfg.url}/login" page = self._session.get(login_url, timeout=30) page.raise_for_status() csrf = _extract_csrf(page.text) data = {"username": self._cfg.user, "password": self._cfg.password} if csrf: data["csrf_token"] = csrf resp = self._session.post( login_url, data=data, allow_redirects=True, timeout=30, ) resp.raise_for_status() # Calibre-Web redirects to / on success; landing back on /login means bad creds if resp.url.rstrip("/").endswith("/login"): raise RuntimeError("Calibre-Web authentication failed — check credentials") self._authenticated = True # The CSRF token is session-scoped in Flask-WTF — reuse the login token for uploads. # Also try to extract a fresh one from the landing page (/). self._upload_csrf = _extract_csrf(resp.text) or csrf log.debug("Upload CSRF token from login: %s", self._upload_csrf[:12] + "…" if self._upload_csrf else "NOT FOUND") log.info("Authenticated to Calibre-Web at %s", self._cfg.url) def upload(self, book_path: Path, zip_source: str) -> str: """Upload a book file. Returns status: 'uploaded' | 'skipped_duplicate' | 'error'.""" file_hash = _sha256(book_path) if db.is_book_uploaded(file_hash): log.info("Skipping duplicate: %s (hash %s)", book_path.name, file_hash[:8]) db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate") return "skipped_duplicate" try: self._ensure_auth() mime = MIME_TYPES.get(book_path.suffix.lower(), "application/octet-stream") with book_path.open("rb") as fh: resp = self._session.post( f"{self._cfg.url}/upload", files={"btn-upload": (book_path.name, fh, mime)}, data={"csrf_token": self._upload_csrf} if self._upload_csrf else {}, timeout=120, ) if not resp.ok: log.error("Upload HTTP %s — response body: %s", resp.status_code, resp.text[:500]) resp.raise_for_status() log.info("Uploaded: %s", book_path.name) db.record_book(book_path.name, file_hash, zip_source, "uploaded") return "uploaded" except requests.HTTPError: db.record_book(book_path.name, file_hash, zip_source, "error") return "error" except Exception as e: log.error("Upload failed for %s: %s", book_path.name, e) db.record_book(book_path.name, file_hash, zip_source, "error") return "error" def test_connection(cfg: CalibreConfig) -> tuple[bool, str]: try: client = CalibreClient(cfg) client._ensure_auth() return True, f"Authenticated to {cfg.url} as '{cfg.user}'." except Exception as e: return False, str(e) def _extract_csrf(html: str) -> str | None: m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html) if not m: m = re.search(r'value="([^"]+)"\s+name="csrf_token"', html) return m.group(1) if m else None def _sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest()