Files
calibresync/uploader.py
T
2026-05-12 16:23:13 +02:00

414 lines
17 KiB
Python

import hashlib
import logging
import re
import time
import unicodedata
from pathlib import Path
from urllib.parse import quote
import requests
import db
from config import CalibreConfig
log = logging.getLogger(__name__)
MIME_TYPES = {
".epub": "application/epub+zip",
".pdf": "application/pdf",
}
# Words stripped before comparing titles — release-group tags, language codes, format names, etc.
_JUNK_WORDS = {
"retail", "epub", "ebook", "pdf", "mobi", "azw3", "decipher",
"swedish", "english", "danish", "norwegian", "finnish", "german", "french",
"the", "a", "an", "och", "und", "les", "der", "die", "das",
}
class CalibreUnavailableError(RuntimeError):
"""Raised when Calibre-Web returns repeated 502/503/504 — sync run should abort."""
class CalibreClient:
def __init__(self, cfg: CalibreConfig):
self._cfg = cfg
self._session = requests.Session()
self._authenticated = False
self._upload_csrf: str | None = None
self._consecutive_failures = 0
# Pre-loaded title word-sets for fast duplicate detection (set by preload_existing_titles)
self._existing_title_sets: list[frozenset[str]] | None = None
def preload_existing_titles(self, books: list[dict]) -> None:
"""Build an in-memory index of normalised title keywords from a pre-fetched book list."""
self._existing_title_sets = [
frozenset(_normalize_words(b.get("title", "")))
for b in books
if b.get("title")
]
log.info("Pre-loaded %d existing book titles for duplicate detection", len(self._existing_title_sets))
def _ensure_auth(self) -> None:
if self._authenticated:
return
login_url = f"{self._cfg.url}/login"
page = self._session.get(login_url, timeout=30)
page.raise_for_status()
csrf = _extract_csrf(page.text)
data = {"username": self._cfg.user, "password": self._cfg.password}
if csrf:
data["csrf_token"] = csrf
resp = self._session.post(login_url, data=data, allow_redirects=True, timeout=30)
resp.raise_for_status()
if resp.url.rstrip("/").endswith("/login"):
raise RuntimeError("Calibre-Web authentication failed — check credentials")
self._authenticated = True
self._upload_csrf = _extract_csrf(resp.text) or csrf
log.info("Authenticated to Calibre-Web at %s", self._cfg.url)
def _exists_in_calibre(self, filename: str) -> bool:
"""Check whether a book already exists in Calibre-Web. Returns True if likely duplicate."""
keywords = _keywords_from_filename(filename)
if len(keywords) < 2:
return False
our_words = set(keywords)
# Fast path: check pre-loaded title index (available when sync pre-fetches all books)
if self._existing_title_sets is not None:
for their_words in self._existing_title_sets:
if not their_words:
continue
overlap = len(our_words & their_words)
# Match if: 3+ words in common, OR 60%+ of filename keywords match the title,
# OR 60%+ of the stored title's words appear in the filename keywords.
# The third condition catches short titles drowned out by filename noise.
if (overlap >= 3
or (overlap / len(our_words) >= 0.6)
or (len(their_words) >= 2 and overlap / len(their_words) >= 0.6)):
log.info("Duplicate (preloaded index): '%s'", filename)
return True
return False
# Slow path fallback: OPDS search (used when no index is available)
query = " ".join(keywords[:6])
try:
resp = self._session.get(
f"{self._cfg.url}/opds/search/{quote(query, safe='')}",
auth=(self._cfg.user, self._cfg.password),
timeout=15,
)
if resp.status_code == 404:
return False
calibre_titles = _parse_opds_titles(resp.text)
if not calibre_titles:
return False
for title in calibre_titles:
their_words = set(_normalize_words(title))
if not their_words:
continue
overlap = len(our_words & their_words)
if (overlap >= 3
or (overlap / len(our_words) >= 0.6)
or (len(their_words) >= 2 and overlap / len(their_words) >= 0.6)):
log.info("Duplicate (OPDS search): '%s'", filename)
return True
except Exception as e:
log.warning("OPDS search failed for '%s': %s — proceeding with upload", filename, e)
return False
def upload(self, book_path: Path, zip_source: str) -> str:
"""Upload a book file. Returns status: 'uploaded' | 'skipped_duplicate' | 'error'."""
file_hash = _sha256(book_path)
# Primary guard: hash already in our DB
if db.is_book_uploaded(file_hash):
log.info("Skipping (already uploaded): %s", book_path.name)
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
return "skipped_duplicate"
try:
self._ensure_auth()
# Secondary guard: title search in Calibre-Web (catches pre-existing books)
if self._exists_in_calibre(book_path.name):
log.info("Skipping (exists in Calibre-Web): %s", book_path.name)
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
return "skipped_duplicate"
mime = MIME_TYPES.get(book_path.suffix.lower(), "application/octet-stream")
for attempt in range(1, 4):
try:
with book_path.open("rb") as fh:
resp = self._session.post(
f"{self._cfg.url}/upload",
files={"btn-upload": (book_path.name, fh, mime)},
data={"csrf_token": self._upload_csrf} if self._upload_csrf else {},
timeout=120,
)
if not resp.ok:
log.error("Upload HTTP %s (attempt %d/3) — body: %s", resp.status_code, attempt, resp.text[:300])
resp.raise_for_status()
log.info("Uploaded: %s", book_path.name)
self._consecutive_failures = 0
db.record_book(book_path.name, file_hash, zip_source, "uploaded")
# Add to in-session index so a later zip with the same title is skipped
if self._existing_title_sets is not None:
kw = frozenset(_keywords_from_filename(book_path.name))
if kw:
self._existing_title_sets.append(kw)
return "uploaded"
except requests.HTTPError:
if resp.status_code in (502, 503, 504):
if attempt < 3:
log.warning("HTTP %s on attempt %d/3 — retrying in 180s ...", resp.status_code, attempt)
time.sleep(180)
continue
# All retries exhausted
self._consecutive_failures += 1
if self._consecutive_failures >= 3:
raise CalibreUnavailableError(
f"Calibre-Web returned {resp.status_code} on {self._consecutive_failures} "
"consecutive books — aborting sync run"
)
break
if resp.status_code == 400 and attempt == 1:
log.warning("HTTP 400 — CSRF token likely expired, re-authenticating ...")
self._authenticated = False
self._upload_csrf = None
self._ensure_auth()
continue
break
db.record_book(book_path.name, file_hash, zip_source, "error")
return "error"
except CalibreUnavailableError:
db.record_book(book_path.name, file_hash, zip_source, "error")
raise
except Exception as e:
log.error("Upload failed for %s: %s", book_path.name, e)
db.record_book(book_path.name, file_hash, zip_source, "error")
return "error"
def fetch_all_books(cfg: CalibreConfig) -> list[dict]:
"""Fetch every book from Calibre-Web. Tries /ajax/listbooks first; falls back to OPDS if pagination is broken."""
client = CalibreClient(cfg)
client._ensure_auth()
all_books: list[dict] = []
seen_ids: set = set()
page_size = 1000
start = 0
reported_total = 0
while True:
resp = client._session.get(
f"{cfg.url}/ajax/listbooks",
params={
"draw": 1,
"start": start, "length": page_size,
"iDisplayStart": start, "iDisplayLength": page_size,
},
timeout=60,
)
resp.raise_for_status()
data = resp.json()
if start == 0:
non_list = {k: v for k, v in data.items() if not isinstance(v, list)}
log.info("listbooks page-0 meta fields: %s", non_list)
rows = data.get("rows") or data.get("data") or []
reported_total = (
data.get("recordsTotal") or data.get("total_count") or
data.get("total") or data.get("totalNotFiltered") or 0
)
new_in_page = 0
for b in rows:
bid = b.get("id")
if bid not in seen_ids:
seen_ids.add(bid)
all_books.append(b)
new_in_page += 1
log.info("Books fetched: %d / %d (page gave %d new)", len(all_books), reported_total, new_in_page)
if not rows or new_in_page == 0 or len(all_books) >= reported_total:
break
start += len(rows)
# If we got far fewer books than reported, listbooks pagination is broken — use OPDS instead
if reported_total > 0 and len(all_books) < reported_total // 2:
log.warning(
"listbooks pagination broken (%d/%d books retrieved). Falling back to OPDS.",
len(all_books), reported_total,
)
return _fetch_all_books_opds(cfg)
return all_books
def _fetch_all_books_opds(cfg: CalibreConfig) -> list[dict]:
"""Fetch all books via OPDS catalog, following next-page links."""
import xml.etree.ElementTree as ET
books: list[dict] = []
seen_ids: set = set()
url: str | None = f"{cfg.url}/opds/new"
auth = (cfg.user, cfg.password)
session = requests.Session()
while url:
resp = session.get(url, auth=auth, timeout=30)
if not resp.ok:
log.warning("OPDS fetch failed HTTP %s%s", resp.status_code, url)
break
try:
root = ET.fromstring(resp.content)
except ET.ParseError as exc:
log.warning("OPDS XML parse error: %s", exc)
break
next_url: str | None = None
entries_this_page = 0
for elem in root:
local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
if local == "link" and elem.get("rel") == "next":
href = elem.get("href", "")
next_url = href if href.startswith("http") else f"{cfg.url}{href}"
elif local == "entry":
entries_this_page += 1
title = ""
author_parts: list[str] = []
book_id: int | None = None
for child in elem:
ctag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
if ctag == "title":
title = child.text or ""
elif ctag == "author":
for gc in child:
if (gc.tag.split("}")[-1] if "}" in gc.tag else gc.tag) == "name":
author_parts.append(gc.text or "")
elif ctag == "link":
m = re.search(r"/download/(\d+)/", child.get("href", ""))
if m and book_id is None:
book_id = int(m.group(1))
if book_id and book_id not in seen_ids:
seen_ids.add(book_id)
books.append({"id": book_id, "title": title, "authors": " & ".join(author_parts)})
log.info("OPDS fetched: %d books total (page had %d entries)", len(books), entries_this_page)
if not entries_this_page:
break
url = next_url
return books
def delete_book(cfg: CalibreConfig, book_id: int, client: "CalibreClient | None" = None) -> tuple[bool, str]:
"""Delete a book from Calibre-Web by ID. Pass a pre-authenticated client to avoid re-auth overhead."""
if client is None:
client = CalibreClient(cfg)
client._ensure_auth()
csrf = client._upload_csrf
if not csrf:
# Try to fetch a CSRF token from the book detail page
try:
page = client._session.get(f"{cfg.url}/book/{book_id}", timeout=15)
csrf = _extract_csrf(page.text)
client._upload_csrf = csrf
except Exception:
pass
for attempt in range(2):
resp = client._session.post(
f"{cfg.url}/delete/{book_id}",
data={"csrf_token": csrf} if csrf else {},
timeout=30,
)
if resp.ok:
return True, "Deleted"
if resp.status_code == 400 and attempt == 0:
# CSRF token likely expired; re-authenticate and retry once
log.info("delete_book: 400 on book %d — refreshing CSRF and retrying", book_id)
client._authenticated = False
client._upload_csrf = None
client._ensure_auth()
csrf = client._upload_csrf
continue
return False, f"HTTP {resp.status_code}"
return False, "HTTP 400 after re-auth retry"
def find_duplicate_groups(books: list[dict]) -> list[list[dict]]:
"""Group books by normalised title+author; return only groups with 2+ entries."""
from collections import defaultdict
groups: dict[str, list[dict]] = defaultdict(list)
for book in books:
title = re.sub(r"[^\w\s]", " ", book.get("title", "").lower())
title = re.sub(r"\s+", " ", title).strip()
authors = re.sub(r"[^\w\s]", " ", book.get("authors", "").lower())
authors = re.sub(r"\s+", " ", authors).strip()
key = f"{title}||{authors}"
if title:
groups[key].append(book)
return sorted(
[g for g in groups.values() if len(g) > 1],
key=lambda g: g[0].get("title", "").lower(),
)
def test_connection(cfg: CalibreConfig) -> tuple[bool, str]:
try:
client = CalibreClient(cfg)
client._ensure_auth()
return True, f"Authenticated to {cfg.url} as '{cfg.user}'."
except Exception as e:
return False, str(e)
# --- Helpers ---
def _ascii_fold(s: str) -> str:
"""Strip accents: 'världens' → 'varldens', 'väg' → 'vag'."""
return "".join(c for c in unicodedata.normalize("NFKD", s) if unicodedata.category(c) != "Mn")
def _keywords_from_filename(filename: str) -> list[str]:
"""Extract meaningful words from a release-style filename for OPDS search."""
stem = _ascii_fold(Path(filename).stem.lower())
stem = re.sub(r"[._\-]", " ", stem)
stem = re.sub(r"[^\w\s]", "", stem)
words = stem.split()
return [
w for w in words
if w not in _JUNK_WORDS
and not re.match(r"^\d{4}$", w)
and not re.match(r"^\d+$", w)
and len(w) > 1
]
def _normalize_words(title: str) -> list[str]:
"""Normalize a Calibre-Web title for comparison."""
title = _ascii_fold(title.lower())
title = re.sub(r"[^\w\s]", "", title)
return [w for w in title.split() if w not in _JUNK_WORDS and len(w) > 1]
def _parse_opds_titles(xml: str) -> list[str]:
"""Extract book titles from an OPDS Atom feed, skipping the feed title itself."""
# Grab all <title> elements; the first is the feed title ("Search results"), rest are books
titles = re.findall(r"<title>([^<]+)</title>", xml)
return titles[1:] if len(titles) > 1 else []
def _extract_csrf(html: str) -> str | None:
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html)
if not m:
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', html)
return m.group(1) if m else None
def _sha256(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()