Added test and small fixes

This commit is contained in:
2026-05-10 16:00:58 +02:00
parent e9ec445445
commit 31bd274824
5 changed files with 126 additions and 10 deletions
+44 -3
View File
@@ -1,6 +1,7 @@
import io
import logging
import posixpath
import stat
from dataclasses import dataclass
from pathlib import Path
@@ -11,6 +12,13 @@ from config import SFTPConfig
log = logging.getLogger(__name__)
_KEY_CLASSES = [
paramiko.Ed25519Key,
paramiko.RSAKey,
paramiko.ECDSAKey,
paramiko.DSSKey,
]
@dataclass
class RemoteZip:
@@ -18,16 +26,51 @@ class RemoteZip:
file_size: int
def _load_private_key(pem: str) -> paramiko.PKey:
for cls in _KEY_CLASSES:
try:
return cls.from_private_key(io.StringIO(pem))
except Exception:
continue
raise ValueError("Could not parse private key — unsupported format or bad PEM data")
def get_key_fingerprint(pem: str) -> str | None:
if not pem.strip():
return None
try:
key = _load_private_key(pem)
fp = ":".join(f"{b:02x}" for b in key.get_fingerprint())
return f"{key.get_name()} MD5:{fp}"
except Exception as e:
return f"Invalid key: {e}"
def _make_transport(cfg: SFTPConfig) -> paramiko.Transport:
transport = paramiko.Transport((cfg.host, cfg.port))
if cfg.auth_method == "key" and cfg.key:
key = paramiko.RSAKey.from_private_key(io.StringIO(cfg.key))
key = _load_private_key(cfg.key)
transport.connect(username=cfg.user, pkey=key)
else:
transport.connect(username=cfg.user, password=cfg.password)
return transport
def test_connection(cfg: SFTPConfig) -> tuple[bool, str]:
try:
transport = _make_transport(cfg)
sftp = paramiko.SFTPClient.from_transport(transport)
try:
entries = sftp.listdir(cfg.remote_path)
zip_count = sum(1 for e in entries if e.lower().endswith(".zip"))
return True, f"Connected to {cfg.host}. {len(entries)} item(s) in {cfg.remote_path} ({zip_count} zip file(s) at top level)."
finally:
sftp.close()
transport.close()
except Exception as e:
return False, str(e)
def list_new_zips(cfg: SFTPConfig) -> list[RemoteZip]:
transport = _make_transport(cfg)
sftp = paramiko.SFTPClient.from_transport(transport)
@@ -44,7 +87,6 @@ def list_new_zips(cfg: SFTPConfig) -> list[RemoteZip]:
def download(cfg: SFTPConfig, remote_zip: RemoteZip, dest_dir: str) -> Path:
dest = Path(dest_dir)
dest.mkdir(parents=True, exist_ok=True)
local_path = dest / Path(remote_zip.remote_path).name
transport = _make_transport(cfg)
sftp = paramiko.SFTPClient.from_transport(transport)
@@ -67,7 +109,6 @@ def _walk_zips(sftp: paramiko.SFTPClient, remote_dir: str) -> list[RemoteZip]:
for entry in entries:
full_path = posixpath.join(remote_dir, entry.filename)
import stat
if stat.S_ISDIR(entry.st_mode):
results.extend(_walk_zips(sftp, full_path))
elif entry.filename.lower().endswith(".zip"):