Files
dark_hal/download_manager.py
2026-03-13 12:56:43 -07:00

582 lines
22 KiB
Python

import os
import requests
import threading
import time
import tkinter as tk
from tkinter import ttk, messagebox
from typing import Optional, Dict, Any, List, Callable
from dataclasses import dataclass
from enum import Enum
import json
from datetime import datetime
import queue
class DownloadStatus(Enum):
"""Download status enumeration."""
QUEUED = "Queued"
DOWNLOADING = "Downloading"
PAUSED = "Paused"
COMPLETED = "Completed"
FAILED = "Failed"
CANCELLED = "Cancelled"
AUTH_REQUIRED = "Auth Required"
@dataclass
class DownloadItem:
"""Represents a download item."""
id: str
repo_id: str
filename: str
url: str
save_path: str
total_size: int = 0
downloaded_size: int = 0
status: DownloadStatus = DownloadStatus.QUEUED
error_message: str = ""
start_time: Optional[float] = None
end_time: Optional[float] = None
speed: float = 0.0
eta: int = 0
headers: Dict[str, str] = None
resume_position: int = 0
def __post_init__(self):
if self.headers is None:
self.headers = {}
@property
def progress(self) -> float:
"""Calculate download progress percentage."""
if self.total_size > 0:
return (self.downloaded_size / self.total_size) * 100
return 0.0
@property
def is_resumable(self) -> bool:
"""Check if download can be resumed."""
return self.status in [DownloadStatus.PAUSED, DownloadStatus.FAILED]
class DownloadManager:
"""Manages multiple downloads with pause/resume support."""
def __init__(self, max_concurrent: int = 3):
self.downloads: Dict[str, DownloadItem] = {}
self.download_queue: queue.Queue = queue.Queue()
self.active_downloads: Dict[str, threading.Thread] = {}
self.max_concurrent = max_concurrent
self.callbacks: Dict[str, List[Callable]] = {
'on_progress': [],
'on_status_change': [],
'on_complete': [],
'on_error': []
}
self._stop_flags: Dict[str, threading.Event] = {}
self._pause_flags: Dict[str, threading.Event] = {}
self._worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self._worker_thread.start()
def add_download(self, repo_id: str, filename: str, url: str, save_path: str,
headers: Optional[Dict[str, str]] = None) -> str:
"""Add a new download to the queue."""
download_id = f"{repo_id}_{filename}_{int(time.time())}"
# Create download item
item = DownloadItem(
id=download_id,
repo_id=repo_id,
filename=filename,
url=url,
save_path=save_path,
headers=headers or {}
)
self.downloads[download_id] = item
self.download_queue.put(download_id)
self._trigger_callback('on_status_change', item)
return download_id
def pause_download(self, download_id: str):
"""Pause a download."""
if download_id in self._pause_flags:
self._pause_flags[download_id].set()
if download_id in self.downloads:
self.downloads[download_id].status = DownloadStatus.PAUSED
self._trigger_callback('on_status_change', self.downloads[download_id])
def resume_download(self, download_id: str):
"""Resume a paused download."""
item = self.downloads.get(download_id)
if item and item.is_resumable:
item.status = DownloadStatus.QUEUED
item.resume_position = item.downloaded_size
self.download_queue.put(download_id)
self._trigger_callback('on_status_change', item)
def cancel_download(self, download_id: str):
"""Cancel a download."""
if download_id in self._stop_flags:
self._stop_flags[download_id].set()
if download_id in self.downloads:
self.downloads[download_id].status = DownloadStatus.CANCELLED
self._trigger_callback('on_status_change', self.downloads[download_id])
# Remove partial file
item = self.downloads[download_id]
if os.path.exists(item.save_path):
try:
os.remove(item.save_path)
except Exception:
pass
def retry_download(self, download_id: str):
"""Retry a failed download."""
item = self.downloads.get(download_id)
if item and item.status in [DownloadStatus.FAILED, DownloadStatus.AUTH_REQUIRED]:
item.status = DownloadStatus.QUEUED
item.downloaded_size = 0
item.resume_position = 0
item.error_message = ""
self.download_queue.put(download_id)
self._trigger_callback('on_status_change', item)
def _process_queue(self):
"""Process download queue."""
while True:
# Check if we can start more downloads
if len(self.active_downloads) < self.max_concurrent:
try:
download_id = self.download_queue.get(timeout=1)
if download_id in self.downloads:
thread = threading.Thread(
target=self._download_file,
args=(download_id,),
daemon=True
)
self.active_downloads[download_id] = thread
thread.start()
except queue.Empty:
pass
# Clean up finished downloads
finished = []
for download_id, thread in self.active_downloads.items():
if not thread.is_alive():
finished.append(download_id)
for download_id in finished:
del self.active_downloads[download_id]
time.sleep(0.5)
def _download_file(self, download_id: str):
"""Download a file with resume support."""
item = self.downloads[download_id]
# Create flags for this download
self._stop_flags[download_id] = threading.Event()
self._pause_flags[download_id] = threading.Event()
try:
# Update status
item.status = DownloadStatus.DOWNLOADING
item.start_time = time.time()
self._trigger_callback('on_status_change', item)
# Create directory if needed
os.makedirs(os.path.dirname(item.save_path), exist_ok=True)
# Setup headers for resume
headers = item.headers.copy()
if item.resume_position > 0:
headers['Range'] = f'bytes={item.resume_position}-'
# Make request
response = requests.get(item.url, headers=headers, stream=True, timeout=30)
# Check for authentication issues
if response.status_code == 401 or response.status_code == 403:
item.status = DownloadStatus.AUTH_REQUIRED
item.error_message = f"Authentication failed: {response.status_code}"
self._trigger_callback('on_error', item)
return
response.raise_for_status()
# Get total size
if item.resume_position == 0:
item.total_size = int(response.headers.get('content-length', 0))
# Open file for writing (append if resuming)
mode = 'ab' if item.resume_position > 0 else 'wb'
with open(item.save_path, mode) as f:
chunk_size = 8192
last_update = time.time()
bytes_since_update = 0
for chunk in response.iter_content(chunk_size=chunk_size):
# Check stop flag
if self._stop_flags[download_id].is_set():
return
# Check pause flag
if self._pause_flags[download_id].is_set():
item.status = DownloadStatus.PAUSED
self._trigger_callback('on_status_change', item)
return
if chunk:
f.write(chunk)
item.downloaded_size += len(chunk)
bytes_since_update += len(chunk)
# Calculate speed and ETA
current_time = time.time()
time_diff = current_time - last_update
if time_diff >= 1.0: # Update every second
item.speed = bytes_since_update / time_diff
if item.speed > 0:
remaining = item.total_size - item.downloaded_size
item.eta = int(remaining / item.speed)
self._trigger_callback('on_progress', item)
last_update = current_time
bytes_since_update = 0
# Download completed
item.status = DownloadStatus.COMPLETED
item.end_time = time.time()
self._trigger_callback('on_complete', item)
except requests.exceptions.RequestException as e:
item.status = DownloadStatus.FAILED
item.error_message = str(e)
self._trigger_callback('on_error', item)
except Exception as e:
item.status = DownloadStatus.FAILED
item.error_message = f"Unexpected error: {e}"
self._trigger_callback('on_error', item)
finally:
# Clean up flags
if download_id in self._stop_flags:
del self._stop_flags[download_id]
if download_id in self._pause_flags:
del self._pause_flags[download_id]
self._trigger_callback('on_status_change', item)
def register_callback(self, event: str, callback: Callable):
"""Register a callback for download events."""
if event in self.callbacks:
self.callbacks[event].append(callback)
def _trigger_callback(self, event: str, item: DownloadItem):
"""Trigger callbacks for an event."""
for callback in self.callbacks.get(event, []):
try:
callback(item)
except Exception as e:
print(f"Callback error: {e}")
def get_all_downloads(self) -> List[DownloadItem]:
"""Get all download items."""
return list(self.downloads.values())
def clear_completed(self):
"""Clear completed downloads from the list."""
to_remove = []
for download_id, item in self.downloads.items():
if item.status in [DownloadStatus.COMPLETED, DownloadStatus.CANCELLED]:
to_remove.append(download_id)
for download_id in to_remove:
del self.downloads[download_id]
class DownloadManagerTab:
"""Download Manager GUI tab."""
def __init__(self, parent: ttk.Frame, download_manager: DownloadManager):
self.parent = parent
self.manager = download_manager
self.item_widgets: Dict[str, Dict[str, Any]] = {}
# Register callbacks
self.manager.register_callback('on_progress', self._on_progress)
self.manager.register_callback('on_status_change', self._on_status_change)
self.manager.register_callback('on_complete', self._on_complete)
self.manager.register_callback('on_error', self._on_error)
self._build_ui()
# Start update timer
self._update_display()
def _build_ui(self):
"""Build the download manager UI."""
# Top controls
controls_frame = ttk.Frame(self.parent)
controls_frame.pack(fill=tk.X, padx=5, pady=5)
ttk.Button(controls_frame, text="Clear Completed",
command=self._clear_completed).pack(side=tk.LEFT, padx=5)
ttk.Button(controls_frame, text="Pause All",
command=self._pause_all).pack(side=tk.LEFT, padx=5)
ttk.Button(controls_frame, text="Resume All",
command=self._resume_all).pack(side=tk.LEFT, padx=5)
# Status summary
self.status_label = ttk.Label(controls_frame, text="Downloads: 0 active, 0 queued")
self.status_label.pack(side=tk.RIGHT, padx=5)
# Downloads list with scrollbar
list_frame = ttk.Frame(self.parent)
list_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Canvas for scrollable content
self.canvas = tk.Canvas(list_frame, bg='white')
scrollbar = ttk.Scrollbar(list_frame, orient="vertical", command=self.canvas.yview)
self.scrollable_frame = ttk.Frame(self.canvas)
self.scrollable_frame.bind(
"<Configure>",
lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
)
self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
self.canvas.configure(yscrollcommand=scrollbar.set)
self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
def add_download_widget(self, item: DownloadItem):
"""Add a download widget to the display."""
if item.id in self.item_widgets:
return
# Create frame for this download
frame = ttk.LabelFrame(self.scrollable_frame, text=f"{item.repo_id} / {item.filename}")
frame.pack(fill=tk.X, padx=5, pady=5)
# Info row
info_frame = ttk.Frame(frame)
info_frame.pack(fill=tk.X, padx=5, pady=5)
status_label = ttk.Label(info_frame, text=f"Status: {item.status.value}")
status_label.pack(side=tk.LEFT, padx=5)
size_label = ttk.Label(info_frame, text="Size: -")
size_label.pack(side=tk.LEFT, padx=5)
speed_label = ttk.Label(info_frame, text="Speed: -")
speed_label.pack(side=tk.LEFT, padx=5)
eta_label = ttk.Label(info_frame, text="ETA: -")
eta_label.pack(side=tk.LEFT, padx=5)
# Progress bar
progress_bar = ttk.Progressbar(frame, length=400, mode='determinate')
progress_bar.pack(fill=tk.X, padx=5, pady=5)
# Error label (hidden by default)
error_label = ttk.Label(frame, text="", foreground="red")
# Control buttons
controls_frame = ttk.Frame(frame)
controls_frame.pack(fill=tk.X, padx=5, pady=5)
pause_btn = ttk.Button(controls_frame, text="Pause", width=10,
command=lambda: self.manager.pause_download(item.id))
pause_btn.pack(side=tk.LEFT, padx=2)
resume_btn = ttk.Button(controls_frame, text="Resume", width=10,
command=lambda: self.manager.resume_download(item.id))
resume_btn.pack(side=tk.LEFT, padx=2)
cancel_btn = ttk.Button(controls_frame, text="Cancel", width=10,
command=lambda: self.manager.cancel_download(item.id))
cancel_btn.pack(side=tk.LEFT, padx=2)
retry_btn = ttk.Button(controls_frame, text="Retry", width=10,
command=lambda: self.manager.retry_download(item.id))
retry_btn.pack(side=tk.LEFT, padx=2)
# Store widgets
self.item_widgets[item.id] = {
'frame': frame,
'status_label': status_label,
'size_label': size_label,
'speed_label': speed_label,
'eta_label': eta_label,
'progress_bar': progress_bar,
'error_label': error_label,
'pause_btn': pause_btn,
'resume_btn': resume_btn,
'cancel_btn': cancel_btn,
'retry_btn': retry_btn
}
# Initial update
self._update_download_widget(item)
def _update_download_widget(self, item: DownloadItem):
"""Update a download widget."""
if item.id not in self.item_widgets:
self.add_download_widget(item)
return
widgets = self.item_widgets[item.id]
# Update status
widgets['status_label'].config(text=f"Status: {item.status.value}")
# Update size
if item.total_size > 0:
size_text = f"Size: {self._format_size(item.downloaded_size)} / {self._format_size(item.total_size)}"
else:
size_text = f"Size: {self._format_size(item.downloaded_size)}"
widgets['size_label'].config(text=size_text)
# Update speed
if item.speed > 0:
widgets['speed_label'].config(text=f"Speed: {self._format_size(item.speed)}/s")
else:
widgets['speed_label'].config(text="Speed: -")
# Update ETA
if item.eta > 0:
eta_text = self._format_time(item.eta)
widgets['eta_label'].config(text=f"ETA: {eta_text}")
else:
widgets['eta_label'].config(text="ETA: -")
# Update progress bar
widgets['progress_bar']['value'] = item.progress
# Update error message
if item.error_message:
widgets['error_label'].config(text=f"Error: {item.error_message}")
widgets['error_label'].pack(fill=tk.X, padx=5, pady=2)
else:
widgets['error_label'].pack_forget()
# Update button states
if item.status == DownloadStatus.DOWNLOADING:
widgets['pause_btn'].config(state="normal")
widgets['resume_btn'].config(state="disabled")
widgets['cancel_btn'].config(state="normal")
widgets['retry_btn'].config(state="disabled")
elif item.status == DownloadStatus.PAUSED:
widgets['pause_btn'].config(state="disabled")
widgets['resume_btn'].config(state="normal")
widgets['cancel_btn'].config(state="normal")
widgets['retry_btn'].config(state="disabled")
elif item.status in [DownloadStatus.FAILED, DownloadStatus.AUTH_REQUIRED]:
widgets['pause_btn'].config(state="disabled")
widgets['resume_btn'].config(state="disabled")
widgets['cancel_btn'].config(state="disabled")
widgets['retry_btn'].config(state="normal")
elif item.status == DownloadStatus.COMPLETED:
widgets['pause_btn'].config(state="disabled")
widgets['resume_btn'].config(state="disabled")
widgets['cancel_btn'].config(state="disabled")
widgets['retry_btn'].config(state="disabled")
else:
widgets['pause_btn'].config(state="disabled")
widgets['resume_btn'].config(state="disabled")
widgets['cancel_btn'].config(state="normal")
widgets['retry_btn'].config(state="disabled")
def _format_size(self, bytes_size: float) -> str:
"""Format bytes to human readable size."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} PB"
def _format_time(self, seconds: int) -> str:
"""Format seconds to human readable time."""
if seconds < 60:
return f"{seconds}s"
elif seconds < 3600:
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}m {secs}s"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{hours}h {minutes}m"
def _on_progress(self, item: DownloadItem):
"""Handle progress update."""
self.parent.after(0, lambda: self._update_download_widget(item))
def _on_status_change(self, item: DownloadItem):
"""Handle status change."""
self.parent.after(0, lambda: self._update_download_widget(item))
def _on_complete(self, item: DownloadItem):
"""Handle download completion."""
self.parent.after(0, lambda: self._update_download_widget(item))
def _on_error(self, item: DownloadItem):
"""Handle download error."""
self.parent.after(0, lambda: self._update_download_widget(item))
# Show error notification for auth issues
if item.status == DownloadStatus.AUTH_REQUIRED:
self.parent.after(0, lambda: messagebox.showerror(
"Authentication Required",
f"Authentication failed for {item.filename}.\n"
f"Please check your API key in Settings."
))
def _clear_completed(self):
"""Clear completed downloads."""
# Remove widgets for completed downloads
for download_id in list(self.item_widgets.keys()):
item = self.manager.downloads.get(download_id)
if item and item.status in [DownloadStatus.COMPLETED, DownloadStatus.CANCELLED]:
self.item_widgets[download_id]['frame'].destroy()
del self.item_widgets[download_id]
# Clear from manager
self.manager.clear_completed()
def _pause_all(self):
"""Pause all active downloads."""
for item in self.manager.get_all_downloads():
if item.status == DownloadStatus.DOWNLOADING:
self.manager.pause_download(item.id)
def _resume_all(self):
"""Resume all paused downloads."""
for item in self.manager.get_all_downloads():
if item.status == DownloadStatus.PAUSED:
self.manager.resume_download(item.id)
def _update_display(self):
"""Update the display periodically."""
# Update status summary
all_downloads = self.manager.get_all_downloads()
active = sum(1 for d in all_downloads if d.status == DownloadStatus.DOWNLOADING)
queued = sum(1 for d in all_downloads if d.status == DownloadStatus.QUEUED)
self.status_label.config(text=f"Downloads: {active} active, {queued} queued")
# Add any new downloads
for item in all_downloads:
if item.id not in self.item_widgets:
self.add_download_item(item)
# Schedule next update
self.parent.after(500, self._update_display)