168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
"""
|
|
uploader.py
|
|
|
|
Daemon that watches the photos folder for new files and uploads them
|
|
via HTTP POST (default) or S3 (optional, requires boto3). Implements
|
|
retry with exponential backoff and stores upload state in a small JSON DB.
|
|
"""
|
|
import threading
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict
|
|
import requests
|
|
import logging
|
|
try:
|
|
import boto3
|
|
_HAS_BOTO3 = True
|
|
except Exception:
|
|
_HAS_BOTO3 = False
|
|
|
|
from Core.Logger import Logs
|
|
from Core import settings as config
|
|
|
|
sanad_logger = Logs()
|
|
sanad_logger.LogEngine("G1_Logs", "uploader")
|
|
|
|
|
|
class UploadDaemon:
|
|
def __init__(self, photos_dir: Path = config.PHOTOS_DIR, poll_s: float = 2.0):
|
|
self.photos_dir = Path(photos_dir)
|
|
self.poll_s = float(poll_s)
|
|
self._stop = threading.Event()
|
|
self._thread = None
|
|
self._db_path = Path(config.UPLOAD_DB)
|
|
self._state = self._load_db()
|
|
|
|
def _load_db(self) -> Dict[str, Dict]:
|
|
try:
|
|
if self._db_path.exists():
|
|
return json.loads(self._db_path.read_text())
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def _save_db(self):
|
|
try:
|
|
self._db_path.write_text(json.dumps(self._state, indent=2))
|
|
except Exception as e:
|
|
sanad_logger.print_and_log(f"Failed to write upload db: {e}", "warning")
|
|
|
|
def start(self):
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._stop.clear()
|
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
|
self._thread.start()
|
|
sanad_logger.print_and_log("📤 Upload daemon started.", "info")
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=1.0)
|
|
|
|
def _should_upload(self, path: Path) -> bool:
|
|
s = self._state.get(str(path), {})
|
|
return not s.get("uploaded", False)
|
|
|
|
def _mark_uploaded(self, path: Path, info: Dict):
|
|
self._state[str(path)] = {"uploaded": True, "info": info, "ts": time.time()}
|
|
self._save_db()
|
|
|
|
def _upload_http(self, path: Path) -> Dict:
|
|
url = config.UPLOAD_URL
|
|
if not url:
|
|
raise RuntimeError("UPLOAD_URL is not configured")
|
|
with open(path, "rb") as f:
|
|
r = requests.post(url, files={"file": f}, timeout=30)
|
|
r.raise_for_status()
|
|
return {"status": "ok", "resp": r.text}
|
|
|
|
def _upload_s3(self, path: Path) -> Dict:
|
|
if not _HAS_BOTO3:
|
|
raise RuntimeError("boto3 not installed; install it to use S3 uploads")
|
|
bucket = config.UPLOAD_S3_BUCKET
|
|
if not bucket:
|
|
raise RuntimeError("UPLOAD_S3_BUCKET not configured")
|
|
|
|
# Build client using provided keys or default env/iam
|
|
kwargs = {}
|
|
if config.UPLOAD_S3_KEY and config.UPLOAD_S3_SECRET:
|
|
kwargs["aws_access_key_id"] = config.UPLOAD_S3_KEY
|
|
kwargs["aws_secret_access_key"] = config.UPLOAD_S3_SECRET
|
|
if config.UPLOAD_S3_REGION:
|
|
kwargs["region_name"] = config.UPLOAD_S3_REGION
|
|
|
|
s3 = boto3.client("s3", **kwargs)
|
|
key = path.name
|
|
try:
|
|
s3.upload_file(str(path), bucket, key)
|
|
return {"status": "ok", "bucket": bucket, "key": key}
|
|
except Exception:
|
|
raise
|
|
|
|
def _run(self):
|
|
# If upload destination is not configured, keep daemon alive but idle.
|
|
if config.UPLOAD_METHOD == "http" and not str(config.UPLOAD_URL or "").strip():
|
|
sanad_logger.print_and_log("📤 Upload daemon idle: UPLOAD_URL is not configured.", "warning")
|
|
while not self._stop.is_set():
|
|
time.sleep(max(2.0, self.poll_s))
|
|
return
|
|
if config.UPLOAD_METHOD == "s3" and not str(config.UPLOAD_S3_BUCKET or "").strip():
|
|
sanad_logger.print_and_log("📤 Upload daemon idle: UPLOAD_S3_BUCKET is not configured.", "warning")
|
|
while not self._stop.is_set():
|
|
time.sleep(max(2.0, self.poll_s))
|
|
return
|
|
|
|
while not self._stop.is_set():
|
|
try:
|
|
for p in sorted(self.photos_dir.glob("**/*")):
|
|
if not p.is_file():
|
|
continue
|
|
if not self._should_upload(p):
|
|
continue
|
|
|
|
# Exponential backoff per-file
|
|
attempts = 0
|
|
backoff = 1.0
|
|
success = False
|
|
while attempts < 5 and not success and not self._stop.is_set():
|
|
try:
|
|
sanad_logger.print_and_log(f"Uploading {p} (attempt {attempts+1})", "info")
|
|
if config.UPLOAD_METHOD == "http":
|
|
info = self._upload_http(p)
|
|
elif config.UPLOAD_METHOD == "s3":
|
|
info = self._upload_s3(p)
|
|
else:
|
|
raise NotImplementedError("UPLOAD_METHOD not implemented by daemon")
|
|
self._mark_uploaded(p, info)
|
|
sanad_logger.print_and_log(f"Uploaded {p}", "info")
|
|
success = True
|
|
except Exception as e:
|
|
attempts += 1
|
|
sanad_logger.print_and_log(f"Upload failed ({e}), backoff {backoff}s", "warning")
|
|
time.sleep(backoff)
|
|
backoff = min(30.0, backoff * 2)
|
|
|
|
time.sleep(self.poll_s)
|
|
except Exception as e:
|
|
sanad_logger.print_and_log(f"Upload loop error: {e}", "error")
|
|
time.sleep(5.0)
|
|
|
|
|
|
def start_uploader(photos_dir: Path = config.PHOTOS_DIR) -> UploadDaemon:
|
|
d = UploadDaemon(photos_dir)
|
|
d.start()
|
|
return d
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
d = UploadDaemon()
|
|
d.start()
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
d.stop()
|