diff --git a/db.py b/db.py index 02bc1f9..d4a8f74 100644 --- a/db.py +++ b/db.py @@ -68,6 +68,12 @@ def init_db() -> None: status TEXT DEFAULT 'running', error_msg TEXT ); + + CREATE TABLE IF NOT EXISTS remote_zip_cache ( + remote_path TEXT PRIMARY KEY, + file_size INTEGER NOT NULL, + cached_at TEXT NOT NULL + ); """) @@ -110,6 +116,39 @@ def get_all_processed_paths() -> set[str]: return {row["remote_path"] for row in rows} +# --- Remote zip cache --- + +def get_remote_zip_cache() -> list[tuple[str, int]]: + """Return cached (remote_path, file_size) tuples.""" + with get_db() as conn: + rows = conn.execute("SELECT remote_path, file_size FROM remote_zip_cache").fetchall() + return [(row["remote_path"], row["file_size"]) for row in rows] + + +def upsert_remote_zip_cache(zips: list[tuple[str, int]]) -> None: + """Bulk-insert or replace cache entries. zips is a list of (remote_path, file_size).""" + now = _now() + with get_db() as conn: + conn.executemany( + "INSERT INTO remote_zip_cache (remote_path, file_size, cached_at) VALUES (?,?,?)" + " ON CONFLICT(remote_path) DO UPDATE SET file_size=excluded.file_size, cached_at=excluded.cached_at", + [(path, size, now) for path, size in zips], + ) + + +def get_remote_cache_info() -> dict: + with get_db() as conn: + row = conn.execute( + "SELECT COUNT(*) as count, MAX(cached_at) as last_scan FROM remote_zip_cache" + ).fetchone() + return {"count": row["count"], "last_scan": row["last_scan"]} + + +def clear_remote_zip_cache() -> int: + with get_db() as conn: + return conn.execute("DELETE FROM remote_zip_cache").rowcount + + def mark_zip_processed(remote_path: str, file_size: int, status: str, error_msg: str | None = None) -> None: with get_db() as conn: conn.execute( @@ -212,11 +251,13 @@ def get_stats() -> dict: def clear_sync_data() -> dict: - """Delete all processed_zips, uploaded_books, and sync_runs rows. Settings are kept.""" + """Delete all processed_zips, uploaded_books, and sync_runs rows. Settings are kept. + Also resets the remote scan timestamp so the next sync does a full rescan.""" with get_db() as conn: zips = conn.execute("DELETE FROM processed_zips").rowcount books = conn.execute("DELETE FROM uploaded_books").rowcount runs = conn.execute("DELETE FROM sync_runs").rowcount + conn.execute("DELETE FROM settings WHERE key = 'remote_cache_last_scan'") return {"zips": zips, "books": books, "runs": runs} diff --git a/main.py b/main.py index 30605e2..aa4775e 100644 --- a/main.py +++ b/main.py @@ -63,6 +63,7 @@ async def dashboard(request: Request): zips = [dict(z) for z in db.get_recent_zips(20)] interval = int(db.get_setting("scheduler_interval_minutes", "0") or "0") batch_size = int(db.get_setting("sync_batch_size", "0") or "0") + cache_info = db.get_remote_cache_info() return templates.TemplateResponse(request, "index.html", { "stats": stats, "runs": runs, @@ -71,6 +72,7 @@ async def dashboard(request: Request): "next_run": next_run_time(), "interval": interval, "batch_size": batch_size, + "cache_info": cache_info, }) @@ -158,6 +160,15 @@ async def trigger_test_sync(background_tasks: BackgroundTasks): return RedirectResponse("/?test_started=1", status_code=303) +@app.post("/sync/rescan") +async def trigger_rescan(background_tasks: BackgroundTasks): + if sync.is_running(): + return RedirectResponse("/?already_running=1", status_code=303) + cfg = config.load() + background_tasks.add_task(sftp_module.refresh_remote_zip_cache, cfg.sftp) + return RedirectResponse("/?rescan_started=1", status_code=303) + + # --- Connection tests --- @app.get("/api/test/ssh") diff --git a/sftp.py b/sftp.py index 7981df1..1ba09b1 100644 --- a/sftp.py +++ b/sftp.py @@ -4,6 +4,7 @@ import shlex import socket import time from dataclasses import dataclass +from datetime import datetime, timedelta, timezone from pathlib import Path import paramiko @@ -80,29 +81,67 @@ def test_connection(cfg: SFTPConfig) -> tuple[bool, str]: def list_new_zips(cfg: SFTPConfig, max_results: int | None = None) -> list[RemoteZip]: + last_scan = db.get_setting("remote_cache_last_scan") + transport = _make_transport(cfg) try: t0 = time.monotonic() - all_zips = _find_remote_zips(transport, cfg.remote_path) - log.info("Remote find done in %.1fs — %d zip(s) found", time.monotonic() - t0, len(all_zips)) - - t1 = time.monotonic() - processed = db.get_all_processed_paths() - log.info("DB lookup done in %.1fs — %d path(s) already processed", time.monotonic() - t1, len(processed)) - - new_zips: list[RemoteZip] = [] - for zip_info in all_zips: - if zip_info.remote_path not in processed: - new_zips.append(zip_info) - if max_results and len(new_zips) >= max_results: - log.info("Reached limit of %d", max_results) - break - - log.info("%d new zip(s) to process", len(new_zips)) - return new_zips + if last_scan: + # Fast incremental: prune directories not modified since last scan. + # Adding a file/dir to a directory updates that directory's mtime, + # so we safely skip entire subtrees that haven't changed. + cutoff = _scan_cutoff(last_scan) + log.info("Incremental scan — looking for directories modified since %s ...", cutoff) + new_remote = _find_remote_zips(transport, cfg.remote_path, newer_than=cutoff) + log.info("Incremental scan done in %.1fs — %d new zip(s) on remote", time.monotonic() - t0, len(new_remote)) + else: + log.info("First run — full remote scan (may take several minutes for large trees) ...") + new_remote = _find_remote_zips(transport, cfg.remote_path) + log.info("Full scan done in %.1fs — %d zip(s) found", time.monotonic() - t0, len(new_remote)) finally: transport.close() + # Record scan time, then update cache with any new entries found + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + db.set_setting("remote_cache_last_scan", now_str) + if new_remote: + db.upsert_remote_zip_cache([(z.remote_path, z.file_size) for z in new_remote]) + log.info("Cache updated with %d new entry(ies)", len(new_remote)) + + # Filter full cache against already-processed paths + t1 = time.monotonic() + all_cached = db.get_remote_zip_cache() + processed = db.get_all_processed_paths() + log.info("DB lookup done in %.1fs — cache: %d, processed: %d", time.monotonic() - t1, len(all_cached), len(processed)) + + new_zips: list[RemoteZip] = [] + for path, size in all_cached: + if path not in processed: + new_zips.append(RemoteZip(remote_path=path, file_size=size)) + if max_results and len(new_zips) >= max_results: + break + + log.info("%d zip(s) to process", len(new_zips)) + return new_zips + + +def refresh_remote_zip_cache(cfg: SFTPConfig) -> int: + """Force a full remote scan, replacing the entire cache. Used by the manual rescan button.""" + log.info("Forced full remote cache refresh ...") + t0 = time.monotonic() + transport = _make_transport(cfg) + try: + all_zips = _find_remote_zips(transport, cfg.remote_path) + finally: + transport.close() + log.info("Full scan done in %.1fs — %d zip(s)", time.monotonic() - t0, len(all_zips)) + db.clear_remote_zip_cache() + db.upsert_remote_zip_cache([(z.remote_path, z.file_size) for z in all_zips]) + now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + db.set_setting("remote_cache_last_scan", now_str) + log.info("Cache refreshed: %d zip(s) stored", len(all_zips)) + return len(all_zips) + def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path: dest = Path(dest_dir) @@ -119,14 +158,27 @@ def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path: return local_path -def _find_remote_zips(transport: paramiko.Transport, remote_path: str) -> list[RemoteZip]: - """Single SSH exec: find all .zip files server-side. Vastly faster than per-directory SFTP calls.""" +def _find_remote_zips(transport: paramiko.Transport, remote_path: str, newer_than: str | None = None) -> list[RemoteZip]: + """Run find on the remote host, streaming results with progress logging every 30 s.""" channel = transport.open_session() - cmd = f"find {shlex.quote(remote_path)} -type f -iname '*.zip' -printf '%s\\t%p\\n'" - log.info("Running remote find under %s ...", remote_path) + + if newer_than: + # Prune entire directory subtrees whose mtime predates the cutoff. + # A directory's mtime is updated when entries are added inside it, + # so old-mtime dirs are guaranteed to contain no new files. + cmd = ( + f"find {shlex.quote(remote_path)}" + f" \\( -type d ! -newermt {shlex.quote(newer_than)} -prune \\)" + f" -o \\( -type f -iname '*.zip' -printf '%s\\t%p\\n' \\)" + ) + else: + cmd = f"find {shlex.quote(remote_path)} -type f -iname '*.zip' -printf '%s\\t%p\\n'" + channel.exec_command(cmd) zips: list[RemoteZip] = [] + last_log = time.monotonic() + for line in channel.makefile("r", -1): line = line.rstrip("\n") if "\t" not in line: @@ -137,9 +189,21 @@ def _find_remote_zips(transport: paramiko.Transport, remote_path: str) -> list[R except ValueError: continue + now = time.monotonic() + if now - last_log >= 30: + log.info("Find in progress: %d zip(s) found so far ...", len(zips)) + last_log = now + stderr_out = channel.makefile_stderr("r", -1).read().strip() if stderr_out: log.warning("find stderr: %s", stderr_out[:500]) channel.recv_exit_status() channel.close() return zips + + +def _scan_cutoff(last_scan: str) -> str: + """Subtract 5-minute safety buffer from last-scan timestamp to handle clock skew.""" + dt = datetime.strptime(last_scan, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) + dt -= timedelta(minutes=5) + return dt.strftime("%Y-%m-%d %H:%M:%S") diff --git a/static/style.css b/static/style.css index efba279..281303d 100644 --- a/static/style.css +++ b/static/style.css @@ -156,6 +156,7 @@ tr:hover td { background: rgba(255,255,255,0.02); } .btn-danger { background: #dc2626; color: #fff; border: 1px solid #dc2626; } .btn-danger:hover { background: #b91c1c; border-color: #b91c1c; } .danger-zone { border-color: rgba(220,38,38,0.4); } +.cache-status { margin-bottom: 1rem; } /* Forms */ .form-section { diff --git a/templates/index.html b/templates/index.html index e787b14..baf41e5 100644 --- a/templates/index.html +++ b/templates/index.html @@ -11,11 +11,14 @@ {% if next_run %} — next: {{ next_run }}{% endif %} {% endif %} +
{% if not sync_running %} @@ -35,10 +38,21 @@ {% if request.query_params.get("test_started") %}