diff --git a/src/webdrop_bridge/core/updater.py b/src/webdrop_bridge/core/updater.py new file mode 100644 index 0000000..aa5d385 --- /dev/null +++ b/src/webdrop_bridge/core/updater.py @@ -0,0 +1,398 @@ +"""Auto-update system for WebDrop Bridge using Forgejo releases. + +This module manages checking for updates, downloading installers, and +verifying checksums from Forgejo releases. +""" + +import asyncio +import hashlib +import json +import logging +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional +from urllib.error import URLError +from urllib.request import urlopen + +logger = logging.getLogger(__name__) + + +@dataclass +class Release: + """Represents a Forgejo release.""" + + tag_name: str + name: str + version: str # Semantic version (e.g., "1.0.0") + body: str # Release notes/changelog + assets: list[dict] # List of {name, browser_download_url} + published_at: str # ISO format datetime + + +class UpdateManager: + """Manages auto-updates via Forgejo releases API.""" + + def __init__(self, current_version: str, config_dir: Optional[Path] = None): + """Initialize update manager. + + Args: + current_version: Current app version (e.g., "0.0.1") + config_dir: Directory for storing update cache. Defaults to temp. + """ + self.current_version = current_version + self.forgejo_url = "https://git.him-tools.de" + self.repo = "HIM-public/webdrop-bridge" + self.api_endpoint = ( + f"{self.forgejo_url}/api/v1/repos/{self.repo}/releases/latest" + ) + + # Cache management + self.cache_dir = config_dir or Path.home() / ".webdrop-bridge" + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.cache_file = self.cache_dir / "update_check.json" + self.cache_ttl = timedelta(hours=24) + + def _parse_version(self, version_str: str) -> tuple[int, int, int]: + """Parse semantic version string to tuple. + + Args: + version_str: Version string (e.g., "1.0.0" or "v1.0.0") + + Returns: + Tuple of (major, minor, patch) + + Raises: + ValueError: If version format is invalid + """ + # Remove 'v' prefix if present + version_str = version_str.lstrip("v") + + try: + parts = version_str.split(".") + if len(parts) != 3: + raise ValueError(f"Invalid version format: {version_str}") + return tuple(int(p) for p in parts) # type: ignore + except ValueError as e: + logger.error(f"Failed to parse version '{version_str}': {e}") + raise + + def _is_newer_version(self, latest_version: str) -> bool: + """Check if latest version is newer than current. + + Args: + latest_version: Latest version string + + Returns: + True if latest_version > current_version + """ + try: + current = self._parse_version(self.current_version) + latest = self._parse_version(latest_version) + return latest > current + except ValueError: + logger.error("Failed to compare versions") + return False + + def _load_cache(self) -> Optional[dict]: + """Load cached release info if valid. + + Returns: + Cached release dict if cache exists and is fresh, None otherwise + """ + if not self.cache_file.exists(): + return None + + try: + with open(self.cache_file) as f: + cached = json.load(f) + + # Check if cache is still valid + timestamp = datetime.fromisoformat(cached.get("timestamp", "")) + if datetime.now() - timestamp < self.cache_ttl: + logger.debug("Using cached release info") + return cached + else: + logger.debug("Cache expired") + self.cache_file.unlink() + return None + except (json.JSONDecodeError, ValueError) as e: + logger.warning(f"Failed to load cache: {e}") + self.cache_file.unlink() + return None + + def _save_cache(self, release_info: dict) -> None: + """Save release info to cache. + + Args: + release_info: Release information to cache + """ + try: + cache_data = { + "timestamp": datetime.now().isoformat(), + "release": release_info, + } + with open(self.cache_file, "w") as f: + json.dump(cache_data, f) + logger.debug("Cached release info") + except OSError as e: + logger.warning(f"Failed to save cache: {e}") + + async def check_for_updates(self) -> Optional[Release]: + """Check Forgejo API for latest release. + + Returns: + Release object if newer version available, None otherwise + """ + # Try cache first + cached = self._load_cache() + if cached: + release_data = cached.get("release") + if release_data: + version = release_data["tag_name"].lstrip("v") + if not self._is_newer_version(version): + logger.info("No newer version available (cached)") + return None + return Release(**release_data) + + # Fetch from API + try: + logger.info(f"Checking for updates from {self.api_endpoint}") + + # Run in thread pool to avoid blocking + loop = asyncio.get_event_loop() + response = await loop.run_in_executor( + None, self._fetch_release + ) + + if not response: + return None + + # Check if newer version + version = response["tag_name"].lstrip("v") + if not self._is_newer_version(version): + logger.info(f"Latest version {version} is not newer than {self.current_version}") + self._save_cache(response) + return None + + logger.info(f"New version available: {version}") + release = Release(**response) + self._save_cache(response) + return release + + except URLError as e: + logger.error(f"Network error checking updates: {e}") + return None + except Exception as e: + logger.error(f"Error checking for updates: {e}") + return None + + def _fetch_release(self) -> Optional[dict]: + """Fetch latest release from Forgejo API (blocking). + + Returns: + Release data dict or None on error + """ + try: + with urlopen(self.api_endpoint, timeout=10) as response: + data = json.loads(response.read()) + return { + "tag_name": data["tag_name"], + "name": data["name"], + "version": data["tag_name"].lstrip("v"), + "body": data["body"], + "assets": data.get("assets", []), + "published_at": data.get("published_at", ""), + } + except URLError as e: + logger.error(f"Failed to fetch release: {e}") + return None + + async def download_update( + self, release: Release, output_dir: Optional[Path] = None + ) -> Optional[Path]: + """Download installer from release assets. + + Args: + release: Release information + output_dir: Directory to save installer. Defaults to cache_dir. + + Returns: + Path to downloaded file or None on error + """ + if not release.assets: + logger.error("No assets found in release") + return None + + # Find .msi or .dmg file + installer_asset = None + for asset in release.assets: + if asset["name"].endswith((".msi", ".dmg")): + installer_asset = asset + break + + if not installer_asset: + logger.error("No installer found in release assets") + return None + + output_dir = output_dir or self.cache_dir + output_dir.mkdir(parents=True, exist_ok=True) + output_file = output_dir / installer_asset["name"] + + try: + logger.info(f"Downloading {installer_asset['name']}") + + # Run in thread pool to avoid blocking + loop = asyncio.get_event_loop() + success = await loop.run_in_executor( + None, + self._download_file, + installer_asset["browser_download_url"], + output_file, + ) + + if success: + logger.info(f"Downloaded to {output_file}") + return output_file + return None + + except Exception as e: + logger.error(f"Error downloading update: {e}") + if output_file.exists(): + output_file.unlink() + return None + + def _download_file(self, url: str, output_path: Path) -> bool: + """Download file from URL (blocking). + + Args: + url: URL to download from + output_path: Path to save file + + Returns: + True if successful, False otherwise + """ + try: + logger.debug(f"Downloading from {url}") + with urlopen(url, timeout=300) as response: # 5 min timeout + with open(output_path, "wb") as f: + f.write(response.read()) + logger.debug(f"Downloaded {output_path.stat().st_size} bytes") + return True + except URLError as e: + logger.error(f"Download failed: {e}") + return False + + async def verify_checksum( + self, file_path: Path, release: Release + ) -> bool: + """Verify file checksum against release checksum file. + + Args: + file_path: Path to downloaded installer + release: Release information + + Returns: + True if checksum matches, False otherwise + """ + # Find .sha256 file in release assets + checksum_asset = None + for asset in release.assets: + if asset["name"].endswith(".sha256"): + checksum_asset = asset + break + + if not checksum_asset: + logger.warning("No checksum file found in release") + return True # Continue anyway + + try: + logger.info("Verifying checksum...") + + # Download checksum file + loop = asyncio.get_event_loop() + checksum_content = await loop.run_in_executor( + None, + self._download_checksum, + checksum_asset["browser_download_url"], + ) + + if not checksum_content: + logger.warning("Failed to download checksum") + return False + + # Calculate file checksum + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256_hash.update(chunk) + + file_checksum = sha256_hash.hexdigest() + expected_checksum = checksum_content.strip() + + if file_checksum == expected_checksum: + logger.info("Checksum verification passed") + return True + else: + logger.error( + f"Checksum mismatch: {file_checksum} != {expected_checksum}" + ) + return False + + except Exception as e: + logger.error(f"Error verifying checksum: {e}") + return False + + def _download_checksum(self, url: str) -> Optional[str]: + """Download checksum file (blocking). + + Args: + url: URL to checksum file + + Returns: + Checksum content or None on error + """ + try: + with urlopen(url, timeout=10) as response: + return response.read().decode().strip() + except URLError as e: + logger.error(f"Failed to download checksum: {e}") + return None + + def install_update(self, installer_path: Path) -> bool: + """Launch installer for update. + + Args: + installer_path: Path to installer executable + + Returns: + True if installer launched, False otherwise + + Note: + The actual installation and restart are handled by the installer. + """ + if not installer_path.exists(): + logger.error(f"Installer not found: {installer_path}") + return False + + try: + import platform + import subprocess + + if platform.system() == "Windows": + # Windows: Run MSI installer + logger.info(f"Launching installer: {installer_path}") + subprocess.Popen([str(installer_path)]) + return True + elif platform.system() == "Darwin": + # macOS: Mount DMG and run installer + logger.info(f"Launching DMG: {installer_path}") + subprocess.Popen(["open", str(installer_path)]) + return True + else: + logger.error(f"Unsupported platform: {platform.system()}") + return False + + except Exception as e: + logger.error(f"Failed to launch installer: {e}") + return False diff --git a/src/webdrop_bridge/ui/update_manager_ui.py b/src/webdrop_bridge/ui/update_manager_ui.py index bd487ed..7a45e13 100644 --- a/src/webdrop_bridge/ui/update_manager_ui.py +++ b/src/webdrop_bridge/ui/update_manager_ui.py @@ -15,13 +15,13 @@ from pathlib import Path from PySide6.QtCore import Qt, Signal from PySide6.QtWidgets import ( QDialog, - QLabel, - QPushButton, - QProgressBar, - QVBoxLayout, QHBoxLayout, - QTextEdit, + QLabel, QMessageBox, + QProgressBar, + QPushButton, + QTextEdit, + QVBoxLayout, ) logger = logging.getLogger(__name__) diff --git a/tests/unit/test_update_manager_ui.py b/tests/unit/test_update_manager_ui.py index d56e2e5..e1c5f6a 100644 --- a/tests/unit/test_update_manager_ui.py +++ b/tests/unit/test_update_manager_ui.py @@ -1,17 +1,17 @@ """Tests for the update manager UI dialogs.""" import pytest -from PySide6.QtWidgets import QApplication, QMessageBox -from PySide6.QtTest import QTest from PySide6.QtCore import Qt +from PySide6.QtTest import QTest +from PySide6.QtWidgets import QApplication, QMessageBox from webdrop_bridge.ui.update_manager_ui import ( CheckingDialog, - UpdateAvailableDialog, DownloadingDialog, + ErrorDialog, InstallDialog, NoUpdateDialog, - ErrorDialog, + UpdateAvailableDialog, )