sync errors
This commit is contained in:
@@ -1,8 +1,7 @@
|
|||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
import posixpath
|
import shlex
|
||||||
import socket
|
import socket
|
||||||
import stat
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -81,22 +80,21 @@ def test_connection(cfg: SFTPConfig) -> tuple[bool, str]:
|
|||||||
|
|
||||||
def list_new_zips(cfg: SFTPConfig, max_results: int | None = None) -> list[RemoteZip]:
|
def list_new_zips(cfg: SFTPConfig, max_results: int | None = None) -> list[RemoteZip]:
|
||||||
transport = _make_transport(cfg)
|
transport = _make_transport(cfg)
|
||||||
sftp = paramiko.SFTPClient.from_transport(transport)
|
|
||||||
try:
|
try:
|
||||||
log.info("Walking remote directory tree from %s ...", cfg.remote_path)
|
all_zips = _find_remote_zips(transport, cfg.remote_path)
|
||||||
|
log.info("Remote scan done: %d zip(s) found", len(all_zips))
|
||||||
|
|
||||||
new_zips: list[RemoteZip] = []
|
new_zips: list[RemoteZip] = []
|
||||||
total_seen = 0
|
for zip_info in all_zips:
|
||||||
for zip_info in _walk_zips_iter(sftp, cfg.remote_path):
|
|
||||||
total_seen += 1
|
|
||||||
if not db.is_zip_processed(zip_info.remote_path):
|
if not db.is_zip_processed(zip_info.remote_path):
|
||||||
new_zips.append(zip_info)
|
new_zips.append(zip_info)
|
||||||
if max_results and len(new_zips) >= max_results:
|
if max_results and len(new_zips) >= max_results:
|
||||||
log.info("Reached limit of %d — stopping walk early", max_results)
|
log.info("Reached limit of %d", max_results)
|
||||||
break
|
break
|
||||||
log.info("Remote walk done: %d zip(s) seen, %d new", total_seen, len(new_zips))
|
|
||||||
|
log.info("%d new zip(s) to process", len(new_zips))
|
||||||
return new_zips
|
return new_zips
|
||||||
finally:
|
finally:
|
||||||
sftp.close()
|
|
||||||
transport.close()
|
transport.close()
|
||||||
|
|
||||||
|
|
||||||
@@ -115,24 +113,27 @@ def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path:
|
|||||||
return local_path
|
return local_path
|
||||||
|
|
||||||
|
|
||||||
def _walk_zips_iter(sftp: paramiko.SFTPClient, remote_dir: str):
|
def _find_remote_zips(transport: paramiko.Transport, remote_path: str) -> list[RemoteZip]:
|
||||||
log.info("Listing %s ...", remote_dir)
|
"""Single SSH exec: find all .zip files server-side. Vastly faster than per-directory SFTP calls."""
|
||||||
try:
|
channel = transport.open_session()
|
||||||
entries = sftp.listdir_attr(remote_dir)
|
cmd = f"find {shlex.quote(remote_path)} -type f -iname '*.zip' -printf '%s\\t%p\\n'"
|
||||||
except IOError as e:
|
log.info("Running remote find under %s ...", remote_path)
|
||||||
log.warning("Cannot list %s: %s", remote_dir, e)
|
channel.exec_command(cmd)
|
||||||
return
|
|
||||||
|
|
||||||
subdirs = []
|
zips: list[RemoteZip] = []
|
||||||
zips_here = 0
|
for line in channel.makefile("r", -1):
|
||||||
for entry in entries:
|
line = line.rstrip("\n")
|
||||||
full_path = posixpath.join(remote_dir, entry.filename)
|
if "\t" not in line:
|
||||||
if stat.S_ISDIR(entry.st_mode):
|
continue
|
||||||
subdirs.append(full_path)
|
size_str, path = line.split("\t", 1)
|
||||||
elif entry.filename.lower().endswith(".zip"):
|
try:
|
||||||
yield RemoteZip(remote_path=full_path, file_size=entry.st_size or 0)
|
zips.append(RemoteZip(remote_path=path, file_size=int(size_str)))
|
||||||
zips_here += 1
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
log.info(" %s: %d entries, %d zip(s), %d subdir(s)", remote_dir, len(entries), zips_here, len(subdirs))
|
stderr_out = channel.makefile_stderr("r", -1).read().strip()
|
||||||
for subdir in subdirs:
|
if stderr_out:
|
||||||
yield from _walk_zips_iter(sftp, subdir)
|
log.warning("find stderr: %s", stderr_out[:500])
|
||||||
|
channel.recv_exit_status()
|
||||||
|
channel.close()
|
||||||
|
return zips
|
||||||
|
|||||||
Reference in New Issue
Block a user