import logging import threading import time from pathlib import Path import config import db import extractor import grimmory as grimmory_module import sftp as sftp_module log = logging.getLogger(__name__) _lock = threading.Lock() _running = False def is_running() -> bool: return _running def run_sync(limit: int | None = None) -> None: """ Process all unprocessed remote zips in chunks. limit: if set, process at most this many zips (used by test mode). If None, processes all unprocessed zips in batches of sync_batch_size. """ global _running if not _lock.acquire(blocking=False): log.warning("Sync already running, skipping") return _running = True run_id = db.start_sync_run() counters = dict(zips_found=0, zips_new=0, books_imported=0, books_skipped=0, books_errored=0) try: log.info("Sync started (limit=%s)", limit) cfg = config.load() _validate_config(cfg) log.info("Config OK — work dir: %s, bookdrop: %s", cfg.work_dir, cfg.grimmory.bookdrop_path) work_dir = Path(cfg.work_dir) work_dir.mkdir(parents=True, exist_ok=True) log.info("Connecting to SFTP %s@%s:%s ...", cfg.sftp.user, cfg.sftp.host, cfg.sftp.port) new_zips = sftp_module.list_new_zips(cfg.sftp, max_results=limit) counters["zips_found"] = len(new_zips) counters["zips_new"] = len(new_zips) if not new_zips: log.info("No new zips to process") db.finish_sync_run(run_id, status="success", **counters) return batch_size = int(db.get_setting("sync_batch_size", "0") or "0") if batch_size <= 0: batch_size = len(new_zips) total_batches = -(-len(new_zips) // batch_size) for batch_num, i in enumerate(range(0, len(new_zips), batch_size), start=1): chunk = new_zips[i : i + batch_size] log.info("Batch %d/%d — processing %d zip(s)", batch_num, total_batches, len(chunk)) for remote_zip in chunk: zip_status = "success" zip_error = None local_zip = None # Insert zip row early so we have a zip_id for per-book records db.mark_zip_processed(remote_zip.remote_path, remote_zip.file_size, "running") zip_id = db.get_zip_id_by_path(remote_zip.remote_path) try: t0 = time.monotonic() local_zip = sftp_module.download(cfg.sftp, remote_zip, str(work_dir / "downloads")) log.info("Download done in %.1fs (%.1f MB)", time.monotonic() - t0, local_zip.stat().st_size / 1_048_576) t1 = time.monotonic() books = extractor.extract(local_zip, work_dir / "extracted") log.info("Extract done in %.1fs — %d book(s)", time.monotonic() - t1, len(books)) for book in books: sha256 = grimmory_module.compute_sha256(book) if db.is_book_processed(sha256): log.info("Skipping '%s' — sha256 already processed", book.name) counters["books_skipped"] += 1 db.record_book(zip_id, book.name, sha256, status="skipped") continue result = grimmory_module.place_book( book, cfg.grimmory.bookdrop_path, cfg.grimmory.url, cfg.grimmory.user, cfg.grimmory.password, sha256=sha256, ) if result.status == "success": counters["books_imported"] += 1 elif result.status == "skipped": counters["books_skipped"] += 1 db.record_book(zip_id, book.name, result.sha256, status=result.status, error_msg=result.error_msg) extractor.cleanup(work_dir / "extracted" / local_zip.stem) except Exception as e: log.error("Error processing %s: %s", remote_zip.remote_path, e) zip_status = "error" zip_error = str(e) counters["books_errored"] += 1 finally: if local_zip and local_zip.exists(): extractor.cleanup(local_zip) db.mark_zip_processed(remote_zip.remote_path, remote_zip.file_size, zip_status, zip_error) log.info("Batch %d/%d done", batch_num, total_batches) db.finish_sync_run(run_id, status="success", **counters) log.info( "Sync complete. Total zips: %d, Imported: %d, Errors: %d", counters["zips_new"], counters["books_imported"], counters["books_errored"], ) except Exception as e: log.exception("Sync run failed: %s", e) db.finish_sync_run(run_id, status="error", error_msg=str(e), **counters) finally: _running = False _lock.release() def _validate_config(cfg) -> None: missing = [] if not cfg.sftp.host: missing.append("SFTP host") if not cfg.sftp.user: missing.append("SFTP user") if not cfg.sftp.remote_path: missing.append("SFTP remote path") if cfg.sftp.auth_method == "key" and not cfg.sftp.key: missing.append("SSH private key") if cfg.sftp.auth_method == "password" and not cfg.sftp.password: missing.append("SSH password") if not cfg.grimmory.url: missing.append("Grimmory URL") if not cfg.grimmory.user: missing.append("Grimmory username") if not cfg.grimmory.password: missing.append("Grimmory password") if not cfg.grimmory.bookdrop_path: missing.append("Grimmory bookdrop path") if missing: raise ValueError(f"Missing configuration: {', '.join(missing)}")