"""Auto-update system for WebDrop Bridge using Forgejo releases. This module manages checking for updates, downloading installers, and verifying checksums from Forgejo releases. """ import asyncio import hashlib import json import logging import socket from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Optional from urllib.error import URLError from urllib.request import urlopen logger = logging.getLogger(__name__) @dataclass class Release: """Represents a Forgejo release.""" tag_name: str name: str version: str # Semantic version (e.g., "1.0.0") body: str # Release notes/changelog assets: list[dict] # List of {name, browser_download_url} published_at: str # ISO format datetime class UpdateManager: """Manages auto-updates via Forgejo releases API.""" def __init__(self, current_version: str, config_dir: Optional[Path] = None): """Initialize update manager. Args: current_version: Current app version (e.g., "0.0.1") config_dir: Directory for storing update cache. Defaults to temp. """ self.current_version = current_version self.forgejo_url = "https://git.him-tools.de" self.repo = "HIM-public/webdrop-bridge" self.api_endpoint = f"{self.forgejo_url}/api/v1/repos/{self.repo}/releases/latest" # Cache management self.cache_dir = config_dir or Path.home() / ".webdrop-bridge" self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_file = self.cache_dir / "update_check.json" self.cache_ttl = timedelta(hours=24) def _parse_version(self, version_str: str) -> tuple[int, int, int]: """Parse semantic version string to tuple. Args: version_str: Version string (e.g., "1.0.0" or "v1.0.0") Returns: Tuple of (major, minor, patch) Raises: ValueError: If version format is invalid """ # Remove 'v' prefix if present version_str = version_str.lstrip("v") try: parts = version_str.split(".") if len(parts) != 3: raise ValueError(f"Invalid version format: {version_str}") return tuple(int(p) for p in parts) # type: ignore except ValueError as e: logger.error(f"Failed to parse version '{version_str}': {e}") raise def _is_newer_version(self, latest_version: str) -> bool: """Check if latest version is newer than current. Args: latest_version: Latest version string Returns: True if latest_version > current_version """ try: current = self._parse_version(self.current_version) latest = self._parse_version(latest_version) return latest > current except ValueError: logger.error("Failed to compare versions") return False def _load_cache(self) -> Optional[dict]: """Load cached release info if valid. Returns: Cached release dict if cache exists and is fresh, None otherwise """ if not self.cache_file.exists(): return None try: with open(self.cache_file) as f: cached = json.load(f) # Check if cache is still valid timestamp = datetime.fromisoformat(cached.get("timestamp", "")) if datetime.now() - timestamp < self.cache_ttl: logger.debug("Using cached release info") return cached else: logger.debug("Cache expired") self.cache_file.unlink() return None except (json.JSONDecodeError, ValueError) as e: logger.warning(f"Failed to load cache: {e}") self.cache_file.unlink() return None def _save_cache(self, release_info: dict) -> None: """Save release info to cache. Args: release_info: Release information to cache """ try: cache_data = { "timestamp": datetime.now().isoformat(), "release": release_info, } with open(self.cache_file, "w") as f: json.dump(cache_data, f) logger.debug("Cached release info") except OSError as e: logger.warning(f"Failed to save cache: {e}") async def check_for_updates(self) -> Optional[Release]: """Check Forgejo API for latest release. Returns: Release object if newer version available, None otherwise """ logger.debug(f"check_for_updates() called, current version: {self.current_version}") # Only use cache when a pending update was already found (avoids # showing the update dialog on every start). "No update" is never # cached so that a freshly published release is visible immediately. logger.debug("Checking cache for pending update...") cached = self._load_cache() if cached: release_data = cached.get("release") if release_data: version = release_data["tag_name"].lstrip("v") logger.debug(f"Cached pending update version: {version}") if self._is_newer_version(version): logger.info(f"Returning cached pending update: {version}") return Release(**release_data) else: # Current version is >= cached release (e.g. already updated) logger.debug("Cached release is no longer newer — discarding cache") self.cache_file.unlink(missing_ok=True) # Always fetch fresh from API so new releases are seen immediately logger.debug("Fetching from API...") try: logger.info(f"Checking for updates from {self.api_endpoint}") loop = asyncio.get_event_loop() response = await asyncio.wait_for( loop.run_in_executor(None, self._fetch_release), timeout=8, ) if not response: return None version = response["tag_name"].lstrip("v") if not self._is_newer_version(version): logger.info(f"Latest version {version} is not newer than {self.current_version}") return None # Cache the found update so repeated starts don't hammer the API logger.info(f"New version available: {version}") release = Release(**response) self._save_cache(response) return release except asyncio.TimeoutError: logger.warning("Update check timed out - API server not responding") return None except Exception as e: logger.error(f"Error checking for updates: {e}") return None def _fetch_release(self) -> Optional[dict]: """Fetch latest release from Forgejo API (blocking). Returns: Release data dict or None on error """ try: logger.debug(f"Fetching release from {self.api_endpoint}") # Set socket timeout to prevent hanging old_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(5) try: logger.debug("Opening URL connection...") with urlopen(self.api_endpoint, timeout=5) as response: logger.debug(f"Response status: {response.status}, reading data...") response_data = response.read() logger.debug(f"Read {len(response_data)} bytes, parsing JSON...") data = json.loads(response_data) logger.info(f"Successfully fetched release: {data.get('tag_name', 'unknown')}") return { "tag_name": data["tag_name"], "name": data["name"], "version": data["tag_name"].lstrip("v"), "body": data["body"], "assets": data.get("assets", []), "published_at": data.get("published_at", ""), } finally: socket.setdefaulttimeout(old_timeout) except socket.timeout as e: logger.error(f"Socket timeout (5s) connecting to {self.api_endpoint}") return None except Exception as e: logger.error(f"Failed to fetch release: {type(e).__name__}: {e}") import traceback logger.debug(traceback.format_exc()) return None async def download_update( self, release: Release, output_dir: Optional[Path] = None, progress_callback=None ) -> Optional[Path]: """Download installer from release assets. Args: release: Release information output_dir: Directory to save installer. Defaults to cache_dir. Returns: Path to downloaded file or None on error """ if not release.assets: logger.error("No assets found in release") return None # Find .msi or .dmg file installer_asset = None for asset in release.assets: if asset["name"].endswith((".msi", ".dmg")): installer_asset = asset break if not installer_asset: logger.error("No installer found in release assets") return None output_dir = output_dir or self.cache_dir output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / installer_asset["name"] try: logger.info(f"Downloading {installer_asset['name']}") # Run in thread pool with 5-minute timeout for large files loop = asyncio.get_event_loop() success = await asyncio.wait_for( loop.run_in_executor( None, self._download_file, installer_asset["browser_download_url"], output_file, progress_callback, ), timeout=300, ) if success: logger.info(f"Downloaded to {output_file}") return output_file return None except asyncio.TimeoutError: logger.error(f"Download timed out: {installer_asset['name']}") if output_file.exists(): output_file.unlink() return None except Exception as e: logger.error(f"Error downloading update: {e}") if output_file.exists(): output_file.unlink() return None def _download_file(self, url: str, output_path: Path, progress_callback=None) -> bool: """Download file from URL (blocking). Args: url: URL to download from output_path: Path to save file progress_callback: Optional callable(bytes_downloaded, total_bytes) Returns: True if successful, False otherwise """ try: logger.debug(f"Downloading from {url}") with urlopen(url, timeout=300) as response: # 5 min timeout total = int(response.headers.get("Content-Length", 0)) downloaded = 0 chunk_size = 65536 # 64 KB chunks with open(output_path, "wb") as f: while True: chunk = response.read(chunk_size) if not chunk: break f.write(chunk) downloaded += len(chunk) if progress_callback: try: progress_callback(downloaded, total) except Exception: pass # Never let progress errors abort the download logger.debug(f"Downloaded {output_path.stat().st_size} bytes") return True except URLError as e: logger.error(f"Download failed: {e}") return False async def verify_checksum(self, file_path: Path, release: Release) -> bool: """Verify file checksum against release checksum file. Args: file_path: Path to downloaded installer release: Release information Returns: True if checksum matches, False otherwise """ # Find .sha256 file matching the installer name (e.g. Setup.msi.sha256) # Fall back to any .sha256 only if no specific match exists installer_name = file_path.name checksum_asset = None for asset in release.assets: if asset["name"] == f"{installer_name}.sha256": checksum_asset = asset break if not checksum_asset: logger.warning("No checksum file found in release") return True # Continue anyway try: logger.info("Verifying checksum...") # Download checksum file with 30 second timeout loop = asyncio.get_event_loop() checksum_content = await asyncio.wait_for( loop.run_in_executor( None, self._download_checksum, checksum_asset["browser_download_url"], ), timeout=30, ) if not checksum_content: logger.warning("Failed to download checksum") return False # Calculate file checksum sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha256_hash.update(chunk) file_checksum = sha256_hash.hexdigest() expected_checksum = checksum_content.strip() if file_checksum == expected_checksum: logger.info("Checksum verification passed") return True else: logger.error(f"Checksum mismatch: {file_checksum} != {expected_checksum}") return False except asyncio.TimeoutError: logger.error("Checksum verification timed out") return False except Exception as e: logger.error(f"Error verifying checksum: {e}") return False def _download_checksum(self, url: str) -> Optional[str]: """Download checksum file (blocking). Args: url: URL to checksum file Returns: Checksum content or None on error """ try: with urlopen(url, timeout=10) as response: return response.read().decode().strip() except URLError as e: logger.error(f"Failed to download checksum: {e}") return None def install_update(self, installer_path: Path) -> bool: """Launch installer for update. Args: installer_path: Path to installer executable Returns: True if installer launched, False otherwise Note: The actual installation and restart are handled by the installer. """ if not installer_path.exists(): logger.error(f"Installer not found: {installer_path}") return False try: import platform import subprocess if platform.system() == "Windows": # Windows: MSI files must be launched via msiexec logger.info(f"Launching installer: {installer_path}") if str(installer_path).lower().endswith(".msi"): subprocess.Popen(["msiexec.exe", "/i", str(installer_path)]) else: subprocess.Popen([str(installer_path)]) return True elif platform.system() == "Darwin": # macOS: Mount DMG and run installer logger.info(f"Launching DMG: {installer_path}") subprocess.Popen(["open", str(installer_path)]) return True else: logger.error(f"Unsupported platform: {platform.system()}") return False except Exception as e: logger.error(f"Failed to launch installer: {e}") return False