webdrop-bridge/src/webdrop_bridge/ui/main_window.py

1767 lines
69 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Main application window with web engine integration."""
import asyncio
import json
import logging
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
from PySide6.QtCore import (
QEvent,
QObject,
QPoint,
QSize,
QStandardPaths,
Qt,
QThread,
QTimer,
QUrl,
Signal,
Slot,
)
from PySide6.QtGui import QIcon, QMouseEvent
from PySide6.QtWebChannel import QWebChannel
from PySide6.QtWebEngineCore import QWebEngineDownloadRequest, QWebEngineScript
from PySide6.QtWidgets import (
QLabel,
QMainWindow,
QSizePolicy,
QSpacerItem,
QStatusBar,
QToolBar,
QVBoxLayout,
QWidget,
QWidgetAction,
)
from webdrop_bridge.config import Config
from webdrop_bridge.core.drag_interceptor import DragInterceptor
from webdrop_bridge.core.validator import PathValidator
from webdrop_bridge.ui.restricted_web_view import RestrictedWebEngineView
logger = logging.getLogger(__name__)
# Default welcome page HTML when no webapp is configured
DEFAULT_WELCOME_PAGE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebDrop Bridge</title>
<style>
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}}
.container {{
background: white;
border-radius: 12px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
padding: 60px 40px;
max-width: 600px;
text-align: center;
}}
h1 {{
color: #667eea;
font-size: 2.5em;
margin-bottom: 10px;
}}
.version {{
color: #999;
font-size: 0.9em;
margin-bottom: 30px;
}}
p {{
color: #555;
font-size: 1.1em;
line-height: 1.6;
margin-bottom: 20px;
}}
.features {{
background: #f5f5f5;
border-radius: 8px;
padding: 30px;
margin: 30px 0;
text-align: left;
}}
.features h2 {{
color: #333;
font-size: 1.2em;
margin-bottom: 15px;
text-align: center;
}}
.features ul {{
list-style: none;
padding: 0;
}}
.features li {{
color: #666;
padding: 10px 0;
border-bottom: 1px solid #ddd;
}}
.features li:last-child {{
border-bottom: none;
}}
.features li:before {{
content: "";
color: #667eea;
font-weight: bold;
margin-right: 10px;
}}
.status {{
background: #fff3cd;
border: 1px solid #ffc107;
border-radius: 6px;
padding: 15px;
margin: 20px 0;
color: #856404;
}}
.status strong {{
display: block;
margin-bottom: 5px;
}}
.footer {{
color: #999;
font-size: 0.9em;
margin-top: 30px;
padding-top: 20px;
border-top: 1px solid #eee;
}}
</style>
</head>
<body>
<div class="container">
<h1>🌉 WebDrop Bridge</h1>
<div class="version">Professional Web-to-File Drag-and-Drop Bridge</div>
<div class="status">
<strong>✓ Application Ready</strong>
No web application is currently configured.
Configure WEBAPP_URL in your .env file to load your custom application.
</div>
<p>WebDrop Bridge is a professional desktop application that seamlessly converts web-based drag-and-drop interactions into native file operations on Windows and macOS.</p>
<div class="features">
<h2>Key Features</h2>
<ul>
<li>Drag-and-drop from web interface to desktop</li>
<li>Real-time drag state monitoring</li>
<li>Path validation and security controls</li>
<li>Cross-platform support (Windows & macOS)</li>
<li>Professional production-grade architecture</li>
<li>Comprehensive logging and monitoring</li>
</ul>
</div>
<p><strong>To configure your web application:</strong></p>
<ol style="text-align: left; color: #666; margin-top: 15px;">
<li>Create a <code>.env</code> file in your application directory</li>
<li>Set <code>WEBAPP_URL</code> to your HTML file path or HTTP URL</li>
<li>Example: <code>WEBAPP_URL=file:///./webapp/index.html</code></li>
<li>Restart the application</li>
</ol>
<div class="footer">
<strong>Version:</strong> {version}<br>
<strong>Status:</strong> Ready for configuration
</div>
</div>
</body>
</html>
"""
class _DragBridge(QObject):
"""JavaScript bridge for drag operations via QWebChannel.
Exposed to JavaScript as 'bridge' object.
"""
def __init__(self, window: 'MainWindow', parent: Optional[QObject] = None):
"""Initialize the drag bridge.
Args:
window: MainWindow instance
parent: Parent QObject
"""
super().__init__(parent)
self.window = window
@Slot(str)
def start_file_drag(self, path_text: str) -> None:
"""Start a native file drag for the given path or Azure URL.
Called from JavaScript when user drags an item.
Accepts either local file paths or Azure Blob Storage URLs.
Defers execution to avoid Qt drag manager state issues.
Args:
path_text: File path string or Azure URL to drag
"""
logger.debug(f"Bridge: start_file_drag called for {path_text}")
# Defer to avoid drag manager state issues
# handle_drag() handles URL conversion and validation internally
QTimer.singleShot(0, lambda: self.window.drag_interceptor.handle_drag(path_text))
@Slot(str)
def debug_log(self, message: str) -> None:
"""Log debug message from JavaScript.
Args:
message: Debug message from JavaScript
"""
logger.debug(f"JS Debug: {message}")
class MainWindow(QMainWindow):
"""Main application window for WebDrop Bridge.
Displays web content in a QWebEngineView and provides drag-and-drop
integration with the native filesystem.
"""
# Signals
check_for_updates = Signal()
update_available = Signal(object) # Emits Release object
def __init__(
self,
config: Config,
parent: Optional[QWidget] = None,
):
"""Initialize the main window.
Args:
config: Application configuration
parent: Parent widget
"""
super().__init__(parent)
self.config = config
self._background_threads = [] # Keep references to background threads
self._background_workers = {} # Keep references to background workers
self.checking_dialog = None # Track the checking dialog
self._is_manual_check = False # Track if this is a manual check (for UI feedback)
# Set window properties
self.setWindowTitle(config.window_title)
self.setGeometry(
100,
100,
config.window_width,
config.window_height,
)
# Set window icon
# Support both development mode and PyInstaller bundle
if hasattr(sys, '_MEIPASS'):
# Running as PyInstaller bundle
icon_path = Path(sys._MEIPASS) / "resources" / "icons" / "app.ico" # type: ignore
else:
# Running in development mode
icon_path = Path(__file__).parent.parent.parent.parent / "resources" / "icons" / "app.ico"
if icon_path.exists():
self.setWindowIcon(QIcon(str(icon_path)))
logger.debug(f"Window icon set from {icon_path}")
else:
logger.warning(f"Window icon not found at {icon_path}")
# Create web engine view
self.web_view = RestrictedWebEngineView(config.allowed_urls)
# Enable the main window and web view to receive drag events
self.setAcceptDrops(True)
self.web_view.setAcceptDrops(True)
# Track ongoing drags from web view
self._current_drag_url = None
# Redirect JavaScript console messages to Python logger
self.web_view.page().javaScriptConsoleMessage = self._on_js_console_message
# Create navigation toolbar (Kiosk-mode navigation)
self._create_navigation_toolbar()
# Create status bar
self._create_status_bar()
# Create drag interceptor with config (includes URL converter)
self.drag_interceptor = DragInterceptor(config)
# Connect drag interceptor signals
self.drag_interceptor.drag_started.connect(self._on_drag_started)
self.drag_interceptor.drag_failed.connect(self._on_drag_failed)
# Set up JavaScript bridge with QWebChannel
self._drag_bridge = _DragBridge(self)
web_channel = QWebChannel(self)
web_channel.registerObject("bridge", self._drag_bridge)
self.web_view.page().setWebChannel(web_channel)
# Install the drag bridge script
self._install_bridge_script()
# Connect to loadFinished to verify script injection
self.web_view.loadFinished.connect(self._on_page_loaded)
# Set up download handler
profile = self.web_view.page().profile()
logger.debug(f"Connecting download handler to profile: {profile}")
# CRITICAL: Connect download handler BEFORE any page loads
profile.downloadRequested.connect(self._on_download_requested)
# Enable downloads by setting download path
downloads_path = QStandardPaths.writableLocation(
QStandardPaths.StandardLocation.DownloadLocation
)
if downloads_path:
profile.setDownloadPath(downloads_path)
logger.debug(f"Download path set to: {downloads_path}")
logger.debug("Download handler connected successfully")
# Set up central widget with layout
central_widget = QWidget()
layout = QVBoxLayout()
layout.addWidget(self.web_view)
layout.setContentsMargins(0, 0, 0, 0)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
# Load web application
self._load_webapp()
# Apply styling if available
self._apply_stylesheet()
def _load_webapp(self) -> None:
"""Load the web application.
Loads HTML from the configured webapp URL or from local file.
Injects the WebChannel bridge JavaScript for drag-and-drop.
Supports both bundled apps (PyInstaller) and development mode.
Falls back to default welcome page if webapp not found.
"""
webapp_url = self.config.webapp_url
if webapp_url.startswith("http://") or webapp_url.startswith("https://"):
# Remote URL
self.web_view.load(QUrl(webapp_url))
else:
# Local file path
try:
file_path = Path(webapp_url).resolve()
# If path doesn't exist, try relative to application root
# This handles both development and bundled (PyInstaller) modes
if not file_path.exists():
# Try relative to application package root
app_root = Path(__file__).parent.parent.parent.parent
relative_path = app_root / webapp_url.lstrip("file:///").lstrip("./")
if relative_path.exists():
file_path = relative_path
else:
# Try without leading "./"
alt_path = Path(webapp_url.lstrip("file:///").lstrip("./")).resolve()
if alt_path.exists():
file_path = alt_path
if not file_path.exists():
# Show welcome page with instructions
welcome_html = DEFAULT_WELCOME_PAGE.format(version=self.config.app_version)
self.web_view.setHtml(welcome_html)
return
# Load local file
html_content = file_path.read_text(encoding='utf-8')
# Inject WebChannel bridge JavaScript
injected_html = self._inject_drag_bridge(html_content)
# Load the modified HTML
self.web_view.setHtml(injected_html, QUrl.fromLocalFile(file_path.parent))
except (OSError, ValueError) as e:
# Show welcome page on error
welcome_html = DEFAULT_WELCOME_PAGE.format(version=self.config.app_version)
self.web_view.setHtml(welcome_html)
def _install_bridge_script(self) -> None:
"""Install the drag bridge JavaScript via QWebEngineScript.
Uses DocumentCreation injection point to ensure script runs as early as possible,
before any page scripts that might interfere with drag events.
Embeds qwebchannel.js inline to avoid CSP issues with qrc:// URLs.
Injects configuration that bridge script uses for dynamic URL pattern matching.
"""
from PySide6.QtCore import QFile, QIODevice
script = QWebEngineScript()
script.setName("webdrop-bridge")
script.setInjectionPoint(QWebEngineScript.InjectionPoint.DocumentCreation)
script.setWorldId(QWebEngineScript.ScriptWorldId.MainWorld)
script.setRunsOnSubFrames(False)
# Load qwebchannel.js from Qt resources (avoids CSP blocking qrc:// URLs)
qwebchannel_code = ""
qwebchannel_file = QFile(":/qtwebchannel/qwebchannel.js")
if qwebchannel_file.open(QIODevice.OpenModeFlag.ReadOnly | QIODevice.OpenModeFlag.Text):
qwebchannel_code = bytes(qwebchannel_file.readAll()).decode('utf-8') # type: ignore
qwebchannel_file.close()
logger.debug("Loaded qwebchannel.js inline to avoid CSP issues")
else:
logger.warning("Failed to load qwebchannel.js from resources")
# Generate configuration injection script
config_code = self._generate_config_injection_script()
# Load bridge script from file
# Try multiple paths to support dev mode, PyInstaller bundle, and MSI installation
# 1. Development mode: __file__.parent / script.js
# 2. PyInstaller bundle: sys._MEIPASS / webdrop_bridge / ui / script.js
# 3. MSI installation: Same directory as executable
script_path = None
download_interceptor_path = None
# List of paths to try in order of preference
search_paths = []
# 1. Development mode
search_paths.append(Path(__file__).parent / "bridge_script_intercept.js")
# 2. PyInstaller bundle (via sys._MEIPASS)
if hasattr(sys, '_MEIPASS'):
search_paths.append(Path(sys._MEIPASS) / "webdrop_bridge" / "ui" / "bridge_script_intercept.js") # type: ignore
# 3. Installed executable's directory (handles MSI installation where all files are packaged together)
exe_dir = Path(sys.executable).parent
search_paths.append(exe_dir / "webdrop_bridge" / "ui" / "bridge_script_intercept.js")
# Find the bridge script
for path in search_paths:
if path.exists():
script_path = path
logger.debug(f"Found bridge script at: {script_path}")
break
if script_path is None:
# Log all attempted paths for debugging
logger.error("Bridge script NOT found at any expected location:")
for i, path in enumerate(search_paths, 1):
logger.error(f" [{i}] {path} (exists: {path.exists()})")
logger.error(f"sys._MEIPASS: {getattr(sys, '_MEIPASS', 'NOT SET')}")
logger.error(f"sys.executable: {sys.executable}")
logger.error(f"__file__: {__file__}")
try:
if script_path is None:
raise FileNotFoundError("bridge_script_intercept.js not found in any expected location")
with open(script_path, 'r', encoding='utf-8') as f:
bridge_code = f.read()
# Load download interceptor using similar search path logic
download_search_paths = []
download_search_paths.append(Path(__file__).parent / "download_interceptor.js")
if hasattr(sys, '_MEIPASS'):
download_search_paths.append(Path(sys._MEIPASS) / "webdrop_bridge" / "ui" / "download_interceptor.js") # type: ignore
download_search_paths.append(exe_dir / "webdrop_bridge" / "ui" / "download_interceptor.js")
download_interceptor_code = ""
for path in download_search_paths:
if path.exists():
download_interceptor_path = path
break
if download_interceptor_path:
try:
with open(download_interceptor_path, 'r', encoding='utf-8') as f:
download_interceptor_code = f.read()
logger.debug(f"Loaded download interceptor from {download_interceptor_path}")
except (OSError, IOError) as e:
logger.warning(f"Download interceptor exists but failed to load: {e}")
else:
logger.debug("Download interceptor not found (optional)")
# Combine: qwebchannel.js + config + bridge script + download interceptor
combined_code = qwebchannel_code + "\n\n" + config_code + "\n\n" + bridge_code
if download_interceptor_code:
combined_code += "\n\n" + download_interceptor_code
logger.debug(f"Combined script size: {len(combined_code)} chars "
f"(qwebchannel: {len(qwebchannel_code)}, "
f"config: {len(config_code)}, "
f"bridge: {len(bridge_code)}, "
f"interceptor: {len(download_interceptor_code)})")
logger.debug(f"URL mappings in config: {len(self.config.url_mappings)}")
for i, mapping in enumerate(self.config.url_mappings):
logger.debug(f" Mapping {i+1}: {mapping.url_prefix}{mapping.local_path}")
script.setSourceCode(combined_code)
self.web_view.page().scripts().insert(script)
logger.debug(f"✅ Successfully installed bridge script")
logger.debug(f" Script size: {len(combined_code)} chars")
logger.debug(f" Loaded from: {script_path}")
except (OSError, IOError) as e:
logger.error(f"❌ Failed to load bridge script: {e}")
logger.error(f" This will break drag-and-drop functionality!")
# Don't re-raise - allow app to start (will show error in logs)
def _generate_config_injection_script(self) -> str:
"""Generate JavaScript code that injects configuration.
Creates a script that sets window.webdropConfig with the current
URL mappings, allowing the bridge script to dynamically check
against configured patterns instead of hardcoded values.
Returns:
JavaScript code as string
"""
# Convert URL mappings to format expected by bridge script
mappings = []
for mapping in self.config.url_mappings:
mappings.append({
"url_prefix": mapping.url_prefix,
"local_path": mapping.local_path
})
logger.debug(f"Generating config injection with {len(mappings)} URL mappings")
for i, m in enumerate(mappings):
logger.debug(f" [{i+1}] {m['url_prefix']} -> {m['local_path']}")
# Generate config object as JSON
config_obj = {"urlMappings": mappings}
config_json = json.dumps(config_obj)
logger.debug(f"Config JSON size: {len(config_json)} bytes")
# Generate JavaScript code - Safe injection with error handling
config_js = f"""
(function() {{
try {{
// WebDrop Bridge - Configuration Injection
console.log('[WebDrop Config] Starting configuration injection...');
window.webdropConfig = {config_json};
console.log('[WebDrop Config] Configuration object created');
if (window.webdropConfig && window.webdropConfig.urlMappings) {{
console.log('[WebDrop Config] SUCCESS: ' + window.webdropConfig.urlMappings.length + ' URL mappings loaded');
for (var i = 0; i < window.webdropConfig.urlMappings.length; i++) {{
var m = window.webdropConfig.urlMappings[i];
console.log('[WebDrop Config] [' + (i+1) + '] ' + m.url_prefix + ' -> ' + m.local_path);
}}
}} else {{
console.warn('[WebDrop Config] WARNING: No valid URL mappings found in config object');
}}
}} catch(e) {{
console.error('[WebDrop Config] ERROR during configuration injection: ' + e.message);
if (e.stack) console.error('[WebDrop Config] Stack: ' + e.stack);
}}
}})();
"""
return config_js
def _inject_drag_bridge(self, html_content: str) -> str:
"""Return HTML content unmodified.
The drag bridge script is now injected via QWebEngineScript in _install_bridge_script().
This method is kept for compatibility but does nothing.
Args:
html_content: Original HTML content
Returns:
HTML unchanged
"""
return html_content
def _apply_stylesheet(self) -> None:
"""Apply application stylesheet if available."""
stylesheet_path = Path(__file__).parent.parent.parent.parent / \
"resources" / "stylesheets" / "default.qss"
if stylesheet_path.exists():
try:
with open(stylesheet_path, "r") as f:
stylesheet = f.read()
self.setStyleSheet(stylesheet)
except (OSError, IOError):
# Silently fail if stylesheet can't be read
pass
def _on_drag_started(self, source: str, local_path: str) -> None:
"""Handle successful drag initiation.
Args:
source: Original URL or path from web content
local_path: Local file path that is being dragged
"""
logger.info(f"Drag started: {source} -> {local_path}")
# Ask user if they want to check out the asset
if source.startswith('http'):
self._prompt_checkout(source, local_path)
def _prompt_checkout(self, azure_url: str, local_path: str) -> None:
"""Check checkout status and prompt user if needed.
First checks if the asset is already checked out. Only shows dialog if not checked out.
Args:
azure_url: Azure Blob Storage URL
local_path: Local file path
"""
from PySide6.QtWidgets import QMessageBox
# Extract filename for display
filename = Path(local_path).name
# Extract asset ID
match = re.search(r'/([^/]+)/[^/]+$', azure_url)
if not match:
logger.warning(f"Could not extract asset ID from URL: {azure_url}")
return
asset_id = match.group(1)
# Store callback ID for this check
callback_id = f"checkout_check_{id(self)}"
# Check checkout status - use callback approach since Qt doesn't handle Promise returns well
js_code = f"""
(async () => {{
try {{
const authToken = window.capturedAuthToken;
if (!authToken) {{
console.log('[Checkout Check] No auth token available');
window['{callback_id}'] = JSON.stringify({{ error: 'No auth token' }});
return;
}}
console.log('[Checkout Check] Fetching asset data for {asset_id}');
const response = await fetch(
'https://devagravityprivate.azurewebsites.net/api/assets/{asset_id}?fields=checkout',
{{
method: 'GET',
headers: {{
'Accept': 'application/json',
'Authorization': authToken
}}
}}
);
console.log('[Checkout Check] Response status:', response.status);
if (response.ok) {{
const data = await response.json();
console.log('[Checkout Check] Full data:', JSON.stringify(data));
console.log('[Checkout Check] Checkout field:', data.checkout);
const hasCheckout = !!(data.checkout && Object.keys(data.checkout).length > 0);
console.log('[Checkout Check] Has checkout:', hasCheckout);
window['{callback_id}'] = JSON.stringify({{ checkout: data.checkout || null, hasCheckout: hasCheckout }});
}} else {{
console.log('[Checkout Check] Failed to fetch, status:', response.status);
window['{callback_id}'] = JSON.stringify({{ error: 'Failed to fetch asset', status: response.status }});
}}
}} catch (error) {{
console.error('[Checkout Check] Error:', error);
window['{callback_id}'] = JSON.stringify({{ error: error.toString() }});
}}
}})();
"""
# Execute the async fetch
self.web_view.page().runJavaScript(js_code)
# After a short delay, read the result from window variable
def check_result():
read_code = f"window['{callback_id}']"
self.web_view.page().runJavaScript(read_code, lambda result: self._handle_checkout_status(result, azure_url, filename, callback_id))
# Wait 500ms for async fetch to complete
from PySide6.QtCore import QTimer
QTimer.singleShot(500, check_result)
def _handle_checkout_status(self, result, azure_url: str, filename: str, callback_id: str) -> None:
"""Handle the result of checkout status check.
Args:
result: Result from JavaScript (JSON string)
azure_url: Azure URL
filename: Asset filename
callback_id: Callback ID to clean up
"""
# Clean up window variable
cleanup_code = f"delete window['{callback_id}']"
self.web_view.page().runJavaScript(cleanup_code)
logger.debug(f"Checkout status result type: {type(result)}, value: {result}")
if not result or not isinstance(result, str):
logger.warning(f"Checkout status check returned invalid result: {result}")
self._show_checkout_dialog(azure_url, filename)
return
# Parse JSON string
try:
import json
parsed_result = json.loads(result)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Failed to parse checkout status result: {e}")
self._show_checkout_dialog(azure_url, filename)
return
if parsed_result.get('error'):
logger.warning(f"Could not check checkout status: {parsed_result}")
self._show_checkout_dialog(azure_url, filename)
return
# Check if already checked out
has_checkout = parsed_result.get('hasCheckout', False)
if has_checkout:
checkout_info = parsed_result.get('checkout', {})
logger.info(f"Asset {filename} is already checked out: {checkout_info}, skipping dialog")
return
# Not checked out, show confirmation dialog
logger.debug(f"Asset {filename} is not checked out, showing dialog")
self._show_checkout_dialog(azure_url, filename)
def _show_checkout_dialog(self, azure_url: str, filename: str) -> None:
"""Show the checkout confirmation dialog.
Args:
azure_url: Azure Blob Storage URL
filename: Asset filename
"""
from PySide6.QtWidgets import QMessageBox
reply = QMessageBox.question(
self,
"Checkout Asset",
f"Do you want to check out this asset?\n\n{filename}",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.Yes
)
if reply == QMessageBox.StandardButton.Yes:
logger.info(f"User confirmed checkout for {filename}")
self._trigger_checkout_api(azure_url)
else:
logger.info(f"User declined checkout for {filename}")
def _trigger_checkout_api(self, azure_url: str) -> None:
"""Trigger checkout via API call using JavaScript.
Calls the checkout API from JavaScript so HttpOnly cookies are automatically included.
Example URL: https://devagravitystg.file.core.windows.net/devagravitysync/anPGZszKzgKaSz1SIx2HFgduy/filename
Asset ID: anPGZszKzgKaSz1SIx2HFgduy
Args:
azure_url: Azure Blob Storage URL containing asset ID
"""
try:
# Extract asset ID from URL (middle segment between domain and filename)
# Format: https://domain/container/ASSET_ID/filename
match = re.search(r'/([^/]+)/[^/]+$', azure_url)
if not match:
logger.warning(f"Could not extract asset ID from URL: {azure_url}")
return
asset_id = match.group(1)
logger.info(f"Extracted asset ID: {asset_id}")
# Call API from JavaScript with Authorization header
js_code = f"""
(async function() {{
try {{
// Get captured auth token (from intercepted XHR)
const authToken = window.capturedAuthToken;
if (!authToken) {{
console.error('No authorization token available');
return {{ success: false, error: 'No auth token' }};
}}
const headers = {{
'Accept': 'application/json',
'Content-Type': 'application/json',
'Accept-Language': 'de',
'Authorization': authToken
}};
const response = await fetch(
'https://devagravityprivate.azurewebsites.net/api/assets/checkout/bulk?checkout=true',
{{
method: 'PUT',
headers: headers,
body: JSON.stringify({{asset_ids: ['{asset_id}']}})
}}
);
if (response.ok) {{
console.log('✅ Checkout API successful for asset {asset_id}');
return {{ success: true, status: response.status }};
}} else {{
const text = await response.text();
console.warn('Checkout API returned status ' + response.status + ': ' + text.substring(0, 200));
return {{ success: false, status: response.status, error: text }};
}}
}} catch (error) {{
console.error('Checkout API call failed:', error);
return {{ success: false, error: error.toString() }};
}}
}})();
"""
def on_result(result):
"""Callback when JavaScript completes."""
if result and isinstance(result, dict):
if result.get('success'):
logger.info(f"✅ Checkout successful for asset {asset_id}")
else:
status = result.get('status', 'unknown')
error = result.get('error', 'unknown error')
logger.warning(f"Checkout API returned status {status}: {error}")
else:
logger.debug(f"Checkout API call completed (result: {result})")
# Execute JavaScript (async, non-blocking)
self.web_view.page().runJavaScript(js_code, on_result)
except Exception as e:
logger.exception(f"Error triggering checkout API: {e}")
def _on_drag_failed(self, source: str, error: str) -> None:
"""Handle drag operation failure.
Args:
source: Original URL or path from web content
error: Error message
"""
logger.warning(f"Drag failed for {source}: {error}")
# Can be extended with user notification or status bar message
def _on_download_requested(self, download: QWebEngineDownloadRequest) -> None:
"""Handle download requests from the embedded web view.
Downloads are automatically saved to the user's Downloads folder.
Args:
download: Download request from the web engine
"""
try:
# Log download details for debugging
logger.debug(f"Download URL: {download.url().toString()}")
logger.debug(f"Download filename: {download.downloadFileName()}")
logger.debug(f"Download mime type: {download.mimeType()}")
logger.debug(f"Download suggested filename: {download.suggestedFileName()}")
logger.debug(f"Download state: {download.state()}")
# Get the system's Downloads folder
downloads_path = QStandardPaths.writableLocation(
QStandardPaths.StandardLocation.DownloadLocation
)
if not downloads_path:
# Fallback to user's home directory if Downloads folder not available
downloads_path = str(Path.home())
logger.warning("Downloads folder not found, using home directory")
# Use suggested filename if available, fallback to downloadFileName
filename = download.suggestedFileName() or download.downloadFileName()
if not filename:
filename = "download"
logger.warning("No filename suggested, using 'download'")
# Construct full download path
download_file = Path(downloads_path) / filename
logger.debug(f"Download will be saved to: {download_file}")
# Set download path and accept
download.setDownloadDirectory(str(download_file.parent))
download.setDownloadFileName(download_file.name)
download.accept()
logger.info(f"Download started: {filename}")
# Update status bar (temporarily)
self.status_bar.showMessage(
f"📥 Download: {filename}", 3000
)
# Connect to state changed for progress tracking
download.stateChanged.connect(
lambda state: logger.debug(f"Download state changed to: {state}")
)
# Connect to finished signal for completion feedback
download.isFinishedChanged.connect(
lambda: self._on_download_finished(download, download_file)
)
except Exception as e:
logger.error(f"Error handling download: {e}", exc_info=True)
self.status_bar.showMessage(f"Download error: {e}", 5000)
def _on_download_finished(self, download: QWebEngineDownloadRequest, file_path: Path) -> None:
"""Handle download completion.
Args:
download: The completed download request
file_path: Path where file was saved
"""
try:
if not download.isFinished():
return
state = download.state()
logger.debug(f"Download finished with state: {state}")
if state == QWebEngineDownloadRequest.DownloadState.DownloadCompleted:
logger.info(f"Download completed: {file_path.name}")
self.status_bar.showMessage(
f"Download completed: {file_path.name}", 5000
)
elif state == QWebEngineDownloadRequest.DownloadState.DownloadCancelled:
logger.info(f"Download cancelled: {file_path.name}")
self.status_bar.showMessage(
f"⚠️ Download abgebrochen: {file_path.name}", 3000
)
elif state == QWebEngineDownloadRequest.DownloadState.DownloadInterrupted:
logger.warning(f"Download interrupted: {file_path.name}")
self.status_bar.showMessage(
f"❌ Download fehlgeschlagen: {file_path.name}", 5000
)
except Exception as e:
logger.error(f"Error in download finished handler: {e}", exc_info=True)
def dragEnterEvent(self, event):
"""Handle drag entering the main window (from WebView or external).
Note: With intercept script, ALT-drags are prevented in JavaScript
and handled via bridge.start_file_drag(). This just handles any
remaining drag events.
Args:
event: QDragEnterEvent
"""
event.ignore()
def dragMoveEvent(self, event):
"""Handle drag moving over the main window.
Args:
event: QDragMoveEvent
"""
event.ignore()
def dragLeaveEvent(self, event):
"""Handle drag leaving the main window.
Args:
event: QDragLeaveEvent
"""
event.ignore()
def dropEvent(self, event):
"""Handle drop on the main window.
Args:
event: QDropEvent
"""
event.ignore()
def _on_js_console_message(self, level, message, line_number, source_id):
"""Redirect JavaScript console messages to Python logger.
Args:
level: Console message level (JavaScriptConsoleMessageLevel enum)
message: The console message
line_number: Line number where the message originated
source_id: Source file/URL where the message originated
"""
from PySide6.QtWebEngineCore import QWebEnginePage
# Map JS log levels to Python log levels using enum
if level == QWebEnginePage.JavaScriptConsoleMessageLevel.InfoMessageLevel:
logger.debug(f"JS Console: {message}")
elif level == QWebEnginePage.JavaScriptConsoleMessageLevel.WarningMessageLevel:
logger.warning(f"JS Console: {message}")
logger.debug(f" at {source_id}:{line_number}")
else: # ErrorMessageLevel
logger.error(f"JS Console: {message}")
logger.debug(f" at {source_id}:{line_number}")
def _on_page_loaded(self, success: bool) -> None:
"""Called when a page finishes loading.
Checks if the bridge script was successfully injected.
Args:
success: True if page loaded successfully
"""
if not success:
logger.warning("Page failed to load")
return
# Check if bridge script is loaded
def check_script(result):
if result:
logger.debug("WebDrop Bridge script is active")
logger.debug("QWebChannel bridge is ready")
else:
logger.error("WebDrop Bridge script NOT loaded!")
logger.error("Drag-and-drop conversion will not work")
# Execute JS to check if our script is loaded
self.web_view.page().runJavaScript(
"typeof window.__webdrop_bridge_injected !== 'undefined' && window.__webdrop_bridge_injected === true",
check_script
)
def _create_navigation_toolbar(self) -> None:
"""Create navigation toolbar with Home, Back, Forward, Refresh buttons.
In Kiosk-mode, users can navigate history but cannot freely browse.
Help actions are positioned on the right side of the toolbar.
"""
toolbar = QToolBar("Navigation")
toolbar.setMovable(False)
toolbar.setIconSize(QSize(24, 24))
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, toolbar)
# Back button
back_action = self.web_view.pageAction(
self.web_view.page().WebAction.Back
)
toolbar.addAction(back_action)
# Forward button
forward_action = self.web_view.pageAction(
self.web_view.page().WebAction.Forward
)
toolbar.addAction(forward_action)
# Separator
toolbar.addSeparator()
# Home button
home_action = toolbar.addAction(self.style().standardIcon(self.style().StandardPixmap.SP_DirHomeIcon), "")
home_action.setToolTip("Home")
home_action.triggered.connect(self._navigate_home)
# Refresh button
refresh_action = self.web_view.pageAction(
self.web_view.page().WebAction.Reload
)
toolbar.addAction(refresh_action)
# Add stretch spacer to push help buttons to the right
spacer = QWidget()
spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum)
toolbar.addWidget(spacer)
# About button (info icon) on the right
about_action = toolbar.addAction("")
about_action.setToolTip("About WebDrop Bridge")
about_action.triggered.connect(self._show_about_dialog)
# Settings button on the right
settings_action = toolbar.addAction("⚙️")
settings_action.setToolTip("Settings")
settings_action.triggered.connect(self._show_settings_dialog)
# Check for Updates button on the right
check_updates_action = toolbar.addAction("🔄")
check_updates_action.setToolTip("Check for Updates")
check_updates_action.triggered.connect(self._on_manual_check_for_updates)
def _create_status_bar(self) -> None:
"""Create status bar with update status indicator."""
self.status_bar = self.statusBar()
# Update status label
self.update_status_label = QLabel("Ready")
self.update_status_label.setStyleSheet("margin-right: 10px;")
self.status_bar.addPermanentWidget(self.update_status_label)
def set_update_status(self, status: str, emoji: str = "") -> None:
"""Update the status bar with update information.
Args:
status: Status text to display
emoji: Optional emoji prefix (rotating, checkmark, download, warning symbols)
"""
if emoji:
self.update_status_label.setText(f"{emoji} {status}")
else:
self.update_status_label.setText(status)
def _on_manual_check_for_updates(self) -> None:
"""Handle manual check for updates from menu.
Triggers an immediate update check (bypass cache) with user feedback dialog.
"""
logger.info("Manual update check requested from menu")
# Show "Checking for Updates..." dialog
from webdrop_bridge.ui.update_manager_ui import CheckingDialog
self.checking_dialog = CheckingDialog(self)
self._is_manual_check = True
# Start the update check
self.check_for_updates_startup()
# Show the dialog
self.checking_dialog.show()
def _show_about_dialog(self) -> None:
"""Show About dialog with version and information."""
from PySide6.QtWidgets import QMessageBox
about_text = (
f"<b>{self.config.app_name}</b><br>"
f"Version: {self.config.app_version}<br>"
f"<br>"
f"Bridges web-based drag-and-drop workflows with native file operations "
f"for professional desktop applications.<br>"
f"<br>"
f"<b>Product of:</b><br>"
f"<b>Hörl Information Management GmbH</b><br>"
f"Silberburgstraße 126<br>"
f"70176 Stuttgart, Germany<br>"
f"<br>"
f"<small>"
f"<b>Email:</b> <a href='mailto:info@hoerl-im.de'>info@hoerl-im.de</a><br>"
f"<b>Phone:</b> +49 (0) 711 933 42 52 0<br>"
f"<b>Web:</b> <a href='https://www.hoerl-im.de/'>https://www.hoerl-im.de/</a><br>"
f"</small>"
f"<br>"
f"<small>© 2026 Hörl Information Management GmbH. All rights reserved.</small>"
)
QMessageBox.about(self, f"About {self.config.app_name}", about_text)
def _show_settings_dialog(self) -> None:
"""Show Settings dialog for configuration management."""
from webdrop_bridge.ui.settings_dialog import SettingsDialog
dialog = SettingsDialog(self.config, self)
dialog.exec()
def _navigate_home(self) -> None:
"""Navigate to the home (start) URL."""
home_url = self.config.webapp_url
if home_url.startswith("http://") or home_url.startswith("https://"):
self.web_view.load(QUrl(home_url))
else:
try:
file_path = Path(home_url).resolve()
file_url = file_path.as_uri()
self.web_view.load(QUrl(file_url))
except (OSError, ValueError):
pass
def closeEvent(self, event) -> None:
"""Handle window close event.
Properly cleanup WebEnginePage before closing to avoid
"Release of profile requested but WebEnginePage still not deleted" warning.
This ensures session data (cookies, login state) is properly saved.
Args:
event: Close event
"""
logger.debug("Closing application - cleaning up web engine resources")
# Properly delete WebEnginePage before the profile is released
# This ensures cookies and session data are saved correctly
if hasattr(self, 'web_view') and self.web_view:
page = self.web_view.page()
if page:
# Disconnect signals to prevent callbacks during shutdown
try:
page.loadFinished.disconnect()
except RuntimeError:
pass # Already disconnected or never connected
# Delete the page explicitly
page.deleteLater()
logger.debug("WebEnginePage scheduled for deletion")
# Clear the page from the view
self.web_view.setPage(None) # type: ignore
event.accept()
def check_for_updates_startup(self) -> None:
"""Check for updates on application startup.
Runs asynchronously in background without blocking UI.
Uses 24-hour cache so will not hammer the API.
"""
from webdrop_bridge.core.updater import UpdateManager
try:
# Create update manager
cache_dir = Path.home() / ".webdrop-bridge"
manager = UpdateManager(
current_version=self.config.app_version,
config_dir=cache_dir
)
# Run async check in background
self._run_async_check(manager)
except Exception as e:
logger.error(f"Failed to initialize update check: {e}")
def _run_async_check(self, manager) -> None:
"""Run update check in background thread with safety timeout.
Args:
manager: UpdateManager instance
"""
try:
logger.debug("_run_async_check() starting")
# Create and start background thread
thread = QThread()
worker = UpdateCheckWorker(manager, self.config.app_version)
# IMPORTANT: Keep references to prevent garbage collection
# Store in a list to keep worker alive during thread execution
self._background_threads.append(thread)
self._background_workers = getattr(self, '_background_workers', {})
self._background_workers[id(thread)] = worker
logger.debug(f"Created worker and thread, thread id: {id(thread)}")
# Create a safety timeout timer (but don't start it yet)
# Use a flag-based approach to avoid thread issues with stopping timers
check_started_time = [datetime.now()] # Track when check started
check_completed = [False] # Flag to mark when check completes
def force_close_timeout():
# Check if already completed - if so, don't show error
if check_completed[0]:
logger.debug("Timeout fired but check already completed, suppressing error")
return
logger.warning("Update check taking too long (30s timeout)")
if hasattr(self, 'checking_dialog') and self.checking_dialog:
self.checking_dialog.close()
self.set_update_status("Check timed out - no server response", emoji="⏱️")
# Show error dialog
from PySide6.QtWidgets import QMessageBox
QMessageBox.warning(
self,
"Update Check Timeout",
"The server did not respond within 30 seconds.\n\n"
"This may be due to a network issue or server unavailability.\n\n"
"Please check your connection and try again."
)
safety_timer = QTimer()
safety_timer.setSingleShot(True)
safety_timer.setInterval(30000) # 30 seconds
safety_timer.timeout.connect(force_close_timeout)
# Mark check as completed when any result arrives
def on_check_done():
logger.debug("Check finished, marking as completed to prevent timeout error")
check_completed[0] = True
# Connect signals
worker.update_available.connect(self._on_update_available)
worker.update_available.connect(on_check_done)
worker.update_status.connect(self._on_update_status)
worker.update_status.connect(on_check_done) # "Ready" status means check done
worker.check_failed.connect(self._on_check_failed)
worker.check_failed.connect(on_check_done)
worker.finished.connect(thread.quit)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
# Clean up finished threads and workers from list
def cleanup_thread():
logger.debug(f"Cleaning up thread {id(thread)}")
if thread in self._background_threads:
self._background_threads.remove(thread)
if id(thread) in self._background_workers:
del self._background_workers[id(thread)]
thread.finished.connect(cleanup_thread)
# Move worker to thread and start
logger.debug("Moving worker to thread and connecting started signal")
worker.moveToThread(thread)
thread.started.connect(worker.run)
logger.debug("Starting thread...")
thread.start()
logger.debug("Thread started, starting safety timer")
# Start the safety timeout
safety_timer.start()
except Exception as e:
logger.error(f"Failed to start update check thread: {e}", exc_info=True)
def _on_update_status(self, status: str, emoji: str) -> None:
"""Handle update status changes.
Args:
status: Status text
emoji: Status emoji
"""
self.set_update_status(status, emoji)
# If this is a manual check and we get the "Ready" status, it means no updates
if self._is_manual_check and status == "Ready":
# Close checking dialog first, then show result
if hasattr(self, 'checking_dialog') and self.checking_dialog:
self.checking_dialog.close()
from webdrop_bridge.ui.update_manager_ui import NoUpdateDialog
dialog = NoUpdateDialog(parent=self)
self._is_manual_check = False
dialog.exec()
def _on_check_failed(self, error_message: str) -> None:
"""Handle update check failure.
Args:
error_message: Error description
"""
logger.error(f"Update check failed: {error_message}")
self.set_update_status(f"Check failed: {error_message}", emoji="")
self._is_manual_check = False
# Close checking dialog first, then show error
if hasattr(self, 'checking_dialog') and self.checking_dialog:
self.checking_dialog.close()
from PySide6.QtWidgets import QMessageBox
QMessageBox.warning(
self,
"Update Check Failed",
f"Could not check for updates:\n\n{error_message}\n\nPlease try again later."
)
def _on_update_available(self, release) -> None:
"""Handle update available notification.
Args:
release: Release object with update info
"""
# Update status to show update available
self.set_update_status(f"Update available: v{release.version}", emoji="")
# Show update available dialog
from webdrop_bridge.ui.update_manager_ui import UpdateAvailableDialog
dialog = UpdateAvailableDialog(
version=release.version,
changelog=release.body,
parent=self
)
# Connect dialog signals
dialog.update_now.connect(lambda: self._on_user_update_now(release))
dialog.update_later.connect(lambda: self._on_user_update_later())
dialog.skip_version.connect(lambda: self._on_user_skip_version(release.version))
# Show dialog (modal)
dialog.exec()
def _on_user_update_now(self, release) -> None:
"""Handle user clicking 'Update Now' button.
Args:
release: Release object to download and install
"""
logger.info(f"User clicked 'Update Now' for v{release.version}")
# Start download
self._start_update_download(release)
def _on_user_update_later(self) -> None:
"""Handle user clicking 'Later' button."""
logger.info("User deferred update")
self.set_update_status("Update deferred", emoji="")
def _on_user_skip_version(self, version: str) -> None:
"""Handle user clicking 'Skip Version' button.
Args:
version: Version to skip
"""
logger.info(f"User skipped version {version}")
# Store skipped version in preferences
skipped_file = Path.home() / ".webdrop-bridge" / "skipped_version.txt"
skipped_file.parent.mkdir(parents=True, exist_ok=True)
skipped_file.write_text(version)
self.set_update_status(f"Skipped v{version}", emoji="")
def _start_update_download(self, release) -> None:
"""Start downloading the update in background thread.
Args:
release: Release object to download
"""
logger.info(f"Starting download for v{release.version}")
self.set_update_status(f"Downloading v{release.version}", emoji="⬇️")
# Run download in background thread to avoid blocking UI
self._perform_update_async(release)
def _perform_update_async(self, release) -> None:
"""Download and install update asynchronously in background thread.
Args:
release: Release object to download and install
"""
from webdrop_bridge.core.updater import UpdateManager
try:
logger.debug("_perform_update_async() starting")
# Create update manager
manager = UpdateManager(
current_version=self.config.app_version,
config_dir=Path.home() / ".webdrop-bridge"
)
# Create and start background thread
thread = QThread()
worker = UpdateDownloadWorker(manager, release, self.config.app_version)
# IMPORTANT: Keep references to prevent garbage collection
self._background_threads.append(thread)
self._background_workers[id(thread)] = worker
logger.debug(f"Created download worker and thread, thread id: {id(thread)}")
# Connect signals
worker.download_complete.connect(self._on_download_complete)
worker.download_failed.connect(self._on_download_failed)
worker.update_status.connect(self._on_update_status)
worker.finished.connect(thread.quit)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
# Create a safety timeout timer for download (10 minutes)
# Use a flag-based approach to avoid thread issues with stopping timers
download_started_time = [datetime.now()] # Track when download started
download_completed = [False] # Flag to mark when download completes
def force_timeout():
# Check if already completed - if so, don't show error
if download_completed[0]:
logger.debug("Timeout fired but download already completed, suppressing error")
return
logger.error("Download taking too long (10 minute timeout)")
self.set_update_status("Download timed out - no server response", emoji="⏱️")
worker.download_failed.emit("Download took too long with no response")
thread.quit()
thread.wait()
safety_timer = QTimer()
safety_timer.setSingleShot(True)
safety_timer.setInterval(600000) # 10 minutes
safety_timer.timeout.connect(force_timeout)
# Mark download as completed when it finishes
def on_download_done():
logger.debug("Download finished, marking as completed to prevent timeout error")
download_completed[0] = True
worker.download_complete.connect(on_download_done)
worker.download_failed.connect(on_download_done)
# Clean up finished threads from list
def cleanup_thread():
logger.debug(f"Cleaning up download thread {id(thread)}")
if thread in self._background_threads:
self._background_threads.remove(thread)
if id(thread) in self._background_workers:
del self._background_workers[id(thread)]
thread.finished.connect(cleanup_thread)
# Start thread
logger.debug("Moving download worker to thread and connecting started signal")
worker.moveToThread(thread)
thread.started.connect(worker.run)
logger.debug("Starting download thread...")
thread.start()
logger.debug("Download thread started, starting safety timer")
# Start the safety timeout
safety_timer.start()
except Exception as e:
logger.error(f"Failed to start update download: {e}")
self.set_update_status(f"Update failed: {str(e)[:30]}", emoji="")
def _on_download_complete(self, installer_path: Path) -> None:
"""Handle successful download and verification.
Args:
installer_path: Path to downloaded and verified installer
"""
from webdrop_bridge.ui.update_manager_ui import InstallDialog
logger.info(f"Download complete: {installer_path}")
self.set_update_status("Ready to install", emoji="")
# Show install confirmation dialog
install_dialog = InstallDialog(parent=self)
install_dialog.install_now.connect(
lambda: self._do_install(installer_path)
)
install_dialog.exec()
def _on_download_failed(self, error: str) -> None:
"""Handle download failure.
Args:
error: Error message
"""
logger.error(f"Download failed: {error}")
self.set_update_status(error, emoji="")
from PySide6.QtWidgets import QMessageBox
QMessageBox.critical(
self,
"Download Failed",
f"Could not download the update:\n\n{error}\n\nPlease try again later."
)
def _do_install(self, installer_path: Path) -> None:
"""Execute the installer.
Args:
installer_path: Path to installer executable
"""
logger.info(f"Installing from {installer_path}")
from webdrop_bridge.core.updater import UpdateManager
manager = UpdateManager(
current_version=self.config.app_version,
config_dir=Path.home() / ".webdrop-bridge"
)
if manager.install_update(installer_path):
self.set_update_status("Installation started", emoji="")
logger.info("Update installer launched successfully")
else:
self.set_update_status("Installation failed", emoji="")
logger.error("Failed to launch update installer")
class UpdateCheckWorker(QObject):
"""Worker for running update check asynchronously."""
# Define signals at class level
update_available = Signal(object) # Emits Release object
update_status = Signal(str, str) # Emits (status_text, emoji)
check_failed = Signal(str) # Emits error message
finished = Signal()
def __init__(self, manager, current_version: str):
"""Initialize worker.
Args:
manager: UpdateManager instance
current_version: Current app version
"""
super().__init__()
self.manager = manager
self.current_version = current_version
def run(self) -> None:
"""Run the update check."""
loop = None
try:
logger.debug("UpdateCheckWorker.run() starting")
# Notify checking status
self.update_status.emit("Checking for updates", "🔄")
# Create a fresh event loop for this thread
logger.debug("Creating new event loop for worker thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Check for updates with short timeout (network call has its own timeout)
logger.debug("Starting update check with 10-second timeout")
release = loop.run_until_complete(
asyncio.wait_for(
self.manager.check_for_updates(),
timeout=10
)
)
logger.debug(f"Update check completed, release={release}")
# Emit result
if release:
logger.info(f"Update available: {release.version}")
self.update_available.emit(release)
else:
# No update available - show ready status
logger.info("No update available")
self.update_status.emit("Ready", "")
except asyncio.TimeoutError:
logger.warning("Update check timed out - server not responding")
self.check_failed.emit("Server not responding - check again later")
except Exception as e:
logger.error(f"Update check failed: {e}", exc_info=True)
self.check_failed.emit(f"Check failed: {str(e)[:50]}")
finally:
# Properly close the event loop
if loop is not None:
try:
if not loop.is_closed():
loop.close()
logger.debug("Event loop closed")
except Exception as e:
logger.warning(f"Error closing event loop: {e}")
self.finished.emit()
class UpdateDownloadWorker(QObject):
"""Worker for downloading and verifying update asynchronously."""
# Define signals at class level
download_complete = Signal(Path) # Emits installer_path
download_failed = Signal(str) # Emits error message
update_status = Signal(str, str) # Emits (status_text, emoji)
finished = Signal()
def __init__(self, manager, release, current_version: str):
"""Initialize worker.
Args:
manager: UpdateManager instance
release: Release object to download
current_version: Current app version
"""
super().__init__()
self.manager = manager
self.release = release
self.current_version = current_version
def run(self) -> None:
"""Run the download and verification."""
loop = None
try:
# Download the update
self.update_status.emit(f"Downloading v{self.release.version}", "⬇️")
# Create a fresh event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Download with 5 minute timeout (300 seconds)
logger.info("Starting download with 5-minute timeout")
installer_path = loop.run_until_complete(
asyncio.wait_for(
self.manager.download_update(self.release),
timeout=300
)
)
if not installer_path:
self.update_status.emit("Download failed", "")
self.download_failed.emit("No installer found in release")
logger.error("Download failed - no installer found")
return
logger.info(f"Downloaded to {installer_path}")
self.update_status.emit("Verifying download", "🔍")
# Verify checksum with 30 second timeout
logger.info("Starting checksum verification")
checksum_ok = loop.run_until_complete(
asyncio.wait_for(
self.manager.verify_checksum(installer_path, self.release),
timeout=30
)
)
if not checksum_ok:
self.update_status.emit("Verification failed", "")
self.download_failed.emit("Checksum verification failed")
logger.error("Checksum verification failed")
return
logger.info("Checksum verification passed")
self.download_complete.emit(installer_path)
except asyncio.TimeoutError as e:
logger.error(f"Download/verification timed out: {e}")
self.update_status.emit("Operation timed out", "⏱️")
self.download_failed.emit("Download or verification timed out (no response from server)")
except Exception as e:
logger.error(f"Error during download: {e}")
self.download_failed.emit(f"Download error: {str(e)[:50]}")
except Exception as e:
logger.error(f"Download worker failed: {e}")
self.download_failed.emit(f"Download error: {str(e)[:50]}")
finally:
# Properly close the event loop
if loop is not None:
try:
if not loop.is_closed():
loop.close()
logger.debug("Event loop closed")
except Exception as e:
logger.warning(f"Error closing event loop: {e}")
self.finished.emit()