import hashlib import logging import re import time from pathlib import Path from urllib.parse import quote import requests import db from config import CalibreConfig log = logging.getLogger(__name__) MIME_TYPES = { ".epub": "application/epub+zip", ".pdf": "application/pdf", } # Words stripped before comparing titles — release-group tags, language codes, format names, etc. _JUNK_WORDS = { "retail", "epub", "ebook", "pdf", "mobi", "azw3", "decipher", "swedish", "english", "danish", "norwegian", "finnish", "german", "french", "the", "a", "an", "och", "und", "les", "der", "die", "das", } class CalibreUnavailableError(RuntimeError): """Raised when Calibre-Web returns repeated 502/503/504 — sync run should abort.""" class CalibreClient: def __init__(self, cfg: CalibreConfig): self._cfg = cfg self._session = requests.Session() self._authenticated = False self._upload_csrf: str | None = None self._consecutive_failures = 0 def _ensure_auth(self) -> None: if self._authenticated: return login_url = f"{self._cfg.url}/login" page = self._session.get(login_url, timeout=30) page.raise_for_status() csrf = _extract_csrf(page.text) data = {"username": self._cfg.user, "password": self._cfg.password} if csrf: data["csrf_token"] = csrf resp = self._session.post(login_url, data=data, allow_redirects=True, timeout=30) resp.raise_for_status() if resp.url.rstrip("/").endswith("/login"): raise RuntimeError("Calibre-Web authentication failed — check credentials") self._authenticated = True self._upload_csrf = _extract_csrf(resp.text) or csrf log.info("Authenticated to Calibre-Web at %s", self._cfg.url) def _exists_in_calibre(self, filename: str) -> bool: """Search Calibre-Web OPDS for a title matching this filename. Returns True if likely duplicate.""" keywords = _keywords_from_filename(filename) if len(keywords) < 2: return False query = " ".join(keywords[:6]) try: resp = self._session.get( f"{self._cfg.url}/opds/search/{quote(query, safe='')}", auth=(self._cfg.user, self._cfg.password), timeout=15, ) if resp.status_code == 404: return False calibre_titles = _parse_opds_titles(resp.text) if not calibre_titles: return False our_words = set(keywords) for title in calibre_titles: their_words = set(_normalize_words(title)) overlap = len(our_words & their_words) # Match if 3+ words in common, or 60%+ of our keywords match if overlap >= 3 or (our_words and overlap / len(our_words) >= 0.6): log.info("Duplicate found in Calibre-Web: '%s' ~ '%s'", filename, title) return True except Exception as e: log.warning("OPDS search failed for '%s': %s — proceeding with upload", filename, e) return False def upload(self, book_path: Path, zip_source: str) -> str: """Upload a book file. Returns status: 'uploaded' | 'skipped_duplicate' | 'error'.""" file_hash = _sha256(book_path) # Primary guard: hash already in our DB if db.is_book_uploaded(file_hash): log.info("Skipping (already uploaded): %s", book_path.name) db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate") return "skipped_duplicate" try: self._ensure_auth() # Secondary guard: title search in Calibre-Web (catches pre-existing books) if self._exists_in_calibre(book_path.name): log.info("Skipping (exists in Calibre-Web): %s", book_path.name) db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate") return "skipped_duplicate" mime = MIME_TYPES.get(book_path.suffix.lower(), "application/octet-stream") for attempt in range(1, 4): try: with book_path.open("rb") as fh: resp = self._session.post( f"{self._cfg.url}/upload", files={"btn-upload": (book_path.name, fh, mime)}, data={"csrf_token": self._upload_csrf} if self._upload_csrf else {}, timeout=120, ) if not resp.ok: log.error("Upload HTTP %s (attempt %d/3) — body: %s", resp.status_code, attempt, resp.text[:300]) resp.raise_for_status() log.info("Uploaded: %s", book_path.name) self._consecutive_failures = 0 db.record_book(book_path.name, file_hash, zip_source, "uploaded") return "uploaded" except requests.HTTPError: if resp.status_code in (502, 503, 504): if attempt < 3: log.warning("HTTP %s on attempt %d/3 — retrying in 60s ...", resp.status_code, attempt) time.sleep(60) continue # All retries exhausted self._consecutive_failures += 1 if self._consecutive_failures >= 3: raise CalibreUnavailableError( f"Calibre-Web returned {resp.status_code} on {self._consecutive_failures} " "consecutive books — aborting sync run" ) break if resp.status_code == 400 and attempt == 1: log.warning("HTTP 400 — CSRF token likely expired, re-authenticating ...") self._authenticated = False self._upload_csrf = None self._ensure_auth() continue break db.record_book(book_path.name, file_hash, zip_source, "error") return "error" except CalibreUnavailableError: db.record_book(book_path.name, file_hash, zip_source, "error") raise except Exception as e: log.error("Upload failed for %s: %s", book_path.name, e) db.record_book(book_path.name, file_hash, zip_source, "error") return "error" def fetch_all_books(cfg: CalibreConfig) -> list[dict]: """Fetch every book from Calibre-Web via /ajax/listbooks. Returns raw row dicts.""" client = CalibreClient(cfg) client._ensure_auth() all_books: list[dict] = [] seen_ids: set = set() page_size = 1000 start = 0 while True: resp = client._session.get( f"{cfg.url}/ajax/listbooks", params={ "draw": 1, # DataTables 1.10+ names "start": start, "length": page_size, # DataTables 1.9.x names (older Calibre-Web) "iDisplayStart": start, "iDisplayLength": page_size, }, timeout=60, ) resp.raise_for_status() data = resp.json() if start == 0: non_list = {k: v for k, v in data.items() if not isinstance(v, list)} log.info("listbooks page-0 meta fields: %s", non_list) # Calibre-Web uses DataTables format: "data"/"recordsTotal", older versions use "rows"/"total_count" rows = data.get("rows") or data.get("data") or [] total = ( data.get("recordsTotal") or data.get("total_count") or data.get("total") or data.get("totalNotFiltered") or 0 ) new_in_page = 0 for b in rows: bid = b.get("id") if bid not in seen_ids: seen_ids.add(bid) all_books.append(b) new_in_page += 1 log.info("Books fetched: %d / %d (page gave %d new)", len(all_books), total, new_in_page) # Stop when: empty page, no new books (start is being ignored), or we've seen everything if not rows or new_in_page == 0 or len(all_books) >= total: break start += len(rows) return all_books def delete_book(cfg: CalibreConfig, book_id: int, client: "CalibreClient | None" = None) -> tuple[bool, str]: """Delete a book from Calibre-Web by ID. Pass a pre-authenticated client to avoid re-auth overhead.""" if client is None: client = CalibreClient(cfg) client._ensure_auth() csrf = client._upload_csrf if not csrf: # Try to fetch a CSRF token from the book detail page try: page = client._session.get(f"{cfg.url}/book/{book_id}", timeout=15) csrf = _extract_csrf(page.text) client._upload_csrf = csrf except Exception: pass for attempt in range(2): resp = client._session.post( f"{cfg.url}/delete/{book_id}", data={"csrf_token": csrf} if csrf else {}, timeout=30, ) if resp.ok: return True, "Deleted" if resp.status_code == 400 and attempt == 0: # CSRF token likely expired; re-authenticate and retry once log.info("delete_book: 400 on book %d — refreshing CSRF and retrying", book_id) client._authenticated = False client._upload_csrf = None client._ensure_auth() csrf = client._upload_csrf continue return False, f"HTTP {resp.status_code}" return False, "HTTP 400 after re-auth retry" def find_duplicate_groups(books: list[dict]) -> list[list[dict]]: """Group books by normalised title+author; return only groups with 2+ entries.""" from collections import defaultdict groups: dict[str, list[dict]] = defaultdict(list) for book in books: title = re.sub(r"[^\w\s]", " ", book.get("title", "").lower()) title = re.sub(r"\s+", " ", title).strip() authors = re.sub(r"[^\w\s]", " ", book.get("authors", "").lower()) authors = re.sub(r"\s+", " ", authors).strip() key = f"{title}||{authors}" if title: groups[key].append(book) return sorted( [g for g in groups.values() if len(g) > 1], key=lambda g: g[0].get("title", "").lower(), ) def test_connection(cfg: CalibreConfig) -> tuple[bool, str]: try: client = CalibreClient(cfg) client._ensure_auth() return True, f"Authenticated to {cfg.url} as '{cfg.user}'." except Exception as e: return False, str(e) # --- Helpers --- def _keywords_from_filename(filename: str) -> list[str]: """Extract meaningful words from a release-style filename for OPDS search.""" stem = Path(filename).stem.lower() stem = re.sub(r"[._\-]", " ", stem) stem = re.sub(r"[^\w\s]", "", stem) words = stem.split() return [ w for w in words if w not in _JUNK_WORDS and not re.match(r"^\d{4}$", w) # strip year and not re.match(r"^\d+$", w) # strip pure numbers and len(w) > 1 ] def _normalize_words(title: str) -> list[str]: """Normalize a Calibre-Web title for comparison.""" title = title.lower() title = re.sub(r"[^\w\s]", "", title) return [w for w in title.split() if w not in _JUNK_WORDS and len(w) > 1] def _parse_opds_titles(xml: str) -> list[str]: """Extract book titles from an OPDS Atom feed, skipping the feed title itself.""" # Grab all elements; the first is the feed title ("Search results"), rest are books titles = re.findall(r"<title>([^<]+)", xml) return titles[1:] if len(titles) > 1 else [] def _extract_csrf(html: str) -> str | None: m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html) if not m: m = re.search(r'value="([^"]+)"\s+name="csrf_token"', html) return m.group(1) if m else None def _sha256(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest()