198 lines
7.2 KiB
Python
198 lines
7.2 KiB
Python
#!/usr/bin/env python
|
|
"""Test script to verify the update feature no longer hangs the UI.
|
|
|
|
This script demonstrates that the update download happens in a background
|
|
thread and doesn't block the UI thread.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
from PySide6.QtCore import QCoreApplication, QThread, QTimer
|
|
|
|
from webdrop_bridge.config import Config
|
|
from webdrop_bridge.core.updater import Release, UpdateManager
|
|
from webdrop_bridge.ui.main_window import MainWindow, UpdateDownloadWorker
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def test_update_download_runs_in_background():
|
|
"""Verify that update download runs in a background thread."""
|
|
print("\n=== Testing Update Download Background Thread ===\n")
|
|
|
|
app = QCoreApplication.instance() or QCoreApplication([])
|
|
|
|
# Create a mock release
|
|
release = Release(
|
|
tag_name="v0.0.2",
|
|
name="Release 0.0.2",
|
|
version="0.0.2",
|
|
body="Test release notes",
|
|
assets=[{"name": "installer.msi", "browser_download_url": "http://example.com/installer.msi"}],
|
|
published_at="2026-01-30T00:00:00Z"
|
|
)
|
|
|
|
# Create a mock update manager
|
|
manager = Mock(spec=UpdateManager)
|
|
|
|
# Track if download_update was called
|
|
download_called = False
|
|
download_thread_id = None
|
|
|
|
async def mock_download(rel):
|
|
nonlocal download_called, download_thread_id
|
|
download_called = True
|
|
download_thread_id = QThread.currentThreadId()
|
|
# Simulate network operation
|
|
await asyncio.sleep(0.1)
|
|
return Path("/tmp/fake_installer.msi")
|
|
|
|
async def mock_verify(file_path, rel):
|
|
nonlocal download_thread_id
|
|
await asyncio.sleep(0.1)
|
|
return True
|
|
|
|
manager.download_update = mock_download
|
|
manager.verify_checksum = mock_verify
|
|
|
|
# Create the worker
|
|
worker = UpdateDownloadWorker(manager, release, "0.0.1")
|
|
|
|
# Track signals
|
|
signals_emitted = []
|
|
worker.download_complete.connect(lambda p: signals_emitted.append(("complete", p)))
|
|
worker.download_failed.connect(lambda e: signals_emitted.append(("failed", e)))
|
|
worker.finished.connect(lambda: signals_emitted.append(("finished",)))
|
|
|
|
# Create a thread and move worker to it
|
|
thread = QThread()
|
|
worker.moveToThread(thread)
|
|
|
|
# Track if worker runs in different thread
|
|
main_thread_id = QThread.currentThreadId()
|
|
worker_thread_id = None
|
|
|
|
def on_worker_run_started():
|
|
nonlocal worker_thread_id
|
|
worker_thread_id = QThread.currentThreadId()
|
|
logger.info(f"Worker running in thread: {worker_thread_id}")
|
|
logger.info(f"Main thread: {main_thread_id}")
|
|
|
|
thread.started.connect(on_worker_run_started)
|
|
thread.started.connect(worker.run)
|
|
|
|
# Start the thread and process events until done
|
|
thread.start()
|
|
|
|
# Wait for completion with timeout
|
|
start_time = asyncio.get_event_loop().time() if hasattr(asyncio.get_event_loop(), 'time') else 0
|
|
while not download_called and len(signals_emitted) < 3:
|
|
app.processEvents()
|
|
QTimer.singleShot(10, app.quit)
|
|
app.exec()
|
|
if len(signals_emitted) >= 3:
|
|
break
|
|
|
|
# Cleanup
|
|
thread.quit()
|
|
thread.wait()
|
|
|
|
# Verify results
|
|
print(f"\n✓ Download called: {download_called}")
|
|
print(f"✓ Signals emitted: {len(signals_emitted)}")
|
|
|
|
# Check if completion signal was emitted (shows async operations completed)
|
|
has_complete_or_failed = any(sig[0] in ("complete", "failed") for sig in signals_emitted)
|
|
has_finished = any(sig[0] == "finished" for sig in signals_emitted)
|
|
|
|
print(f"✓ Completion/Failed signal emitted: {has_complete_or_failed}")
|
|
print(f"✓ Finished signal emitted: {has_finished}")
|
|
|
|
if has_complete_or_failed and has_finished:
|
|
print("\n✅ SUCCESS: Update download runs asynchronously without blocking UI!")
|
|
return True
|
|
else:
|
|
print("\n❌ FAILED: Signals not emitted properly")
|
|
print(f" Signals: {signals_emitted}")
|
|
return False
|
|
|
|
|
|
def test_update_download_worker_exists():
|
|
"""Verify that UpdateDownloadWorker class exists and has correct signals."""
|
|
print("\n=== Testing UpdateDownloadWorker Class ===\n")
|
|
|
|
# Check class exists
|
|
assert hasattr(UpdateDownloadWorker, '__init__'), "UpdateDownloadWorker missing __init__"
|
|
print("✓ UpdateDownloadWorker class exists")
|
|
|
|
# Check signals
|
|
required_signals = ['download_complete', 'download_failed', 'update_status', 'finished']
|
|
for signal_name in required_signals:
|
|
assert hasattr(UpdateDownloadWorker, signal_name), f"Missing signal: {signal_name}"
|
|
print(f"✓ Signal '{signal_name}' defined")
|
|
|
|
# Check methods
|
|
assert hasattr(UpdateDownloadWorker, 'run'), "UpdateDownloadWorker missing run method"
|
|
print("✓ Method 'run' defined")
|
|
|
|
print("\n✅ SUCCESS: UpdateDownloadWorker properly implemented!")
|
|
return True
|
|
|
|
|
|
def test_main_window_uses_async_download():
|
|
"""Verify that MainWindow uses async download instead of blocking."""
|
|
print("\n=== Testing MainWindow Async Download Integration ===\n")
|
|
|
|
# Check that _perform_update_async exists (new async version)
|
|
assert hasattr(MainWindow, '_perform_update_async'), "MainWindow missing _perform_update_async"
|
|
print("✓ Method '_perform_update_async' exists (new async version)")
|
|
|
|
# Check that old blocking _perform_update is gone
|
|
assert not hasattr(MainWindow, '_perform_update'), \
|
|
"MainWindow still has old blocking _perform_update method"
|
|
print("✓ Old blocking '_perform_update' method removed")
|
|
|
|
# Check download/failed handlers exist
|
|
assert hasattr(MainWindow, '_on_download_complete'), "MainWindow missing _on_download_complete"
|
|
assert hasattr(MainWindow, '_on_download_failed'), "MainWindow missing _on_download_failed"
|
|
print("✓ Download completion handlers exist")
|
|
|
|
print("\n✅ SUCCESS: MainWindow properly integrated with async download!")
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("\n" + "="*60)
|
|
print("UPDATE FEATURE FIX VERIFICATION")
|
|
print("="*60)
|
|
|
|
try:
|
|
# Test 1: Worker exists
|
|
test1 = test_update_download_worker_exists()
|
|
|
|
# Test 2: MainWindow integration
|
|
test2 = test_main_window_uses_async_download()
|
|
|
|
# Test 3: Async operation
|
|
test3 = test_update_download_runs_in_background()
|
|
|
|
print("\n" + "="*60)
|
|
if test1 and test2 and test3:
|
|
print("✅ ALL TESTS PASSED - UPDATE FEATURE HANG FIXED!")
|
|
print("="*60 + "\n")
|
|
print("Summary of changes:")
|
|
print("- Created UpdateDownloadWorker class for async downloads")
|
|
print("- Moved blocking operations from UI thread to background thread")
|
|
print("- Added handlers for download completion/failure")
|
|
print("- UI now stays responsive during update download")
|
|
else:
|
|
print("❌ SOME TESTS FAILED")
|
|
print("="*60 + "\n")
|
|
except Exception as e:
|
|
print(f"\n❌ ERROR: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|