""" uploader.py Daemon that watches the photos folder for new files and uploads them via HTTP POST (default) or S3 (optional, requires boto3). Implements retry with exponential backoff and stores upload state in a small JSON DB. """ import threading import time import json from pathlib import Path from typing import Dict import requests import logging try: import boto3 _HAS_BOTO3 = True except Exception: _HAS_BOTO3 = False from Core.Logger import Logs from Core import settings as config sanad_logger = Logs() sanad_logger.LogEngine("G1_Logs", "uploader") class UploadDaemon: def __init__(self, photos_dir: Path = config.PHOTOS_DIR, poll_s: float = 2.0): self.photos_dir = Path(photos_dir) self.poll_s = float(poll_s) self._stop = threading.Event() self._thread = None self._db_path = Path(config.UPLOAD_DB) self._state = self._load_db() def _load_db(self) -> Dict[str, Dict]: try: if self._db_path.exists(): return json.loads(self._db_path.read_text()) except Exception: pass return {} def _save_db(self): try: self._db_path.write_text(json.dumps(self._state, indent=2)) except Exception as e: sanad_logger.print_and_log(f"Failed to write upload db: {e}", "warning") def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() sanad_logger.print_and_log("📤 Upload daemon started.", "info") def stop(self): self._stop.set() if self._thread: self._thread.join(timeout=1.0) def _should_upload(self, path: Path) -> bool: s = self._state.get(str(path), {}) return not s.get("uploaded", False) def _mark_uploaded(self, path: Path, info: Dict): self._state[str(path)] = {"uploaded": True, "info": info, "ts": time.time()} self._save_db() def _upload_http(self, path: Path) -> Dict: url = config.UPLOAD_URL if not url: raise RuntimeError("UPLOAD_URL is not configured") with open(path, "rb") as f: r = requests.post(url, files={"file": f}, timeout=30) r.raise_for_status() return {"status": "ok", "resp": r.text} def _upload_s3(self, path: Path) -> Dict: if not _HAS_BOTO3: raise RuntimeError("boto3 not installed; install it to use S3 uploads") bucket = config.UPLOAD_S3_BUCKET if not bucket: raise RuntimeError("UPLOAD_S3_BUCKET not configured") # Build client using provided keys or default env/iam kwargs = {} if config.UPLOAD_S3_KEY and config.UPLOAD_S3_SECRET: kwargs["aws_access_key_id"] = config.UPLOAD_S3_KEY kwargs["aws_secret_access_key"] = config.UPLOAD_S3_SECRET if config.UPLOAD_S3_REGION: kwargs["region_name"] = config.UPLOAD_S3_REGION s3 = boto3.client("s3", **kwargs) key = path.name try: s3.upload_file(str(path), bucket, key) return {"status": "ok", "bucket": bucket, "key": key} except Exception: raise def _run(self): # If upload destination is not configured, keep daemon alive but idle. if config.UPLOAD_METHOD == "http" and not str(config.UPLOAD_URL or "").strip(): sanad_logger.print_and_log("📤 Upload daemon idle: UPLOAD_URL is not configured.", "warning") while not self._stop.is_set(): time.sleep(max(2.0, self.poll_s)) return if config.UPLOAD_METHOD == "s3" and not str(config.UPLOAD_S3_BUCKET or "").strip(): sanad_logger.print_and_log("📤 Upload daemon idle: UPLOAD_S3_BUCKET is not configured.", "warning") while not self._stop.is_set(): time.sleep(max(2.0, self.poll_s)) return while not self._stop.is_set(): try: for p in sorted(self.photos_dir.glob("**/*")): if not p.is_file(): continue if not self._should_upload(p): continue # Exponential backoff per-file attempts = 0 backoff = 1.0 success = False while attempts < 5 and not success and not self._stop.is_set(): try: sanad_logger.print_and_log(f"Uploading {p} (attempt {attempts+1})", "info") if config.UPLOAD_METHOD == "http": info = self._upload_http(p) elif config.UPLOAD_METHOD == "s3": info = self._upload_s3(p) else: raise NotImplementedError("UPLOAD_METHOD not implemented by daemon") self._mark_uploaded(p, info) sanad_logger.print_and_log(f"Uploaded {p}", "info") success = True except Exception as e: attempts += 1 sanad_logger.print_and_log(f"Upload failed ({e}), backoff {backoff}s", "warning") time.sleep(backoff) backoff = min(30.0, backoff * 2) time.sleep(self.poll_s) except Exception as e: sanad_logger.print_and_log(f"Upload loop error: {e}", "error") time.sleep(5.0) def start_uploader(photos_dir: Path = config.PHOTOS_DIR) -> UploadDaemon: d = UploadDaemon(photos_dir) d.start() return d if __name__ == "__main__": logging.basicConfig(level=logging.INFO) d = UploadDaemon() d.start() try: while True: time.sleep(1) except KeyboardInterrupt: d.stop()