Added batch process and test
This commit is contained in:
@@ -18,7 +18,13 @@ def is_running() -> bool:
|
||||
return _running
|
||||
|
||||
|
||||
def run_sync() -> None:
|
||||
def run_sync(limit: int | None = None) -> None:
|
||||
"""
|
||||
Process all unprocessed remote zips in chunks.
|
||||
|
||||
limit: if set, process at most this many zips (used by test mode).
|
||||
If None, processes all unprocessed zips in batches of sync_batch_size.
|
||||
"""
|
||||
global _running
|
||||
if not _lock.acquire(blocking=False):
|
||||
log.warning("Sync already running, skipping")
|
||||
@@ -38,40 +44,62 @@ def run_sync() -> None:
|
||||
log.info("Listing remote zips at %s@%s:%s", cfg.sftp.user, cfg.sftp.host, cfg.sftp.remote_path)
|
||||
new_zips = sftp_module.list_new_zips(cfg.sftp)
|
||||
counters["zips_found"] = len(new_zips)
|
||||
|
||||
# Test mode: cap at the explicit limit
|
||||
if limit is not None:
|
||||
new_zips = new_zips[:limit]
|
||||
|
||||
counters["zips_new"] = len(new_zips)
|
||||
|
||||
if not new_zips:
|
||||
log.info("No new zips to process")
|
||||
db.finish_sync_run(run_id, status="success", **counters)
|
||||
return
|
||||
|
||||
# Determine chunk size; 0 means process everything in one chunk
|
||||
batch_size = int(db.get_setting("sync_batch_size", "0") or "0")
|
||||
if batch_size <= 0:
|
||||
batch_size = len(new_zips)
|
||||
|
||||
total_batches = -(-len(new_zips) // batch_size) # ceiling division
|
||||
client = CalibreClient(cfg.calibre)
|
||||
|
||||
for remote_zip in new_zips:
|
||||
zip_status = "success"
|
||||
zip_error = None
|
||||
local_zip = None
|
||||
try:
|
||||
local_zip = sftp_module.download(cfg.sftp, remote_zip, str(work_dir / "downloads"))
|
||||
books = extractor.extract(local_zip, work_dir / "extracted")
|
||||
for batch_num, i in enumerate(range(0, len(new_zips), batch_size), start=1):
|
||||
chunk = new_zips[i : i + batch_size]
|
||||
log.info("Batch %d/%d — processing %d zip(s)", batch_num, total_batches, len(chunk))
|
||||
|
||||
for book in books:
|
||||
status = client.upload(book, zip_source=remote_zip.remote_path)
|
||||
if status == "uploaded":
|
||||
counters["books_uploaded"] += 1
|
||||
elif status == "skipped_duplicate":
|
||||
counters["books_skipped"] += 1
|
||||
else:
|
||||
counters["books_errored"] += 1
|
||||
for remote_zip in chunk:
|
||||
zip_status = "success"
|
||||
zip_error = None
|
||||
local_zip = None
|
||||
try:
|
||||
local_zip = sftp_module.download(cfg.sftp, remote_zip, str(work_dir / "downloads"))
|
||||
books = extractor.extract(local_zip, work_dir / "extracted")
|
||||
|
||||
extractor.cleanup(work_dir / "extracted" / local_zip.stem)
|
||||
except Exception as e:
|
||||
log.error("Error processing %s: %s", remote_zip.remote_path, e)
|
||||
zip_status = "error"
|
||||
zip_error = str(e)
|
||||
finally:
|
||||
if local_zip and local_zip.exists():
|
||||
extractor.cleanup(local_zip)
|
||||
db.mark_zip_processed(remote_zip.remote_path, remote_zip.file_size, zip_status, zip_error)
|
||||
for book in books:
|
||||
status = client.upload(book, zip_source=remote_zip.remote_path)
|
||||
if status == "uploaded":
|
||||
counters["books_uploaded"] += 1
|
||||
elif status == "skipped_duplicate":
|
||||
counters["books_skipped"] += 1
|
||||
else:
|
||||
counters["books_errored"] += 1
|
||||
|
||||
extractor.cleanup(work_dir / "extracted" / local_zip.stem)
|
||||
except Exception as e:
|
||||
log.error("Error processing %s: %s", remote_zip.remote_path, e)
|
||||
zip_status = "error"
|
||||
zip_error = str(e)
|
||||
finally:
|
||||
if local_zip and local_zip.exists():
|
||||
extractor.cleanup(local_zip)
|
||||
db.mark_zip_processed(remote_zip.remote_path, remote_zip.file_size, zip_status, zip_error)
|
||||
|
||||
log.info("Batch %d/%d done", batch_num, total_batches)
|
||||
|
||||
db.finish_sync_run(run_id, status="success", **counters)
|
||||
log.info(
|
||||
"Sync done. Zips: %d, Uploaded: %d, Skipped: %d, Errors: %d",
|
||||
"Sync complete. Total zips: %d, Uploaded: %d, Skipped: %d, Errors: %d",
|
||||
counters["zips_new"], counters["books_uploaded"],
|
||||
counters["books_skipped"], counters["books_errored"],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user