feat: implement brand-specific configuration and update management for Agravity Bridge

This commit is contained in:
claudi 2026-03-10 16:02:24 +01:00
parent baf56e040f
commit b988532aaa
9 changed files with 461 additions and 48 deletions

View file

@ -101,14 +101,13 @@ class ConfigValidator:
class ConfigProfile:
"""Manages named configuration profiles.
Profiles are stored in ~/.webdrop_bridge/profiles/ directory as JSON files.
Profiles are stored in the brand-specific app config directory.
"""
PROFILES_DIR = Path.home() / ".webdrop_bridge" / "profiles"
def __init__(self) -> None:
def __init__(self, config_dir_name: str = "webdrop_bridge") -> None:
"""Initialize profile manager."""
self.PROFILES_DIR.mkdir(parents=True, exist_ok=True)
self.profiles_dir = Config.get_default_config_path(config_dir_name).parent / "profiles"
self.profiles_dir.mkdir(parents=True, exist_ok=True)
def save_profile(self, profile_name: str, config: Config) -> Path:
"""Save configuration as a named profile.
@ -126,7 +125,7 @@ class ConfigProfile:
if not profile_name or "/" in profile_name or "\\" in profile_name:
raise ConfigurationError(f"Invalid profile name: {profile_name}")
profile_path = self.PROFILES_DIR / f"{profile_name}.json"
profile_path = self.profiles_dir / f"{profile_name}.json"
config_data = {
"app_name": config.app_name,
@ -160,7 +159,7 @@ class ConfigProfile:
Raises:
ConfigurationError: If profile not found or invalid
"""
profile_path = self.PROFILES_DIR / f"{profile_name}.json"
profile_path = self.profiles_dir / f"{profile_name}.json"
if not profile_path.exists():
raise ConfigurationError(f"Profile not found: {profile_name}")
@ -179,10 +178,10 @@ class ConfigProfile:
Returns:
List of profile names (without .json extension)
"""
if not self.PROFILES_DIR.exists():
if not self.profiles_dir.exists():
return []
return sorted([p.stem for p in self.PROFILES_DIR.glob("*.json")])
return sorted([p.stem for p in self.profiles_dir.glob("*.json")])
def delete_profile(self, profile_name: str) -> None:
"""Delete a profile.
@ -193,7 +192,7 @@ class ConfigProfile:
Raises:
ConfigurationError: If profile not found
"""
profile_path = self.PROFILES_DIR / f"{profile_name}.json"
profile_path = self.profiles_dir / f"{profile_name}.json"
if not profile_path.exists():
raise ConfigurationError(f"Profile not found: {profile_name}")

View file

@ -5,9 +5,11 @@ verifying checksums from Forgejo releases.
"""
import asyncio
import fnmatch
import hashlib
import json
import logging
import platform
import socket
from dataclasses import dataclass
from datetime import datetime, timedelta
@ -34,7 +36,16 @@ class Release:
class UpdateManager:
"""Manages auto-updates via Forgejo releases API."""
def __init__(self, current_version: str, config_dir: Optional[Path] = None):
def __init__(
self,
current_version: str,
config_dir: Optional[Path] = None,
brand_id: str = "webdrop_bridge",
forgejo_url: str = "https://git.him-tools.de",
repo: str = "HIM-public/webdrop-bridge",
update_channel: str = "stable",
manifest_name: str = "release-manifest.json",
):
"""Initialize update manager.
Args:
@ -42,8 +53,11 @@ class UpdateManager:
config_dir: Directory for storing update cache. Defaults to temp.
"""
self.current_version = current_version
self.forgejo_url = "https://git.him-tools.de"
self.repo = "HIM-public/webdrop-bridge"
self.brand_id = brand_id
self.forgejo_url = forgejo_url.rstrip("/")
self.repo = repo
self.update_channel = update_channel
self.manifest_name = manifest_name
self.api_endpoint = f"{self.forgejo_url}/api/v1/repos/{self.repo}/releases/latest"
# Cache management
@ -52,6 +66,128 @@ class UpdateManager:
self.cache_file = self.cache_dir / "update_check.json"
self.cache_ttl = timedelta(hours=24)
def _get_platform_key(self) -> str:
"""Return the release-manifest platform key for the current system."""
system = platform.system()
machine = platform.machine().lower()
if system == "Windows":
arch = "x64" if machine in {"amd64", "x86_64"} else machine
return f"windows-{arch}"
if system == "Darwin":
return "macos-universal"
return f"{system.lower()}-{machine}"
def _find_asset(self, assets: list[dict], asset_name: str) -> Optional[dict]:
"""Find an asset by exact name."""
for asset in assets:
if asset.get("name") == asset_name:
return asset
return None
def _find_manifest_asset(self, release: Release) -> Optional[dict]:
"""Find the shared release manifest asset if present."""
return self._find_asset(release.assets, self.manifest_name)
def _download_json_asset(self, url: str) -> Optional[dict]:
"""Download and parse a JSON asset from a release."""
try:
with urlopen(url, timeout=10) as response:
return json.loads(response.read().decode("utf-8"))
except (URLError, json.JSONDecodeError) as e:
logger.error(f"Failed to download JSON asset: {e}")
return None
async def _load_release_manifest(self, release: Release) -> Optional[dict]:
"""Load the shared release manifest if present."""
manifest_asset = self._find_manifest_asset(release)
if not manifest_asset:
return None
loop = asyncio.get_event_loop()
return await asyncio.wait_for(
loop.run_in_executor(
None, self._download_json_asset, manifest_asset["browser_download_url"]
),
timeout=15,
)
def _resolve_assets_from_manifest(
self, release: Release, manifest: dict
) -> tuple[Optional[dict], Optional[dict]]:
"""Resolve installer and checksum assets from a shared release manifest."""
if manifest.get("channel") not in {None, "", self.update_channel}:
logger.info(
"Release manifest channel %s does not match configured channel %s",
manifest.get("channel"),
self.update_channel,
)
return None, None
brand_entry = manifest.get("brands", {}).get(self.brand_id, {})
platform_entry = brand_entry.get(self._get_platform_key(), {})
installer_name = platform_entry.get("installer")
checksum_name = platform_entry.get("checksum")
if not installer_name:
logger.warning(
"No installer entry found for brand=%s platform=%s in release manifest",
self.brand_id,
self._get_platform_key(),
)
return None, None
return self._find_asset(release.assets, installer_name), self._find_asset(
release.assets, checksum_name
)
def _resolve_assets_legacy(self, release: Release) -> tuple[Optional[dict], Optional[dict]]:
"""Resolve installer and checksum assets using legacy filename matching."""
is_windows = platform.system() == "Windows"
extension = ".msi" if is_windows else ".dmg"
brand_prefix = f"{self.brand_id}-*"
installer_asset = None
for asset in release.assets:
asset_name = asset.get("name", "")
if not asset_name.endswith(extension):
continue
if self.brand_id != "webdrop_bridge" and fnmatch.fnmatch(
asset_name.lower(), brand_prefix.lower()
):
installer_asset = asset
break
if self.brand_id == "webdrop_bridge":
installer_asset = asset
break
if not installer_asset:
return None, None
checksum_asset = self._find_asset(release.assets, f"{installer_asset['name']}.sha256")
return installer_asset, checksum_asset
async def _resolve_release_assets(
self, release: Release
) -> tuple[Optional[dict], Optional[dict]]:
"""Resolve installer and checksum assets for the configured brand."""
try:
manifest = await self._load_release_manifest(release)
except asyncio.TimeoutError:
logger.warning(
"Timed out while loading release manifest, falling back to legacy lookup"
)
manifest = None
if manifest:
installer_asset, checksum_asset = self._resolve_assets_from_manifest(release, manifest)
if installer_asset:
return installer_asset, checksum_asset
return self._resolve_assets_legacy(release)
def _parse_version(self, version_str: str) -> tuple[int, int, int]:
"""Parse semantic version string to tuple.
@ -253,12 +389,7 @@ class UpdateManager:
logger.error("No assets found in release")
return None
# Find .msi or .dmg file
installer_asset = None
for asset in release.assets:
if asset["name"].endswith((".msi", ".dmg")):
installer_asset = asset
break
installer_asset, _ = await self._resolve_release_assets(release)
if not installer_asset:
logger.error("No installer found in release assets")
@ -345,14 +476,11 @@ class UpdateManager:
Returns:
True if checksum matches, False otherwise
"""
# Find .sha256 file matching the installer name (e.g. Setup.msi.sha256)
# Fall back to any .sha256 only if no specific match exists
installer_name = file_path.name
checksum_asset = None
for asset in release.assets:
if asset["name"] == f"{installer_name}.sha256":
checksum_asset = asset
break
installer_asset, checksum_asset = await self._resolve_release_assets(release)
installer_name = installer_asset["name"] if installer_asset else file_path.name
if not checksum_asset:
checksum_asset = self._find_asset(release.assets, f"{installer_name}.sha256")
if not checksum_asset:
logger.warning("No checksum file found in release")