feat: Implement auto-update system with Forgejo releases management

This commit is contained in:
claudi 2026-01-29 08:19:42 +01:00
parent b221ba8436
commit af8e417197
3 changed files with 407 additions and 9 deletions

View file

@ -0,0 +1,398 @@
"""Auto-update system for WebDrop Bridge using Forgejo releases.
This module manages checking for updates, downloading installers, and
verifying checksums from Forgejo releases.
"""
import asyncio
import hashlib
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import URLError
from urllib.request import urlopen
logger = logging.getLogger(__name__)
@dataclass
class Release:
"""Represents a Forgejo release."""
tag_name: str
name: str
version: str # Semantic version (e.g., "1.0.0")
body: str # Release notes/changelog
assets: list[dict] # List of {name, browser_download_url}
published_at: str # ISO format datetime
class UpdateManager:
"""Manages auto-updates via Forgejo releases API."""
def __init__(self, current_version: str, config_dir: Optional[Path] = None):
"""Initialize update manager.
Args:
current_version: Current app version (e.g., "0.0.1")
config_dir: Directory for storing update cache. Defaults to temp.
"""
self.current_version = current_version
self.forgejo_url = "https://git.him-tools.de"
self.repo = "HIM-public/webdrop-bridge"
self.api_endpoint = (
f"{self.forgejo_url}/api/v1/repos/{self.repo}/releases/latest"
)
# Cache management
self.cache_dir = config_dir or Path.home() / ".webdrop-bridge"
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file = self.cache_dir / "update_check.json"
self.cache_ttl = timedelta(hours=24)
def _parse_version(self, version_str: str) -> tuple[int, int, int]:
"""Parse semantic version string to tuple.
Args:
version_str: Version string (e.g., "1.0.0" or "v1.0.0")
Returns:
Tuple of (major, minor, patch)
Raises:
ValueError: If version format is invalid
"""
# Remove 'v' prefix if present
version_str = version_str.lstrip("v")
try:
parts = version_str.split(".")
if len(parts) != 3:
raise ValueError(f"Invalid version format: {version_str}")
return tuple(int(p) for p in parts) # type: ignore
except ValueError as e:
logger.error(f"Failed to parse version '{version_str}': {e}")
raise
def _is_newer_version(self, latest_version: str) -> bool:
"""Check if latest version is newer than current.
Args:
latest_version: Latest version string
Returns:
True if latest_version > current_version
"""
try:
current = self._parse_version(self.current_version)
latest = self._parse_version(latest_version)
return latest > current
except ValueError:
logger.error("Failed to compare versions")
return False
def _load_cache(self) -> Optional[dict]:
"""Load cached release info if valid.
Returns:
Cached release dict if cache exists and is fresh, None otherwise
"""
if not self.cache_file.exists():
return None
try:
with open(self.cache_file) as f:
cached = json.load(f)
# Check if cache is still valid
timestamp = datetime.fromisoformat(cached.get("timestamp", ""))
if datetime.now() - timestamp < self.cache_ttl:
logger.debug("Using cached release info")
return cached
else:
logger.debug("Cache expired")
self.cache_file.unlink()
return None
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Failed to load cache: {e}")
self.cache_file.unlink()
return None
def _save_cache(self, release_info: dict) -> None:
"""Save release info to cache.
Args:
release_info: Release information to cache
"""
try:
cache_data = {
"timestamp": datetime.now().isoformat(),
"release": release_info,
}
with open(self.cache_file, "w") as f:
json.dump(cache_data, f)
logger.debug("Cached release info")
except OSError as e:
logger.warning(f"Failed to save cache: {e}")
async def check_for_updates(self) -> Optional[Release]:
"""Check Forgejo API for latest release.
Returns:
Release object if newer version available, None otherwise
"""
# Try cache first
cached = self._load_cache()
if cached:
release_data = cached.get("release")
if release_data:
version = release_data["tag_name"].lstrip("v")
if not self._is_newer_version(version):
logger.info("No newer version available (cached)")
return None
return Release(**release_data)
# Fetch from API
try:
logger.info(f"Checking for updates from {self.api_endpoint}")
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None, self._fetch_release
)
if not response:
return None
# Check if newer version
version = response["tag_name"].lstrip("v")
if not self._is_newer_version(version):
logger.info(f"Latest version {version} is not newer than {self.current_version}")
self._save_cache(response)
return None
logger.info(f"New version available: {version}")
release = Release(**response)
self._save_cache(response)
return release
except URLError as e:
logger.error(f"Network error checking updates: {e}")
return None
except Exception as e:
logger.error(f"Error checking for updates: {e}")
return None
def _fetch_release(self) -> Optional[dict]:
"""Fetch latest release from Forgejo API (blocking).
Returns:
Release data dict or None on error
"""
try:
with urlopen(self.api_endpoint, timeout=10) as response:
data = json.loads(response.read())
return {
"tag_name": data["tag_name"],
"name": data["name"],
"version": data["tag_name"].lstrip("v"),
"body": data["body"],
"assets": data.get("assets", []),
"published_at": data.get("published_at", ""),
}
except URLError as e:
logger.error(f"Failed to fetch release: {e}")
return None
async def download_update(
self, release: Release, output_dir: Optional[Path] = None
) -> Optional[Path]:
"""Download installer from release assets.
Args:
release: Release information
output_dir: Directory to save installer. Defaults to cache_dir.
Returns:
Path to downloaded file or None on error
"""
if not release.assets:
logger.error("No assets found in release")
return None
# Find .msi or .dmg file
installer_asset = None
for asset in release.assets:
if asset["name"].endswith((".msi", ".dmg")):
installer_asset = asset
break
if not installer_asset:
logger.error("No installer found in release assets")
return None
output_dir = output_dir or self.cache_dir
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / installer_asset["name"]
try:
logger.info(f"Downloading {installer_asset['name']}")
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
success = await loop.run_in_executor(
None,
self._download_file,
installer_asset["browser_download_url"],
output_file,
)
if success:
logger.info(f"Downloaded to {output_file}")
return output_file
return None
except Exception as e:
logger.error(f"Error downloading update: {e}")
if output_file.exists():
output_file.unlink()
return None
def _download_file(self, url: str, output_path: Path) -> bool:
"""Download file from URL (blocking).
Args:
url: URL to download from
output_path: Path to save file
Returns:
True if successful, False otherwise
"""
try:
logger.debug(f"Downloading from {url}")
with urlopen(url, timeout=300) as response: # 5 min timeout
with open(output_path, "wb") as f:
f.write(response.read())
logger.debug(f"Downloaded {output_path.stat().st_size} bytes")
return True
except URLError as e:
logger.error(f"Download failed: {e}")
return False
async def verify_checksum(
self, file_path: Path, release: Release
) -> bool:
"""Verify file checksum against release checksum file.
Args:
file_path: Path to downloaded installer
release: Release information
Returns:
True if checksum matches, False otherwise
"""
# Find .sha256 file in release assets
checksum_asset = None
for asset in release.assets:
if asset["name"].endswith(".sha256"):
checksum_asset = asset
break
if not checksum_asset:
logger.warning("No checksum file found in release")
return True # Continue anyway
try:
logger.info("Verifying checksum...")
# Download checksum file
loop = asyncio.get_event_loop()
checksum_content = await loop.run_in_executor(
None,
self._download_checksum,
checksum_asset["browser_download_url"],
)
if not checksum_content:
logger.warning("Failed to download checksum")
return False
# Calculate file checksum
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256_hash.update(chunk)
file_checksum = sha256_hash.hexdigest()
expected_checksum = checksum_content.strip()
if file_checksum == expected_checksum:
logger.info("Checksum verification passed")
return True
else:
logger.error(
f"Checksum mismatch: {file_checksum} != {expected_checksum}"
)
return False
except Exception as e:
logger.error(f"Error verifying checksum: {e}")
return False
def _download_checksum(self, url: str) -> Optional[str]:
"""Download checksum file (blocking).
Args:
url: URL to checksum file
Returns:
Checksum content or None on error
"""
try:
with urlopen(url, timeout=10) as response:
return response.read().decode().strip()
except URLError as e:
logger.error(f"Failed to download checksum: {e}")
return None
def install_update(self, installer_path: Path) -> bool:
"""Launch installer for update.
Args:
installer_path: Path to installer executable
Returns:
True if installer launched, False otherwise
Note:
The actual installation and restart are handled by the installer.
"""
if not installer_path.exists():
logger.error(f"Installer not found: {installer_path}")
return False
try:
import platform
import subprocess
if platform.system() == "Windows":
# Windows: Run MSI installer
logger.info(f"Launching installer: {installer_path}")
subprocess.Popen([str(installer_path)])
return True
elif platform.system() == "Darwin":
# macOS: Mount DMG and run installer
logger.info(f"Launching DMG: {installer_path}")
subprocess.Popen(["open", str(installer_path)])
return True
else:
logger.error(f"Unsupported platform: {platform.system()}")
return False
except Exception as e:
logger.error(f"Failed to launch installer: {e}")
return False