134 lines
4.8 KiB
Python
134 lines
4.8 KiB
Python
import logging
|
|
import shutil
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import config
|
|
import db
|
|
import extractor
|
|
import sftp as sftp_module
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_lock = threading.Lock()
|
|
_running = False
|
|
|
|
|
|
def is_running() -> bool:
|
|
return _running
|
|
|
|
|
|
def run_sync(limit: int | None = None) -> None:
|
|
"""
|
|
Process all unprocessed remote zips in chunks.
|
|
|
|
limit: if set, process at most this many zips (used by test mode).
|
|
If None, processes all unprocessed zips in batches of sync_batch_size.
|
|
"""
|
|
global _running
|
|
if not _lock.acquire(blocking=False):
|
|
log.warning("Sync already running, skipping")
|
|
return
|
|
|
|
_running = True
|
|
run_id = db.start_sync_run()
|
|
counters = dict(zips_found=0, zips_new=0, books_imported=0, books_errored=0)
|
|
|
|
try:
|
|
log.info("Sync started (limit=%s)", limit)
|
|
cfg = config.load()
|
|
_validate_config(cfg)
|
|
log.info("Config OK — work dir: %s, import dir: %s", cfg.work_dir, cfg.import_dir)
|
|
|
|
work_dir = Path(cfg.work_dir)
|
|
work_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
import_dir = Path(cfg.import_dir)
|
|
import_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
log.info("Connecting to SFTP %s@%s:%s ...", cfg.sftp.user, cfg.sftp.host, cfg.sftp.port)
|
|
new_zips = sftp_module.list_new_zips(cfg.sftp, max_results=limit)
|
|
counters["zips_found"] = len(new_zips)
|
|
counters["zips_new"] = len(new_zips)
|
|
|
|
if not new_zips:
|
|
log.info("No new zips to process")
|
|
db.finish_sync_run(run_id, status="success", **counters)
|
|
return
|
|
|
|
batch_size = int(db.get_setting("sync_batch_size", "0") or "0")
|
|
if batch_size <= 0:
|
|
batch_size = len(new_zips)
|
|
|
|
total_batches = -(-len(new_zips) // batch_size)
|
|
|
|
for batch_num, i in enumerate(range(0, len(new_zips), batch_size), start=1):
|
|
chunk = new_zips[i : i + batch_size]
|
|
log.info("Batch %d/%d — processing %d zip(s)", batch_num, total_batches, len(chunk))
|
|
|
|
for remote_zip in chunk:
|
|
zip_status = "success"
|
|
zip_error = None
|
|
local_zip = None
|
|
try:
|
|
t0 = time.monotonic()
|
|
local_zip = sftp_module.download(cfg.sftp, remote_zip, str(work_dir / "downloads"))
|
|
log.info("Download done in %.1fs (%.1f MB)", time.monotonic() - t0, local_zip.stat().st_size / 1_048_576)
|
|
|
|
t1 = time.monotonic()
|
|
books = extractor.extract(local_zip, work_dir / "extracted")
|
|
log.info("Extract done in %.1fs — %d book(s)", time.monotonic() - t1, len(books))
|
|
|
|
for book in books:
|
|
dest = import_dir / book.name
|
|
if dest.exists():
|
|
log.info("Skipping '%s' — already exists in import dir", book.name)
|
|
else:
|
|
shutil.move(str(book), str(dest))
|
|
log.info("Moved '%s' → %s", book.name, import_dir)
|
|
counters["books_imported"] += 1
|
|
|
|
extractor.cleanup(work_dir / "extracted" / local_zip.stem)
|
|
except Exception as e:
|
|
log.error("Error processing %s: %s", remote_zip.remote_path, e)
|
|
zip_status = "error"
|
|
zip_error = str(e)
|
|
counters["books_errored"] += 1
|
|
finally:
|
|
if local_zip and local_zip.exists():
|
|
extractor.cleanup(local_zip)
|
|
db.mark_zip_processed(remote_zip.remote_path, remote_zip.file_size, zip_status, zip_error)
|
|
|
|
log.info("Batch %d/%d done", batch_num, total_batches)
|
|
|
|
db.finish_sync_run(run_id, status="success", **counters)
|
|
log.info(
|
|
"Sync complete. Total zips: %d, Imported: %d, Errors: %d",
|
|
counters["zips_new"], counters["books_imported"], counters["books_errored"],
|
|
)
|
|
except Exception as e:
|
|
log.exception("Sync run failed: %s", e)
|
|
db.finish_sync_run(run_id, status="error", error_msg=str(e), **counters)
|
|
finally:
|
|
_running = False
|
|
_lock.release()
|
|
|
|
|
|
def _validate_config(cfg) -> None:
|
|
missing = []
|
|
if not cfg.sftp.host:
|
|
missing.append("SFTP host")
|
|
if not cfg.sftp.user:
|
|
missing.append("SFTP user")
|
|
if not cfg.sftp.remote_path:
|
|
missing.append("SFTP remote path")
|
|
if cfg.sftp.auth_method == "key" and not cfg.sftp.key:
|
|
missing.append("SSH private key")
|
|
if cfg.sftp.auth_method == "password" and not cfg.sftp.password:
|
|
missing.append("SSH password")
|
|
if not cfg.import_dir:
|
|
missing.append("CWA import folder")
|
|
if missing:
|
|
raise ValueError(f"Missing configuration: {', '.join(missing)}")
|