sync errors
This commit is contained in:
@@ -4,6 +4,7 @@ import shlex
|
||||
import socket
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import paramiko
|
||||
@@ -80,29 +81,67 @@ def test_connection(cfg: SFTPConfig) -> tuple[bool, str]:
|
||||
|
||||
|
||||
def list_new_zips(cfg: SFTPConfig, max_results: int | None = None) -> list[RemoteZip]:
|
||||
last_scan = db.get_setting("remote_cache_last_scan")
|
||||
|
||||
transport = _make_transport(cfg)
|
||||
try:
|
||||
t0 = time.monotonic()
|
||||
all_zips = _find_remote_zips(transport, cfg.remote_path)
|
||||
log.info("Remote find done in %.1fs — %d zip(s) found", time.monotonic() - t0, len(all_zips))
|
||||
|
||||
t1 = time.monotonic()
|
||||
processed = db.get_all_processed_paths()
|
||||
log.info("DB lookup done in %.1fs — %d path(s) already processed", time.monotonic() - t1, len(processed))
|
||||
|
||||
new_zips: list[RemoteZip] = []
|
||||
for zip_info in all_zips:
|
||||
if zip_info.remote_path not in processed:
|
||||
new_zips.append(zip_info)
|
||||
if max_results and len(new_zips) >= max_results:
|
||||
log.info("Reached limit of %d", max_results)
|
||||
break
|
||||
|
||||
log.info("%d new zip(s) to process", len(new_zips))
|
||||
return new_zips
|
||||
if last_scan:
|
||||
# Fast incremental: prune directories not modified since last scan.
|
||||
# Adding a file/dir to a directory updates that directory's mtime,
|
||||
# so we safely skip entire subtrees that haven't changed.
|
||||
cutoff = _scan_cutoff(last_scan)
|
||||
log.info("Incremental scan — looking for directories modified since %s ...", cutoff)
|
||||
new_remote = _find_remote_zips(transport, cfg.remote_path, newer_than=cutoff)
|
||||
log.info("Incremental scan done in %.1fs — %d new zip(s) on remote", time.monotonic() - t0, len(new_remote))
|
||||
else:
|
||||
log.info("First run — full remote scan (may take several minutes for large trees) ...")
|
||||
new_remote = _find_remote_zips(transport, cfg.remote_path)
|
||||
log.info("Full scan done in %.1fs — %d zip(s) found", time.monotonic() - t0, len(new_remote))
|
||||
finally:
|
||||
transport.close()
|
||||
|
||||
# Record scan time, then update cache with any new entries found
|
||||
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
db.set_setting("remote_cache_last_scan", now_str)
|
||||
if new_remote:
|
||||
db.upsert_remote_zip_cache([(z.remote_path, z.file_size) for z in new_remote])
|
||||
log.info("Cache updated with %d new entry(ies)", len(new_remote))
|
||||
|
||||
# Filter full cache against already-processed paths
|
||||
t1 = time.monotonic()
|
||||
all_cached = db.get_remote_zip_cache()
|
||||
processed = db.get_all_processed_paths()
|
||||
log.info("DB lookup done in %.1fs — cache: %d, processed: %d", time.monotonic() - t1, len(all_cached), len(processed))
|
||||
|
||||
new_zips: list[RemoteZip] = []
|
||||
for path, size in all_cached:
|
||||
if path not in processed:
|
||||
new_zips.append(RemoteZip(remote_path=path, file_size=size))
|
||||
if max_results and len(new_zips) >= max_results:
|
||||
break
|
||||
|
||||
log.info("%d zip(s) to process", len(new_zips))
|
||||
return new_zips
|
||||
|
||||
|
||||
def refresh_remote_zip_cache(cfg: SFTPConfig) -> int:
|
||||
"""Force a full remote scan, replacing the entire cache. Used by the manual rescan button."""
|
||||
log.info("Forced full remote cache refresh ...")
|
||||
t0 = time.monotonic()
|
||||
transport = _make_transport(cfg)
|
||||
try:
|
||||
all_zips = _find_remote_zips(transport, cfg.remote_path)
|
||||
finally:
|
||||
transport.close()
|
||||
log.info("Full scan done in %.1fs — %d zip(s)", time.monotonic() - t0, len(all_zips))
|
||||
db.clear_remote_zip_cache()
|
||||
db.upsert_remote_zip_cache([(z.remote_path, z.file_size) for z in all_zips])
|
||||
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
db.set_setting("remote_cache_last_scan", now_str)
|
||||
log.info("Cache refreshed: %d zip(s) stored", len(all_zips))
|
||||
return len(all_zips)
|
||||
|
||||
|
||||
def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path:
|
||||
dest = Path(dest_dir)
|
||||
@@ -119,14 +158,27 @@ def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path:
|
||||
return local_path
|
||||
|
||||
|
||||
def _find_remote_zips(transport: paramiko.Transport, remote_path: str) -> list[RemoteZip]:
|
||||
"""Single SSH exec: find all .zip files server-side. Vastly faster than per-directory SFTP calls."""
|
||||
def _find_remote_zips(transport: paramiko.Transport, remote_path: str, newer_than: str | None = None) -> list[RemoteZip]:
|
||||
"""Run find on the remote host, streaming results with progress logging every 30 s."""
|
||||
channel = transport.open_session()
|
||||
cmd = f"find {shlex.quote(remote_path)} -type f -iname '*.zip' -printf '%s\\t%p\\n'"
|
||||
log.info("Running remote find under %s ...", remote_path)
|
||||
|
||||
if newer_than:
|
||||
# Prune entire directory subtrees whose mtime predates the cutoff.
|
||||
# A directory's mtime is updated when entries are added inside it,
|
||||
# so old-mtime dirs are guaranteed to contain no new files.
|
||||
cmd = (
|
||||
f"find {shlex.quote(remote_path)}"
|
||||
f" \\( -type d ! -newermt {shlex.quote(newer_than)} -prune \\)"
|
||||
f" -o \\( -type f -iname '*.zip' -printf '%s\\t%p\\n' \\)"
|
||||
)
|
||||
else:
|
||||
cmd = f"find {shlex.quote(remote_path)} -type f -iname '*.zip' -printf '%s\\t%p\\n'"
|
||||
|
||||
channel.exec_command(cmd)
|
||||
|
||||
zips: list[RemoteZip] = []
|
||||
last_log = time.monotonic()
|
||||
|
||||
for line in channel.makefile("r", -1):
|
||||
line = line.rstrip("\n")
|
||||
if "\t" not in line:
|
||||
@@ -137,9 +189,21 @@ def _find_remote_zips(transport: paramiko.Transport, remote_path: str) -> list[R
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
now = time.monotonic()
|
||||
if now - last_log >= 30:
|
||||
log.info("Find in progress: %d zip(s) found so far ...", len(zips))
|
||||
last_log = now
|
||||
|
||||
stderr_out = channel.makefile_stderr("r", -1).read().strip()
|
||||
if stderr_out:
|
||||
log.warning("find stderr: %s", stderr_out[:500])
|
||||
channel.recv_exit_status()
|
||||
channel.close()
|
||||
return zips
|
||||
|
||||
|
||||
def _scan_cutoff(last_scan: str) -> str:
|
||||
"""Subtract 5-minute safety buffer from last-scan timestamp to handle clock skew."""
|
||||
dt = datetime.strptime(last_scan, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
||||
dt -= timedelta(minutes=5)
|
||||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
Reference in New Issue
Block a user