Files
calibresync/uploader.py
T
2026-05-12 11:27:52 +02:00

302 lines
12 KiB
Python

import hashlib
import logging
import re
import time
from pathlib import Path
from urllib.parse import quote
import requests
import db
from config import CalibreConfig
log = logging.getLogger(__name__)
MIME_TYPES = {
".epub": "application/epub+zip",
".pdf": "application/pdf",
}
# Words stripped before comparing titles — release-group tags, language codes, format names, etc.
_JUNK_WORDS = {
"retail", "epub", "ebook", "pdf", "mobi", "azw3", "decipher",
"swedish", "english", "danish", "norwegian", "finnish", "german", "french",
"the", "a", "an", "och", "und", "les", "der", "die", "das",
}
class CalibreUnavailableError(RuntimeError):
"""Raised when Calibre-Web returns repeated 502/503/504 — sync run should abort."""
class CalibreClient:
def __init__(self, cfg: CalibreConfig):
self._cfg = cfg
self._session = requests.Session()
self._authenticated = False
self._upload_csrf: str | None = None
self._consecutive_failures = 0
def _ensure_auth(self) -> None:
if self._authenticated:
return
login_url = f"{self._cfg.url}/login"
page = self._session.get(login_url, timeout=30)
page.raise_for_status()
csrf = _extract_csrf(page.text)
data = {"username": self._cfg.user, "password": self._cfg.password}
if csrf:
data["csrf_token"] = csrf
resp = self._session.post(login_url, data=data, allow_redirects=True, timeout=30)
resp.raise_for_status()
if resp.url.rstrip("/").endswith("/login"):
raise RuntimeError("Calibre-Web authentication failed — check credentials")
self._authenticated = True
self._upload_csrf = _extract_csrf(resp.text) or csrf
log.info("Authenticated to Calibre-Web at %s", self._cfg.url)
def _exists_in_calibre(self, filename: str) -> bool:
"""Search Calibre-Web OPDS for a title matching this filename. Returns True if likely duplicate."""
keywords = _keywords_from_filename(filename)
if len(keywords) < 2:
return False
query = " ".join(keywords[:6])
try:
resp = self._session.get(
f"{self._cfg.url}/opds/search/{quote(query, safe='')}",
auth=(self._cfg.user, self._cfg.password),
timeout=15,
)
if resp.status_code == 404:
return False
calibre_titles = _parse_opds_titles(resp.text)
if not calibre_titles:
return False
our_words = set(keywords)
for title in calibre_titles:
their_words = set(_normalize_words(title))
overlap = len(our_words & their_words)
# Match if 3+ words in common, or 60%+ of our keywords match
if overlap >= 3 or (our_words and overlap / len(our_words) >= 0.6):
log.info("Duplicate found in Calibre-Web: '%s' ~ '%s'", filename, title)
return True
except Exception as e:
log.warning("OPDS search failed for '%s': %s — proceeding with upload", filename, e)
return False
def upload(self, book_path: Path, zip_source: str) -> str:
"""Upload a book file. Returns status: 'uploaded' | 'skipped_duplicate' | 'error'."""
file_hash = _sha256(book_path)
# Primary guard: hash already in our DB
if db.is_book_uploaded(file_hash):
log.info("Skipping (already uploaded): %s", book_path.name)
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
return "skipped_duplicate"
try:
self._ensure_auth()
# Secondary guard: title search in Calibre-Web (catches pre-existing books)
if self._exists_in_calibre(book_path.name):
log.info("Skipping (exists in Calibre-Web): %s", book_path.name)
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
return "skipped_duplicate"
mime = MIME_TYPES.get(book_path.suffix.lower(), "application/octet-stream")
for attempt in range(1, 4):
try:
with book_path.open("rb") as fh:
resp = self._session.post(
f"{self._cfg.url}/upload",
files={"btn-upload": (book_path.name, fh, mime)},
data={"csrf_token": self._upload_csrf} if self._upload_csrf else {},
timeout=120,
)
if not resp.ok:
log.error("Upload HTTP %s (attempt %d/3) — body: %s", resp.status_code, attempt, resp.text[:300])
resp.raise_for_status()
log.info("Uploaded: %s", book_path.name)
self._consecutive_failures = 0
db.record_book(book_path.name, file_hash, zip_source, "uploaded")
return "uploaded"
except requests.HTTPError:
if resp.status_code in (502, 503, 504):
if attempt < 3:
log.warning("HTTP %s on attempt %d/3 — retrying in 60s ...", resp.status_code, attempt)
time.sleep(60)
continue
# All retries exhausted
self._consecutive_failures += 1
if self._consecutive_failures >= 3:
raise CalibreUnavailableError(
f"Calibre-Web returned {resp.status_code} on {self._consecutive_failures} "
"consecutive books — aborting sync run"
)
break
if resp.status_code == 400 and attempt == 1:
log.warning("HTTP 400 — CSRF token likely expired, re-authenticating ...")
self._authenticated = False
self._upload_csrf = None
self._ensure_auth()
continue
break
db.record_book(book_path.name, file_hash, zip_source, "error")
return "error"
except CalibreUnavailableError:
db.record_book(book_path.name, file_hash, zip_source, "error")
raise
except Exception as e:
log.error("Upload failed for %s: %s", book_path.name, e)
db.record_book(book_path.name, file_hash, zip_source, "error")
return "error"
def fetch_all_books(cfg: CalibreConfig) -> list[dict]:
"""Fetch every book from Calibre-Web via /ajax/listbooks. Returns raw row dicts."""
client = CalibreClient(cfg)
client._ensure_auth()
all_books: list[dict] = []
page_size = 100
start = 0
while True:
resp = client._session.get(
f"{cfg.url}/ajax/listbooks",
params={"draw": 1, "start": start, "length": page_size, "sort": "title", "order": "asc"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
# Calibre-Web uses DataTables format: "data"/"recordsTotal", older versions use "rows"/"total_count"
rows = data.get("rows") or data.get("data") or []
total = (
data.get("recordsTotal") or data.get("total_count") or
data.get("total") or data.get("totalNotFiltered") or 0
)
all_books.extend(rows)
log.info("Books fetched: %d / %d", len(all_books), total)
if not rows or len(all_books) >= total:
break
start += len(rows)
# Deduplicate by ID in case of page-boundary overlap in the API response
seen: set[int] = set()
unique: list[dict] = []
for b in all_books:
bid = b.get("id")
if bid is None or bid not in seen:
seen.add(bid)
unique.append(b)
return unique
def delete_book(cfg: CalibreConfig, book_id: int, client: "CalibreClient | None" = None) -> tuple[bool, str]:
"""Delete a book from Calibre-Web by ID. Pass a pre-authenticated client to avoid re-auth overhead."""
if client is None:
client = CalibreClient(cfg)
client._ensure_auth()
csrf = client._upload_csrf
if not csrf:
# Try to fetch a CSRF token from the book detail page
try:
page = client._session.get(f"{cfg.url}/book/{book_id}", timeout=15)
csrf = _extract_csrf(page.text)
client._upload_csrf = csrf
except Exception:
pass
for attempt in range(2):
resp = client._session.post(
f"{cfg.url}/delete/{book_id}",
data={"csrf_token": csrf} if csrf else {},
timeout=30,
)
if resp.ok:
return True, "Deleted"
if resp.status_code == 400 and attempt == 0:
# CSRF token likely expired; re-authenticate and retry once
log.info("delete_book: 400 on book %d — refreshing CSRF and retrying", book_id)
client._authenticated = False
client._upload_csrf = None
client._ensure_auth()
csrf = client._upload_csrf
continue
return False, f"HTTP {resp.status_code}"
return False, "HTTP 400 after re-auth retry"
def find_duplicate_groups(books: list[dict]) -> list[list[dict]]:
"""Group books by normalised title+author; return only groups with 2+ entries."""
from collections import defaultdict
groups: dict[str, list[dict]] = defaultdict(list)
for book in books:
title = re.sub(r"[^\w\s]", " ", book.get("title", "").lower())
title = re.sub(r"\s+", " ", title).strip()
authors = re.sub(r"[^\w\s]", " ", book.get("authors", "").lower())
authors = re.sub(r"\s+", " ", authors).strip()
key = f"{title}||{authors}"
if title:
groups[key].append(book)
return sorted(
[g for g in groups.values() if len(g) > 1],
key=lambda g: g[0].get("title", "").lower(),
)
def test_connection(cfg: CalibreConfig) -> tuple[bool, str]:
try:
client = CalibreClient(cfg)
client._ensure_auth()
return True, f"Authenticated to {cfg.url} as '{cfg.user}'."
except Exception as e:
return False, str(e)
# --- Helpers ---
def _keywords_from_filename(filename: str) -> list[str]:
"""Extract meaningful words from a release-style filename for OPDS search."""
stem = Path(filename).stem.lower()
stem = re.sub(r"[._\-]", " ", stem)
stem = re.sub(r"[^\w\s]", "", stem)
words = stem.split()
return [
w for w in words
if w not in _JUNK_WORDS
and not re.match(r"^\d{4}$", w) # strip year
and not re.match(r"^\d+$", w) # strip pure numbers
and len(w) > 1
]
def _normalize_words(title: str) -> list[str]:
"""Normalize a Calibre-Web title for comparison."""
title = title.lower()
title = re.sub(r"[^\w\s]", "", title)
return [w for w in title.split() if w not in _JUNK_WORDS and len(w) > 1]
def _parse_opds_titles(xml: str) -> list[str]:
"""Extract book titles from an OPDS Atom feed, skipping the feed title itself."""
# Grab all <title> elements; the first is the feed title ("Search results"), rest are books
titles = re.findall(r"<title>([^<]+)</title>", xml)
return titles[1:] if len(titles) > 1 else []
def _extract_csrf(html: str) -> str | None:
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html)
if not m:
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', html)
return m.group(1) if m else None
def _sha256(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()