392 lines
16 KiB
Python
392 lines
16 KiB
Python
import hashlib
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from urllib.parse import quote
|
|
|
|
import requests
|
|
|
|
import db
|
|
from config import CalibreConfig
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
MIME_TYPES = {
|
|
".epub": "application/epub+zip",
|
|
".pdf": "application/pdf",
|
|
}
|
|
|
|
# Words stripped before comparing titles — release-group tags, language codes, format names, etc.
|
|
_JUNK_WORDS = {
|
|
"retail", "epub", "ebook", "pdf", "mobi", "azw3", "decipher",
|
|
"swedish", "english", "danish", "norwegian", "finnish", "german", "french",
|
|
"the", "a", "an", "och", "und", "les", "der", "die", "das",
|
|
}
|
|
|
|
|
|
class CalibreUnavailableError(RuntimeError):
|
|
"""Raised when Calibre-Web returns repeated 502/503/504 — sync run should abort."""
|
|
|
|
|
|
class CalibreClient:
|
|
def __init__(self, cfg: CalibreConfig):
|
|
self._cfg = cfg
|
|
self._session = requests.Session()
|
|
self._authenticated = False
|
|
self._upload_csrf: str | None = None
|
|
self._consecutive_failures = 0
|
|
# Pre-loaded title word-sets for fast duplicate detection (set by preload_existing_titles)
|
|
self._existing_title_sets: list[frozenset[str]] | None = None
|
|
|
|
def preload_existing_titles(self, books: list[dict]) -> None:
|
|
"""Build an in-memory index of normalised title keywords from a pre-fetched book list."""
|
|
self._existing_title_sets = [
|
|
frozenset(_normalize_words(b.get("title", "")))
|
|
for b in books
|
|
if b.get("title")
|
|
]
|
|
log.info("Pre-loaded %d existing book titles for duplicate detection", len(self._existing_title_sets))
|
|
|
|
def _ensure_auth(self) -> None:
|
|
if self._authenticated:
|
|
return
|
|
login_url = f"{self._cfg.url}/login"
|
|
page = self._session.get(login_url, timeout=30)
|
|
page.raise_for_status()
|
|
csrf = _extract_csrf(page.text)
|
|
|
|
data = {"username": self._cfg.user, "password": self._cfg.password}
|
|
if csrf:
|
|
data["csrf_token"] = csrf
|
|
|
|
resp = self._session.post(login_url, data=data, allow_redirects=True, timeout=30)
|
|
resp.raise_for_status()
|
|
if resp.url.rstrip("/").endswith("/login"):
|
|
raise RuntimeError("Calibre-Web authentication failed — check credentials")
|
|
self._authenticated = True
|
|
self._upload_csrf = _extract_csrf(resp.text) or csrf
|
|
log.info("Authenticated to Calibre-Web at %s", self._cfg.url)
|
|
|
|
def _exists_in_calibre(self, filename: str) -> bool:
|
|
"""Check whether a book already exists in Calibre-Web. Returns True if likely duplicate."""
|
|
keywords = _keywords_from_filename(filename)
|
|
if len(keywords) < 2:
|
|
return False
|
|
our_words = set(keywords)
|
|
|
|
# Fast path: check pre-loaded title index (available when sync pre-fetches all books)
|
|
if self._existing_title_sets is not None:
|
|
for their_words in self._existing_title_sets:
|
|
overlap = len(our_words & their_words)
|
|
if overlap >= 3 or (our_words and overlap / len(our_words) >= 0.6):
|
|
log.info("Duplicate (preloaded index): '%s'", filename)
|
|
return True
|
|
return False
|
|
|
|
# Slow path fallback: OPDS search (used when no index is available)
|
|
query = " ".join(keywords[:6])
|
|
try:
|
|
resp = self._session.get(
|
|
f"{self._cfg.url}/opds/search/{quote(query, safe='')}",
|
|
auth=(self._cfg.user, self._cfg.password),
|
|
timeout=15,
|
|
)
|
|
if resp.status_code == 404:
|
|
return False
|
|
calibre_titles = _parse_opds_titles(resp.text)
|
|
if not calibre_titles:
|
|
return False
|
|
|
|
for title in calibre_titles:
|
|
their_words = set(_normalize_words(title))
|
|
overlap = len(our_words & their_words)
|
|
if overlap >= 3 or (our_words and overlap / len(our_words) >= 0.6):
|
|
log.info("Duplicate (OPDS search): '%s' ~ '%s'", filename, title)
|
|
return True
|
|
except Exception as e:
|
|
log.warning("OPDS search failed for '%s': %s — proceeding with upload", filename, e)
|
|
return False
|
|
|
|
def upload(self, book_path: Path, zip_source: str) -> str:
|
|
"""Upload a book file. Returns status: 'uploaded' | 'skipped_duplicate' | 'error'."""
|
|
file_hash = _sha256(book_path)
|
|
|
|
# Primary guard: hash already in our DB
|
|
if db.is_book_uploaded(file_hash):
|
|
log.info("Skipping (already uploaded): %s", book_path.name)
|
|
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
|
|
return "skipped_duplicate"
|
|
|
|
try:
|
|
self._ensure_auth()
|
|
|
|
# Secondary guard: title search in Calibre-Web (catches pre-existing books)
|
|
if self._exists_in_calibre(book_path.name):
|
|
log.info("Skipping (exists in Calibre-Web): %s", book_path.name)
|
|
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
|
|
return "skipped_duplicate"
|
|
|
|
mime = MIME_TYPES.get(book_path.suffix.lower(), "application/octet-stream")
|
|
for attempt in range(1, 4):
|
|
try:
|
|
with book_path.open("rb") as fh:
|
|
resp = self._session.post(
|
|
f"{self._cfg.url}/upload",
|
|
files={"btn-upload": (book_path.name, fh, mime)},
|
|
data={"csrf_token": self._upload_csrf} if self._upload_csrf else {},
|
|
timeout=120,
|
|
)
|
|
if not resp.ok:
|
|
log.error("Upload HTTP %s (attempt %d/3) — body: %s", resp.status_code, attempt, resp.text[:300])
|
|
resp.raise_for_status()
|
|
log.info("Uploaded: %s", book_path.name)
|
|
self._consecutive_failures = 0
|
|
db.record_book(book_path.name, file_hash, zip_source, "uploaded")
|
|
return "uploaded"
|
|
except requests.HTTPError:
|
|
if resp.status_code in (502, 503, 504):
|
|
if attempt < 3:
|
|
log.warning("HTTP %s on attempt %d/3 — retrying in 180s ...", resp.status_code, attempt)
|
|
time.sleep(180)
|
|
continue
|
|
# All retries exhausted
|
|
self._consecutive_failures += 1
|
|
if self._consecutive_failures >= 3:
|
|
raise CalibreUnavailableError(
|
|
f"Calibre-Web returned {resp.status_code} on {self._consecutive_failures} "
|
|
"consecutive books — aborting sync run"
|
|
)
|
|
break
|
|
if resp.status_code == 400 and attempt == 1:
|
|
log.warning("HTTP 400 — CSRF token likely expired, re-authenticating ...")
|
|
self._authenticated = False
|
|
self._upload_csrf = None
|
|
self._ensure_auth()
|
|
continue
|
|
break
|
|
|
|
db.record_book(book_path.name, file_hash, zip_source, "error")
|
|
return "error"
|
|
except CalibreUnavailableError:
|
|
db.record_book(book_path.name, file_hash, zip_source, "error")
|
|
raise
|
|
except Exception as e:
|
|
log.error("Upload failed for %s: %s", book_path.name, e)
|
|
db.record_book(book_path.name, file_hash, zip_source, "error")
|
|
return "error"
|
|
|
|
|
|
def fetch_all_books(cfg: CalibreConfig) -> list[dict]:
|
|
"""Fetch every book from Calibre-Web. Tries /ajax/listbooks first; falls back to OPDS if pagination is broken."""
|
|
client = CalibreClient(cfg)
|
|
client._ensure_auth()
|
|
all_books: list[dict] = []
|
|
seen_ids: set = set()
|
|
page_size = 1000
|
|
start = 0
|
|
reported_total = 0
|
|
while True:
|
|
resp = client._session.get(
|
|
f"{cfg.url}/ajax/listbooks",
|
|
params={
|
|
"draw": 1,
|
|
"start": start, "length": page_size,
|
|
"iDisplayStart": start, "iDisplayLength": page_size,
|
|
},
|
|
timeout=60,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if start == 0:
|
|
non_list = {k: v for k, v in data.items() if not isinstance(v, list)}
|
|
log.info("listbooks page-0 meta fields: %s", non_list)
|
|
rows = data.get("rows") or data.get("data") or []
|
|
reported_total = (
|
|
data.get("recordsTotal") or data.get("total_count") or
|
|
data.get("total") or data.get("totalNotFiltered") or 0
|
|
)
|
|
new_in_page = 0
|
|
for b in rows:
|
|
bid = b.get("id")
|
|
if bid not in seen_ids:
|
|
seen_ids.add(bid)
|
|
all_books.append(b)
|
|
new_in_page += 1
|
|
log.info("Books fetched: %d / %d (page gave %d new)", len(all_books), reported_total, new_in_page)
|
|
if not rows or new_in_page == 0 or len(all_books) >= reported_total:
|
|
break
|
|
start += len(rows)
|
|
|
|
# If we got far fewer books than reported, listbooks pagination is broken — use OPDS instead
|
|
if reported_total > 0 and len(all_books) < reported_total // 2:
|
|
log.warning(
|
|
"listbooks pagination broken (%d/%d books retrieved). Falling back to OPDS.",
|
|
len(all_books), reported_total,
|
|
)
|
|
return _fetch_all_books_opds(cfg)
|
|
return all_books
|
|
|
|
|
|
def _fetch_all_books_opds(cfg: CalibreConfig) -> list[dict]:
|
|
"""Fetch all books via OPDS catalog, following next-page links."""
|
|
import xml.etree.ElementTree as ET
|
|
books: list[dict] = []
|
|
seen_ids: set = set()
|
|
url: str | None = f"{cfg.url}/opds/new"
|
|
auth = (cfg.user, cfg.password)
|
|
session = requests.Session()
|
|
|
|
while url:
|
|
resp = session.get(url, auth=auth, timeout=30)
|
|
if not resp.ok:
|
|
log.warning("OPDS fetch failed HTTP %s — %s", resp.status_code, url)
|
|
break
|
|
try:
|
|
root = ET.fromstring(resp.content)
|
|
except ET.ParseError as exc:
|
|
log.warning("OPDS XML parse error: %s", exc)
|
|
break
|
|
|
|
next_url: str | None = None
|
|
entries_this_page = 0
|
|
for elem in root:
|
|
local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
|
|
if local == "link" and elem.get("rel") == "next":
|
|
href = elem.get("href", "")
|
|
next_url = href if href.startswith("http") else f"{cfg.url}{href}"
|
|
elif local == "entry":
|
|
entries_this_page += 1
|
|
title = ""
|
|
author_parts: list[str] = []
|
|
book_id: int | None = None
|
|
for child in elem:
|
|
ctag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
|
|
if ctag == "title":
|
|
title = child.text or ""
|
|
elif ctag == "author":
|
|
for gc in child:
|
|
if (gc.tag.split("}")[-1] if "}" in gc.tag else gc.tag) == "name":
|
|
author_parts.append(gc.text or "")
|
|
elif ctag == "link":
|
|
m = re.search(r"/download/(\d+)/", child.get("href", ""))
|
|
if m and book_id is None:
|
|
book_id = int(m.group(1))
|
|
if book_id and book_id not in seen_ids:
|
|
seen_ids.add(book_id)
|
|
books.append({"id": book_id, "title": title, "authors": " & ".join(author_parts)})
|
|
|
|
log.info("OPDS fetched: %d books total (page had %d entries)", len(books), entries_this_page)
|
|
if not entries_this_page:
|
|
break
|
|
url = next_url
|
|
|
|
return books
|
|
|
|
|
|
def delete_book(cfg: CalibreConfig, book_id: int, client: "CalibreClient | None" = None) -> tuple[bool, str]:
|
|
"""Delete a book from Calibre-Web by ID. Pass a pre-authenticated client to avoid re-auth overhead."""
|
|
if client is None:
|
|
client = CalibreClient(cfg)
|
|
client._ensure_auth()
|
|
csrf = client._upload_csrf
|
|
if not csrf:
|
|
# Try to fetch a CSRF token from the book detail page
|
|
try:
|
|
page = client._session.get(f"{cfg.url}/book/{book_id}", timeout=15)
|
|
csrf = _extract_csrf(page.text)
|
|
client._upload_csrf = csrf
|
|
except Exception:
|
|
pass
|
|
for attempt in range(2):
|
|
resp = client._session.post(
|
|
f"{cfg.url}/delete/{book_id}",
|
|
data={"csrf_token": csrf} if csrf else {},
|
|
timeout=30,
|
|
)
|
|
if resp.ok:
|
|
return True, "Deleted"
|
|
if resp.status_code == 400 and attempt == 0:
|
|
# CSRF token likely expired; re-authenticate and retry once
|
|
log.info("delete_book: 400 on book %d — refreshing CSRF and retrying", book_id)
|
|
client._authenticated = False
|
|
client._upload_csrf = None
|
|
client._ensure_auth()
|
|
csrf = client._upload_csrf
|
|
continue
|
|
return False, f"HTTP {resp.status_code}"
|
|
return False, "HTTP 400 after re-auth retry"
|
|
|
|
|
|
def find_duplicate_groups(books: list[dict]) -> list[list[dict]]:
|
|
"""Group books by normalised title+author; return only groups with 2+ entries."""
|
|
from collections import defaultdict
|
|
groups: dict[str, list[dict]] = defaultdict(list)
|
|
for book in books:
|
|
title = re.sub(r"[^\w\s]", " ", book.get("title", "").lower())
|
|
title = re.sub(r"\s+", " ", title).strip()
|
|
authors = re.sub(r"[^\w\s]", " ", book.get("authors", "").lower())
|
|
authors = re.sub(r"\s+", " ", authors).strip()
|
|
key = f"{title}||{authors}"
|
|
if title:
|
|
groups[key].append(book)
|
|
return sorted(
|
|
[g for g in groups.values() if len(g) > 1],
|
|
key=lambda g: g[0].get("title", "").lower(),
|
|
)
|
|
|
|
|
|
def test_connection(cfg: CalibreConfig) -> tuple[bool, str]:
|
|
try:
|
|
client = CalibreClient(cfg)
|
|
client._ensure_auth()
|
|
return True, f"Authenticated to {cfg.url} as '{cfg.user}'."
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
# --- Helpers ---
|
|
|
|
def _keywords_from_filename(filename: str) -> list[str]:
|
|
"""Extract meaningful words from a release-style filename for OPDS search."""
|
|
stem = Path(filename).stem.lower()
|
|
stem = re.sub(r"[._\-]", " ", stem)
|
|
stem = re.sub(r"[^\w\s]", "", stem)
|
|
words = stem.split()
|
|
return [
|
|
w for w in words
|
|
if w not in _JUNK_WORDS
|
|
and not re.match(r"^\d{4}$", w) # strip year
|
|
and not re.match(r"^\d+$", w) # strip pure numbers
|
|
and len(w) > 1
|
|
]
|
|
|
|
|
|
def _normalize_words(title: str) -> list[str]:
|
|
"""Normalize a Calibre-Web title for comparison."""
|
|
title = title.lower()
|
|
title = re.sub(r"[^\w\s]", "", title)
|
|
return [w for w in title.split() if w not in _JUNK_WORDS and len(w) > 1]
|
|
|
|
|
|
def _parse_opds_titles(xml: str) -> list[str]:
|
|
"""Extract book titles from an OPDS Atom feed, skipping the feed title itself."""
|
|
# Grab all <title> elements; the first is the feed title ("Search results"), rest are books
|
|
titles = re.findall(r"<title>([^<]+)</title>", xml)
|
|
return titles[1:] if len(titles) > 1 else []
|
|
|
|
|
|
def _extract_csrf(html: str) -> str | None:
|
|
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html)
|
|
if not m:
|
|
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', html)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _sha256(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|