Files

3094 lines
143 KiB
Python
Raw Permalink Normal View History

2026-03-13 12:56:43 -07:00
import argparse
import sys
import threading
import time
import tkinter as tk
import traceback
import linecache
from tkinter import ttk, filedialog, messagebox
from typing import List, Dict, Any, Optional
import queue
import subprocess
import os
from dotenv import load_dotenv
import requests
from settings_manager import SettingsManager, open_settings_dialog
from grouped_download_manager import GroupedDownloadManager, FileSelectionDialog
from grouped_download_gui import GroupedDownloadManagerTab
from model_library import ModelLibraryTab
from mcp_config import open_mcp_config
from splash_screen import SplashManager
from mcp_tab import MCPTab
from model_converter import ModelConverterTab
from chess_tab import ChessTab
from chat_templates import get_template_manager, ChatTemplateDialog
# from finetune_tab import FineTuneTab # Temporarily disabled for debugging
# Try to import torch for GPU functionality
try:
import torch
except ImportError:
torch = None
# Load from your custom env file
load_dotenv("HUGGINGFACE.env")
# Global execution tracer
_trace_enabled = False
_trace_filters = ['llm_runtime', 'main.py', 'autogptq', 'transformers']
def execution_tracer(frame, event, arg):
"""Trace every line of code execution during model loading"""
if not _trace_enabled:
return
if event == 'line':
filename = frame.f_code.co_filename
lineno = frame.f_lineno
# Only trace files we care about
if any(filter_term in filename for filter_term in _trace_filters):
try:
line = linecache.getline(filename, lineno).strip()
short_filename = filename.split('/')[-1] if '/' in filename else filename.split('\\')[-1]
print(f"[TRACE] {short_filename}:{lineno} | {line}")
except:
pass
return execution_tracer
def start_tracing():
"""Start execution tracing"""
global _trace_enabled
_trace_enabled = True
print("[TRACE] Execution tracing STARTED")
def stop_tracing():
"""Stop execution tracing"""
global _trace_enabled
_trace_enabled = False
print("[TRACE] Execution tracing STOPPED")
# Access the key
hf_key = os.getenv("HF_API_KEY")
# Embedded llama.cpp (self-contained, no external daemons)
try:
from llama_cpp import Llama
except Exception:
print(
"The 'llama-cpp-python' package is required. Please install dependencies with: pip install -r requirements.txt",
file=sys.stderr)
raise
# ChessGPT support
try:
import torch
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM
TRANSFORMERS_AVAILABLE = True
MIN_TRANSFORMERS_VERSION = '4.25.1'
if transformers.__version__ < MIN_TRANSFORMERS_VERSION:
print(
f"Warning: transformers version {transformers.__version__} may not be compatible. Recommended: {MIN_TRANSFORMERS_VERSION}+")
except ImportError:
TRANSFORMERS_AVAILABLE = False
print("Warning: transformers not available. Chess mode will not work. Install with: pip install torch transformers")
_llama_cache = {
"key": None, # (model_path, lora_path, n_ctx, n_gpu_layers, n_threads)
"llm": None,
}
# ChessGPT model cache
_chessgpt_cache = {
"tokenizer": None,
"model": None,
"loaded": False
}
# Serialize llama native calls when sharing one instance across threads
_LLAMA_LOCK = threading.RLock()
# Default system prompt and anti-echo stop tokens
DEFAULT_SYSTEM_PROMPT = (
"You are a helpful, concise assistant. Answer the user's question directly. "
"Do not repeat or paraphrase the user's prompt; provide only your answer."
)
# Conservative defaults to reduce echo
DEFAULT_TEMPERATURE = 0.2
DEFAULT_MAX_TOKENS = 256
# Add common model chat-template markers to stop tokens to avoid template echoes like [INST] <<SYS>> ...
STOP_TOKENS = [
"\nUser:", "\nYou:", "User:", "You:",
"<|user|>", "<|assistant|>", "<|eot_id|>", "<|eom_id|>",
"[INST]", "[/INST]", "<<SYS>>", "</SYS>", "<</SYS>>", "</SYS>>"
]
def _strip_echo_from_response(text: str, last_user_prompt: Optional[str]) -> str:
try:
s = text or ""
# Remove the last user prompt if the model echoed it at the start
if last_user_prompt:
lu = (last_user_prompt or "").strip()
if lu and s.strip().startswith(lu):
# Cut the first occurrence
idx = s.find(lu)
if idx == 0:
s = s[len(lu):]
# Remove common stop tokens/templates that may leak
for tok in STOP_TOKENS:
s = s.replace(tok, " ")
# Clean up repeated whitespace
s = " ".join(s.split())
return s.strip()
except Exception:
return text or ""
# Provide safe fallbacks so static analysis / early references don't error.
# If real implementations are defined later they will overwrite these.
try:
_extract_gguf_metadata # type: ignore
except NameError:
def _extract_gguf_metadata(path: str, key: str) -> Optional[str]:
# Non-invasive fallback: best-effort no-op that returns None
return None
try:
_extract_gguf_int_metadata # type: ignore
except NameError:
def _extract_gguf_int_metadata(path: str, key: str) -> Optional[int]:
# Non-invasive fallback: best-effort no-op that returns None
return None
def _is_gguf_model(path: str) -> bool:
if not path:
return False
p = path.strip().strip('"')
return os.path.isfile(p) and p.lower().endswith('.gguf')
def _is_valid_model(path: str) -> bool:
"""Check if the given path is a valid model file of any supported format."""
if not path:
return False
p = path.strip().strip('"')
# Check if it might be a HuggingFace repo ID
if "/" in p and not os.path.exists(p):
return True # Let the loader validate it
if not os.path.exists(p):
return False
# Check for supported file extensions
p_lower = p.lower()
# GGUF format (llama.cpp)
if p_lower.endswith('.gguf'):
return True
# SafeTensors format (HuggingFace)
if p_lower.endswith('.safetensors'):
return True
# PyTorch format
if p_lower.endswith('.bin') or p_lower.endswith('.pt') or p_lower.endswith('.pth'):
return True
# GPTQ quantized models
if 'gptq' in p_lower and (p_lower.endswith('.safetensors') or p_lower.endswith('.bin')):
return True
# AWQ quantized models
if 'awq' in p_lower and (p_lower.endswith('.safetensors') or p_lower.endswith('.bin')):
return True
# EXL2 format
if p_lower.endswith('.exl2'):
return True
# Check if it's a directory with model files
if os.path.isdir(p):
# Check for standard HuggingFace structure
config_path = os.path.join(p, "config.json")
if os.path.exists(config_path):
return True
# Check for GPTQ models
if any(f for f in os.listdir(p) if 'gptq' in f.lower() and (f.endswith('.safetensors') or f.endswith('.bin'))):
return True
# Check for AWQ models
if any(f for f in os.listdir(p) if 'awq' in f.lower() and (f.endswith('.safetensors') or f.endswith('.bin'))):
return True
# Check for any model files
if any(f for f in os.listdir(p) if f.endswith(('.gguf', '.safetensors', '.bin', '.pt', '.pth', '.exl2'))):
return True
return False
def _get_llama(model_path: str, n_ctx: int = 4096, n_gpu_layers: int = 0, lora_path: Optional[str] = None,
n_threads: Optional[int] = None) -> "Llama":
mp = model_path.strip().strip('"')
if n_threads is None or n_threads <= 0:
n_threads = max(1, os.cpu_count() or 1)
# include file modification time so cache invalidates when model file changes
try:
mtime = os.path.getmtime(mp)
except Exception:
mtime = None
# Normalize inputs to ints for cache key
try:
n_ctx_int = int(n_ctx) if n_ctx is not None else 0
except Exception:
n_ctx_int = 0
try:
n_gpu_int = int(n_gpu_layers) if n_gpu_layers is not None else 0
except Exception:
n_gpu_int = 0
# include n_ctx and n_gpu_layers in key so different contexts create separate instances
key = (mp, lora_path, n_ctx_int, n_gpu_int, int(n_threads), mtime)
if _llama_cache["llm"] is not None and _llama_cache["key"] == key:
return _llama_cache["llm"]
# Pass the requested context size and gpu layers to Llama so it uses the correct capacity.
# If the caller passes 0 for n_ctx, the underlying library will use the model's trained n_ctx.
print(f"[GGUF_DEBUG] Loading GGUF model with n_ctx={n_ctx_int}, n_gpu_layers={n_gpu_int}")
llm = Llama(
model_path=mp,
n_ctx=n_ctx_int,
n_gpu_layers=n_gpu_int,
lora_path=lora_path,
n_threads=n_threads,
verbose=False,
)
print(f"[GGUF_DEBUG] GGUF model loaded successfully with GPU layers: {n_gpu_int}")
_llama_cache["key"] = key
_llama_cache["llm"] = llm
return llm
def _get_chessgpt():
"""Get the already-loaded ChessGPT GGUF model."""
# The ChessGPT GGUF is already loaded as the main model
# We don't need to download anything - just return a flag
# The actual model is accessed through _get_llama()
return None, None # Return None since we're using the GGUF version
def _run_chessgpt_prompt(prompt: str, model_path: str = None, on_chunk: Optional[callable] = None,
max_tokens: int = 128) -> str:
"""Run a prompt through ChessGPT GGUF model using the ChessGPT conversation format."""
try:
# Format prompt for ChessGPT conversation style
# Add explicit instruction to return only the move
chess_prompt = f"A friendly, helpful chat between some humans.<|endoftext|>Human 0: {prompt}\nRespond with ONLY the chess move in UCI format (like e2e4).<|endoftext|>Human 1:"
# Use the already-loaded GGUF model with proper context size
# ChessGPT was trained on 2048 context
llm = _get_llama(model_path, n_ctx=2048, n_gpu_layers=32)
# Generate response using the GGUF model with lower temperature for more deterministic moves
with _LLAMA_LOCK:
response = llm(
chess_prompt,
max_tokens=20, # Reduced - we only need a move
temperature=0.3, # Lower temperature for more deterministic chess moves
top_p=0.9,
top_k=40,
echo=False,
stop=["<|endoftext|>", "Human 0:", "Human 1:", "\n"]
)
output_str = response['choices'][0]['text'].strip()
print(f"[CHESS DEBUG] ChessGPT raw response: '{output_str}'")
# Stream output if callback provided
if on_chunk:
for char in output_str:
on_chunk(char)
return output_str
except Exception as e:
print(f"ChessGPT GGUF generation failed: {e}")
raise
def run_prompt(model_path: str, prompt: str, stream: bool, n_ctx: int = 4096, n_gpu_layers: int = 0,
lora_path: Optional[str] = None, on_chunk: Optional[callable] = None, n_threads: Optional[int] = None,
max_tokens: Optional[int] = None, history: Optional[List[Dict[str, Any]]] = None,
cancel_event: Optional[threading.Event] = None, chess_mode: bool = False) -> str:
# Use ChessGPT GGUF if chess mode is enabled
if chess_mode:
try:
return _run_chessgpt_prompt(prompt, model_path=model_path, on_chunk=on_chunk, max_tokens=max_tokens or 128)
except Exception as e:
print(f"ChessGPT GGUF failed, falling back to regular model: {e}")
# Fall through to regular model with limited context
n_ctx = min(n_ctx, 2048) # Limit context for chess to avoid overflow
llm = _get_llama(model_path, n_ctx=n_ctx, n_gpu_layers=n_gpu_layers, lora_path=lora_path, n_threads=n_threads)
# Build messages from prior history plus the new user message
messages: List[Dict[str, Any]] = []
if not history or (history and history[0].get("role") != "system"):
messages.append({"role": "system", "content": DEFAULT_SYSTEM_PROMPT})
if history:
messages.extend(history)
messages.append({"role": "user", "content": prompt})
max_new_tokens = DEFAULT_MAX_TOKENS if (max_tokens is None or int(max_tokens) <= 0) else int(max_tokens)
out_parts: List[str] = []
if stream:
with _LLAMA_LOCK:
for part in llm.create_chat_completion(messages=messages, stream=True, stop=STOP_TOKENS,
temperature=DEFAULT_TEMPERATURE, repeat_penalty=1.2,
max_tokens=max_new_tokens):
if cancel_event is not None and cancel_event.is_set():
break
try:
chunk = part["choices"][0]["delta"].get("content", "")
except Exception:
chunk = part.get("choices", [{}])[0].get("message", {}).get("content", "")
if chunk:
out_parts.append(chunk)
if on_chunk:
on_chunk(chunk)
if cancel_event is not None and cancel_event.is_set():
break
result = "".join(out_parts)
# Try to remove prompt echo/templates using the last user prompt
last_user = prompt
return _strip_echo_from_response(result, last_user)
else:
with _LLAMA_LOCK:
res = llm.create_chat_completion(messages=messages, stop=STOP_TOKENS, temperature=DEFAULT_TEMPERATURE,
repeat_penalty=1.2, max_tokens=max_new_tokens)
raw = res.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
# Remove common echoes: use the prompt as the last user message
return _strip_echo_from_response(raw, prompt)
def chat_stream(model_path: str, messages: List[Dict[str, Any]], n_ctx: int = 4096, n_gpu_layers: int = 0,
lora_path: Optional[str] = None, on_chunk: Optional[callable] = None, n_threads: Optional[int] = None,
max_tokens: Optional[int] = None, cancel_event: Optional[threading.Event] = None,
chess_mode: bool = False, chat_template: Optional[str] = None, session_id: Optional[str] = None) -> str:
# Use ChessGPT if chess mode is enabled
if chess_mode:
try:
# Extract the last user message for ChessGPT
last_user_message = ""
for msg in reversed(messages):
if msg.get("role") == "user":
last_user_message = msg.get("content", "")
break
if last_user_message:
return _run_chessgpt_prompt(last_user_message, on_chunk=on_chunk, max_tokens=max_tokens or 128)
except Exception as e:
print(f"ChessGPT failed, falling back to regular model: {e}")
# Fall through to regular model
# Ensure a system message exists at the start
if not messages or messages[0].get("role") != "system":
messages = [{"role": "system", "content": DEFAULT_SYSTEM_PROMPT}] + list(messages)
collected: List[str] = []
max_new_tokens = DEFAULT_MAX_TOKENS if (max_tokens is None or int(max_tokens) <= 0) else int(max_tokens)
# Use appropriate loader based on model type
if _is_gguf_model(model_path):
# Use existing GGUF loader
llm = _get_llama(model_path, n_ctx=n_ctx, n_gpu_layers=n_gpu_layers, lora_path=lora_path, n_threads=n_threads)
with _LLAMA_LOCK:
for part in llm.create_chat_completion(messages=messages, stream=True, stop=STOP_TOKENS,
temperature=DEFAULT_TEMPERATURE, repeat_penalty=1.2,
max_tokens=max_new_tokens):
if cancel_event is not None and cancel_event.is_set():
break
try:
delta = part["choices"][0]["delta"].get("content", "")
except Exception:
delta = part.get("choices", [{}])[0].get("message", {}).get("content", "")
if delta:
collected.append(delta)
if on_chunk:
on_chunk(delta)
if cancel_event is not None and cancel_event.is_set():
break
else:
# Use unified loader for other model types - reuse cached model
from llm_runtime import GenerateConfig
# Check if model is already loaded in cache (from _on_load_model)
if not hasattr(chat_stream, '_unified_model_cache') or chat_stream._unified_model_cache is None:
raise RuntimeError("Model not loaded. Please load a model first using the 'Load Model' button.")
llm = chat_stream._unified_model_cache
print("DEBUG: Using cached unified model")
# Convert messages to prompt format using chat template
if chat_template and chat_template != "None":
# Use the selected chat template
from chat_templates import get_template_manager
template_manager = get_template_manager()
prompt = template_manager.format_conversation(chat_template, messages, add_generation_prompt=True)
print(f"DEBUG: Using chat template '{chat_template}'")
else:
# Fallback to simple format for backward compatibility
prompt_parts = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
prompt_parts.append(f"System: {content}")
elif role == "user":
prompt_parts.append(f"User: {content}")
elif role == "assistant":
prompt_parts.append(f"Assistant: {content}")
prompt = "\n".join(prompt_parts) + "\nAssistant:"
print("DEBUG: Using fallback User:/Assistant: format")
print(f"DEBUG: Generated prompt: '{prompt}'")
# Get appropriate stop tokens from template or use defaults
if chat_template and chat_template != "None":
from chat_templates import get_template_manager
template_manager = get_template_manager()
template_stop_tokens = template_manager.get_stop_tokens(chat_template)
stop_tokens = template_stop_tokens if template_stop_tokens else STOP_TOKENS
else:
stop_tokens = STOP_TOKENS
# Generate with unified API and KV caching
cfg = GenerateConfig(max_tokens=max_new_tokens, temperature=DEFAULT_TEMPERATURE, top_p=0.9, stop=stop_tokens)
# Use instance-specific session ID to maintain conversation continuity while preventing cross-chat contamination
if session_id is None:
session_id = "default" # Fallback for CLI usage
# Get session info before generation
if hasattr(llm, 'get_session_info'):
session_info = llm.get_session_info(session_id)
print(f"[KV_CACHE] Pre-generation session info: {session_info}")
print(f"[CHAT] Starting streaming generation with KV caching enabled")
generation_start = time.time()
token_count = 0
for delta in llm.stream(prompt, cfg=cfg, session_id=session_id):
if cancel_event is not None and cancel_event.is_set():
break
if delta:
collected.append(delta)
token_count += 1
if on_chunk:
on_chunk(delta)
if cancel_event is not None and cancel_event.is_set():
break
generation_time = time.time() - generation_start
print(f"[KV_CACHE] Generated {token_count} tokens in {generation_time:.2f}s ({token_count/generation_time:.1f} tokens/s)")
# Get session info after generation
if hasattr(llm, 'get_session_info'):
session_info = llm.get_session_info(session_id)
print(f"[KV_CACHE] Post-generation session info: {session_info}")
# Get KV cache statistics if available
if hasattr(llm, 'get_kv_cache_stats'):
cache_stats = llm.get_kv_cache_stats()
print(f"[KV_CACHE] Cache statistics: {cache_stats}")
result = "".join(collected)
# Find the last user message in provided messages and attempt to strip echoes
last_user = None
try:
for msg in reversed(messages):
if msg.get("role") == "user":
last_user = msg.get("content", "")
break
except Exception:
last_user = None
return _strip_echo_from_response(result, last_user)
# ---------------- GUI (Embedded only) -----------------
class EmbeddedGUI:
def __init__(self, root: tk.Tk):
print("[APP_DEBUG] EmbeddedGUI.__init__() started")
self.root = root
self.root.title("DarkHal 2.0 - AI Model Management Platform")
# Set window icon
try:
icon_path = os.path.join(os.path.dirname(__file__), "assets", "Halico.ico")
if os.path.exists(icon_path):
self.root.iconbitmap(icon_path)
except Exception:
pass
# Set minimum window size
self.root.minsize(1000, 700)
# Initialize settings manager
self.settings_manager = SettingsManager()
# Initialize grouped download manager
max_concurrent = self.settings_manager.get('download_settings.max_concurrent_downloads', 3)
self.download_manager = GroupedDownloadManager(max_concurrent=max_concurrent)
# Initialize agent mode attributes
self.agent_enabled = False
self.dhal_agent = None
# Create menu bar
self._create_menu_bar()
# Load settings and initialize variables
self.model_var = tk.StringVar(value=self.settings_manager.get('paths.last_model_path', ''))
self.stream_var = tk.BooleanVar(value=self.settings_manager.get('model_settings.stream_by_default', True))
self.n_ctx_var = tk.IntVar(value=self.settings_manager.get('model_settings.default_n_ctx', 4096))
# Set default GPU layers - use higher default if GPU is available and auto-config is enabled
default_gpu_layers = self.settings_manager.get('model_settings.default_n_gpu_layers', 0)
if default_gpu_layers == 0 and self.settings_manager.get('model_settings.auto_gpu', True):
# If auto-GPU is enabled and no custom default is set, use a reasonable default for GPU systems
try:
import torch
if torch.cuda.is_available():
default_gpu_layers = 16 # Reasonable default for most 7B models
except:
pass
self.n_gpu_layers_var = tk.IntVar(value=default_gpu_layers)
self.lora_var = tk.StringVar(value=self.settings_manager.get('paths.last_lora_path', ''))
self.model_status_var = tk.StringVar(value="[not loaded]")
self.max_tokens_var = tk.IntVar(
value=self.settings_manager.get('model_settings.default_max_tokens', DEFAULT_MAX_TOKENS))
self.chess_mode_var = tk.BooleanVar(value=self.settings_manager.get('model_settings.chess_mode', False))
self.agent_mode_var = tk.BooleanVar(value=False) # Agent mode always starts disabled for safety
# Advanced loading options
self.quantization_var = tk.StringVar(value=self.settings_manager.get('model_settings.quantization', 'none'))
self.device_strategy_var = tk.StringVar(value=self.settings_manager.get('model_settings.device_strategy', 'auto'))
self.chat_template_var = tk.StringVar(value="None")
self.gpu_memory_limit_var = tk.DoubleVar(value=self.settings_manager.get('model_settings.gpu_memory_limit', 6.0))
# Sampling parameters
self.temperature_var = tk.DoubleVar(value=self.settings_manager.get('model_settings.temperature', 0.7))
self.top_p_var = tk.DoubleVar(value=self.settings_manager.get('model_settings.top_p', 0.9))
self.repetition_penalty_var = tk.DoubleVar(value=self.settings_manager.get('model_settings.repetition_penalty', 1.1))
self.no_repeat_ngram_size_var = tk.IntVar(value=self.settings_manager.get('model_settings.no_repeat_ngram_size', 0))
self.min_p_var = tk.DoubleVar(value=self.settings_manager.get('model_settings.min_p', 0.0))
self.typical_p_var = tk.DoubleVar(value=self.settings_manager.get('model_settings.typical_p', 1.0))
# UI queue for thread-safe widget updates from worker threads
self._ui_queue: "queue.SimpleQueue[callable]" = queue.SimpleQueue()
self.root.after(30, self._drain_ui_queue)
# Reset loaded status when key settings change
try:
self.model_var.trace_add("write", lambda *a: self._mark_model_unloaded())
self.n_ctx_var.trace_add("write", lambda *a: self._mark_model_unloaded())
self.n_gpu_layers_var.trace_add("write", lambda *a: self._mark_model_unloaded())
self.lora_var.trace_add("write", lambda *a: self._mark_model_unloaded())
self.chess_mode_var.trace_add("write", lambda *a: self._mark_model_unloaded())
# Advanced loading options - also mark model as unloaded and save settings
self.quantization_var.trace_add("write", lambda *a: self._on_advanced_setting_changed())
self.device_strategy_var.trace_add("write", lambda *a: self._on_advanced_setting_changed())
self.gpu_memory_limit_var.trace_add("write", lambda *a: self._on_advanced_setting_changed())
# Sampling parameters - save settings when changed
self.temperature_var.trace_add("write", lambda *a: self._on_sampling_setting_changed())
self.top_p_var.trace_add("write", lambda *a: self._on_sampling_setting_changed())
self.repetition_penalty_var.trace_add("write", lambda *a: self._on_sampling_setting_changed())
self.no_repeat_ngram_size_var.trace_add("write", lambda *a: self._on_sampling_setting_changed())
self.min_p_var.trace_add("write", lambda *a: self._on_sampling_setting_changed())
self.typical_p_var.trace_add("write", lambda *a: self._on_sampling_setting_changed())
except Exception:
try:
self.model_var.trace("w", lambda *a: self._mark_model_unloaded())
self.n_ctx_var.trace("w", lambda *a: self._mark_model_unloaded())
self.n_gpu_layers_var.trace("w", lambda *a: self._mark_model_unloaded())
self.lora_var.trace("w", lambda *a: self._mark_model_unloaded())
self.chess_mode_var.trace("w", lambda *a: self._mark_model_unloaded())
except Exception:
pass
# Local models support with settings
self.models_dir_var = tk.StringVar(value=self.settings_manager.get('paths.models_directory', './models'))
self.local_model_var = tk.StringVar()
self._local_model_paths: Dict[str, str] = {}
nb = ttk.Notebook(root)
nb.pack(fill=tk.BOTH, expand=True)
# Single Run tab
self.run_frame = ttk.Frame(nb)
nb.add(self.run_frame, text="Run")
self._build_run_tab(self.run_frame)
# Model Library tab
self.library_frame = ttk.Frame(nb)
nb.add(self.library_frame, text="Model Library")
self.library_tab = ModelLibraryTab(self.library_frame, self.settings_manager)
# Model Converter tab
self.converter_frame = ttk.Frame(nb)
nb.add(self.converter_frame, text="Model Converter")
self.converter_tab = ModelConverterTab(self.converter_frame, self.settings_manager)
# Chess tab
self.chess_frame = ttk.Frame(nb)
nb.add(self.chess_frame, text="Chess")
self.chess_tab = ChessTab(self.chess_frame, self.settings_manager)
# Fine Tune tab - temporarily disabled for debugging
# self.finetune_frame = ttk.Frame(nb)
# nb.add(self.finetune_frame, text="Fine Tune")
# self.finetune_tab = FineTuneTab(self.finetune_frame, self.settings_manager)
# Initialize local models list if a folder is preset
if self.models_dir_var.get():
self._refresh_local_models()
self.chat_history: List[Dict[str, Any]] = []
self._current_cancel: Optional[threading.Event] = None
# Initialize unique session ID for KV cache isolation
import uuid
self._session_id = f"chat_session_{uuid.uuid4().hex[:8]}"
# Initialize chat template manager
self.template_manager = get_template_manager()
self._refresh_chat_templates()
# Apply window size from settings
width = self.settings_manager.get('ui_preferences.window_width', 1200)
height = self.settings_manager.get('ui_preferences.window_height', 700)
self.root.geometry(f"{width}x{height}")
# Save window size on close
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
def _create_menu_bar(self):
"""Create the application menu bar."""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# File menu
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="File", menu=file_menu)
file_menu.add_command(label="Open Model...", command=self._browse_gguf)
file_menu.add_separator()
file_menu.add_command(label="Settings...", command=self._open_settings)
file_menu.add_separator()
file_menu.add_command(label="Exit", command=self._on_closing)
# Edit menu
edit_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Edit", menu=edit_menu)
edit_menu.add_command(label="Clear Output", command=lambda: self.output_text.delete('1.0', tk.END))
edit_menu.add_command(label="Clear Chat History", command=self._clear_chat_history)
edit_menu.add_command(label="Clear KV Cache", command=self._clear_kv_cache)
# Tools menu
tools_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Tools", menu=tools_menu)
tools_menu.add_command(label="HuggingFace Downloader", command=self._open_hf_downloader)
tools_menu.add_command(label="Downloads Manager", command=self._open_downloads_manager)
tools_menu.add_command(label="MCP Server", command=self._open_mcp_server)
tools_menu.add_command(label="Resource Monitor", command=self._open_resource_monitor)
# Debug submenu
debug_menu = tk.Menu(tools_menu, tearoff=0)
tools_menu.add_separator()
tools_menu.add_cascade(label="Debug", menu=debug_menu)
debug_menu.add_command(label="Inspect Model Devices", command=self._inspect_model_devices)
tools_menu.add_command(label="Refresh Local Models", command=self._refresh_local_models)
tools_menu.add_separator()
tools_menu.add_command(label="Clear Completed Downloads", command=self._clear_completed_downloads)
tools_menu.add_command(label="MCP Server Config", command=self._open_mcp_config)
# Agents menu
agents_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Agents", menu=agents_menu)
# DarkHal submenu
darkhal_menu = tk.Menu(agents_menu, tearoff=0)
agents_menu.add_cascade(label="DarkHal", menu=darkhal_menu)
darkhal_menu.add_command(label="Dhal", command=self._open_dhal_agent)
darkhal_menu.add_command(label="Agent Dev Kit (ADK)", command=self._open_adk)
# Metasploit option
agents_menu.add_command(label="Metasploit", command=self._open_metasploit)
# Help menu
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Help", menu=help_menu)
help_menu.add_command(label="About", command=self._show_about)
def _open_settings(self):
"""Open the settings dialog."""
open_settings_dialog(self.root, self.settings_manager)
# Reload settings after dialog closes
self._reload_settings()
def _reload_settings(self):
"""Reload settings after they've been changed."""
# Update variables from settings
self.n_ctx_var.set(self.settings_manager.get('model_settings.default_n_ctx', 4096))
self.n_gpu_layers_var.set(self.settings_manager.get('model_settings.default_n_gpu_layers', 0))
self.max_tokens_var.set(self.settings_manager.get('model_settings.default_max_tokens', DEFAULT_MAX_TOKENS))
self.stream_var.set(self.settings_manager.get('model_settings.stream_by_default', True))
# Reload HF API if token settings changed
if hasattr(self, 'hf_api'):
try:
from hf_downloader import HuggingFaceAPI
api_key = None
organization = None
if not self.settings_manager.get('api.use_env_token', True):
api_key = self.settings_manager.get('api.huggingface_token', '').strip()
if self.settings_manager.get('api.use_organization', False):
organization = self.settings_manager.get('api.organization', '').strip()
self.hf_api = HuggingFaceAPI(api_key=api_key, organization=organization)
except Exception:
pass
def _clear_chat_history(self):
"""Clear the chat history and invalidate KV cache session."""
self.chat_history.clear()
self.output_text.delete('1.0', tk.END)
# Clear KV cache for the current session to prevent contamination
try:
if hasattr(chat_stream, '_unified_model_cache') and chat_stream._unified_model_cache:
llm = chat_stream._unified_model_cache
if hasattr(llm, 'clear_session_cache'):
llm.clear_session_cache(self._session_id)
print(f"[KV_CACHE] Cleared cache for session {self._session_id}")
except Exception as e:
print(f"[KV_CACHE] Error clearing session cache: {e}")
# Generate new session ID for fresh conversation
import uuid
self._session_id = f"chat_session_{uuid.uuid4().hex[:8]}"
self._append_output("[Chat history cleared - KV cache reset]\n")
def _refresh_chat_templates(self):
"""Refresh the chat template dropdown with available templates"""
try:
template_names = ["None"] + self.template_manager.get_template_names()
self.chat_template_combo['values'] = template_names
# Set to "None" if current selection is not available
current = self.chat_template_var.get()
if current not in template_names:
self.chat_template_var.set("None")
except Exception as e:
print(f"Error refreshing chat templates: {e}")
def _load_chat_template(self):
"""Load chat templates from file"""
try:
filename = filedialog.askopenfilename(
title="Load Chat Templates",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
parent=self.root
)
if filename:
# Load templates from the selected file
import json
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
loaded_count = 0
for name, template_data in data.items():
try:
from chat_templates import ChatTemplate
template = ChatTemplate(**template_data)
if self.template_manager.add_template(template):
loaded_count += 1
else:
# Template exists, ask if user wants to update
if messagebox.askyesno("Template Exists",
f"Template '{name}' already exists. Update it?"):
self.template_manager.update_template(template)
loaded_count += 1
except Exception as e:
print(f"Error loading template '{name}': {e}")
self._refresh_chat_templates()
messagebox.showinfo("Templates Loaded", f"Successfully loaded {loaded_count} template(s)")
except Exception as e:
messagebox.showerror("Error", f"Error loading templates: {e}")
def _add_chat_template(self):
"""Add a new chat template"""
try:
dialog = ChatTemplateDialog(self.root)
self.root.wait_window(dialog.dialog)
if dialog.result:
template = dialog.result
if self.template_manager.add_template(template):
self._refresh_chat_templates()
self.chat_template_var.set(template.name)
messagebox.showinfo("Success", f"Template '{template.name}' added successfully")
else:
# Template exists, ask if user wants to update
if messagebox.askyesno("Template Exists",
f"Template '{template.name}' already exists. Update it?"):
self.template_manager.update_template(template)
self._refresh_chat_templates()
messagebox.showinfo("Success", f"Template '{template.name}' updated successfully")
except Exception as e:
messagebox.showerror("Error", f"Error adding template: {e}")
def _clear_kv_cache(self):
"""Clear the KV cache for the current chat session."""
try:
# Clear cache for unified models (HuggingFace)
if hasattr(chat_stream, '_unified_model_cache') and chat_stream._unified_model_cache:
llm = chat_stream._unified_model_cache
if hasattr(llm, 'clear_session_cache'):
llm.clear_session_cache(self._session_id)
self._append_output(f"[KV Cache cleared for session {self._session_id}]\n")
# Show cache statistics after clearing
if hasattr(llm, 'get_kv_cache_stats'):
cache_stats = llm.get_kv_cache_stats()
self._append_output(f"[Cache Stats] Active sessions: {cache_stats.get('active_sessions', 0)}\n")
else:
self._append_output("[KV Cache clear not supported for this model]\n")
else:
self._append_output("[No cached model found - KV cache already clear]\n")
except Exception as e:
self._append_output(f"[Error clearing KV cache: {e}]\n")
def _open_hf_downloader(self):
"""Open standalone HuggingFace downloader window."""
try:
from hf_downloader import HuggingFaceDownloaderGUI
downloader_window = tk.Toplevel(self.root)
HuggingFaceDownloaderGUI(downloader_window)
except Exception as e:
messagebox.showerror("Error", f"Failed to open HuggingFace downloader: {e}")
def _show_about(self):
"""Show about dialog."""
about_text = (
"LLM_Train - Advanced Local Model Manager\n\n"
"A comprehensive local GGUF model runner with cloud integration\n\n"
"Features:\n"
"• Run local GGUF models with optimized performance\n"
"• Search and download from HuggingFace Hub\n"
"• Advanced download manager with pause/resume/retry\n"
"• Model Library with smart scanning and indexing\n"
"• Multi-model MCP server for Claude integration\n"
"• Organization support for HuggingFace teams\n"
"• Chat and single prompt modes\n"
"• Customizable settings and preferences\n"
"• Optimized USB/SSD write speeds\n\n"
"Powered by llama-cpp-python and MCP protocol"
)
messagebox.showinfo("About", about_text)
def _clear_completed_downloads(self):
"""Clear completed downloads from download manager."""
if hasattr(self, 'download_tab'):
self.download_tab._clear_completed()
def _open_mcp_config(self):
"""Open MCP server configuration."""
try:
open_mcp_config(self.root)
except Exception as e:
messagebox.showerror("Error", f"Failed to open MCP configuration: {e}")
def _open_resource_monitor(self):
"""Open Resource Monitor in a new window."""
resource_window = tk.Toplevel(self.root)
resource_window.title("Resource Monitor")
resource_window.geometry("800x600")
resource_window.transient(self.root)
# Build resource monitor content in the new window
self._build_resource_tab(resource_window)
def _open_downloads_manager(self):
"""Open Downloads Manager in a new window."""
downloads_window = tk.Toplevel(self.root)
downloads_window.title("Downloads Manager")
downloads_window.geometry("900x700")
downloads_window.transient(self.root)
# Create download manager in the new window
from download_manager_tab import GroupedDownloadManagerTab
GroupedDownloadManagerTab(downloads_window, self.download_manager)
def _open_mcp_server(self):
"""Open MCP Server in a new window."""
mcp_window = tk.Toplevel(self.root)
mcp_window.title("MCP Server")
mcp_window.geometry("800x600")
mcp_window.transient(self.root)
# Create MCP server tab in the new window
MCPTab(mcp_window, self.settings_manager)
def _open_dhal_agent(self):
"""Open Dhal Dark Agent in a new window."""
dhal_window = tk.Toplevel(self.root)
dhal_window.title("Dhal - Dark Agent")
dhal_window.geometry("1000x700")
dhal_window.transient(self.root)
# Create Dark Agent tab in the new window with proper main_app reference
from dark_agent import DarkAgentTab
dark_agent_tab = DarkAgentTab(dhal_window, self.settings_manager, self)
# Ensure the main_app reference is properly set
dark_agent_tab.main_app = self
def _open_adk(self):
"""Open Agent Development Kit in a new window."""
adk_window = tk.Toplevel(self.root)
adk_window.title("Agent Development Kit (ADK)")
adk_window.geometry("900x600")
adk_window.transient(self.root)
# Create ADK interface
adk_frame = ttk.Frame(adk_window)
adk_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=20)
ttk.Label(adk_frame, text="Agent Development Kit", font=("Arial", 16, "bold")).pack(pady=10)
ttk.Label(adk_frame, text="Advanced tools for creating and managing AI agents").pack(pady=5)
ttk.Label(adk_frame, text="Coming Soon...", font=("Arial", 12, "italic")).pack(pady=20)
def _open_metasploit(self):
"""Open Metasploit interface in a new window."""
metasploit_window = tk.Toplevel(self.root)
metasploit_window.title("Metasploit")
metasploit_window.geometry("1000x700")
metasploit_window.transient(self.root)
# Create Metasploit interface
from pentestgpt import PentestGPTTab
PentestGPTTab(metasploit_window, self.settings_manager, self)
def _inspect_model_devices(self):
"""Open device inspection dialog"""
from tools.inspect_devices import inspect_loaded_model, inspect_model_devices
# Check if we have a loaded model
current_model_path = self.model_var.get()
if not current_model_path or current_model_path == "Select a model...":
tk.messagebox.showwarning("No Model", "Please load a model first.")
return
# Create inspection window
inspect_window = tk.Toplevel(self.root)
inspect_window.title("Model Device Inspection")
inspect_window.geometry("800x600")
inspect_window.configure(bg='#2b2b2b')
# Create text widget with scrollbar
frame = tk.Frame(inspect_window, bg='#2b2b2b')
frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
text_widget = tk.Text(frame, bg='#1e1e1e', fg='#ffffff', font=('Consolas', 10), wrap=tk.WORD)
scrollbar = tk.Scrollbar(frame, orient=tk.VERTICAL, command=text_widget.yview)
text_widget.configure(yscrollcommand=scrollbar.set)
text_widget.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Add inspection results
text_widget.insert(tk.END, "Inspecting model devices...\n\n")
text_widget.update()
try:
# Check if we have a loaded unified model
if hasattr(chat_stream, '_unified_model_cache') and chat_stream._unified_model_cache:
result = inspect_loaded_model(chat_stream._unified_model_cache.model)
else:
# Fallback: load and inspect the model path
result = inspect_model_devices(current_model_path)
text_widget.delete('1.0', tk.END)
text_widget.insert(tk.END, result)
except Exception as e:
text_widget.delete('1.0', tk.END)
text_widget.insert(tk.END, f"Error inspecting model:\n{str(e)}")
text_widget.config(state=tk.DISABLED)
def _on_closing(self):
"""Handle window closing event."""
# Save current window size
self.settings_manager.set('ui_preferences.window_width', self.root.winfo_width())
self.settings_manager.set('ui_preferences.window_height', self.root.winfo_height())
# Save other current values
if self.model_var.get():
self.settings_manager.set('paths.last_model_path', self.model_var.get())
if self.lora_var.get():
self.settings_manager.set('paths.last_lora_path', self.lora_var.get())
self.settings_manager.save_settings()
self.root.destroy()
def _build_run_tab(self, frame: ttk.Frame):
# Create notebook for Run sub-tabs
run_notebook = ttk.Notebook(frame)
run_notebook.pack(fill=tk.BOTH, expand=True)
# Chat sub-tab
self.chat_frame = ttk.Frame(run_notebook)
run_notebook.add(self.chat_frame, text="Chat")
self._build_chat_subtab(self.chat_frame)
# Model Settings sub-tab
self.model_settings_frame = ttk.Frame(run_notebook)
run_notebook.add(self.model_settings_frame, text="Model Settings")
self._build_model_settings_tab(self.model_settings_frame)
def _build_chat_subtab(self, frame: ttk.Frame):
# Model loading section
model_frame = ttk.LabelFrame(frame, text="Model Selection", padding="10")
model_frame.pack(fill=tk.X, padx=8, pady=8)
ttk.Label(model_frame, text="Model:").grid(row=0, column=0, sticky=tk.W)
self.model_entry = ttk.Entry(model_frame, textvariable=self.model_var, width=50)
self.model_entry.grid(row=0, column=1, sticky=tk.EW, padx=5)
self.browse_model_btn = ttk.Button(model_frame, text="Browse Model", command=self._browse_gguf)
self.browse_model_btn.grid(row=0, column=2, padx=2)
self.browse_folder_btn = ttk.Button(model_frame, text="Browse Folder", command=self._browse_folder)
self.browse_folder_btn.grid(row=0, column=3, padx=2)
self.load_model_btn = ttk.Button(model_frame, text="Load Model", command=self._on_load_unload_model)
self.load_model_btn.grid(row=0, column=4, padx=5)
# Chat Template row
ttk.Label(model_frame, text="Chat Template:").grid(row=1, column=0, sticky=tk.W, pady=(5, 0))
self.chat_template_combo = ttk.Combobox(model_frame, textvariable=self.chat_template_var,
values=["None"], state="readonly", width=20)
self.chat_template_combo.grid(row=1, column=1, sticky=tk.W, padx=5, pady=(5, 0))
ttk.Button(model_frame, text="Load", command=self._load_chat_template).grid(row=1, column=2, padx=2, pady=(5, 0))
ttk.Button(model_frame, text="Add", command=self._add_chat_template).grid(row=1, column=3, padx=2, pady=(5, 0))
status_frame = ttk.Frame(model_frame)
status_frame.grid(row=2, column=0, columnspan=5, sticky=tk.W, pady=(5, 0))
ttk.Label(status_frame, textvariable=self.model_status_var).pack(side=tk.LEFT)
ttk.Label(status_frame, text=" | Supports: GGUF, SafeTensors, GPTQ, AWQ, EXL2, PyTorch",
font=('Arial', 8), foreground='gray').pack(side=tk.LEFT, padx=(10, 0))
# Configure grid weights for resizing
model_frame.grid_columnconfigure(1, weight=1)
# Options section
options_frame = ttk.Frame(frame)
options_frame.pack(fill=tk.X, padx=8, pady=(0, 8))
ttk.Checkbutton(options_frame, text="Chess Mode (ChessGPT)", variable=self.chess_mode_var,
command=self._on_chess_mode_changed).pack(side=tk.LEFT)
ttk.Checkbutton(options_frame, text="Stream Output", variable=self.stream_var).pack(side=tk.LEFT, padx=(20, 0))
# Agent Mode controls
self.agent_mode_var = tk.BooleanVar(value=False)
agent_btn = ttk.Checkbutton(options_frame, text="🤖 Agent Mode (SYSTEM ACCESS)",
variable=self.agent_mode_var,
command=self._on_agent_mode_changed)
agent_btn.pack(side=tk.LEFT, padx=(20, 0))
# Initialize agent handler
self.agent_handler = None
self._init_agent_mode()
mid = ttk.Frame(frame)
mid.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
ttk.Label(mid, text="Prompt / Chat Input:").pack(anchor=tk.W)
self.prompt_text = tk.Text(mid, height=6)
self.prompt_text.pack(fill=tk.BOTH, expand=True)
btns = ttk.Frame(frame)
btns.pack(fill=tk.X, padx=8, pady=4)
self.send_btn = ttk.Button(btns, text="Send (Chat)", command=self._on_chat)
self.send_btn.pack(side=tk.LEFT)
self.stop_btn = ttk.Button(btns, text="Stop", command=self._on_stop, state="disabled")
self.stop_btn.pack(side=tk.LEFT, padx=6)
ttk.Button(btns, text="Clear Output", command=lambda: self.output_text.delete('1.0', tk.END)).pack(side=tk.LEFT,
padx=6)
out = ttk.Frame(frame)
out.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
ttk.Label(out, text="Output:").pack(anchor=tk.W)
self.output_text = tk.Text(out, height=12)
self.output_text.pack(fill=tk.BOTH, expand=True)
def _add_tooltip(self, widget, text):
"""Add a tooltip to a widget"""
def create_tooltip(widget, text):
def on_enter(event):
# Prevent multiple tooltips
if hasattr(widget, 'tooltip') and widget.tooltip:
return
try:
tooltip = tk.Toplevel()
tooltip.wm_overrideredirect(True)
x = widget.winfo_rootx() + 20
y = widget.winfo_rooty() + 20
tooltip.wm_geometry(f"+{x}+{y}")
label = tk.Label(tooltip, text=text, background="lightyellow",
relief="solid", borderwidth=1, font=("Arial", "9", "normal"))
label.pack()
widget.tooltip = tooltip
except:
# Ignore tooltip creation errors
pass
def on_leave(event):
try:
if hasattr(widget, 'tooltip') and widget.tooltip:
widget.tooltip.destroy()
widget.tooltip = None
except:
# Ignore tooltip destruction errors
pass
widget.bind("<Enter>", on_enter)
widget.bind("<Leave>", on_leave)
create_tooltip(widget, text)
def _build_model_settings_tab(self, frame: ttk.Frame):
# Context and GPU settings
ctx_frame = ttk.LabelFrame(frame, text="Context & GPU Settings", padding="10")
ctx_frame.pack(fill=tk.X, padx=8, pady=8)
# Auto-config checkbox
self.auto_context_var = tk.BooleanVar(value=self.settings_manager.get('model_settings.auto_context', True))
self.auto_context_check = ttk.Checkbutton(
ctx_frame,
text="Auto-configure context size based on model",
variable=self.auto_context_var,
command=self._on_auto_context_changed
)
self.auto_context_check.grid(row=0, column=0, columnspan=3, sticky=tk.W, pady=(0, 10))
self._add_tooltip(self.auto_context_check, "Automatically use the model's trained context size (n_ctx_train) for optimal performance.\nDisable to manually set context size.")
ttk.Label(ctx_frame, text="Context Length (n_ctx):").grid(row=1, column=0, sticky=tk.W)
self.n_ctx_spin = ttk.Entry(ctx_frame, textvariable=self.n_ctx_var, width=15)
self.n_ctx_spin.grid(row=1, column=1, sticky=tk.W, padx=5)
self._add_tooltip(self.n_ctx_spin, "Maximum number of tokens the model can process at once.\nHigher values use more memory but allow longer conversations.")
ttk.Label(ctx_frame, text="tokens").grid(row=1, column=2, sticky=tk.W)
# Disable manual entry if auto-config is enabled
if self.auto_context_var.get():
self.n_ctx_spin.configure(state='disabled')
# Auto-GPU config checkbox
self.auto_gpu_var = tk.BooleanVar(value=self.settings_manager.get('model_settings.auto_gpu', True))
self.auto_gpu_check = ttk.Checkbutton(
ctx_frame,
text="Auto-configure GPU layers for optimal performance",
variable=self.auto_gpu_var,
command=self._on_auto_gpu_changed
)
self.auto_gpu_check.grid(row=2, column=0, columnspan=3, sticky=tk.W, pady=(10, 5))
self._add_tooltip(self.auto_gpu_check, "Automatically set GPU layers based on your VRAM and model size.\nDisable to manually set GPU layers.")
ttk.Label(ctx_frame, text="GPU Layers (n_gpu_layers):").grid(row=3, column=0, sticky=tk.W, pady=(5, 0))
self.n_gpu_spin = ttk.Entry(ctx_frame, textvariable=self.n_gpu_layers_var, width=15)
self.n_gpu_spin.grid(row=3, column=1, sticky=tk.W, padx=5, pady=(5, 0))
self._add_tooltip(self.n_gpu_spin, "Number of model layers to offload to GPU.\nHigher values improve speed but use more VRAM.\nUse 0 for CPU-only.")
ttk.Label(ctx_frame, text="layers").grid(row=3, column=2, sticky=tk.W, pady=(5, 0))
# Disable manual entry if auto-config is enabled
if self.auto_gpu_var.get():
self.n_gpu_spin.configure(state='disabled')
# LoRA settings
lora_frame = ttk.LabelFrame(frame, text="LoRA Adapter Settings", padding="10")
lora_frame.pack(fill=tk.X, padx=8, pady=8)
ttk.Label(lora_frame, text="LoRA Path (optional):").grid(row=0, column=0, sticky=tk.W)
self.lora_entry = ttk.Entry(lora_frame, textvariable=self.lora_var, width=60)
self.lora_entry.grid(row=0, column=1, sticky=tk.EW, padx=5)
self._add_tooltip(self.lora_entry, "Path to LoRA (Low-Rank Adaptation) adapter file.\nLoRA adapters fine-tune model behavior without changing base weights.\nLeave empty if not using LoRA.")
self.lora_btn = ttk.Button(lora_frame, text="Browse", command=self._browse_lora)
self.lora_btn.grid(row=0, column=2, padx=5)
lora_frame.grid_columnconfigure(1, weight=1)
# Generation settings
gen_frame = ttk.LabelFrame(frame, text="Generation Settings", padding="10")
gen_frame.pack(fill=tk.X, padx=8, pady=8)
# Max tokens (renamed for clarity)
ttk.Label(gen_frame, text="Max New Tokens (n_predict):").grid(row=0, column=0, sticky=tk.W)
self.max_tokens_spin = tk.Spinbox(gen_frame, from_=16, to=8192, increment=16, textvariable=self.max_tokens_var,
width=15)
self.max_tokens_spin.grid(row=0, column=1, sticky=tk.W, padx=5)
self._add_tooltip(self.max_tokens_spin, "Maximum number of new tokens to generate.\nHigher values allow longer responses but take more time.")
# Temperature
ttk.Label(gen_frame, text="Temperature:").grid(row=1, column=0, sticky=tk.W, pady=(10, 0))
temp_spin = tk.Spinbox(gen_frame, from_=0.0, to=2.0, increment=0.1,
textvariable=self.temperature_var, width=15, format="%.1f")
temp_spin.grid(row=1, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(temp_spin, "Controls randomness in generation.\n0.0 = deterministic, 1.0 = balanced, 2.0 = very creative.\nLower values for factual tasks, higher for creative tasks.")
# Top P
ttk.Label(gen_frame, text="Top P:").grid(row=2, column=0, sticky=tk.W, pady=(10, 0))
top_p_spin = tk.Spinbox(gen_frame, from_=0.0, to=1.0, increment=0.1,
textvariable=self.top_p_var, width=15, format="%.1f")
top_p_spin.grid(row=2, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(top_p_spin, "Nucleus sampling parameter.\nOnly consider tokens in the top P probability mass.\n0.9 is typical, lower values for more focused responses.")
# Repetition Penalty
ttk.Label(gen_frame, text="Repetition Penalty:").grid(row=3, column=0, sticky=tk.W, pady=(10, 0))
rep_pen_spin = tk.Spinbox(gen_frame, from_=0.5, to=2.0, increment=0.1,
textvariable=self.repetition_penalty_var, width=15, format="%.1f")
rep_pen_spin.grid(row=3, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(rep_pen_spin, "Penalty for repeating tokens.\n1.0 = no penalty, >1.0 = discourage repetition.\n1.1 is typical, higher values reduce repetition more.")
# No Repeat N-gram Size
ttk.Label(gen_frame, text="No Repeat N-gram Size:").grid(row=4, column=0, sticky=tk.W, pady=(10, 0))
ngram_spin = tk.Spinbox(gen_frame, from_=0, to=10, increment=1,
textvariable=self.no_repeat_ngram_size_var, width=15)
ngram_spin.grid(row=4, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(ngram_spin, "Prevent repeating N-grams (sequences of N tokens).\n0 = disabled, 2-4 = typical values.\nHigher values prevent more repetitive patterns.")
# Min P
ttk.Label(gen_frame, text="Min P:").grid(row=5, column=0, sticky=tk.W, pady=(10, 0))
min_p_spin = tk.Spinbox(gen_frame, from_=0.0, to=1.0, increment=0.01,
textvariable=self.min_p_var, width=15, format="%.2f")
min_p_spin.grid(row=5, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(min_p_spin, "Minimum probability threshold.\nTokens below this probability are excluded.\n0.0 = disabled, 0.05 = typical value.")
# Typical P
ttk.Label(gen_frame, text="Typical P:").grid(row=6, column=0, sticky=tk.W, pady=(10, 0))
typical_p_spin = tk.Spinbox(gen_frame, from_=0.0, to=1.0, increment=0.1,
textvariable=self.typical_p_var, width=15, format="%.1f")
typical_p_spin.grid(row=6, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(typical_p_spin, "Typical sampling parameter.\nFocuses on tokens with 'typical' information content.\n1.0 = disabled, 0.95 = typical value.")
# Advanced Loading Options
advanced_frame = ttk.LabelFrame(frame, text="Advanced Loading Options", padding="10")
advanced_frame.pack(fill=tk.X, padx=8, pady=8)
# Quantization options
ttk.Label(advanced_frame, text="Quantization:").grid(row=0, column=0, sticky=tk.W)
self.quantization_combo = ttk.Combobox(advanced_frame, textvariable=self.quantization_var,
values=["none", "4bit", "8bit", "gptq", "awq", "exl2"], state="readonly", width=20)
self.quantization_combo.grid(row=0, column=1, sticky=tk.W, padx=5)
self._add_tooltip(self.quantization_combo, "Reduce model memory usage by using lower precision.\nnone = full precision\n4bit/8bit = bitsandbytes quantization\ngptq/awq/exl2 = specialized quantization formats")
ttk.Label(advanced_frame, text="(auto-detected for pre-quantized models)").grid(row=0, column=2, sticky=tk.W, padx=(10, 0))
# Device strategy
ttk.Label(advanced_frame, text="Device Strategy:").grid(row=1, column=0, sticky=tk.W, pady=(10, 0))
self.device_combo = ttk.Combobox(advanced_frame, textvariable=self.device_strategy_var,
values=["auto", "force_gpu", "balanced_split", "cpu_only"],
state="readonly", width=20)
self.device_combo.grid(row=1, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(self.device_combo, "How to distribute model across devices.\nauto = automatic distribution\nforce_gpu = all on GPU\nbalanced_split = split between CPU/GPU\ncpu_only = CPU only")
ttk.Label(advanced_frame, text="(balanced_split for large models)").grid(row=1, column=2, sticky=tk.W, padx=(10, 0), pady=(10, 0))
# GPU memory limit
ttk.Label(advanced_frame, text="GPU Memory Limit:").grid(row=2, column=0, sticky=tk.W, pady=(10, 0))
self.gpu_mem_spin = tk.Spinbox(advanced_frame, from_=1.0, to=24.0, increment=0.5,
textvariable=self.gpu_memory_limit_var, width=15, format="%.1f")
self.gpu_mem_spin.grid(row=2, column=1, sticky=tk.W, padx=5, pady=(10, 0))
self._add_tooltip(self.gpu_mem_spin, "Maximum GPU memory to use (in GB).\nUsed with balanced_split strategy.\nSet below your GPU's total VRAM to leave room for other applications.")
ttk.Label(advanced_frame, text="GB (for balanced_split)").grid(row=2, column=2, sticky=tk.W, padx=(10, 0), pady=(10, 0))
# Local models browser
local_frame = ttk.LabelFrame(frame, text="Local Models Browser", padding="10")
local_frame.pack(fill=tk.X, padx=8, pady=8)
ttk.Label(local_frame, text="Local Models:").grid(row=0, column=0, sticky=tk.W)
self.local_models_combo = ttk.Combobox(local_frame, textvariable=self.local_model_var, width=50,
state="readonly")
self.local_models_combo.grid(row=0, column=1, sticky=tk.EW, padx=5)
self.local_models_combo.bind("<<ComboboxSelected>>", self._on_local_model_selected)
ttk.Button(local_frame, text="Folder...", command=self._choose_models_folder).grid(row=0, column=2, padx=5)
ttk.Button(local_frame, text="Refresh", command=self._refresh_local_models).grid(row=0, column=3, padx=5)
local_frame.grid_columnconfigure(1, weight=1)
def _build_resource_tab(self, frame: ttk.Frame):
# GPU Information
gpu_frame = ttk.LabelFrame(frame, text="GPU Information", padding="10")
gpu_frame.pack(fill=tk.X, padx=8, pady=8)
self.gpu_info_var = tk.StringVar(value="Checking GPU...")
self.gpu_memory_var = tk.StringVar(value="Memory: Unknown")
self.gpu_usage_var = tk.StringVar(value="Usage: Unknown")
ttk.Label(gpu_frame, textvariable=self.gpu_info_var).grid(row=0, column=0, sticky=tk.W, columnspan=3)
ttk.Label(gpu_frame, textvariable=self.gpu_memory_var).grid(row=1, column=0, sticky=tk.W, pady=(5, 0))
ttk.Label(gpu_frame, textvariable=self.gpu_usage_var).grid(row=2, column=0, sticky=tk.W, pady=(5, 0))
ttk.Button(gpu_frame, text="Test GPU", command=self._test_gpu).grid(row=1, column=2, padx=5, rowspan=2)
# CPU Information
cpu_frame = ttk.LabelFrame(frame, text="CPU Information", padding="10")
cpu_frame.pack(fill=tk.X, padx=8, pady=8)
self.cpu_info_var = tk.StringVar(value="Detecting CPU...")
self.cpu_usage_var = tk.StringVar(value="Usage: Unknown")
self.ram_usage_var = tk.StringVar(value="RAM: Unknown")
ttk.Label(cpu_frame, textvariable=self.cpu_info_var).grid(row=0, column=0, sticky=tk.W, columnspan=2)
ttk.Label(cpu_frame, textvariable=self.cpu_usage_var).grid(row=1, column=0, sticky=tk.W, pady=(5, 0))
ttk.Label(cpu_frame, textvariable=self.ram_usage_var).grid(row=2, column=0, sticky=tk.W, pady=(5, 0))
# Resource monitoring controls
controls_frame = ttk.LabelFrame(frame, text="Monitoring Controls", padding="10")
controls_frame.pack(fill=tk.X, padx=8, pady=8)
self.monitor_var = tk.BooleanVar(value=False)
ttk.Checkbutton(controls_frame, text="Enable Real-time Monitoring", variable=self.monitor_var,
command=self._toggle_monitoring).pack(side=tk.LEFT)
ttk.Button(controls_frame, text="Refresh Now", command=self._refresh_resources).pack(side=tk.LEFT, padx=(20, 0))
# Initialize resource monitoring
self._initialize_resource_monitoring()
# ---------------- HuggingFace Browser Tab -----------------
def _build_hf_tab(self, frame: ttk.Frame):
# Import the new HuggingFace downloader module
try:
from hf_downloader import HuggingFaceAPI
# Configure API based on settings
api_key = None
organization = None
if not self.settings_manager.get('api.use_env_token', True):
api_key = self.settings_manager.get('api.huggingface_token', '').strip()
if self.settings_manager.get('api.use_organization', False):
organization = self.settings_manager.get('api.organization', '').strip()
self.hf_api = HuggingFaceAPI(api_key=api_key, organization=organization)
except ImportError:
ttk.Label(frame,
text="hf_downloader module not found. Please ensure hf_downloader.py is in the same directory.").pack(
padx=8, pady=8)
return
except ValueError as e:
ttk.Label(frame, text=f"API Key Error: {e}").pack(padx=8, pady=8)
return
except Exception as e:
ttk.Label(frame, text=f"Error initializing HuggingFace API: {e}").pack(padx=8, pady=8)
return
# Search bar with dropdown (using settings defaults)
search_row = ttk.Frame(frame)
search_row.pack(fill=tk.X, padx=8, pady=8)
self.hf_search_query = tk.StringVar()
self.hf_search_type = tk.StringVar(
value=self.settings_manager.get('search_preferences.default_search_type', 'Models'))
# Search entry
self.hf_search_entry = ttk.Entry(search_row, textvariable=self.hf_search_query, width=60)
self.hf_search_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.hf_search_entry.bind("<Return>", lambda e: self._hf_search())
# Search type dropdown
self.hf_type_combo = ttk.Combobox(search_row, values=["Models", "Datasets"],
textvariable=self.hf_search_type, state="readonly", width=15)
self.hf_type_combo.pack(side=tk.LEFT, padx=(10, 0))
# Search button
ttk.Button(search_row, text="Search", command=self._hf_search).pack(side=tk.LEFT, padx=(10, 0))
# Results area with enhanced columns
results_frame = ttk.Frame(frame)
results_frame.pack(fill=tk.BOTH, expand=True, padx=8, pady=(0, 8))
# Create treeview with new column structure
cols = ("creator", "name", "description", "keywords", "size", "metadata")
self.hf_tree = ttk.Treeview(results_frame, columns=cols, show="headings", height=15)
# Define column headings and widths
self.hf_tree.heading("creator", text="Creator")
self.hf_tree.heading("name", text="Name")
self.hf_tree.heading("description", text="Description")
self.hf_tree.heading("keywords", text="Keywords")
self.hf_tree.heading("size", text="Size")
self.hf_tree.heading("metadata", text="Metadata")
self.hf_tree.column("creator", width=120)
self.hf_tree.column("name", width=200)
self.hf_tree.column("description", width=250)
self.hf_tree.column("keywords", width=150)
self.hf_tree.column("size", width=80)
self.hf_tree.column("metadata", width=150)
self.hf_tree.pack(fill=tk.BOTH, expand=True, side=tk.LEFT)
self.hf_tree.bind("<Double-Button-1>", self._hf_download_selected)
# Scrollbars
vsb = ttk.Scrollbar(results_frame, orient="vertical", command=self.hf_tree.yview)
hsb = ttk.Scrollbar(results_frame, orient="horizontal", command=self.hf_tree.xview)
self.hf_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y)
# Filter footer with checkboxes
filter_frame = ttk.Frame(frame)
filter_frame.pack(fill=tk.X, padx=8, pady=(0, 8))
ttk.Label(filter_frame, text="Filter:").pack(side=tk.LEFT)
# Initialize filters based on default sort preference
default_sort = self.settings_manager.get('search_preferences.default_sort', 'downloads')
self.filter_most_downloaded = tk.BooleanVar(value=(default_sort == 'downloads'))
self.filter_most_liked = tk.BooleanVar(value=(default_sort == 'likes'))
self.filter_size = tk.BooleanVar(value=(default_sort == 'lastModified'))
ttk.Checkbutton(filter_frame, text="Most Downloaded",
variable=self.filter_most_downloaded).pack(side=tk.LEFT, padx=(10, 0))
ttk.Checkbutton(filter_frame, text="Most Liked",
variable=self.filter_most_liked).pack(side=tk.LEFT, padx=(10, 0))
ttk.Checkbutton(filter_frame, text="Size",
variable=self.filter_size).pack(side=tk.LEFT, padx=(10, 0))
# Download button on the right
ttk.Button(filter_frame, text="Download Selected",
command=self._hf_download_selected).pack(side=tk.RIGHT, padx=5)
# Status area for HF tab
self.hf_status_var = tk.StringVar(value="Ready")
ttk.Label(frame, textvariable=self.hf_status_var).pack(fill=tk.X, padx=8, pady=(0, 8))
# Holder for last results (to act on selection)
self._hf_results: List[Dict[str, Any]] = []
def _hf_set_status(self, text: str):
try:
self.hf_status_var.set(text)
except Exception:
pass
def _format_bytes(self, n: Optional[int]) -> str:
try:
if not n or n <= 0:
return "-"
units = ["B", "KB", "MB", "GB", "TB"]
i = 0
f = float(n)
while f >= 1024 and i < len(units) - 1:
f /= 1024.0
i += 1
return f"{f:.1f} {units[i]}"
except Exception:
return "-"
def _format_number(self, num: int) -> str:
"""Format large numbers with K, M suffixes."""
if num >= 1_000_000:
return f"{num / 1_000_000:.1f}M"
elif num >= 1_000:
return f"{num / 1_000:.1f}K"
return str(num)
def _hf_search(self):
query = (self.hf_search_query.get() or "").strip()
search_type = self.hf_search_type.get()
# Determine sort parameter based on filters
sort = "downloads"
if self.filter_most_liked.get() and not self.filter_most_downloaded.get():
sort = "likes"
elif self.filter_size.get() and not self.filter_most_downloaded.get() and not self.filter_most_liked.get():
sort = "lastModified"
self._hf_set_status("Searching ...")
self.hf_tree.delete(*self.hf_tree.get_children())
self._hf_results = []
threading.Thread(target=self._hf_perform_search_thread, args=(search_type, query, sort), daemon=True).start()
def _hf_perform_search_thread(self, search_type: str, query: str, sort: str):
try:
# Use the new API
if search_type == "Models":
results = self.hf_api.search_models(query, limit=50, sort=sort)
else:
results = self.hf_api.search_datasets(query, limit=50, sort=sort)
rows = []
packed = []
for item in results:
try:
# Extract fields based on search type
if search_type == "Models":
repo_id = item.get("modelId", item.get("id", ""))
pipeline_tag = item.get("pipeline_tag", "")
tags = item.get("tags", [])
keywords = ", ".join(tags[:3]) if tags else pipeline_tag
description = item.get("description", "")
else:
repo_id = item.get("id", "")
task_ids = item.get("cardData", {}).get("task_ids", [])
keywords = ", ".join(task_ids[:3]) if task_ids else "dataset"
card_data = item.get("cardData", {})
description = card_data.get("description", card_data.get("summary", ""))
creator = repo_id.split("/")[0] if "/" in repo_id else ""
name = repo_id.split("/")[1] if "/" in repo_id else repo_id
# Truncate description
if len(description) > 100:
description = description[:97] + "..."
# Calculate size
size_bytes = 0
siblings = item.get("siblings", [])
for sibling in siblings:
if isinstance(sibling, dict):
size = sibling.get("size", 0)
if isinstance(size, (int, float)):
size_bytes += size
size_str = self._format_bytes(size_bytes) if size_bytes > 0 else "-"
# Get metadata
metadata_parts = []
downloads = item.get("downloads", 0)
likes = item.get("likes", 0)
if downloads > 0:
metadata_parts.append(f"{self._format_number(downloads)}")
if likes > 0:
metadata_parts.append(f"{self._format_number(likes)}")
if search_type == "Models":
library = item.get("library_name", "")
if library:
metadata_parts.append(library)
metadata = " | ".join(metadata_parts)
# Prepare row
rows.append((creator, name, description, keywords, size_str, metadata))
packed.append({
"type": search_type,
"repo_id": repo_id,
})
except Exception:
continue
def apply_rows():
try:
for row in rows:
self.hf_tree.insert("", tk.END, values=row)
self._hf_results = packed
self._hf_set_status(f"Found {len(rows)} {search_type.lower()}")
except Exception:
pass
self._enqueue_ui(apply_rows)
except Exception as e:
self._enqueue_ui(lambda: self._hf_set_status(f"Search error: {e}"))
def _hf_download_selected(self, event=None):
"""Download the selected model or dataset using grouped download manager."""
selection = self.hf_tree.selection()
if not selection:
messagebox.showinfo("No Selection", "Please select an item to download")
return
item = self.hf_tree.item(selection[0])
values = item['values']
if len(values) < 2:
return
creator = values[0]
name = values[1]
repo_id = f"{creator}/{name}" if creator else name
# Ask for download location (use default from settings)
initial_dir = self.settings_manager.get('paths.downloads_directory', './downloads')
download_dir = filedialog.askdirectory(
title="Select Download Directory",
initialdir=initial_dir
)
if not download_dir:
return
self._hf_set_status(f"Fetching file list for {repo_id}...")
def prepare_downloads():
try:
# Get file list
files = self.hf_api.get_model_files(repo_id)
if not files:
self._enqueue_ui(lambda: self._append_output(f"[Error] No files found for {repo_id}\n"))
self._enqueue_ui(lambda: self._hf_set_status("No files found"))
return
# Always show file selection dialog for user choice
self._enqueue_ui(lambda: self._show_file_selection_dialog(repo_id, files, download_dir))
except Exception as e:
self._enqueue_ui(lambda: self._append_output(f"[Download error] {e}\n"))
self._enqueue_ui(lambda: self._hf_set_status("Download preparation failed"))
threading.Thread(target=prepare_downloads, daemon=True).start()
def _show_file_selection_dialog(self, repo_id: str, files: List[Dict], download_dir: str):
"""Show dialog to select files for download using the new FileSelectionDialog."""
try:
# Use the new FileSelectionDialog
dialog = FileSelectionDialog(
parent=self.root,
repo_id=repo_id,
files=files,
title=f"Select Files to Download - {repo_id}"
)
result, selected_files = dialog.show()
if result == 'download' and selected_files:
# Create download group
group_name = f"{repo_id.split('/')[-1] if '/' in repo_id else repo_id}"
group_description = f"Files from {repo_id}"
group_id = self.download_manager.create_download_group(
repo_id=repo_id,
name=group_name,
description=group_description
)
# Add selected files to the group
download_count = 0
for filename, file_info in selected_files:
url = f"{self.hf_api.base_url}/{repo_id}/resolve/main/{filename}"
save_path = os.path.join(download_dir, repo_id.replace("/", "_"), filename)
self.download_manager.add_file_to_group(
group_id=group_id,
filename=filename,
url=url,
save_path=save_path,
headers=self.hf_api.headers,
selected=True
)
download_count += 1
self._hf_set_status(f"Added {download_count} file(s) to download queue")
self._append_output(f"Created download group '{group_name}' with {download_count} files\n")
# Switch to downloads tab to show the new group
self.notebook.select(self.downloads_frame)
else:
self._hf_set_status("Download cancelled")
except Exception as e:
self._append_output(f"[Error] Failed to show file selection dialog: {e}\n")
self._hf_set_status("Error showing file selection")
def _browse_gguf(self):
initial_dir = self.settings_manager.get('paths.models_directory', '.')
path = filedialog.askopenfilename(
title="Select Model (GGUF, Safetensors, GPTQ, AWQ, EXL2)",
initialdir=initial_dir,
filetypes=[
("All Model files", "*.gguf;*.safetensors;*.bin;*.pt;*.pth;*.exl2"),
("GGUF files", "*.gguf"),
("SafeTensors files", "*.safetensors"),
("PyTorch files", "*.bin;*.pt;*.pth"),
("GPTQ models", "*gptq*.safetensors;*gptq*.bin"),
("AWQ models", "*awq*.safetensors;*awq*.bin"),
("EXL2 files", "*.exl2"),
("All files", "*.*")
]
)
if path:
self.model_var.set(path)
# Save last model path
self.settings_manager.set('paths.last_model_path', path)
self.settings_manager.save_settings()
def _browse_folder(self):
initial_dir = self.settings_manager.get('paths.models_directory', '.')
path = filedialog.askdirectory(
title="Select Model Directory (HuggingFace format)",
initialdir=initial_dir
)
if path:
self.model_var.set(path)
# Save last model path
self.settings_manager.set('paths.last_model_path', path)
self.settings_manager.save_settings()
def _browse_lora(self):
path = filedialog.askopenfilename(title="Select LoRA/adapter file", filetypes=[("All files", "*.*")])
if path:
self.lora_var.set(path)
def _mark_model_unloaded(self):
"""Mark model as unloaded and clear references"""
# Clear model references
if hasattr(self, 'current_model'):
self.current_model = None
if hasattr(chat_stream, '_unified_model_cache'):
chat_stream._unified_model_cache = None
# Clear any cached models
global _llama_cache
_llama_cache["key"] = None
_llama_cache["llm"] = None
# Clear agent reference
if hasattr(self, 'dhal_agent'):
self.dhal_agent = None
# Update UI
self.model_status_var.set("[not loaded]")
self._update_load_button_text()
# Clear chat history since model is unloaded
self.chat_history = []
def _disable_model_settings(self):
"""Disable model settings that would unload the model"""
try:
# Disable model path entry and browse buttons
if hasattr(self, 'model_entry'):
self.model_entry.configure(state='disabled')
if hasattr(self, 'browse_model_btn'):
self.browse_model_btn.configure(state='disabled')
if hasattr(self, 'browse_folder_btn'):
self.browse_folder_btn.configure(state='disabled')
# Disable settings that would trigger model reload
if hasattr(self, 'n_ctx_spin'):
self.n_ctx_spin.configure(state='disabled')
if hasattr(self, 'n_gpu_spin'):
self.n_gpu_spin.configure(state='disabled')
if hasattr(self, 'lora_entry'):
self.lora_entry.configure(state='disabled')
if hasattr(self, 'lora_btn'):
self.lora_btn.configure(state='disabled')
if hasattr(self, 'quantization_combo'):
self.quantization_combo.configure(state='disabled')
if hasattr(self, 'device_combo'):
self.device_combo.configure(state='disabled')
if hasattr(self, 'gpu_mem_spin'):
self.gpu_mem_spin.configure(state='disabled')
except Exception as e:
print(f"Error disabling model settings: {e}")
def _enable_model_settings(self):
"""Re-enable model settings"""
try:
# Re-enable model path entry and browse buttons
if hasattr(self, 'model_entry'):
self.model_entry.configure(state='normal')
if hasattr(self, 'browse_model_btn'):
self.browse_model_btn.configure(state='normal')
if hasattr(self, 'browse_folder_btn'):
self.browse_folder_btn.configure(state='normal')
# Re-enable settings
if hasattr(self, 'n_ctx_spin'):
self.n_ctx_spin.configure(state='normal')
if hasattr(self, 'n_gpu_spin'):
self.n_gpu_spin.configure(state='normal')
if hasattr(self, 'lora_entry'):
self.lora_entry.configure(state='normal')
if hasattr(self, 'lora_btn'):
self.lora_btn.configure(state='normal')
if hasattr(self, 'quantization_combo'):
self.quantization_combo.configure(state='readonly')
if hasattr(self, 'device_combo'):
self.device_combo.configure(state='readonly')
if hasattr(self, 'gpu_mem_spin'):
self.gpu_mem_spin.configure(state='normal')
except Exception as e:
print(f"Error enabling model settings: {e}")
def _init_agent_mode(self):
"""Initialize agent mode handler"""
try:
# Import agent components
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'agent_dhal'))
from agent_dhal.hal import create_dhal, DhalConfig, HalModelClient
self.agent_enabled = True
self.dhal_agent = None
except Exception as e:
print(f"[Agent Mode] Could not import Hal components: {e}")
self.agent_enabled = False
self.agent_mode_var.set(False)
def _on_agent_mode_changed(self):
"""Handle agent mode toggle"""
if self.agent_mode_var.get():
# Show warning
result = messagebox.askyesno(
"⚠️ Enable Agent Mode",
"WARNING: Agent mode gives the AI UNRESTRICTED access to:\n\n"
"• Your file system (read/write/delete)\n"
"• Shell commands (PowerShell, Bash, CMD)\n"
"• Mouse and keyboard control\n"
"• Python code execution\n"
"• Network requests\n"
"• System settings\n\n"
"The AI can control your computer completely!\n\n"
"Only enable if you trust the model and understand the risks.\n\n"
"Continue?",
icon='warning'
)
if not result:
self.agent_mode_var.set(False)
return
# Initialize agent if needed
if self.agent_enabled and not self.dhal_agent:
try:
# Check if we have a loaded model
if not (hasattr(self, 'current_model') and self.current_model) and \
not (hasattr(chat_stream, '_unified_model_cache') and chat_stream._unified_model_cache):
# No model loaded - show error
messagebox.showerror(
"No Model Loaded",
"Please load a model first before enabling Agent Mode.\n\n"
"Agent Mode requires a loaded language model to function."
)
self.agent_mode_var.set(False)
return
# Use the existing loaded model
from agent_dhal.hal import create_dhal
# Create a wrapper for the existing local model
class LocalModelClient:
def __init__(self, model):
self.llm_model = model
async def create_chat_completion(self, messages, **kwargs):
# Convert messages to prompt for local model
prompt = ""
for msg in messages:
if hasattr(msg, 'role') and hasattr(msg, 'content'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
if role == "system":
prompt += f"System: {content}\n"
elif role == "user":
prompt += f"User: {content}\n"
elif role == "assistant":
prompt += f"Assistant: {content}\n"
prompt += "Assistant: "
# Generate response using local model
from llm_runtime import GenerateConfig
cfg = GenerateConfig(
max_tokens=kwargs.get('max_tokens', 800),
temperature=kwargs.get('temperature', 0.7)
)
response = self.llm_model.generate(prompt, cfg)
# Return in expected format
return type('Response', (), {
'choices': [type('Choice', (), {
'message': type('Message', (), {
'content': response
})()
})()]
})()
# Use whichever model is available
if hasattr(self, 'current_model') and self.current_model:
model_client = LocalModelClient(self.current_model)
else:
model_client = LocalModelClient(chat_stream._unified_model_cache)
# Create the agent with the local model
self.dhal_agent = create_dhal(
name="Hal",
system_message="You are Hal with full system access. You can execute any command the user requests. Always explain what you're doing before executing commands.",
model="local", # Just use "local" as identifier
model_client=model_client # Pass the wrapped local model
)
self._append_output("\n[AGENT MODE ACTIVATED] AI has full system control using local model\n")
self._append_output("Available tools: file operations, shell commands, Python execution, mouse/keyboard control\n\n")
except Exception as e:
messagebox.showerror("Error", f"Failed to initialize agent: {e}")
self.agent_mode_var.set(False)
else:
self._append_output("\n[AGENT MODE DEACTIVATED] Normal chat mode\n\n")
def _on_auto_context_changed(self):
"""Handle auto-context checkbox changes."""
auto_context = self.auto_context_var.get()
self.settings_manager.set('model_settings.auto_context', auto_context)
self.settings_manager.save_settings()
if auto_context:
# Disable manual context entry
if hasattr(self, 'n_ctx_spin'):
self.n_ctx_spin.configure(state='disabled')
# Try to auto-detect context from currently selected model
model_path = self.model_var.get()
if model_path and _is_gguf_model(model_path):
try:
detected_n_ctx = _extract_gguf_int_metadata(model_path, "n_ctx_train") or \
_extract_gguf_int_metadata(model_path, "n_ctx")
if detected_n_ctx:
self.n_ctx_var.set(detected_n_ctx)
self._append_output_threadsafe(f"[Auto-detected context size: {detected_n_ctx} tokens]\n")
except Exception as e:
print(f"Could not auto-detect context size: {e}")
else:
# Enable manual context entry
if hasattr(self, 'n_ctx_spin'):
self.n_ctx_spin.configure(state='normal')
def _on_auto_gpu_changed(self):
"""Handle auto-GPU checkbox changes."""
auto_gpu = self.auto_gpu_var.get()
self.settings_manager.set('model_settings.auto_gpu', auto_gpu)
self.settings_manager.save_settings()
if auto_gpu:
# Disable manual GPU entry
if hasattr(self, 'n_gpu_spin'):
self.n_gpu_spin.configure(state='disabled')
# Auto-detect optimal GPU layers
model_path = self.model_var.get()
if model_path and self._has_gpu():
try:
optimal_layers = self._calculate_optimal_gpu_layers(model_path)
if optimal_layers > 0:
self.n_gpu_layers_var.set(optimal_layers)
self._append_output_threadsafe(f"[Auto-configured GPU layers: {optimal_layers}]\n")
except Exception as e:
print(f"Could not auto-configure GPU layers: {e}")
else:
# Enable manual GPU entry
if hasattr(self, 'n_gpu_spin'):
self.n_gpu_spin.configure(state='normal')
def _on_chess_mode_changed(self):
"""Handle chess mode checkbox changes."""
chess_mode = self.chess_mode_var.get()
self.settings_manager.set('model_settings.chess_mode', chess_mode)
self.settings_manager.save_settings()
if chess_mode:
# Auto-configure for ChessGPT model
messagebox.showinfo(
"Chess Mode Enabled",
"Chess Mode enabled! This will use the ChessGPT model for chess-specific conversations.\n\n"
"Make sure you have the Waterhorse/chessgpt-chat-v1 model downloaded or use the HuggingFace browser to get it."
)
# Mark model as unloaded since we're switching modes
self._mark_model_unloaded()
else:
# Reset to normal mode
self._mark_model_unloaded()
def _on_advanced_setting_changed(self):
"""Handle advanced loading settings changes."""
# Save the settings
self.settings_manager.set('model_settings.quantization', self.quantization_var.get())
self.settings_manager.set('model_settings.device_strategy', self.device_strategy_var.get())
self.settings_manager.set('model_settings.gpu_memory_limit', self.gpu_memory_limit_var.get())
self.settings_manager.save_settings()
# Mark model as unloaded since these settings affect loading
self._mark_model_unloaded()
def _on_sampling_setting_changed(self):
"""Handle sampling parameter changes."""
# Save the sampling parameters
self.settings_manager.set('model_settings.temperature', self.temperature_var.get())
self.settings_manager.set('model_settings.top_p', self.top_p_var.get())
self.settings_manager.set('model_settings.repetition_penalty', self.repetition_penalty_var.get())
self.settings_manager.set('model_settings.no_repeat_ngram_size', self.no_repeat_ngram_size_var.get())
self.settings_manager.set('model_settings.min_p', self.min_p_var.get())
self.settings_manager.set('model_settings.typical_p', self.typical_p_var.get())
self.settings_manager.save_settings()
# Enqueue a callable to run on the Tk main thread
def _enqueue_ui(self, fn):
try:
self._ui_queue.put_nowait(fn)
except Exception:
pass
# Periodically drain UI queue
def _drain_ui_queue(self):
try:
while True:
fn = self._ui_queue.get_nowait()
try:
fn()
except Exception:
pass
except queue.Empty:
pass
finally:
self.root.after(30, self._drain_ui_queue)
def _set_status_threadsafe(self, text: str):
self._enqueue_ui(lambda: self.model_status_var.set(text))
def _append_output_threadsafe(self, text: str):
self._enqueue_ui(lambda t=text: self._append_output(t))
def _set_running(self, running: bool):
def _apply():
try:
state_run = "disabled" if running else "normal"
state_stop = "normal" if running else "disabled"
if hasattr(self, "generate_btn"):
self.generate_btn.configure(state=state_run)
if hasattr(self, "send_btn"):
self.send_btn.configure(state=state_run)
if hasattr(self, "stop_btn"):
self.stop_btn.configure(state=state_stop)
except Exception:
pass
self._enqueue_ui(_apply)
def _on_stop(self):
try:
if getattr(self, "_current_cancel", None) is not None:
self._current_cancel.set()
except Exception:
pass
def _on_load_unload_model(self):
"""Handle both load and unload based on current state"""
if self._is_model_loaded():
self._on_unload_model()
else:
self._on_load_model()
def _is_model_loaded(self):
"""Check if a model is currently loaded"""
return (hasattr(self, 'current_model') and self.current_model is not None) or \
(hasattr(chat_stream, '_unified_model_cache') and chat_stream._unified_model_cache is not None)
def _on_unload_model(self):
"""Unload the currently loaded model"""
print("[APP_DEBUG] _on_unload_model() called")
# Confirm before unloading
result = messagebox.askyesno(
"Unload Model",
"Are you sure you want to unload the current model?\n\n"
"This will clear the chat history and free GPU/CPU memory.",
icon='question'
)
if not result:
return
# Clear model references
if hasattr(self, 'current_model'):
self.current_model = None
if hasattr(chat_stream, '_unified_model_cache'):
chat_stream._unified_model_cache = None
# Clear any cached models
global _llama_cache
_llama_cache["key"] = None
_llama_cache["llm"] = None
# Clear agent reference
if hasattr(self, 'dhal_agent'):
self.dhal_agent = None
# Update UI
self._set_status_threadsafe("[not loaded]")
self._update_load_button_text()
self._append_output_threadsafe("[Model unloaded]\n")
# Re-enable model settings
self._enable_model_settings()
# Clear chat history since model is unloaded
self.chat_history = []
print("[APP_DEBUG] Model unloaded successfully")
def _update_load_button_text(self):
"""Update the load button text based on model state"""
if self._is_model_loaded():
self.load_model_btn.configure(text="Unload Model")
else:
self.load_model_btn.configure(text="Load Model")
def _on_load_model(self):
print("[APP_DEBUG] _on_load_model() called")
start_tracing() # Start detailed execution tracing
model = self.model_var.get().strip()
print(f"[APP_DEBUG] Model path: '{model}'")
if not _is_valid_model(model):
print("[APP_DEBUG] Invalid model detected")
stop_tracing()
messagebox.showerror("Load Model",
"Please select a valid model file (GGUF, Safetensors, or HuggingFace repo).")
return
n_ctx = self.n_ctx_var.get()
n_gpu = self.n_gpu_layers_var.get()
lora = self.lora_var.get().strip() or None
# Auto-detect optimal settings before loading if enabled
if _is_gguf_model(model):
# Auto-configure context size
print(f"[CONTEXT_DEBUG] Auto-context enabled: {self.auto_context_var.get()}")
if self.auto_context_var.get():
try:
print(f"[CONTEXT_DEBUG] Attempting to extract context metadata from: {model}")
n_ctx_train = _extract_gguf_int_metadata(model, "n_ctx_train")
n_ctx_fallback = _extract_gguf_int_metadata(model, "n_ctx")
print(f"[CONTEXT_DEBUG] n_ctx_train = {n_ctx_train}, n_ctx = {n_ctx_fallback}")
detected_n_ctx = n_ctx_train or n_ctx_fallback
print(f"[CONTEXT_DEBUG] detected_n_ctx = {detected_n_ctx}")
if detected_n_ctx:
n_ctx = detected_n_ctx
self.n_ctx_var.set(n_ctx) # Update the UI
print(f"[CONTEXT_DEBUG] Setting context size to {n_ctx}")
self._append_output_threadsafe(
f"[Auto-configuring context size to {n_ctx} tokens (model's trained capacity)]\n")
else:
print(f"[CONTEXT_DEBUG] No context metadata found, using default: {n_ctx}")
except Exception as e:
print(f"Could not auto-detect context size: {e}")
import traceback
traceback.print_exc()
else:
print(f"[CONTEXT_DEBUG] Auto-context disabled, using manual setting: {n_ctx}")
# Auto-configure GPU layers
if self.auto_gpu_var.get():
print(f"[GPU_DEBUG] Auto-GPU enabled, checking GPU availability...")
if self._has_gpu():
print(f"[GPU_DEBUG] GPU detected, calculating optimal layers for model: {model}")
try:
optimal_layers = self._calculate_optimal_gpu_layers(model)
print(f"[GPU_DEBUG] Calculated optimal GPU layers: {optimal_layers}")
if optimal_layers > 0:
n_gpu = optimal_layers
self.n_gpu_layers_var.set(n_gpu) # Update the UI
self._append_output_threadsafe(
f"[Auto-configuring GPU layers to {n_gpu} for optimal performance]\n")
else:
print(f"[GPU_DEBUG] Optimal layers = 0, not updating n_gpu")
except Exception as e:
print(f"Could not auto-configure GPU layers: {e}")
else:
print(f"[GPU_DEBUG] No GPU detected, keeping CPU-only mode")
else:
print(f"[GPU_DEBUG] Auto-GPU disabled, using manual setting: {n_gpu}")
self._set_status_threadsafe("[loading...]")
# Create loading popup
loading_popup = tk.Toplevel(self.root)
loading_popup.title("Loading Model")
loading_popup.geometry("400x150")
loading_popup.resizable(False, False)
loading_popup.transient(self.root)
loading_popup.grab_set()
# Center the popup
loading_popup.update_idletasks()
x = (loading_popup.winfo_screenwidth() // 2) - (loading_popup.winfo_width() // 2)
y = (loading_popup.winfo_screenheight() // 2) - (loading_popup.winfo_height() // 2)
loading_popup.geometry(f"+{x}+{y}")
# Add loading message
tk.Label(loading_popup, text="Loading Model...", font=("Arial", 12, "bold")).pack(pady=10)
model_name = os.path.basename(model) if os.path.exists(model) else model
tk.Label(loading_popup, text=model_name, font=("Arial", 10)).pack(pady=5)
# Progress bar
progress_var = tk.DoubleVar()
progress_bar = ttk.Progressbar(loading_popup, variable=progress_var, maximum=100, length=350, mode='indeterminate')
progress_bar.pack(pady=10)
progress_bar.start(10)
# Status label
status_label = tk.Label(loading_popup, text="Initializing...", font=("Arial", 9))
status_label.pack(pady=5)
# Disable model settings while loading
self._disable_model_settings()
def _run():
try:
print("[APP_DEBUG] _run() started in loading thread")
# Load model using appropriate loader
if _is_gguf_model(model):
print("[APP_DEBUG] Detected GGUF model, using _get_llama()")
print(f"[APP_DEBUG] GGUF loading parameters: n_ctx={n_ctx}, n_gpu_layers={n_gpu}, lora={lora}")
# Use existing GGUF loading logic
gguf_model = _get_llama(model, n_ctx=n_ctx, n_gpu_layers=n_gpu, lora_path=lora)
# Store as current_model for agent integration
self.current_model = gguf_model
else:
print("[APP_DEBUG] Non-GGUF model detected, using unified loader")
# Use unified model loader for other formats and cache it
from llm_runtime import load_model
print("[APP_DEBUG] Imported load_model from llm_runtime")
# Get advanced loading options
quantization = self.quantization_var.get()
device_strategy = self.device_strategy_var.get()
gpu_memory_limit = self.gpu_memory_limit_var.get()
print(f"[APP_DEBUG] Advanced options: quantization={quantization}, device_strategy={device_strategy}, gpu_memory_limit={gpu_memory_limit}")
print(f"[APP_DEBUG] Calling load_model() with: model='{model}', device='auto'")
# Ensure quantization is properly passed
load_kwargs = {
'n_ctx': n_ctx,
'n_gpu_layers': n_gpu,
'device_strategy': device_strategy,
'gpu_memory_limit': gpu_memory_limit,
'device': "auto"
}
# Only pass quantization if it's not 'none'
if quantization and quantization != 'none':
load_kwargs['quantization'] = quantization
print(f"[QUANTIZATION_DEBUG] Using quantization: {quantization}")
unified_model = load_model(model, **load_kwargs)
print("[APP_DEBUG] load_model() completed successfully")
# Cache the loaded model for chat function to reuse
chat_stream._unified_model_cache = unified_model
# Store as current_model for agent integration
self.current_model = unified_model
# Warm up the model if supported
if hasattr(unified_model, 'warm_up_model'):
self._append_output_threadsafe("[Warming up model for optimal performance...]\n")
warmup_stats = unified_model.warm_up_model()
if warmup_stats.get('status') == 'success':
self._append_output_threadsafe(f"[Model warmed up in {warmup_stats['warmup_time']:.2f}s]\n")
else:
self._append_output_threadsafe(f"[Model warmup failed: {warmup_stats.get('error', 'unknown')}]\n")
# Get model info if available
if hasattr(unified_model, 'get_model_info'):
model_info = unified_model.get_model_info()
self._append_output_threadsafe(f"[Model Info] {model_info.get('model_name', 'Unknown')}: {model_info.get('total_parameters', 'Unknown')} parameters\n")
self._append_output_threadsafe(f"[KV Cache] Enabled - Max context: {model_info.get('max_position_embeddings', 'Unknown')} tokens\n")
# best-effort: try to detect language metadata from the GGUF file
lang = None
try:
lang = _extract_gguf_metadata(model, "language") or _extract_gguf_metadata(model, "lang")
except Exception:
lang = None
# Show context info for user awareness
try:
detected_n_ctx = _extract_gguf_int_metadata(model, "n_ctx_train") or _extract_gguf_int_metadata(
model, "n_ctx")
except Exception:
detected_n_ctx = None
if detected_n_ctx and detected_n_ctx != n_ctx and not self.auto_context_var.get():
self._append_output_threadsafe(
f"[Model's trained context: {detected_n_ctx} tokens, using requested: {n_ctx} tokens]\n")
if detected_n_ctx > n_ctx:
self._append_output_threadsafe(
f"[Note: Enable 'Auto-configure context size' for optimal performance]\n")
# Auto-configure optimal settings
self._auto_configure_model_settings(model, detected_n_ctx)
if lang:
self._set_status_threadsafe(f"[loaded] ({lang})")
self._append_output_threadsafe(f"[Model language detected: {lang}]\n")
else:
self._set_status_threadsafe("[loaded]")
# Update button text to "Unload Model"
self._enqueue_ui(self._update_load_button_text)
# Close loading popup on success
self._enqueue_ui(lambda: loading_popup.destroy())
except Exception as e:
self._set_status_threadsafe("[error]")
self._append_output_threadsafe(f"[Load Error] {e}\n")
# Update button text back to "Load Model" on error
self._enqueue_ui(self._update_load_button_text)
# Close loading popup on error
self._enqueue_ui(lambda: loading_popup.destroy())
# Re-enable model settings on error
self._enqueue_ui(self._enable_model_settings)
finally:
stop_tracing() # Stop tracing when loading completes or fails
threading.Thread(target=_run, daemon=True).start()
def _auto_configure_model_settings(self, model_path, detected_n_ctx=None):
"""Auto-configure optimal GPU layers based on model and system resources"""
try:
# Auto-configure GPU layers based on available VRAM
if self._has_gpu():
optimal_gpu_layers = self._calculate_optimal_gpu_layers(model_path)
if optimal_gpu_layers != self.n_gpu_layers_var.get():
self.n_gpu_layers_var.set(optimal_gpu_layers)
self._append_output_threadsafe(f"[Auto-configured GPU layers to {optimal_gpu_layers}]\n")
except Exception as e:
self._append_output_threadsafe(f"[Auto-config warning: {e}]\n")
def _calculate_optimal_gpu_layers(self, model_path):
"""Calculate optimal number of GPU layers based on model size and available VRAM"""
try:
import torch
if not torch.cuda.is_available():
return 0
# Get available VRAM
total_vram_gb = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3)
# Reserve some VRAM for the system (1GB buffer)
available_vram_gb = max(0, total_vram_gb - 1.0)
# Detect model size from path or filename
model_name = model_path.lower()
# More comprehensive model size detection
if any(x in model_name for x in ['1b', '1.5b']):
# 1-1.5B models: ~0.5GB per layer, ~32 layers total
layers_per_gb = 6
max_layers = 32
elif any(x in model_name for x in ['3b', '3.8b']):
# 3B models: ~0.75GB per layer, ~32 layers total
layers_per_gb = 4
max_layers = 32
elif any(x in model_name for x in ['7b', '8b']):
# 7-8B models: ~1GB per layer, ~32 layers total
layers_per_gb = 3
max_layers = 32
elif any(x in model_name for x in ['13b', '14b']):
# 13-14B models: ~1.5GB per layer, ~40 layers total
layers_per_gb = 2
max_layers = 40
elif any(x in model_name for x in ['30b', '33b', '34b']):
# 30-34B models: ~2.5GB per layer, ~60 layers total
layers_per_gb = 1.2
max_layers = 60
elif any(x in model_name for x in ['65b', '70b']):
# 65-70B models: ~4GB per layer, ~80 layers total
layers_per_gb = 0.8
max_layers = 80
else:
# Unknown size - conservative estimate
layers_per_gb = 2
max_layers = 32
# Calculate optimal layers based on available VRAM
optimal_layers = int(available_vram_gb * layers_per_gb)
# Cap at the model's actual layer count
optimal_layers = min(optimal_layers, max_layers)
# Ensure at least some layers go to GPU if we have VRAM
if available_vram_gb >= 2.0 and optimal_layers < 1:
optimal_layers = 1
return max(0, optimal_layers)
except Exception as e:
print(f"Error calculating GPU layers: {e}")
return 0
def _has_gpu(self):
"""Check if GPU is available for acceleration"""
try:
import torch
return torch.cuda.is_available()
except:
return False
def _initialize_resource_monitoring(self):
"""Initialize resource monitoring components"""
self._refresh_resources()
def _test_gpu(self):
"""Test GPU functionality by running a small inference"""
def test():
try:
import torch
if not torch.cuda.is_available():
self.gpu_info_var.set("No GPU detected")
return
# Basic GPU test
device = torch.device("cuda:0")
test_tensor = torch.randn(1000, 1000).to(device)
result = torch.matmul(test_tensor, test_tensor)
torch.cuda.synchronize()
gpu_name = torch.cuda.get_device_name(0)
self.gpu_info_var.set(f"GPU Test PASSED: {gpu_name}")
self._append_output_threadsafe("[GPU test completed successfully]\n")
except Exception as e:
self.gpu_info_var.set(f"GPU Test FAILED: {e}")
self._append_output_threadsafe(f"[GPU test failed: {e}]\n")
threading.Thread(target=test, daemon=True).start()
def _refresh_resources(self):
"""Refresh resource usage information"""
def refresh():
try:
import psutil
# CPU Info
cpu_count = psutil.cpu_count(logical=False)
cpu_count_logical = psutil.cpu_count(logical=True)
self.cpu_info_var.set(f"CPU: {cpu_count} cores ({cpu_count_logical} threads)")
# CPU Usage
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage_var.set(f"CPU Usage: {cpu_percent:.1f}%")
# RAM Usage
memory = psutil.virtual_memory()
ram_gb_used = memory.used / (1024 ** 3)
ram_gb_total = memory.total / (1024 ** 3)
self.ram_usage_var.set(f"RAM: {ram_gb_used:.1f}GB / {ram_gb_total:.1f}GB ({memory.percent:.1f}%)")
# GPU Info
try:
import torch
self.gpu_info_var.set(f"PyTorch version: {torch.__version__}")
if torch.cuda.is_available():
gpu_count = torch.cuda.device_count()
gpu_name = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3)
gpu_allocated = torch.cuda.memory_allocated(0) / (1024 ** 3)
gpu_reserved = torch.cuda.memory_reserved(0) / (1024 ** 3)
cuda_version = torch.version.cuda
self.gpu_info_var.set(f"GPU: {gpu_name} (CUDA {cuda_version}) - {gpu_count} device(s)")
self.gpu_memory_var.set(
f"VRAM: {gpu_allocated:.1f}GB allocated, {gpu_reserved:.1f}GB reserved / {gpu_memory:.1f}GB total")
self.gpu_usage_var.set(f"GPU Usage: {(gpu_allocated / gpu_memory) * 100:.1f}%")
else:
# More detailed error info
cuda_available = hasattr(torch.backends, 'cuda') and torch.backends.cuda.is_built()
self.gpu_info_var.set(f"No CUDA GPU available (CUDA built: {cuda_available})")
self.gpu_memory_var.set("VRAM: N/A - Check CUDA installation")
self.gpu_usage_var.set("GPU Usage: N/A")
except ImportError as e:
self.gpu_info_var.set(f"PyTorch not available: {e}")
self.gpu_memory_var.set("VRAM: Install PyTorch with CUDA support")
self.gpu_usage_var.set("GPU Usage: Unknown")
except Exception as e:
self.gpu_info_var.set(f"GPU detection error: {e}")
self.gpu_memory_var.set(f"VRAM: Error - {str(e)}")
self.gpu_usage_var.set("GPU Usage: Error")
except Exception as e:
self.cpu_info_var.set(f"Error: {e}")
threading.Thread(target=refresh, daemon=True).start()
def _toggle_monitoring(self):
"""Toggle real-time resource monitoring"""
if self.monitor_var.get():
self._start_monitoring()
else:
self._stop_monitoring()
def _start_monitoring(self):
"""Start real-time monitoring loop"""
def monitor_loop():
while self.monitor_var.get():
self._refresh_resources()
time.sleep(2) # Update every 2 seconds
if not hasattr(self, '_monitor_thread') or not self._monitor_thread.is_alive():
self._monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
self._monitor_thread.start()
def _stop_monitoring(self):
"""Stop real-time monitoring"""
# Thread will stop on next iteration when monitor_var.get() returns False
pass
def _choose_models_folder(self):
initial_dir = self.settings_manager.get('paths.models_directory', '.')
folder = filedialog.askdirectory(title="Select models folder", initialdir=initial_dir)
if folder:
self.models_dir_var.set(folder)
# Save to settings
self.settings_manager.set('paths.models_directory', folder)
self.settings_manager.save_settings()
self._refresh_local_models()
def _refresh_local_models(self):
folder = (self.models_dir_var.get() or "").strip()
self._local_model_paths.clear()
values: List[str] = []
if folder and os.path.isdir(folder):
try:
for name in os.listdir(folder):
name_lower = name.lower()
# Check for all supported model formats
if (name_lower.endswith((".gguf", ".safetensors", ".bin", ".pt", ".pth", ".exl2")) or
('gptq' in name_lower and name_lower.endswith(('.safetensors', '.bin'))) or
('awq' in name_lower and name_lower.endswith(('.safetensors', '.bin')))):
full = os.path.join(folder, name)
display = name
self._local_model_paths[display] = full
values.append(display)
except Exception:
pass
self.local_models_combo["values"] = values
# keep selection if still present
current_display = self.local_model_var.get()
if current_display not in values:
self.local_model_var.set(values[0] if values else "")
def _on_local_model_selected(self, event=None):
display = self.local_model_var.get()
path = self._local_model_paths.get(display)
if path:
self.model_var.set(path)
def _append_output(self, text: str):
self.output_text.insert(tk.END, text)
self.output_text.see(tk.END)
def _on_generate(self):
model = self.model_var.get().strip()
prompt = self.prompt_text.get('1.0', tk.END).strip()
if not _is_valid_model(model):
messagebox.showerror("Model", "Please select a valid model file.")
return
if not prompt:
messagebox.showinfo("Generate", "Please enter a prompt.")
return
self.output_text.delete('1.0', tk.END)
n_ctx = self.n_ctx_var.get()
n_gpu = self.n_gpu_layers_var.get()
lora = self.lora_var.get().strip() or None
# Retain memory by recording the user turn
self.chat_history.append({"role": "user", "content": prompt})
cancel = threading.Event()
self._current_cancel = cancel
self._set_running(True)
def run():
try:
content = run_prompt(
model,
prompt,
self.stream_var.get(),
n_ctx=n_ctx,
n_gpu_layers=n_gpu,
lora_path=lora,
on_chunk=self._append_output_threadsafe,
n_threads=None,
max_tokens=self.max_tokens_var.get(),
history=self.chat_history,
cancel_event=cancel,
chess_mode=self.chess_mode_var.get(),
)
# Record assistant turn for future context
self.chat_history.append({"role": "assistant", "content": content})
self._append_output_threadsafe("\n" if not cancel.is_set() else "\n[stopped]\n")
except Exception as e:
self._append_output_threadsafe(f"\n[Error] {e}\n")
finally:
self._set_running(False)
self._current_cancel = None
threading.Thread(target=run, daemon=True).start()
def _on_chat(self):
print("DEBUG: _on_chat called")
model = self.model_var.get().strip()
user = self.prompt_text.get('1.0', tk.END).strip()
print(f"DEBUG: model='{model}', user='{user}'")
# Check for agent mode
if self.agent_mode_var.get():
# Use simple agent mode
self._handle_agent_chat_simple(user)
return
if not _is_valid_model(model):
print("DEBUG: Invalid model")
messagebox.showerror("Model", "Please select a valid model file.")
return
if not user:
print("DEBUG: No user input")
messagebox.showinfo("Chat", "Please enter a message.")
return
print("DEBUG: Starting chat processing")
n_ctx = self.n_ctx_var.get()
n_gpu = self.n_gpu_layers_var.get()
lora = self.lora_var.get().strip() or None
self.chat_history.append({"role": "user", "content": user})
self._append_output(f"You: {user}\nAssistant: ")
cancel = threading.Event()
self._current_cancel = cancel
self._set_running(True)
def run():
try:
print("DEBUG: Calling chat_stream")
content = chat_stream(
model,
self.chat_history,
n_ctx=n_ctx,
n_gpu_layers=n_gpu,
lora_path=lora,
on_chunk=self._append_output_threadsafe,
n_threads=None,
max_tokens=self.max_tokens_var.get(),
cancel_event=cancel,
chess_mode=self.chess_mode_var.get(),
chat_template=self.chat_template_var.get(),
session_id=self._session_id,
)
print(f"DEBUG: Got response: '{content}'")
self.chat_history.append({"role": "assistant", "content": content})
self._append_output_threadsafe("\n" if not cancel.is_set() else "\n[stopped]\n")
except Exception as e:
print(f"DEBUG: Chat error: {e}")
self._append_output_threadsafe(f"\n[Error] {e}\n")
finally:
self._set_running(False)
self._current_cancel = None
threading.Thread(target=run, daemon=True).start()
def _handle_agent_chat_simple(self, user_message: str):
"""Simple agent mode that directly executes commands"""
if not user_message:
messagebox.showinfo("Chat", "Please enter a message.")
return
self._append_output(f"You: {user_message}\n")
self._set_running(True)
# Create agent activity popup
agent_popup = tk.Toplevel(self.root)
agent_popup.title("Agent Activity Monitor")
agent_popup.geometry("500x300")
agent_popup.resizable(True, True)
agent_popup.transient(self.root)
# Center the popup
agent_popup.update_idletasks()
x = (agent_popup.winfo_screenwidth() // 2) - (agent_popup.winfo_width() // 2)
y = (agent_popup.winfo_screenheight() // 2) - (agent_popup.winfo_height() // 2)
agent_popup.geometry(f"+{x}+{y}")
# Add activity display
tk.Label(agent_popup, text="🤖 Agent Activity Monitor", font=("Arial", 12, "bold")).pack(pady=5)
# Activity log
log_frame = tk.Frame(agent_popup)
log_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
activity_log = tk.Text(log_frame, height=15, bg='#1e1e1e', fg='#00ff00', font=('Consolas', 9))
scrollbar = tk.Scrollbar(log_frame, orient=tk.VERTICAL, command=activity_log.yview)
activity_log.configure(yscrollcommand=scrollbar.set)
activity_log.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Close button
tk.Button(agent_popup, text="Close", command=agent_popup.destroy).pack(pady=5)
def log_activity(message):
"""Add message to activity log"""
try:
activity_log.insert(tk.END, f"[{time.strftime('%H:%M:%S')}] {message}\n")
activity_log.see(tk.END)
activity_log.update()
except:
pass
def run_simple_agent():
try:
# Import simple agent
from simple_agent_mode import SimpleAgentExecutor
agent = SimpleAgentExecutor(log_callback=log_activity)
log_activity("Agent initialized, analyzing request...")
# Use AI model to intelligently determine what actions to take
log_activity(f"AI analyzing request: {user_message}")
# Generate intelligent response using the loaded model
if self.current_model:
try:
# Create a comprehensive system prompt for the agent
agent_system_prompt = f"""You are an AI assistant with system access. Analyze the user's request and provide the exact Windows commands needed.
User Request: "{user_message}"
For virus scanning, use Windows Defender PowerShell commands:
- Get-MpComputerStatus: Check antivirus status
- Start-MpScan -ScanType QuickScan: Quick virus scan
- Start-MpScan -ScanType FullScan: Full system scan
- Update-MpSignature: Update virus definitions
Respond with ONLY the PowerShell command(s) needed, one per line:
"""
# Get AI response for dynamic command generation
from llm_runtime import GenerateConfig
cfg = GenerateConfig(max_tokens=800, temperature=0.1)
try:
# Check if this is a GGUF model (llama-cpp-python)
if hasattr(self.current_model, 'create_completion'):
# Use llama-cpp-python's native method with proper parameters
log_activity("Using GGUF model native completion method")
completion = self.current_model.create_completion(
prompt=agent_system_prompt,
max_tokens=800,
temperature=0.1,
stop=["\n\n", "Human:", "User:"],
echo=False
)
ai_response = completion['choices'][0]['text'].strip()
else:
# Use unified runtime method
log_activity("Using unified runtime method")
raw_response = self.current_model.generate(agent_system_prompt, cfg)
# Handle different response types
if isinstance(raw_response, str):
ai_response = raw_response
elif hasattr(raw_response, '__iter__') and not isinstance(raw_response, str):
# It's a generator or iterable, collect tokens
tokens = []
for token in raw_response:
if isinstance(token, str):
tokens.append(token)
else:
tokens.append(str(token))
ai_response = ''.join(tokens)
else:
# Fallback: convert to string
ai_response = str(raw_response)
log_activity(f"AI generated action plan: {ai_response}")
except Exception as gen_error:
log_activity(f"Error generating AI response: {gen_error}")
import traceback
log_activity(f"Full error traceback: {traceback.format_exc()}")
ai_response = "Get-MpComputerStatus; Start-MpScan -ScanType QuickScan" # Fallback command
# Execute the AI's action plan - treat as PowerShell commands
if ai_response.strip():
# Split into individual commands and execute each as PowerShell
commands = [cmd.strip() for cmd in ai_response.strip().split('\n') if cmd.strip()]
results = []
for cmd in commands:
if cmd and not cmd.startswith('#'): # Skip comments
self._append_output_threadsafe(f"[Executing] {cmd}\n")
result = agent.tools["powershell"](cmd)
results.append(f"Command: {cmd}\nResult: {result}")
combined_result = "\n\n".join(results)
else:
combined_result = "No commands generated"
result = combined_result
self._append_output_threadsafe(f"[AGENT]: {result}\n")
except Exception as e:
log_activity(f"Error in AI command generation: {e}")
# Simple fallback - just pass the request to the agent for basic parsing
result = agent.process_request(user_message, f"The user wants: {user_message}")
self._append_output_threadsafe(f"[AGENT]: {result}\n")
else:
# No model loaded - basic fallback processing
log_activity("No model loaded, using basic command processing")
result = agent.process_request(user_message, f"Please help with: {user_message}")
self._append_output_threadsafe(f"[AGENT]: {result}\n")
log_activity("Agent processing completed")
except Exception as e:
log_activity(f"Agent error: {e}")
self._append_output_threadsafe(f"\n[Agent Error]: {e}\n")
import traceback
traceback.print_exc()
finally:
log_activity("Agent task completed")
self._set_running(False)
# Close agent popup after 5 seconds
self.root.after(5000, lambda: agent_popup.destroy())
threading.Thread(target=run_simple_agent, daemon=True).start()
def _handle_agent_chat(self, user_message: str):
"""Handle chat in agent mode with full system access"""
if not user_message:
messagebox.showinfo("Chat", "Please enter a message.")
return
self._append_output(f"You: {user_message}\n[AGENT]: ")
self._set_running(True)
def run_agent():
try:
if not self.dhal_agent:
# Use the already loaded model if available
if hasattr(chat_stream, '_unified_model_cache') and chat_stream._unified_model_cache:
# Use the cached model directly
from agent_dhal.hal import Dhal, DhalConfig
# Create a simple pass-through model client that uses our cached model
class CachedModelClient:
def __init__(self, cached_model):
self.model = cached_model
async def create_chat_completion(self, messages, **kwargs):
# Convert messages to prompt
prompt = ""
for msg in messages:
if hasattr(msg, 'role') and hasattr(msg, 'content'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
if role == "system":
prompt += f"System: {content}\n\n"
elif role == "user":
prompt += f"User: {content}\n\n"
elif role == "assistant":
prompt += f"Assistant: {content}\n\n"
prompt += "Assistant: "
# Generate response using cached model
try:
from llm_runtime import GenerateConfig
cfg = GenerateConfig(
max_tokens=kwargs.get('max_tokens', 2000),
temperature=kwargs.get('temperature', 0.7)
)
response = self.model.generate(prompt, cfg)
except:
# Fallback for models without GenerateConfig
response = self.model.generate(prompt)
# Create response object
class CompletionResponse:
def __init__(self, content):
self.content = content
self.function_calls = None
return CompletionResponse(response)
def is_available(self):
return True
config = DhalConfig(
name="Hal",
system_message="You are Hal with full system access. You can execute any command the user requests. Always explain what you're doing before executing commands.",
model=self.model_var.get() or "cached"
)
model_client = CachedModelClient(chat_stream._unified_model_cache)
self.dhal_agent = Dhal(config, model_client)
else:
# Fallback to creating new agent
from agent_dhal.hal import create_dhal
self.dhal_agent = create_dhal(
name="Hal",
system_message="You are Hal with full system access. You can execute any command the user requests. Always explain what you're doing before executing commands.",
model=self.model_var.get() or "gpt-4"
)
# Process message through agent
import asyncio
async def process():
# Create mock context
class MockContext:
def __init__(self):
self.agent_id = "user"
response = await self.dhal_agent.handle_user_message(user_message, MockContext())
return response
# Run async function
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response = loop.run_until_complete(process())
self._append_output_threadsafe(response + "\n")
finally:
loop.close()
except Exception as e:
self._append_output_threadsafe(f"\n[Agent Error] {e}\n")
import traceback
traceback.print_exc()
finally:
self._set_running(False)
threading.Thread(target=run_agent, daemon=True).start()
# ---------------- CLI (Embedded only) -----------------
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Embedded llama.cpp app (no external daemons). Runs local GGUF models via llama-cpp-python.")
p.add_argument("--gui", action="store_true", help="Launch the GUI.")
p.add_argument("--model", required=False, help="Path to a local GGUF model file.")
p.add_argument("--prompt",
help="Single prompt to generate a response for. If omitted with no --gui, starts interactive chat mode.")
p.add_argument("--stream", action="store_true", help="Stream output tokens for single-prompt mode.")
p.add_argument("--n_ctx", type=int, default=4096, help="Context window size (default: 4096)")
p.add_argument("--n_gpu_layers", type=int, default=0, help="GPU layers to offload (default: 0 = CPU)")
p.add_argument("--lora", help="Optional LoRA/adapter file path to apply.")
return p.parse_args(argv)
def launch_main_gui(acceleration_type=None):
"""Launch the main DarkHal 2.0 GUI application with hardware acceleration"""
if acceleration_type:
print(f"Launching DarkHal 2.0 with {acceleration_type.upper()} acceleration...")
# Set default GPU layers based on acceleration type
if acceleration_type == 'cuda':
# Use high GPU offloading for CUDA
os.environ['DARKHAL_DEFAULT_GPU_LAYERS'] = '32'
elif acceleration_type == 'intel':
# Moderate GPU offloading for Intel GPU
os.environ['DARKHAL_DEFAULT_GPU_LAYERS'] = '16'
elif acceleration_type == 'cpu':
# No GPU offloading for CPU-only mode
os.environ['DARKHAL_DEFAULT_GPU_LAYERS'] = '0'
root = tk.Tk()
app = EmbeddedGUI(root)
root.mainloop()
def main(argv: List[str]) -> int:
print(f"[APP_DEBUG] main() called with argv: {argv}")
args = parse_args(argv)
print(f"[APP_DEBUG] Parsed args: {args}")
# Default to GUI when no CLI-specific args are provided, or when --gui is passed
if args.gui or (not args.model and not args.prompt):
print("[APP_DEBUG] Starting GUI mode")
# Show splash screen then launch main app
splash_manager = SplashManager(main_app_callback=launch_main_gui)
splash_manager.show_splash_and_launch()
return 0
# CLI mode requires a GGUF model path
if not args.model or not _is_gguf_model(args.model):
print("Please provide --model pointing to a local .gguf file (or run with --gui).", file=sys.stderr)
return 2
if args.prompt:
out = run_prompt(args.model, args.prompt, args.stream, n_ctx=args.n_ctx, n_gpu_layers=args.n_gpu_layers,
lora_path=(args.lora or None))
print(out)
else:
# Interactive chat
messages: List[Dict[str, Any]] = []
print("Starting interactive chat. Type 'exit' or 'quit' to leave.")
while True:
try:
user = input("You> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nExiting.")
break
if user.lower() in {"exit", "quit"}:
print("Goodbye!")
break
if not user:
continue
messages.append({"role": "user", "content": user})
try:
print("Assistant> ", end="", flush=True)
def _print_chunk(s: str):
print(s, end="", flush=True)
assistant_content = chat_stream(args.model, messages, n_ctx=args.n_ctx, n_gpu_layers=args.n_gpu_layers,
lora_path=(args.lora or None), on_chunk=_print_chunk, chat_template=None, session_id=None)
print()
messages.append({"role": "assistant", "content": assistant_content})
except Exception as e:
print(f"\n[Error] {e}")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))