""" Model Loader for the Tiny Conversational AI. Handles model downloading, loading, and optimization for CPU inference. Uses llama-cpp-python for maximum CPU performance with 4-bit quantization. """ import os import sys import time import shutil from pathlib import Path from typing import Optional, Dict, Any, Callable from dataclasses import dataclass import threading from config import Config, get_config from utils import ( get_logger, get_system_info, get_available_ram_gb, get_optimal_thread_count, check_system_requirements, ProgressBar, Timer, ensure_dir ) logger = get_logger(__name__) @dataclass class ModelInfo: """Information about a loaded model.""" name: str model_id: str path: Path size_gb: float context_size: int loaded: bool = False load_time_seconds: float = 0.0 class ModelLoader: """ Handles loading and managing LLM models. Uses llama-cpp-python for efficient CPU inference with 4-bit quantization. """ def __init__(self, config: Optional[Config] = None): self.config = config or get_config() self.model = None self.model_info: Optional[ModelInfo] = None self._lock = threading.Lock() self._loading = False # Ensure models directory exists ensure_dir(self.config.paths.models_dir) def _get_model_path(self, model_id: str) -> Path: """Get the local path for a model.""" model_config = self.config.model.AVAILABLE_MODELS.get(model_id) if not model_config: raise ValueError(f"Unknown model: {model_id}") return self.config.paths.models_dir / model_config["file"] def _is_model_downloaded(self, model_id: str) -> bool: """Check if a model is already downloaded.""" path = self._get_model_path(model_id) return path.exists() def _download_model( self, model_id: str, progress_callback: Optional[Callable[[float, str], None]] = None ) -> Path: """ Download a model from Hugging Face Hub. Uses huggingface_hub for reliable downloads with resume support. """ model_config = self.config.model.AVAILABLE_MODELS.get(model_id) if not model_config: raise ValueError(f"Unknown model: {model_id}") dest_path = self._get_model_path(model_id) if dest_path.exists(): logger.info(f"Model already downloaded: {dest_path}") return dest_path logger.info(f"Downloading model: {model_config['name']}") logger.info(f"Repository: {model_config['repo']}") logger.info(f"File: {model_config['file']}") logger.info(f"Expected size: ~{model_config['size_gb']}GB") if progress_callback: progress_callback(0.0, f"Starting download of {model_config['name']}...") try: from huggingface_hub import hf_hub_download # Download with progress downloaded_path = hf_hub_download( repo_id=model_config["repo"], filename=model_config["file"], local_dir=self.config.paths.models_dir, local_dir_use_symlinks=False, resume_download=True, ) # Move to expected location if needed downloaded_path = Path(downloaded_path) if downloaded_path != dest_path: if downloaded_path.exists(): shutil.move(str(downloaded_path), str(dest_path)) if progress_callback: progress_callback(1.0, "Download complete!") logger.info(f"Model downloaded successfully: {dest_path}") return dest_path except ImportError: logger.error("huggingface_hub not installed. Please install it: pip install huggingface_hub") raise except Exception as e: logger.error(f"Download failed: {e}") raise def select_best_model(self) -> str: """ Automatically select the best model based on available RAM. Returns model_id of the selected model. """ available_ram = get_available_ram_gb() logger.info(f"Available RAM: {available_ram:.1f}GB") # Sort models by quality (descending), filter by RAM requirement suitable_models = [] for model_id, model_config in self.config.model.AVAILABLE_MODELS.items(): if model_config["min_ram_gb"] <= available_ram: suitable_models.append((model_id, model_config)) if not suitable_models: # Use smallest model as last resort logger.warning("Low RAM detected, using smallest model") return "tinyllama-1.1b" # Sort by quality * speed score suitable_models.sort( key=lambda x: x[1]["quality"] * x[1]["speed"], reverse=True ) selected = suitable_models[0][0] logger.info(f"Selected model: {selected}") return selected def load( self, model_id: Optional[str] = None, auto_download: bool = True, progress_callback: Optional[Callable[[float, str], None]] = None ) -> Any: """ Load a model for inference. Args: model_id: ID of the model to load. If None, auto-selects best model. auto_download: Whether to download the model if not present. progress_callback: Optional callback for progress updates (progress, message). Returns: The loaded model instance. """ with self._lock: if self._loading: raise RuntimeError("Model is already being loaded") self._loading = True try: timer = Timer("Model loading") timer.start() # Auto-select model if not specified if model_id is None: model_id = self.select_best_model() model_config = self.config.model.AVAILABLE_MODELS.get(model_id) if not model_config: raise ValueError(f"Unknown model: {model_id}") model_path = self._get_model_path(model_id) # Download if needed if not model_path.exists(): if auto_download: if progress_callback: progress_callback(0.0, "Downloading model...") model_path = self._download_model(model_id, progress_callback) else: raise FileNotFoundError( f"Model not found: {model_path}\n" f"Run with auto_download=True or download manually from: " f"https://huggingface.co/{model_config['repo']}" ) # Check system requirements (use relaxed check - model will use virtual memory if needed) check_result = check_system_requirements(min(model_config["min_ram_gb"], 1.0)) if not check_result["meets_requirements"]: for error in check_result["errors"]: logger.warning(f"RAM warning (continuing anyway): {error}") # Don't raise - let it try to load, OS will use swap if needed for warning in check_result.get("warnings", []): logger.warning(warning) if progress_callback: progress_callback(0.5, "Loading model into memory...") # Load with llama-cpp-python self.model = self._load_llama_cpp(model_path, model_config) timer.stop() # Store model info self.model_info = ModelInfo( name=model_config["name"], model_id=model_id, path=model_path, size_gb=model_config["size_gb"], context_size=model_config["context_size"], loaded=True, load_time_seconds=timer.elapsed, ) if progress_callback: progress_callback(1.0, f"Model loaded in {timer.elapsed:.1f}s") logger.info(f"Model loaded successfully in {timer.elapsed:.1f}s") return self.model finally: with self._lock: self._loading = False def _load_llama_cpp(self, model_path: Path, model_config: Dict[str, Any]) -> Any: """Load model using llama-cpp-python for optimal CPU performance.""" try: from llama_cpp import Llama except ImportError: logger.error( "llama-cpp-python not installed. Please install it:\n" "pip install llama-cpp-python" ) raise # Determine optimal settings n_threads = self.config.model.n_threads if n_threads == 0: n_threads = get_optimal_thread_count() n_ctx = min( model_config.get("context_size", 4096), self.config.model.max_context_length ) logger.info(f"Loading model: {model_path.name}") logger.info(f"Context size: {n_ctx}") logger.info(f"Threads: {n_threads}") logger.info(f"Batch size: {self.config.model.n_batch}") # Load the model model = Llama( model_path=str(model_path), n_ctx=n_ctx, n_threads=n_threads, n_batch=self.config.model.n_batch, n_gpu_layers=self.config.model.n_gpu_layers, use_mmap=self.config.model.use_mmap, use_mlock=self.config.model.use_mlock, verbose=False, # Reduce noise ) return model def unload(self): """Unload the current model to free memory.""" with self._lock: if self.model is not None: del self.model self.model = None self.model_info = None # Force garbage collection import gc gc.collect() logger.info("Model unloaded") def is_loaded(self) -> bool: """Check if a model is currently loaded.""" return self.model is not None def get_model(self) -> Any: """Get the loaded model instance.""" if not self.is_loaded(): raise RuntimeError("No model loaded. Call load() first.") return self.model def get_model_info(self) -> Optional[ModelInfo]: """Get information about the loaded model.""" return self.model_info def warmup(self, prompt: str = "Hello") -> float: """ Warm up the model with a simple generation. Returns the warmup time in seconds. """ if not self.is_loaded(): raise RuntimeError("No model loaded. Call load() first.") logger.info("Warming up model...") timer = Timer("Warmup") timer.start() # Generate a short response _ = self.model( prompt, max_tokens=10, temperature=0.7, ) timer.stop() logger.info(f"Warmup complete in {timer.elapsed:.2f}s") return timer.elapsed def list_available_models(self) -> Dict[str, Dict[str, Any]]: """List all available models with their info.""" models = {} for model_id, model_config in self.config.model.AVAILABLE_MODELS.items(): models[model_id] = { **model_config, "downloaded": self._is_model_downloaded(model_id), "path": str(self._get_model_path(model_id)), } return models def delete_model(self, model_id: str) -> bool: """Delete a downloaded model to free disk space.""" model_path = self._get_model_path(model_id) if model_path.exists(): # Don't delete if currently loaded if self.model_info and self.model_info.model_id == model_id: self.unload() model_path.unlink() logger.info(f"Deleted model: {model_path}") return True return False # ============================================================================= # CONVENIENCE FUNCTIONS # ============================================================================= _global_loader: Optional[ModelLoader] = None def get_loader() -> ModelLoader: """Get the global model loader instance.""" global _global_loader if _global_loader is None: _global_loader = ModelLoader() return _global_loader def load_model( model_id: Optional[str] = None, auto_download: bool = True ) -> Any: """Convenience function to load a model.""" return get_loader().load(model_id, auto_download) def get_model() -> Any: """Get the currently loaded model.""" return get_loader().get_model() if __name__ == "__main__": # Test model loading from utils import print_banner, print_system_status print_banner() print_system_status() loader = ModelLoader() print("\nšŸ“¦ Available Models:") for model_id, info in loader.list_available_models().items(): status = "āœ“ Downloaded" if info["downloaded"] else "ā—‹ Not downloaded" print(f" • {model_id}: {info['name']}") print(f" Size: {info['size_gb']}GB | Min RAM: {info['min_ram_gb']}GB | {status}") # Auto-select and load best model print("\nšŸš€ Loading model...") try: model = loader.load() print(f"\nāœ“ Model loaded: {loader.model_info.name}") print(f" Load time: {loader.model_info.load_time_seconds:.1f}s") # Warmup warmup_time = loader.warmup() print(f" Warmup time: {warmup_time:.2f}s") # Simple test print("\nšŸ“ Test generation:") response = model( "User: Hello!\nAssistant:", max_tokens=50, temperature=0.7, stop=["User:", "\n\n"], ) print(f"Response: {response['choices'][0]['text'].strip()}") except Exception as e: print(f"āŒ Error: {e}") raise