"""Main application window with web engine integration."""
import asyncio
import json
import logging
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Union
from PySide6.QtCore import (
QEvent,
QObject,
QPoint,
QSize,
QStandardPaths,
Qt,
QThread,
QTimer,
QUrl,
Signal,
Slot,
)
from PySide6.QtGui import QDesktopServices, QIcon, QKeySequence, QMouseEvent, QShortcut
from PySide6.QtWebChannel import QWebChannel
from PySide6.QtWebEngineCore import QWebEngineDownloadRequest, QWebEngineScript
from PySide6.QtWidgets import (
QLabel,
QMainWindow,
QMessageBox,
QSizePolicy,
QSpacerItem,
QStatusBar,
QToolBar,
QVBoxLayout,
QWidget,
QWidgetAction,
)
from webdrop_bridge.config import Config
from webdrop_bridge.core.drag_interceptor import DragInterceptor
from webdrop_bridge.core.validator import PathValidator
from webdrop_bridge.ui.restricted_web_view import RestrictedWebEngineView
logger = logging.getLogger(__name__)
# Default welcome page HTML when no webapp is configured
DEFAULT_WELCOME_PAGE = """
WebDrop Bridge
🌉 WebDrop Bridge
Professional Web-to-File Drag-and-Drop Bridge
✓ Application Ready
No web application is currently configured.
Configure WEBAPP_URL in your .env file to load your custom application.
WebDrop Bridge is a professional desktop application that seamlessly converts web-based drag-and-drop interactions into native file operations on Windows and macOS.
Key Features
- Drag-and-drop from web interface to desktop
- Real-time drag state monitoring
- Path validation and security controls
- Cross-platform support (Windows & macOS)
- Professional production-grade architecture
- Comprehensive logging and monitoring
To configure your web application:
- Create a
.env file in your application directory
- Set
WEBAPP_URL to your HTML file path or HTTP URL
- Example:
WEBAPP_URL=file:///./webapp/index.html
- Restart the application
"""
class OpenDropZone(QWidget):
"""Drop target widget that opens dragged files with their system default application.
Displays an 'open folder' icon in the navigation toolbar. When a file is
dragged from the web view and dropped here, the file's URL is passed to
``QDesktopServices.openUrl()`` so the OS opens it with the associated
programme — exactly the same way double-clicking a file in Explorer/Finder
would behave.
Visual feedback is provided on drag-enter (green border highlight) so the
user can see the drop target is active.
Signals:
file_opened (str): Emitted with the local file path when successfully opened.
file_open_failed (str, str): Emitted with (path, error_message) on failure.
"""
file_opened = Signal(str)
file_open_failed = Signal(str, str)
_NORMAL_STYLE = "QLabel { padding: 4px; border: 2px solid transparent; border-radius: 4px; }"
_HOVER_STYLE = (
"QLabel { padding: 4px; border: 2px solid #4CAF50; border-radius: 4px;"
" background: #E8F5E9; }"
)
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the OpenDropZone widget.
Args:
parent: Parent widget.
"""
super().__init__(parent)
self.setAcceptDrops(True)
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
self._icon_label = QLabel(self)
icon = self.style().standardIcon(self.style().StandardPixmap.SP_DirOpenIcon)
pixmap = icon.pixmap(QSize(32, 32))
self._icon_label.setPixmap(pixmap)
self._icon_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self._icon_label.setStyleSheet(self._NORMAL_STYLE)
self._icon_label.setToolTip("Drop a file here to open it with its default application")
layout.addWidget(self._icon_label)
self.setMinimumSize(QSize(44, 44))
self.setMaximumSize(QSize(48, 48))
def set_icon(self, icon: QIcon) -> None:
"""Set the displayed icon for the drop zone widget."""
if icon.isNull():
return
self._icon_label.setPixmap(icon.pixmap(QSize(32, 32)))
# ------------------------------------------------------------------
# Drop handling
# ------------------------------------------------------------------
def dragEnterEvent(self, event) -> None: # type: ignore[override]
"""Accept drag events that carry file URLs."""
if event.mimeData().hasUrls():
event.acceptProposedAction()
self._icon_label.setStyleSheet(self._HOVER_STYLE)
else:
event.ignore()
def dragLeaveEvent(self, event) -> None: # type: ignore[override]
"""Reset appearance when drag leaves the widget."""
self._icon_label.setStyleSheet(self._NORMAL_STYLE)
super().dragLeaveEvent(event)
def dropEvent(self, event) -> None: # type: ignore[override]
"""Open each dropped file with the system default application.
Accepts the drop action so that the originating ``QDrag`` reports
success (preserving normal drag-started accounting), then immediately
opens the file via ``QDesktopServices.openUrl()``.
"""
self._icon_label.setStyleSheet(self._NORMAL_STYLE)
mime = event.mimeData()
if not mime.hasUrls():
event.ignore()
return
event.acceptProposedAction()
for url in mime.urls():
if url.isLocalFile():
file_path = url.toLocalFile()
logger.info(f"OpenDropZone: opening '{file_path}' with system default app")
if QDesktopServices.openUrl(url):
self.file_opened.emit(file_path)
else:
msg = "OS could not open the file"
logger.warning(f"OpenDropZone: {msg}: {file_path}")
self.file_open_failed.emit(file_path, msg)
else:
logger.debug(f"OpenDropZone: skipping non-local URL {url.toString()}")
class OpenWithDropZone(OpenDropZone):
"""Drop target widget that opens files via an app chooser dialog.
When a file is dropped, this widget emits ``file_open_with_requested`` for
each local file so the main window can invoke platform-specific
"Open With" behavior.
Signals:
file_open_with_requested (str): Emitted with the local file path.
"""
file_open_with_requested = Signal(str)
def __init__(self, parent: Optional[QWidget] = None) -> None:
"""Initialize the OpenWithDropZone widget.
Args:
parent: Parent widget.
"""
super().__init__(parent)
self._icon_label.setToolTip("Drop a file here to choose which app should open it")
def dropEvent(self, event) -> None: # type: ignore[override]
"""Emit dropped local files for app-chooser handling."""
self._icon_label.setStyleSheet(self._NORMAL_STYLE)
mime = event.mimeData()
if not mime.hasUrls():
event.ignore()
return
event.acceptProposedAction()
for url in mime.urls():
if url.isLocalFile():
file_path = url.toLocalFile()
logger.info(f"OpenWithDropZone: request app chooser for '{file_path}'")
self.file_open_with_requested.emit(file_path)
else:
logger.debug(f"OpenWithDropZone: skipping non-local URL {url.toString()}")
class _DragBridge(QObject):
"""JavaScript bridge for drag operations via QWebChannel.
Exposed to JavaScript as 'bridge' object.
"""
def __init__(self, window: "MainWindow", parent: Optional[QObject] = None):
"""Initialize the drag bridge.
Args:
window: MainWindow instance
parent: Parent QObject
"""
super().__init__(parent)
self.window = window
@Slot(str)
def start_file_drag(self, paths_text: str) -> None:
"""Start a native file drag for the given path(s) or Azure URL(s).
Called from JavaScript when user drags item(s).
Accepts either:
- Single file path string or Azure URL
- JSON array string of file paths or Azure URLs (multiple drag support)
Defers execution to avoid Qt drag manager state issues.
Args:
paths_text: String (single path/URL) or JSON array string (multiple paths/URLs)
"""
logger.debug(f"Bridge: start_file_drag called with {len(paths_text)} chars")
# Try to parse as JSON array first (for multiple-drag support)
paths_list: Union[str, list] = paths_text
if paths_text.startswith("["):
try:
parsed = json.loads(paths_text)
if isinstance(parsed, list):
paths_list = parsed
logger.debug(f"Parsed JSON array with {len(parsed)} item(s)")
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Failed to parse JSON array: {e}, treating as single string")
# Handle both single string and list
QTimer.singleShot(0, lambda: self.window.drag_interceptor.handle_drag(paths_list))
@Slot(str)
def debug_log(self, message: str) -> None:
"""Log debug message from JavaScript.
Args:
message: Debug message from JavaScript
"""
logger.debug(f"JS Debug: {message}")
class MainWindow(QMainWindow):
"""Main application window for WebDrop Bridge.
Displays web content in a QWebEngineView and provides drag-and-drop
integration with the native filesystem.
"""
# Signals
check_for_updates = Signal()
update_available = Signal(object) # Emits Release object
def __init__(
self,
config: Config,
parent: Optional[QWidget] = None,
):
"""Initialize the main window.
Args:
config: Application configuration
parent: Parent widget
"""
super().__init__(parent)
self.config = config
self._background_threads = [] # Keep references to background threads
self._background_workers = {} # Keep references to background workers
self.checking_dialog = None # Track the checking dialog
self.downloading_dialog = None # Track the download dialog
self._is_manual_check = False # Track if this is a manual check (for UI feedback)
# Set window properties
self.setWindowTitle(config.window_title)
self.setGeometry(
100,
100,
config.window_width,
config.window_height,
)
# Set window icon
# Support both development mode and PyInstaller bundle
if hasattr(sys, "_MEIPASS"):
# Running as PyInstaller bundle
icon_path = Path(sys._MEIPASS) / "resources" / "icons" / "app.ico" # type: ignore
else:
# Running in development mode
icon_path = (
Path(__file__).parent.parent.parent.parent / "resources" / "icons" / "app.ico"
)
if icon_path.exists():
self.setWindowIcon(QIcon(str(icon_path)))
logger.debug(f"Window icon set from {icon_path}")
else:
logger.warning(f"Window icon not found at {icon_path}")
# Create web engine view with URL for profile isolation
self.web_view = RestrictedWebEngineView(
allowed_urls=config.allowed_urls, webapp_url=config.webapp_url
)
# Enable the main window and web view to receive drag events
self.setAcceptDrops(True)
self.web_view.setAcceptDrops(True)
# Track ongoing drags from web view
self._current_drag_url = None
# Redirect JavaScript console messages to Python logger
self.web_view.page().javaScriptConsoleMessage = self._on_js_console_message
# Create navigation toolbar (Kiosk-mode navigation)
self._create_navigation_toolbar()
# Set up F12 keyboard shortcut for Developer Tools
f12_shortcut = QShortcut(QKeySequence(Qt.Key.Key_F12), self)
f12_shortcut.activated.connect(self._open_developer_tools)
logger.debug("F12 shortcut registered for Developer Tools")
# Create status bar
self._create_status_bar()
# Create drag interceptor with config (includes URL converter)
self.drag_interceptor = DragInterceptor(config)
# Connect drag interceptor signals
self.drag_interceptor.drag_started.connect(self._on_drag_started)
self.drag_interceptor.drag_failed.connect(self._on_drag_failed)
# Set up JavaScript bridge with QWebChannel
self._drag_bridge = _DragBridge(self)
web_channel = QWebChannel(self)
web_channel.registerObject("bridge", self._drag_bridge)
self.web_view.page().setWebChannel(web_channel)
# Install the drag bridge script
self._install_bridge_script()
# Connect to loadFinished to verify script injection
self.web_view.loadFinished.connect(self._on_page_loaded)
# Set up download handler
profile = self.web_view.page().profile()
logger.debug(f"Connecting download handler to profile: {profile}")
# CRITICAL: Connect download handler BEFORE any page loads
profile.downloadRequested.connect(self._on_download_requested)
# Enable downloads by setting download path
downloads_path = QStandardPaths.writableLocation(
QStandardPaths.StandardLocation.DownloadLocation
)
if downloads_path:
profile.setDownloadPath(downloads_path)
logger.debug(f"Download path set to: {downloads_path}")
logger.debug("Download handler connected successfully")
# Set up central widget with layout
central_widget = QWidget()
layout = QVBoxLayout()
layout.addWidget(self.web_view)
layout.setContentsMargins(0, 0, 0, 0)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
# Load web application
self._load_webapp()
# Apply styling if available
self._apply_stylesheet()
def _load_webapp(self) -> None:
"""Load the web application.
Loads HTML from the configured webapp URL or from local file.
Injects the WebChannel bridge JavaScript for drag-and-drop.
Supports both bundled apps (PyInstaller) and development mode.
Falls back to default welcome page if webapp not found.
"""
webapp_url = self.config.webapp_url
if webapp_url.startswith("http://") or webapp_url.startswith("https://"):
# Remote URL
self.web_view.load(QUrl(webapp_url))
else:
# Local file path
try:
file_path = Path(webapp_url).resolve()
# If path doesn't exist, try relative to application root
# This handles both development and bundled (PyInstaller) modes
if not file_path.exists():
# Try relative to application package root
app_root = Path(__file__).parent.parent.parent.parent
relative_path = app_root / webapp_url.lstrip("file:///").lstrip("./")
if relative_path.exists():
file_path = relative_path
else:
# Try without leading "./"
alt_path = Path(webapp_url.lstrip("file:///").lstrip("./")).resolve()
if alt_path.exists():
file_path = alt_path
if not file_path.exists():
# Show welcome page with instructions
welcome_html = DEFAULT_WELCOME_PAGE.format(version=self.config.app_version)
self.web_view.setHtml(welcome_html)
return
# Load local file
html_content = file_path.read_text(encoding="utf-8")
# Inject WebChannel bridge JavaScript
injected_html = self._inject_drag_bridge(html_content)
# Load the modified HTML
self.web_view.setHtml(injected_html, QUrl.fromLocalFile(file_path.parent))
except (OSError, ValueError) as e:
# Show welcome page on error
welcome_html = DEFAULT_WELCOME_PAGE.format(version=self.config.app_version)
self.web_view.setHtml(welcome_html)
def _install_bridge_script(self) -> None:
"""Install the drag bridge JavaScript via QWebEngineScript.
Uses Deferred injection point to ensure script runs after the DOM is ready,
allowing proper event listener registration without race conditions.
Embeds qwebchannel.js inline to avoid CSP issues with qrc:// URLs.
Injects configuration that bridge script uses for dynamic URL pattern matching.
"""
from PySide6.QtCore import QFile, QIODevice
script = QWebEngineScript()
script.setName("webdrop-bridge")
# Use Deferred instead of DocumentCreation to allow DOM to be ready first
# This prevents race conditions with JavaScript event listeners
script.setInjectionPoint(QWebEngineScript.InjectionPoint.Deferred)
script.setWorldId(QWebEngineScript.ScriptWorldId.MainWorld)
script.setRunsOnSubFrames(False)
# Load qwebchannel.js from Qt resources (avoids CSP blocking qrc:// URLs)
qwebchannel_code = ""
qwebchannel_file = QFile(":/qtwebchannel/qwebchannel.js")
if qwebchannel_file.open(QIODevice.OpenModeFlag.ReadOnly | QIODevice.OpenModeFlag.Text):
qwebchannel_code = bytes(qwebchannel_file.readAll()).decode("utf-8") # type: ignore
qwebchannel_file.close()
logger.debug("Loaded qwebchannel.js inline to avoid CSP issues")
else:
logger.warning("Failed to load qwebchannel.js from resources")
# Generate configuration injection script
config_code = self._generate_config_injection_script()
# Load bridge script from file
# Try multiple paths to support dev mode, PyInstaller bundle, and MSI installation
# 1. Development mode: __file__.parent / script.js
# 2. PyInstaller bundle: sys._MEIPASS / webdrop_bridge / ui / script.js
# 3. MSI installation: Same directory as executable
script_path = None
download_interceptor_path = None
# List of paths to try in order of preference
search_paths = []
# 1. Development mode
search_paths.append(Path(__file__).parent / "bridge_script_intercept.js")
# 2. PyInstaller bundle (via sys._MEIPASS)
if hasattr(sys, "_MEIPASS"):
search_paths.append(Path(sys._MEIPASS) / "webdrop_bridge" / "ui" / "bridge_script_intercept.js") # type: ignore
# 3. Installed executable's directory (handles MSI installation where all files are packaged together)
exe_dir = Path(sys.executable).parent
search_paths.append(exe_dir / "webdrop_bridge" / "ui" / "bridge_script_intercept.js")
# Find the bridge script
for path in search_paths:
if path.exists():
script_path = path
logger.debug(f"Found bridge script at: {script_path}")
break
if script_path is None:
# Log all attempted paths for debugging
logger.error("Bridge script NOT found at any expected location:")
for i, path in enumerate(search_paths, 1):
logger.error(f" [{i}] {path} (exists: {path.exists()})")
logger.error(f"sys._MEIPASS: {getattr(sys, '_MEIPASS', 'NOT SET')}")
logger.error(f"sys.executable: {sys.executable}")
logger.error(f"__file__: {__file__}")
try:
if script_path is None:
raise FileNotFoundError(
"bridge_script_intercept.js not found in any expected location"
)
with open(script_path, "r", encoding="utf-8") as f:
bridge_code = f.read()
# Load download interceptor using similar search path logic
download_search_paths = []
download_search_paths.append(Path(__file__).parent / "download_interceptor.js")
if hasattr(sys, "_MEIPASS"):
download_search_paths.append(Path(sys._MEIPASS) / "webdrop_bridge" / "ui" / "download_interceptor.js") # type: ignore
download_search_paths.append(
exe_dir / "webdrop_bridge" / "ui" / "download_interceptor.js"
)
download_interceptor_code = ""
for path in download_search_paths:
if path.exists():
download_interceptor_path = path
break
if download_interceptor_path:
try:
with open(download_interceptor_path, "r", encoding="utf-8") as f:
download_interceptor_code = f.read()
logger.debug(f"Loaded download interceptor from {download_interceptor_path}")
except (OSError, IOError) as e:
logger.warning(f"Download interceptor exists but failed to load: {e}")
else:
logger.debug("Download interceptor not found (optional)")
# Load mouse event emulator for hover effect support
mouse_emulator_search_paths = []
mouse_emulator_search_paths.append(Path(__file__).parent / "mouse_event_emulator.js")
if hasattr(sys, "_MEIPASS"):
mouse_emulator_search_paths.append(
Path(sys._MEIPASS) / "webdrop_bridge" / "ui" / "mouse_event_emulator.js" # type: ignore
)
mouse_emulator_search_paths.append(
exe_dir / "webdrop_bridge" / "ui" / "mouse_event_emulator.js"
)
mouse_emulator_code = ""
for path in mouse_emulator_search_paths:
if path.exists():
try:
with open(path, "r", encoding="utf-8") as f:
mouse_emulator_code = f.read()
logger.debug(f"Loaded mouse event emulator from {path}")
break
except (OSError, IOError) as e:
logger.warning(f"Mouse event emulator exists but failed to load: {e}")
if not mouse_emulator_code:
logger.debug("Mouse event emulator not found (optional)")
# Combine: qwebchannel.js + config + bridge script + download interceptor + mouse emulator
combined_code = qwebchannel_code + "\n\n" + config_code + "\n\n" + bridge_code
if download_interceptor_code:
combined_code += "\n\n" + download_interceptor_code
# Add mouse event emulator last to ensure it runs after all other scripts
if mouse_emulator_code:
combined_code += "\n\n" + mouse_emulator_code
logger.debug(
f"Combined script size: {len(combined_code)} chars "
f"(qwebchannel: {len(qwebchannel_code)}, "
f"config: {len(config_code)}, "
f"bridge: {len(bridge_code)}, "
f"interceptor: {len(download_interceptor_code)}, "
f"mouse_emulator: {len(mouse_emulator_code)})"
)
logger.debug(f"URL mappings in config: {len(self.config.url_mappings)}")
for i, mapping in enumerate(self.config.url_mappings):
logger.debug(f" Mapping {i+1}: {mapping.url_prefix} → {mapping.local_path}")
script.setSourceCode(combined_code)
self.web_view.page().scripts().insert(script)
logger.debug(f"✅ Successfully installed bridge script")
logger.debug(f" Script size: {len(combined_code)} chars")
logger.debug(f" Loaded from: {script_path}")
except (OSError, IOError) as e:
logger.error(f"❌ Failed to load bridge script: {e}")
logger.error(f" This will break drag-and-drop functionality!")
# Don't re-raise - allow app to start (will show error in logs)
def _generate_config_injection_script(self) -> str:
"""Generate JavaScript code that injects configuration.
Creates a script that sets window.webdropConfig with the current
URL mappings, allowing the bridge script to dynamically check
against configured patterns instead of hardcoded values.
Returns:
JavaScript code as string
"""
# Convert URL mappings to format expected by bridge script
mappings = []
for mapping in self.config.url_mappings:
mappings.append({"url_prefix": mapping.url_prefix, "local_path": mapping.local_path})
logger.debug(f"Generating config injection with {len(mappings)} URL mappings")
for i, m in enumerate(mappings):
logger.debug(f" [{i+1}] {m['url_prefix']} -> {m['local_path']}")
# Generate config object as JSON
config_obj = {"urlMappings": mappings, "enableCheckout": self.config.enable_checkout}
config_json = json.dumps(config_obj)
logger.debug(f"Config JSON size: {len(config_json)} bytes")
logger.debug(f"Checkout enabled: {self.config.enable_checkout}")
# Generate JavaScript code - Safe injection with error handling
config_js = f"""
(function() {{
try {{
// WebDrop Bridge - Configuration Injection
console.log('[WebDrop Config] Starting configuration injection...');
window.webdropConfig = {config_json};
console.log('[WebDrop Config] Configuration object created');
console.log('[WebDrop Config] Checkout enabled: ' + window.webdropConfig.enableCheckout);
if (window.webdropConfig && window.webdropConfig.urlMappings) {{
console.log('[WebDrop Config] SUCCESS: ' + window.webdropConfig.urlMappings.length + ' URL mappings loaded');
for (var i = 0; i < window.webdropConfig.urlMappings.length; i++) {{
var m = window.webdropConfig.urlMappings[i];
console.log('[WebDrop Config] [' + (i+1) + '] ' + m.url_prefix + ' -> ' + m.local_path);
}}
}} else {{
console.warn('[WebDrop Config] WARNING: No valid URL mappings found in config object');
}}
}} catch(e) {{
console.error('[WebDrop Config] ERROR during configuration injection: ' + e.message);
if (e.stack) console.error('[WebDrop Config] Stack: ' + e.stack);
}}
}})();
"""
return config_js
def _inject_drag_bridge(self, html_content: str) -> str:
"""Return HTML content unmodified.
The drag bridge script is now injected via QWebEngineScript in _install_bridge_script().
This method is kept for compatibility but does nothing.
Args:
html_content: Original HTML content
Returns:
HTML unchanged
"""
return html_content
def _apply_stylesheet(self) -> None:
"""Apply application stylesheet if available."""
stylesheet_path = (
Path(__file__).parent.parent.parent.parent / "resources" / "stylesheets" / "default.qss"
)
if stylesheet_path.exists():
try:
with open(stylesheet_path, "r") as f:
stylesheet = f.read()
self.setStyleSheet(stylesheet)
except (OSError, IOError):
# Silently fail if stylesheet can't be read
pass
def _on_drag_started(self, source: str, local_path: str) -> None:
"""Handle successful drag initiation.
Args:
source: Original URL or path from web content
local_path: Local file path that is being dragged
"""
logger.info(f"Drag started: {source} -> {local_path}")
# Ask user if they want to check out the asset (only when enabled in config)
if source.startswith("http") and self.config.enable_checkout:
self._prompt_checkout(source, local_path)
def _prompt_checkout(self, azure_url: str, local_path: str) -> None:
"""Check checkout status and prompt user if needed.
First checks if the asset is already checked out. Only shows dialog if not checked out.
Args:
azure_url: Azure Blob Storage URL
local_path: Local file path
"""
from PySide6.QtWidgets import QMessageBox
# Extract filename for display
filename = Path(local_path).name
# Extract asset ID
match = re.search(r"/([^/]+)/[^/]+$", azure_url)
if not match:
logger.warning(f"Could not extract asset ID from URL: {azure_url}")
return
asset_id = match.group(1)
# Store callback ID for this check
callback_id = f"checkout_check_{id(self)}"
# Check checkout status - use callback approach since Qt doesn't handle Promise returns well
js_code = f"""
(async () => {{
try {{
const authToken = window.capturedAuthToken;
if (!authToken) {{
console.log('[Checkout Check] No auth token available');
window['{callback_id}'] = JSON.stringify({{ error: 'No auth token' }});
return;
}}
console.log('[Checkout Check] Fetching asset data for {asset_id}');
const response = await fetch(
'https://devagravityprivate.azurewebsites.net/api/assets/{asset_id}?fields=checkout',
{{
method: 'GET',
headers: {{
'Accept': 'application/json',
'Authorization': authToken
}}
}}
);
console.log('[Checkout Check] Response status:', response.status);
if (response.ok) {{
const data = await response.json();
console.log('[Checkout Check] Full data:', JSON.stringify(data));
console.log('[Checkout Check] Checkout field:', data.checkout);
const hasCheckout = !!(data.checkout && Object.keys(data.checkout).length > 0);
console.log('[Checkout Check] Has checkout:', hasCheckout);
window['{callback_id}'] = JSON.stringify({{ checkout: data.checkout || null, hasCheckout: hasCheckout }});
}} else {{
console.log('[Checkout Check] Failed to fetch, status:', response.status);
window['{callback_id}'] = JSON.stringify({{ error: 'Failed to fetch asset', status: response.status }});
}}
}} catch (error) {{
console.error('[Checkout Check] Error:', error);
window['{callback_id}'] = JSON.stringify({{ error: error.toString() }});
}}
}})();
"""
# Execute the async fetch
self.web_view.page().runJavaScript(js_code)
# After a short delay, read the result from window variable
def check_result():
read_code = f"window['{callback_id}']"
self.web_view.page().runJavaScript(
read_code,
lambda result: self._handle_checkout_status(
result, azure_url, filename, callback_id
),
)
# Wait 500ms for async fetch to complete
from PySide6.QtCore import QTimer
QTimer.singleShot(500, check_result)
def _handle_checkout_status(
self, result, azure_url: str, filename: str, callback_id: str
) -> None:
"""Handle the result of checkout status check.
Args:
result: Result from JavaScript (JSON string)
azure_url: Azure URL
filename: Asset filename
callback_id: Callback ID to clean up
"""
# Clean up window variable
cleanup_code = f"delete window['{callback_id}']"
self.web_view.page().runJavaScript(cleanup_code)
logger.debug(f"Checkout status result type: {type(result)}, value: {result}")
if not result or not isinstance(result, str):
logger.warning(f"Checkout status check returned invalid result: {result}")
self._show_checkout_dialog(azure_url, filename)
return
# Parse JSON string
try:
import json
parsed_result = json.loads(result)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Failed to parse checkout status result: {e}")
self._show_checkout_dialog(azure_url, filename)
return
if parsed_result.get("error"):
logger.warning(f"Could not check checkout status: {parsed_result}")
self._show_checkout_dialog(azure_url, filename)
return
# Check if already checked out
has_checkout = parsed_result.get("hasCheckout", False)
if has_checkout:
checkout_info = parsed_result.get("checkout", {})
logger.info(
f"Asset {filename} is already checked out: {checkout_info}, skipping dialog"
)
return
# Not checked out, show confirmation dialog
logger.debug(f"Asset {filename} is not checked out, showing dialog")
self._show_checkout_dialog(azure_url, filename)
def _show_checkout_dialog(self, azure_url: str, filename: str) -> None:
"""Show the checkout confirmation dialog.
Args:
azure_url: Azure Blob Storage URL
filename: Asset filename
"""
from PySide6.QtWidgets import QMessageBox
reply = QMessageBox.question(
self,
"Checkout Asset",
f"Do you want to check out this asset?\n\n{filename}",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.Yes,
)
if reply == QMessageBox.StandardButton.Yes:
logger.info(f"User confirmed checkout for {filename}")
self._trigger_checkout_api(azure_url)
else:
logger.info(f"User declined checkout for {filename}")
def _trigger_checkout_api(self, azure_url: str) -> None:
"""Trigger checkout via API call using JavaScript.
Calls the checkout API from JavaScript so HttpOnly cookies are automatically included.
Example URL: https://devagravitystg.file.core.windows.net/devagravitysync/anPGZszKzgKaSz1SIx2HFgduy/filename
Asset ID: anPGZszKzgKaSz1SIx2HFgduy
Args:
azure_url: Azure Blob Storage URL containing asset ID
"""
try:
# Extract asset ID from URL (middle segment between domain and filename)
# Format: https://domain/container/ASSET_ID/filename
match = re.search(r"/([^/]+)/[^/]+$", azure_url)
if not match:
logger.warning(f"Could not extract asset ID from URL: {azure_url}")
return
asset_id = match.group(1)
logger.info(f"Extracted asset ID: {asset_id}")
# Call API from JavaScript with Authorization header
js_code = f"""
(async function() {{
try {{
// Get captured auth token (from intercepted XHR)
const authToken = window.capturedAuthToken;
if (!authToken) {{
console.error('No authorization token available');
return {{ success: false, error: 'No auth token' }};
}}
const headers = {{
'Accept': 'application/json',
'Content-Type': 'application/json',
'Accept-Language': 'de',
'Authorization': authToken
}};
const response = await fetch(
'https://devagravityprivate.azurewebsites.net/api/assets/checkout/bulk?checkout=true',
{{
method: 'PUT',
headers: headers,
body: JSON.stringify({{asset_ids: ['{asset_id}']}})
}}
);
if (response.ok) {{
console.log('✅ Checkout API successful for asset {asset_id}');
return {{ success: true, status: response.status }};
}} else {{
const text = await response.text();
console.warn('Checkout API returned status ' + response.status + ': ' + text.substring(0, 200));
return {{ success: false, status: response.status, error: text }};
}}
}} catch (error) {{
console.error('Checkout API call failed:', error);
return {{ success: false, error: error.toString() }};
}}
}})();
"""
def on_result(result):
"""Callback when JavaScript completes."""
if result and isinstance(result, dict):
if result.get("success"):
logger.info(f"✅ Checkout successful for asset {asset_id}")
else:
status = result.get("status", "unknown")
error = result.get("error", "unknown error")
logger.warning(f"Checkout API returned status {status}: {error}")
else:
logger.debug(f"Checkout API call completed (result: {result})")
# Execute JavaScript (async, non-blocking)
self.web_view.page().runJavaScript(js_code, on_result)
except Exception as e:
logger.exception(f"Error triggering checkout API: {e}")
def _on_drag_failed(self, source: str, error: str) -> None:
"""Handle drag operation failure.
Args:
source: Original URL or path from web content
error: Error message
"""
logger.warning(f"Drag failed for {source}: {error}")
# Show error dialog to user
QMessageBox.warning(
self,
"Drag-and-Drop Error",
f"Could not complete the drag-and-drop operation.\n\nError: {error}",
)
def _on_file_opened_via_drop(self, file_path: str) -> None:
"""Handle a file successfully opened via the OpenDropZone.
Args:
file_path: Local file path that was opened.
"""
logger.info(f"Opened via drop zone: {file_path}")
self.statusBar().showMessage(f"Opened: {Path(file_path).name}", 4000)
def _on_file_open_failed_via_drop(self, file_path: str, error: str) -> None:
"""Handle a failure to open a file dropped on the OpenDropZone.
Args:
file_path: Local file path that could not be opened.
error: Error description.
"""
logger.warning(f"Failed to open via drop zone '{file_path}': {error}")
QMessageBox.warning(
self,
"Open File Error",
f"Could not open the file with its default application.\n\n"
f"File: {file_path}\nError: {error}",
)
def _on_file_open_with_requested(self, file_path: str) -> None:
"""Handle a file dropped on the OpenWithDropZone.
Args:
file_path: Local file path to open using an app chooser.
"""
if self._open_with_app_chooser(file_path):
self.statusBar().showMessage(f"Choose app for: {Path(file_path).name}", 4000)
logger.info(f"Opened app chooser for '{file_path}'")
return
logger.warning(f"Could not open app chooser for '{file_path}'")
QMessageBox.warning(
self,
"Open With Error",
"Could not open an application chooser for this file on your platform.",
)
def _open_with_app_chooser(self, file_path: str) -> bool:
"""Open OS-specific app chooser for a local file.
Args:
file_path: Local file path.
Returns:
True if the chooser command was started successfully, False otherwise.
"""
try:
normalized_path = str(Path(file_path))
if not Path(normalized_path).exists():
logger.warning(f"Open-with target does not exist: {normalized_path}")
return False
if sys.platform.startswith("win"):
# First try the native shell "openas" verb.
import ctypes
result = ctypes.windll.shell32.ShellExecuteW(
None, "openas", normalized_path, None, None, 1
)
if result > 32:
return True
logger.warning(f"ShellExecuteW(openas) failed with code {result}; trying fallback")
# Fallback for systems where openas verb is not available/reliable.
subprocess.Popen(["rundll32.exe", "shell32.dll,OpenAs_RunDLL", normalized_path])
return True
if sys.platform == "darwin":
# Prompt for an app and open the file with the selected app.
script = (
"on run argv\n"
"set targetFile to POSIX file (item 1 of argv)\n"
"set chosenApp to choose application\n"
'tell application "Finder" to open targetFile using chosenApp\n'
"end run"
)
result = subprocess.run(
["osascript", "-e", script, file_path],
check=False,
capture_output=True,
text=True,
)
return result.returncode == 0
logger.warning(f"Open-with chooser not implemented for platform: {sys.platform}")
return False
except Exception as e:
logger.warning(f"Failed to open app chooser for '{file_path}': {e}")
return False
def _on_download_requested(self, download: QWebEngineDownloadRequest) -> None:
"""Handle download requests from the embedded web view.
Downloads are automatically saved to the user's Downloads folder.
Args:
download: Download request from the web engine
"""
try:
# Log download details for debugging
logger.debug(f"Download URL: {download.url().toString()}")
logger.debug(f"Download filename: {download.downloadFileName()}")
logger.debug(f"Download mime type: {download.mimeType()}")
logger.debug(f"Download suggested filename: {download.suggestedFileName()}")
logger.debug(f"Download state: {download.state()}")
# Get the system's Downloads folder
downloads_path = QStandardPaths.writableLocation(
QStandardPaths.StandardLocation.DownloadLocation
)
if not downloads_path:
# Fallback to user's home directory if Downloads folder not available
downloads_path = str(Path.home())
logger.warning("Downloads folder not found, using home directory")
# Use suggested filename if available, fallback to downloadFileName
filename = download.suggestedFileName() or download.downloadFileName()
if not filename:
filename = "download"
logger.warning("No filename suggested, using 'download'")
# Construct full download path
download_file = Path(downloads_path) / filename
logger.debug(f"Download will be saved to: {download_file}")
# Set download path and accept
download.setDownloadDirectory(str(download_file.parent))
download.setDownloadFileName(download_file.name)
download.accept()
logger.info(f"Download started: {filename}")
# Update status bar (temporarily)
self.status_bar.showMessage(f"📥 Download: {filename}", 3000)
# Connect to state changed for progress tracking
download.stateChanged.connect(
lambda state: logger.debug(f"Download state changed to: {state}")
)
# Connect to finished signal for completion feedback
download.isFinishedChanged.connect(
lambda: self._on_download_finished(download, download_file)
)
except Exception as e:
logger.error(f"Error handling download: {e}", exc_info=True)
self.status_bar.showMessage(f"Download error: {e}", 5000)
def _on_download_finished(self, download: QWebEngineDownloadRequest, file_path: Path) -> None:
"""Handle download completion.
Args:
download: The completed download request
file_path: Path where file was saved
"""
try:
if not download.isFinished():
return
state = download.state()
logger.debug(f"Download finished with state: {state}")
if state == QWebEngineDownloadRequest.DownloadState.DownloadCompleted:
logger.info(f"Download completed: {file_path.name}")
self.status_bar.showMessage(f"Download completed: {file_path.name}", 5000)
elif state == QWebEngineDownloadRequest.DownloadState.DownloadCancelled:
logger.info(f"Download cancelled: {file_path.name}")
self.status_bar.showMessage(f"⚠️ Download abgebrochen: {file_path.name}", 3000)
elif state == QWebEngineDownloadRequest.DownloadState.DownloadInterrupted:
logger.warning(f"Download interrupted: {file_path.name}")
self.status_bar.showMessage(f"❌ Download fehlgeschlagen: {file_path.name}", 5000)
except Exception as e:
logger.error(f"Error in download finished handler: {e}", exc_info=True)
def dragEnterEvent(self, event):
"""Handle drag entering the main window (from WebView or external).
Note: With intercept script, ALT-drags are prevented in JavaScript
and handled via bridge.start_file_drag(). This just handles any
remaining drag events.
Args:
event: QDragEnterEvent
"""
event.ignore()
def dragMoveEvent(self, event):
"""Handle drag moving over the main window.
Args:
event: QDragMoveEvent
"""
event.ignore()
def dragLeaveEvent(self, event):
"""Handle drag leaving the main window.
Args:
event: QDragLeaveEvent
"""
event.ignore()
def dropEvent(self, event):
"""Handle drop on the main window.
Args:
event: QDropEvent
"""
event.ignore()
def _on_js_console_message(self, level, message, line_number, source_id):
"""Redirect JavaScript console messages to Python logger.
Args:
level: Console message level (JavaScriptConsoleMessageLevel enum)
message: The console message
line_number: Line number where the message originated
source_id: Source file/URL where the message originated
"""
from PySide6.QtWebEngineCore import QWebEnginePage
# Map JS log levels to Python log levels using enum
if level == QWebEnginePage.JavaScriptConsoleMessageLevel.InfoMessageLevel:
logger.debug(f"JS Console: {message}")
elif level == QWebEnginePage.JavaScriptConsoleMessageLevel.WarningMessageLevel:
logger.warning(f"JS Console: {message}")
logger.debug(f" at {source_id}:{line_number}")
else: # ErrorMessageLevel
logger.error(f"JS Console: {message}")
logger.debug(f" at {source_id}:{line_number}")
def _on_page_loaded(self, success: bool) -> None:
"""Called when a page finishes loading.
Checks if the bridge script was successfully injected.
Args:
success: True if page loaded successfully
"""
if not success:
logger.warning("Page failed to load")
return
# Check if bridge script is loaded
def check_script(result):
if result:
logger.debug("WebDrop Bridge script is active")
logger.debug("QWebChannel bridge is ready")
else:
logger.error("WebDrop Bridge script NOT loaded!")
logger.error("Drag-and-drop conversion will not work")
# Execute JS to check if our script is loaded
self.web_view.page().runJavaScript(
"typeof window.__webdrop_intercept_injected !== 'undefined' && window.__webdrop_intercept_injected === true",
check_script,
)
def _create_navigation_toolbar(self) -> None:
"""Create navigation toolbar with Home, Back, Forward, Refresh buttons.
In Kiosk-mode, users can navigate history but cannot freely browse.
Help actions are positioned on the right side of the toolbar.
"""
toolbar = QToolBar("Navigation")
toolbar.setMovable(False)
toolbar.setIconSize(QSize(24, 24))
self.addToolBar(Qt.ToolBarArea.TopToolBarArea, toolbar)
# Back button
back_action = self.web_view.pageAction(self.web_view.page().WebAction.Back)
toolbar.addAction(back_action)
# Forward button
forward_action = self.web_view.pageAction(self.web_view.page().WebAction.Forward)
toolbar.addAction(forward_action)
# Separator
toolbar.addSeparator()
if hasattr(sys, "_MEIPASS"):
icons_dir = Path(sys._MEIPASS) / "resources" / "icons" # type: ignore[attr-defined]
else:
icons_dir = Path(__file__).parent.parent.parent.parent / "resources" / "icons"
# Home button
home_icon_path = icons_dir / "home.ico"
home_icon = (
QIcon(str(home_icon_path))
if home_icon_path.exists()
else self.style().standardIcon(self.style().StandardPixmap.SP_DirHomeIcon)
)
home_action = toolbar.addAction(home_icon, "")
home_action.setToolTip("Home")
home_action.triggered.connect(self._navigate_home)
# Refresh button
refresh_action = self.web_view.pageAction(self.web_view.page().WebAction.Reload)
reload_icon_path = icons_dir / "reload.ico"
if reload_icon_path.exists():
refresh_action.setIcon(QIcon(str(reload_icon_path)))
toolbar.addAction(refresh_action)
# Open-with-default-app drop zone (right of Reload)
self._open_drop_zone = OpenDropZone()
open_icon_path = icons_dir / "open.ico"
if open_icon_path.exists():
self._open_drop_zone.set_icon(QIcon(str(open_icon_path)))
self._open_drop_zone.file_opened.connect(self._on_file_opened_via_drop)
self._open_drop_zone.file_open_failed.connect(self._on_file_open_failed_via_drop)
open_drop_action = QWidgetAction(toolbar)
open_drop_action.setDefaultWidget(self._open_drop_zone)
toolbar.addAction(open_drop_action)
# Open-with chooser drop zone (right of Open-with-default-app)
self._open_with_drop_zone = OpenWithDropZone()
open_with_icon_path = icons_dir / "openwith.ico"
if open_with_icon_path.exists():
self._open_with_drop_zone.set_icon(QIcon(str(open_with_icon_path)))
self._open_with_drop_zone.file_open_with_requested.connect(
self._on_file_open_with_requested
)
open_with_drop_action = QWidgetAction(toolbar)
open_with_drop_action.setDefaultWidget(self._open_with_drop_zone)
toolbar.addAction(open_with_drop_action)
# Add stretch spacer to push help buttons to the right
spacer = QWidget()
spacer.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum)
toolbar.addWidget(spacer)
# About button (info icon) on the right
about_action = toolbar.addAction("ℹ️")
about_action.setToolTip("About WebDrop Bridge")
about_action.triggered.connect(self._show_about_dialog)
# Settings button on the right
settings_action = toolbar.addAction("⚙️")
settings_action.setToolTip("Settings")
settings_action.triggered.connect(self._show_settings_dialog)
# Check for Updates button on the right
check_updates_action = toolbar.addAction("🔄")
check_updates_action.setToolTip("Check for Updates")
check_updates_action.triggered.connect(self._on_manual_check_for_updates)
# Clear cache button on the right
clear_cache_action = toolbar.addAction("🗑️")
clear_cache_action.setToolTip("Clear Cache and Cookies")
clear_cache_action.triggered.connect(self._clear_cache_and_cookies)
# Log file button on the right
log_action = toolbar.addAction("📋")
log_action.setToolTip("Open Log File")
log_action.triggered.connect(self._open_log_file)
# Developer Tools button on the right
dev_tools_action = toolbar.addAction("🔧")
dev_tools_action.setToolTip("Developer Tools (F12)")
dev_tools_action.triggered.connect(self._open_developer_tools)
def _open_log_file(self) -> None:
"""Open the application log file in the system default text editor.
Resolves the log file path from config, falls back to the default
AppData location, and opens it with QDesktopServices. Shows an
informational message if the file does not exist yet.
"""
log_file: Optional[Path] = None
if self.config.log_file:
log_file = Path(self.config.log_file)
else:
# Default location: /webdrop_bridge/logs/webdrop_bridge.log
app_data = Path(
QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
)
log_file = app_data / "logs" / "webdrop_bridge.log"
if log_file.exists():
QDesktopServices.openUrl(QUrl.fromLocalFile(str(log_file)))
else:
QMessageBox.information(
self,
"Log File Not Found",
f"No log file found at:\n{log_file}",
)
def _open_developer_tools(self) -> None:
"""Open Developer Tools in a separate window.
Creates a dedicated window with JavaScript Console and DOM Inspector.
Provides code execution, DOM inspection, and console log capture all
in your application window - no external browser needed.
"""
try:
# Check if dev tools window already exists and is visible
if not hasattr(self, "_dev_tools_window") or self._dev_tools_window is None:
from webdrop_bridge.ui.developer_tools import DeveloperToolsWidget
# Create new window
self._dev_tools_window = QMainWindow()
self._dev_tools_window.setWindowTitle("🔧 Developer Tools")
self._dev_tools_window.setGeometry(100, 100, 1200, 700)
self._dev_tools_window.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
# Create developer tools widget
dev_tools_widget = DeveloperToolsWidget(self.web_view)
# Set the widget as central widget
self._dev_tools_window.setCentralWidget(dev_tools_widget)
# Connect close event to clear reference
def on_close_dev_tools():
self._dev_tools_window = None
self._dev_tools_window.destroyed.connect(on_close_dev_tools)
# Show or bring to front
self._dev_tools_window.show()
self._dev_tools_window.raise_()
self._dev_tools_window.activateWindow()
logger.info("Developer Tools window opened")
except Exception as e:
logger.error(f"Failed to open Developer Tools window: {e}", exc_info=True)
QMessageBox.warning(
self,
"Developer Tools",
f"Could not open Developer Tools:\n{e}",
)
def _create_status_bar(self) -> None:
"""Create status bar with update status indicator."""
self.status_bar = self.statusBar()
# Update status label
self.update_status_label = QLabel("Ready")
self.update_status_label.setStyleSheet("margin-right: 10px;")
self.status_bar.addPermanentWidget(self.update_status_label)
def set_update_status(self, status: str, emoji: str = "") -> None:
"""Update the status bar with update information.
Args:
status: Status text to display
emoji: Optional emoji prefix (rotating, checkmark, download, warning symbols)
"""
if emoji:
self.update_status_label.setText(f"{emoji} {status}")
else:
self.update_status_label.setText(status)
def _on_manual_check_for_updates(self) -> None:
"""Handle manual check for updates from menu.
Triggers an immediate update check (bypass cache) with user feedback dialog.
"""
logger.info("Manual update check requested from menu")
# Show "Checking for Updates..." dialog
from webdrop_bridge.ui.update_manager_ui import CheckingDialog
self.checking_dialog = CheckingDialog(self)
self._is_manual_check = True
# Start the update check
self.check_for_updates_startup()
# Show the dialog
self.checking_dialog.show()
def _clear_cache_and_cookies(self) -> None:
"""Clear web view cache and cookies.
Useful for clearing authentication tokens or cached data from previous
sessions. Also disconnects and reconnects the page to ensure clean state.
"""
logger.info("Clearing cache and cookies")
try:
# Clear cache and cookies in the web view profile
self.web_view.clear_cache_and_cookies()
# Show confirmation message
QMessageBox.information(
self,
"Cache Cleared",
"Browser cache and cookies have been cleared successfully.\n\n"
"You may need to reload the page or restart the application for changes to take effect.",
)
logger.info("Cache and cookies cleared successfully")
except Exception as e:
logger.error(f"Failed to clear cache and cookies: {e}")
QMessageBox.warning(
self,
"Error",
f"Failed to clear cache and cookies: {str(e)}",
)
def _show_about_dialog(self) -> None:
"""Show About dialog with version and information."""
from PySide6.QtWidgets import QMessageBox
about_text = (
f"{self.config.app_name}
"
f"Version: {self.config.app_version}
"
f"
"
f"Bridges web-based drag-and-drop workflows with native file operations "
f"for professional desktop applications.
"
f"
"
f"Toolbar Drop Zones:
"
f"Open icon: Opens dropped files with the system default app.
"
f"Open-with icon: Shows an app chooser for dropped files.
"
f"
"
f"Product of:
"
f"hörl Information Management GmbH
"
f"Silberburgstraße 126
"
f"70176 Stuttgart, Germany
"
f"
"
f""
f"Email: info@hoerl-im.de
"
f"Phone: +49 (0) 711 933 42 52 – 0
"
f"Web: https://www.hoerl-im.de/
"
f""
f"
"
f"© 2026 hörl Information Management GmbH. All rights reserved."
)
QMessageBox.about(self, f"About {self.config.app_name}", about_text)
def _show_settings_dialog(self) -> None:
"""Show Settings dialog for configuration management.
After closing, checks if webapp URL changed and reloads if needed.
For domain changes, shows restart dialog.
For path-only changes, reloads silently without dialog.
"""
from webdrop_bridge.ui.settings_dialog import SettingsDialog
# Store current URL before opening dialog
old_webapp_url = self.config.webapp_url
# Show dialog
dialog = SettingsDialog(self.config, self)
dialog.exec()
# Check if webapp URL changed
new_webapp_url = self.config.webapp_url
if old_webapp_url != new_webapp_url:
logger.info(f"Web application URL changed: {old_webapp_url} → {new_webapp_url}")
# Check if domain changed (not just path)
domain_changed = self._check_domain_changed(old_webapp_url, new_webapp_url)
if domain_changed:
logger.warning("Domain has changed - recommending restart")
self._handle_domain_change_restart()
else:
logger.info("Path changed but domain is same - reloading...")
# Clear cache and navigate to home asynchronously
# Use timer to ensure dialog is fully closed before reloading
self.web_view.clear_cache_and_cookies()
QTimer.singleShot(100, self._navigate_home)
def _check_domain_changed(self, old_url: str, new_url: str) -> bool:
"""Check if the domain/host has changed between two URLs.
Args:
old_url: Previous URL
new_url: New URL
Returns:
True if domain changed, False if only path changed
"""
from urllib.parse import urlparse
try:
old_parts = urlparse(old_url)
new_parts = urlparse(new_url)
old_host = old_parts.netloc or old_parts.path
new_host = new_parts.netloc or new_parts.path
return old_host != new_host
except Exception as e:
logger.warning(f"Could not parse URLs for domain comparison: {e}")
return True # Assume domain changed if we can't parse
def _handle_domain_change_restart(self) -> None:
"""Handle domain change with restart dialog.
Shows dialog asking user to restart application with options:
- Restart now (automatic)
- Restart later (manual)
- Cancel restart (undo URL change)
"""
from PySide6.QtCore import QProcess
from PySide6.QtGui import QIcon
from PySide6.QtWidgets import QMessageBox
msg = QMessageBox(self)
msg.setWindowTitle("Domain Changed - Restart Recommended")
msg.setIcon(QMessageBox.Icon.Warning)
msg.setText(
"Web Application Domain Has Changed\n\n"
"You've switched to a different domain. For maximum stability and "
"to ensure proper authentication, the application should be restarted.\n\n"
"The profile and cache have been cleared, but we recommend restarting."
)
# Add custom buttons
restart_now_btn = msg.addButton("Restart Now", QMessageBox.ButtonRole.AcceptRole)
restart_later_btn = msg.addButton("Restart Later", QMessageBox.ButtonRole.RejectRole)
msg.exec()
if msg.clickedButton() == restart_now_btn:
logger.info("User chose to restart application now")
self._restart_application()
else:
logger.info("User chose to restart later - clearing cache and loading new URL")
# Clear cache and load new URL directly
self.web_view.clear_cache_and_cookies()
self._navigate_home()
def _restart_application(self) -> None:
"""Restart the application automatically.
Starts a new process with the same arguments and closes the current application.
"""
import os
import sys
from PySide6.QtCore import QProcess
logger.info("Restarting application...")
try:
# Get the path to the Python executable
if hasattr(sys, "_MEIPASS"):
# Running as PyInstaller bundle
executable = sys.executable
else:
# Running in development mode
executable = sys.executable
# Get the module to run
module_args = ["-m", "webdrop_bridge.main"]
# Start new process
QProcess.startDetached(executable, module_args)
logger.info("New application process started successfully")
# Close current application after a small delay to allow process to start
from PySide6.QtCore import QTimer
QTimer.singleShot(500, lambda: sys.exit(0))
except Exception as e:
logger.error(f"Failed to restart application: {e}")
from PySide6.QtWidgets import QMessageBox
QMessageBox.warning(
self,
"Restart Failed",
f"Could not automatically restart the application:\n\n{str(e)}\n\n"
"Please restart manually.",
)
def _navigate_home(self) -> None:
"""Navigate to the home (start) URL."""
home_url = self.config.webapp_url
if home_url.startswith("http://") or home_url.startswith("https://"):
self.web_view.load(QUrl(home_url))
else:
try:
file_path = Path(home_url).resolve()
file_url = file_path.as_uri()
self.web_view.load(QUrl(file_url))
except (OSError, ValueError):
pass
def closeEvent(self, event) -> None:
"""Handle window close event.
Properly cleanup WebEnginePage before closing to avoid
"Release of profile requested but WebEnginePage still not deleted" warning.
This ensures session data (cookies, login state) is properly saved.
Args:
event: Close event
"""
logger.debug("Closing application - cleaning up web engine resources")
# Properly delete WebEnginePage before the profile is released
# This ensures cookies and session data are saved correctly
if hasattr(self, "web_view") and self.web_view:
page = self.web_view.page()
if page:
# Disconnect signals to prevent callbacks during shutdown
try:
page.loadFinished.disconnect()
except RuntimeError:
pass # Already disconnected or never connected
# Delete the page explicitly
page.deleteLater()
logger.debug("WebEnginePage scheduled for deletion")
# Clear the page from the view
self.web_view.setPage(None) # type: ignore
event.accept()
def check_for_updates_startup(self) -> None:
"""Check for updates on application startup.
Runs asynchronously in background without blocking UI.
Uses 24-hour cache so will not hammer the API.
"""
from webdrop_bridge.core.updater import UpdateManager
try:
# Create update manager
cache_dir = Path.home() / ".webdrop_bridge"
manager = UpdateManager(current_version=self.config.app_version, config_dir=cache_dir)
# Run async check in background
self._run_async_check(manager)
except Exception as e:
logger.error(f"Failed to initialize update check: {e}")
def _run_async_check(self, manager) -> None:
"""Run update check in background thread with safety timeout.
Args:
manager: UpdateManager instance
"""
try:
logger.debug("_run_async_check() starting")
# Create and start background thread
thread = QThread()
worker = UpdateCheckWorker(manager, self.config.app_version)
# IMPORTANT: Keep references to prevent garbage collection
# Store in a list to keep worker alive during thread execution
self._background_threads.append(thread)
self._background_workers = getattr(self, "_background_workers", {})
self._background_workers[id(thread)] = worker
logger.debug(f"Created worker and thread, thread id: {id(thread)}")
# Create a safety timeout timer (but don't start it yet)
# Use a flag-based approach to avoid thread issues with stopping timers
check_started_time = [datetime.now()] # Track when check started
check_completed = [False] # Flag to mark when check completes
def force_close_timeout():
# Check if already completed - if so, don't show error
if check_completed[0]:
logger.debug("Timeout fired but check already completed, suppressing error")
return
logger.warning("Update check taking too long (30s timeout)")
if hasattr(self, "checking_dialog") and self.checking_dialog:
self.checking_dialog.close()
self.set_update_status("Check timed out - no server response", emoji="⏱️")
# Show error dialog
from PySide6.QtWidgets import QMessageBox
QMessageBox.warning(
self,
"Update Check Timeout",
"The server did not respond within 30 seconds.\n\n"
"This may be due to a network issue or server unavailability.\n\n"
"Please check your connection and try again.",
)
safety_timer = QTimer()
safety_timer.setSingleShot(True)
safety_timer.setInterval(30000) # 30 seconds
safety_timer.timeout.connect(force_close_timeout)
# Mark check as completed when any result arrives
def on_check_done():
logger.debug("Check finished, marking as completed to prevent timeout error")
check_completed[0] = True
# Connect signals
worker.update_available.connect(self._on_update_available)
worker.update_available.connect(on_check_done)
worker.update_status.connect(self._on_update_status)
worker.update_status.connect(on_check_done) # "Ready" status means check done
worker.check_failed.connect(self._on_check_failed)
worker.check_failed.connect(on_check_done)
worker.finished.connect(thread.quit)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
# Clean up finished threads and workers from list
def cleanup_thread():
logger.debug(f"Cleaning up thread {id(thread)}")
if thread in self._background_threads:
self._background_threads.remove(thread)
if id(thread) in self._background_workers:
del self._background_workers[id(thread)]
thread.finished.connect(cleanup_thread)
# Move worker to thread and start
logger.debug("Moving worker to thread and connecting started signal")
worker.moveToThread(thread)
thread.started.connect(worker.run)
logger.debug("Starting thread...")
thread.start()
logger.debug("Thread started, starting safety timer")
# Start the safety timeout
safety_timer.start()
except Exception as e:
logger.error(f"Failed to start update check thread: {e}", exc_info=True)
def _on_update_status(self, status: str, emoji: str) -> None:
"""Handle update status changes.
Args:
status: Status text
emoji: Status emoji
"""
self.set_update_status(status, emoji)
# If this is a manual check and we get the "Ready" status, it means no updates
if self._is_manual_check and status == "Ready":
# Close checking dialog first, then show result
if hasattr(self, "checking_dialog") and self.checking_dialog:
self.checking_dialog.close()
from webdrop_bridge.ui.update_manager_ui import NoUpdateDialog
dialog = NoUpdateDialog(parent=self)
self._is_manual_check = False
dialog.exec()
def _on_check_failed(self, error_message: str) -> None:
"""Handle update check failure.
Args:
error_message: Error description
"""
logger.error(f"Update check failed: {error_message}")
self.set_update_status(f"Check failed: {error_message}", emoji="❌")
self._is_manual_check = False
# Close checking dialog first, then show error
if hasattr(self, "checking_dialog") and self.checking_dialog:
self.checking_dialog.close()
from PySide6.QtWidgets import QMessageBox
QMessageBox.warning(
self,
"Update Check Failed",
f"Could not check for updates:\n\n{error_message}\n\nPlease try again later.",
)
def _on_update_available(self, release) -> None:
"""Handle update available notification.
Args:
release: Release object with update info
"""
# Close checking dialog if open (manual check case)
if hasattr(self, "checking_dialog") and self.checking_dialog:
self.checking_dialog.close()
self.checking_dialog = None
self._is_manual_check = False
# Update status to show update available
self.set_update_status(f"Update available: v{release.version}", emoji="✅")
# Show update available dialog
from webdrop_bridge.ui.update_manager_ui import UpdateAvailableDialog
dialog = UpdateAvailableDialog(version=release.version, changelog=release.body, parent=self)
# Connect dialog signals
dialog.update_now.connect(lambda: self._on_user_update_now(release))
dialog.update_later.connect(lambda: self._on_user_update_later())
# Show dialog (modal)
dialog.exec()
def _on_user_update_now(self, release) -> None:
"""Handle user clicking 'Update Now' button.
Args:
release: Release object to download and install
"""
logger.info(f"User clicked 'Update Now' for v{release.version}")
# Start download
self._start_update_download(release)
def _on_user_update_later(self) -> None:
"""Handle user clicking 'Later' button."""
logger.info("User deferred update")
self.set_update_status("Update deferred", emoji="")
def _start_update_download(self, release) -> None:
"""Start downloading the update in background thread.
Args:
release: Release object to download
"""
logger.info(f"Starting download for v{release.version}")
self.set_update_status(f"Downloading v{release.version}", emoji="⬇️")
# Show download progress dialog
from webdrop_bridge.ui.update_manager_ui import DownloadingDialog
self.downloading_dialog = DownloadingDialog(self)
self.downloading_dialog.show()
# Run download in background thread to avoid blocking UI
self._perform_update_async(release)
def _perform_update_async(self, release) -> None:
"""Download and install update asynchronously in background thread.
Args:
release: Release object to download and install
"""
from webdrop_bridge.core.updater import UpdateManager
try:
logger.debug("_perform_update_async() starting")
# Create update manager
manager = UpdateManager(
current_version=self.config.app_version, config_dir=Path.home() / ".webdrop_bridge"
)
# Create and start background thread
thread = QThread()
worker = UpdateDownloadWorker(manager, release, self.config.app_version)
# IMPORTANT: Keep references to prevent garbage collection
self._background_threads.append(thread)
self._background_workers[id(thread)] = worker
logger.debug(f"Created download worker and thread, thread id: {id(thread)}")
# Connect signals
worker.download_complete.connect(self._on_download_complete)
worker.download_failed.connect(self._on_download_failed)
worker.download_progress.connect(self._on_download_progress)
worker.update_status.connect(self._on_update_status)
worker.finished.connect(thread.quit)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
# Create a safety timeout timer for download (10 minutes)
# Use a flag-based approach to avoid thread issues with stopping timers
download_started_time = [datetime.now()] # Track when download started
download_completed = [False] # Flag to mark when download completes
def force_timeout():
# Check if already completed - if so, don't show error
if download_completed[0]:
logger.debug("Timeout fired but download already completed, suppressing error")
return
logger.error("Download taking too long (10 minute timeout)")
self.set_update_status("Download timed out - no server response", emoji="⏱️")
worker.download_failed.emit("Download took too long with no response")
thread.quit()
thread.wait()
safety_timer = QTimer()
safety_timer.setSingleShot(True)
safety_timer.setInterval(600000) # 10 minutes
safety_timer.timeout.connect(force_timeout)
# Mark download as completed when it finishes
def on_download_done():
logger.debug("Download finished, marking as completed to prevent timeout error")
download_completed[0] = True
worker.download_complete.connect(on_download_done)
worker.download_failed.connect(on_download_done)
# Clean up finished threads from list
def cleanup_thread():
logger.debug(f"Cleaning up download thread {id(thread)}")
if thread in self._background_threads:
self._background_threads.remove(thread)
if id(thread) in self._background_workers:
del self._background_workers[id(thread)]
thread.finished.connect(cleanup_thread)
# Start thread
logger.debug("Moving download worker to thread and connecting started signal")
worker.moveToThread(thread)
thread.started.connect(worker.run)
logger.debug("Starting download thread...")
thread.start()
logger.debug("Download thread started, starting safety timer")
# Start the safety timeout
safety_timer.start()
except Exception as e:
logger.error(f"Failed to start update download: {e}")
self.set_update_status(f"Update failed: {str(e)[:30]}", emoji="❌")
def _on_download_complete(self, installer_path: Path) -> None:
"""Handle successful download and verification.
Args:
installer_path: Path to downloaded and verified installer
"""
from webdrop_bridge.ui.update_manager_ui import InstallDialog
if hasattr(self, "downloading_dialog") and self.downloading_dialog:
self.downloading_dialog.close()
self.downloading_dialog = None
logger.info(f"Download complete: {installer_path}")
self.set_update_status("Ready to install", emoji="✅")
# Show install confirmation dialog
install_dialog = InstallDialog(parent=self)
install_dialog.install_now.connect(lambda: self._do_install(installer_path))
install_dialog.exec()
def _on_download_failed(self, error: str) -> None:
"""Handle download failure.
Args:
error: Error message
"""
if hasattr(self, "downloading_dialog") and self.downloading_dialog:
self.downloading_dialog.close()
self.downloading_dialog = None
logger.error(f"Download failed: {error}")
self.set_update_status(error, emoji="❌")
from PySide6.QtWidgets import QMessageBox
QMessageBox.critical(
self,
"Download Failed",
f"Could not download the update:\n\n{error}\n\nPlease try again later.",
)
def _on_download_progress(self, downloaded: int, total: int) -> None:
"""Forward download progress to the downloading dialog.
Args:
downloaded: Bytes downloaded so far
total: Total bytes (0 if unknown)
"""
if hasattr(self, "downloading_dialog") and self.downloading_dialog:
self.downloading_dialog.set_progress(downloaded, total)
def _do_install(self, installer_path: Path) -> None:
"""Execute the installer.
Args:
installer_path: Path to installer executable
"""
logger.info(f"Installing from {installer_path}")
from webdrop_bridge.core.updater import UpdateManager
manager = UpdateManager(
current_version=self.config.app_version, config_dir=Path.home() / ".webdrop_bridge"
)
if manager.install_update(installer_path):
self.set_update_status("Installation started", emoji="✅")
logger.info("Update installer launched successfully")
else:
self.set_update_status("Installation failed", emoji="❌")
logger.error("Failed to launch update installer")
class UpdateCheckWorker(QObject):
"""Worker for running update check asynchronously."""
# Define signals at class level
update_available = Signal(object) # Emits Release object
update_status = Signal(str, str) # Emits (status_text, emoji)
check_failed = Signal(str) # Emits error message
finished = Signal()
def __init__(self, manager, current_version: str):
"""Initialize worker.
Args:
manager: UpdateManager instance
current_version: Current app version
"""
super().__init__()
self.manager = manager
self.current_version = current_version
def run(self) -> None:
"""Run the update check."""
loop = None
try:
logger.debug("UpdateCheckWorker.run() starting")
# Notify checking status
self.update_status.emit("Checking for updates", "🔄")
# Create a fresh event loop for this thread
logger.debug("Creating new event loop for worker thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Check for updates with short timeout (network call has its own timeout)
logger.debug("Starting update check with 10-second timeout")
release = loop.run_until_complete(
asyncio.wait_for(self.manager.check_for_updates(), timeout=10)
)
logger.debug(f"Update check completed, release={release}")
# Emit result
if release:
logger.info(f"Update available: {release.version}")
self.update_available.emit(release)
else:
# No update available - show ready status
logger.info("No update available")
self.update_status.emit("Ready", "")
except asyncio.TimeoutError:
logger.warning("Update check timed out - server not responding")
self.check_failed.emit("Server not responding - check again later")
except Exception as e:
logger.error(f"Update check failed: {e}", exc_info=True)
self.check_failed.emit(f"Check failed: {str(e)[:50]}")
finally:
# Properly close the event loop
if loop is not None:
try:
if not loop.is_closed():
loop.close()
logger.debug("Event loop closed")
except Exception as e:
logger.warning(f"Error closing event loop: {e}")
self.finished.emit()
class UpdateDownloadWorker(QObject):
"""Worker for downloading and verifying update asynchronously."""
# Define signals at class level
download_complete = Signal(Path) # Emits installer_path
download_failed = Signal(str) # Emits error message
download_progress = Signal(int, int) # Emits (bytes_downloaded, total_bytes)
update_status = Signal(str, str) # Emits (status_text, emoji)
finished = Signal()
def __init__(self, manager, release, current_version: str):
"""Initialize worker.
Args:
manager: UpdateManager instance
release: Release object to download
current_version: Current app version
"""
super().__init__()
self.manager = manager
self.release = release
self.current_version = current_version
def run(self) -> None:
"""Run the download and verification."""
loop = None
try:
# Download the update
self.update_status.emit(f"Downloading v{self.release.version}", "⬇️")
# Create a fresh event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Download with 5 minute timeout (300 seconds)
logger.info("Starting download with 5-minute timeout")
installer_path = loop.run_until_complete(
asyncio.wait_for(
self.manager.download_update(
self.release,
progress_callback=lambda cur, tot: self.download_progress.emit(
cur, tot
),
),
timeout=300,
)
)
if not installer_path:
self.update_status.emit("Download failed", "❌")
self.download_failed.emit("No installer found in release")
logger.error("Download failed - no installer found")
return
logger.info(f"Downloaded to {installer_path}")
self.update_status.emit("Verifying download", "🔍")
# Verify checksum with 30 second timeout
logger.info("Starting checksum verification")
checksum_ok = loop.run_until_complete(
asyncio.wait_for(
self.manager.verify_checksum(installer_path, self.release), timeout=30
)
)
if not checksum_ok:
self.update_status.emit("Verification failed", "❌")
self.download_failed.emit("Checksum verification failed")
logger.error("Checksum verification failed")
return
logger.info("Checksum verification passed")
self.download_complete.emit(installer_path)
except asyncio.TimeoutError as e:
logger.error(f"Download/verification timed out: {e}")
self.update_status.emit("Operation timed out", "⏱️")
self.download_failed.emit(
"Download or verification timed out (no response from server)"
)
except Exception as e:
logger.error(f"Error during download: {e}")
self.download_failed.emit(f"Download error: {str(e)[:50]}")
except Exception as e:
logger.error(f"Download worker failed: {e}")
self.download_failed.emit(f"Download error: {str(e)[:50]}")
finally:
# Properly close the event loop
if loop is not None:
try:
if not loop.is_closed():
loop.close()
logger.debug("Event loop closed")
except Exception as e:
logger.warning(f"Error closing event loop: {e}")
self.finished.emit()
self.finished.emit()