Initial commit
This commit is contained in:
@@ -0,0 +1,75 @@
|
||||
import io
|
||||
import logging
|
||||
import posixpath
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import paramiko
|
||||
|
||||
import db
|
||||
from config import SFTPConfig
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RemoteZip:
|
||||
remote_path: str
|
||||
file_size: int
|
||||
|
||||
|
||||
def _make_transport(cfg: SFTPConfig) -> paramiko.Transport:
|
||||
transport = paramiko.Transport((cfg.host, cfg.port))
|
||||
if cfg.auth_method == "key" and cfg.key:
|
||||
key = paramiko.RSAKey.from_private_key(io.StringIO(cfg.key))
|
||||
transport.connect(username=cfg.user, pkey=key)
|
||||
else:
|
||||
transport.connect(username=cfg.user, password=cfg.password)
|
||||
return transport
|
||||
|
||||
|
||||
def list_new_zips(cfg: SFTPConfig) -> list[RemoteZip]:
|
||||
transport = _make_transport(cfg)
|
||||
sftp = paramiko.SFTPClient.from_transport(transport)
|
||||
try:
|
||||
all_zips = _walk_zips(sftp, cfg.remote_path)
|
||||
new_zips = [z for z in all_zips if not db.is_zip_processed(z.remote_path)]
|
||||
log.info("Remote: %d zip(s) total, %d new", len(all_zips), len(new_zips))
|
||||
return new_zips
|
||||
finally:
|
||||
sftp.close()
|
||||
transport.close()
|
||||
|
||||
|
||||
def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path:
|
||||
dest = Path(dest_dir)
|
||||
dest.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
local_path = dest / Path(remote_zip.remote_path).name
|
||||
transport = _make_transport(cfg)
|
||||
sftp = paramiko.SFTPClient.from_transport(transport)
|
||||
try:
|
||||
log.info("Downloading %s → %s", remote_zip.remote_path, local_path)
|
||||
sftp.get(remote_zip.remote_path, str(local_path))
|
||||
finally:
|
||||
sftp.close()
|
||||
transport.close()
|
||||
return local_path
|
||||
|
||||
|
||||
def _walk_zips(sftp: paramiko.SFTPClient, remote_dir: str) -> list[RemoteZip]:
|
||||
results: list[RemoteZip] = []
|
||||
try:
|
||||
entries = sftp.listdir_attr(remote_dir)
|
||||
except IOError as e:
|
||||
log.warning("Cannot list %s: %s", remote_dir, e)
|
||||
return results
|
||||
|
||||
for entry in entries:
|
||||
full_path = posixpath.join(remote_dir, entry.filename)
|
||||
import stat
|
||||
if stat.S_ISDIR(entry.st_mode):
|
||||
results.extend(_walk_zips(sftp, full_path))
|
||||
elif entry.filename.lower().endswith(".zip"):
|
||||
results.append(RemoteZip(remote_path=full_path, file_size=entry.st_size or 0))
|
||||
return results
|
||||
Reference in New Issue
Block a user