101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
import hashlib
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
import db
|
|
from config import CalibreConfig
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
MIME_TYPES = {
|
|
".epub": "application/epub+zip",
|
|
".pdf": "application/pdf",
|
|
}
|
|
|
|
|
|
class CalibreClient:
|
|
def __init__(self, cfg: CalibreConfig):
|
|
self._cfg = cfg
|
|
self._session = requests.Session()
|
|
self._authenticated = False
|
|
|
|
def _ensure_auth(self) -> None:
|
|
if self._authenticated:
|
|
return
|
|
# Fetch login page first to get the CSRF token (Flask-WTF requirement)
|
|
login_url = f"{self._cfg.url}/login"
|
|
page = self._session.get(login_url, timeout=30)
|
|
page.raise_for_status()
|
|
csrf = _extract_csrf(page.text)
|
|
|
|
data = {"username": self._cfg.user, "password": self._cfg.password}
|
|
if csrf:
|
|
data["csrf_token"] = csrf
|
|
|
|
resp = self._session.post(
|
|
login_url,
|
|
data=data,
|
|
allow_redirects=True,
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
# Calibre-Web redirects to / on success; landing back on /login means bad creds
|
|
if resp.url.rstrip("/").endswith("/login"):
|
|
raise RuntimeError("Calibre-Web authentication failed — check credentials")
|
|
self._authenticated = True
|
|
log.info("Authenticated to Calibre-Web at %s", self._cfg.url)
|
|
|
|
def upload(self, book_path: Path, zip_source: str) -> str:
|
|
"""Upload a book file. Returns status: 'uploaded' | 'skipped_duplicate' | 'error'."""
|
|
file_hash = _sha256(book_path)
|
|
|
|
if db.is_book_uploaded(file_hash):
|
|
log.info("Skipping duplicate: %s (hash %s)", book_path.name, file_hash[:8])
|
|
db.record_book(book_path.name, file_hash, zip_source, "skipped_duplicate")
|
|
return "skipped_duplicate"
|
|
|
|
try:
|
|
self._ensure_auth()
|
|
mime = MIME_TYPES.get(book_path.suffix.lower(), "application/octet-stream")
|
|
with book_path.open("rb") as fh:
|
|
resp = self._session.post(
|
|
f"{self._cfg.url}/upload",
|
|
files={"btn-upload": (book_path.name, fh, mime)},
|
|
timeout=120,
|
|
)
|
|
resp.raise_for_status()
|
|
log.info("Uploaded: %s", book_path.name)
|
|
db.record_book(book_path.name, file_hash, zip_source, "uploaded")
|
|
return "uploaded"
|
|
except Exception as e:
|
|
log.error("Upload failed for %s: %s", book_path.name, e)
|
|
db.record_book(book_path.name, file_hash, zip_source, "error")
|
|
return "error"
|
|
|
|
|
|
def test_connection(cfg: CalibreConfig) -> tuple[bool, str]:
|
|
try:
|
|
client = CalibreClient(cfg)
|
|
client._ensure_auth()
|
|
return True, f"Authenticated to {cfg.url} as '{cfg.user}'."
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def _extract_csrf(html: str) -> str | None:
|
|
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html)
|
|
if not m:
|
|
m = re.search(r'value="([^"]+)"\s+name="csrf_token"', html)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _sha256(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|