#!/usr/bin/env python3 """ MAC OS X INSTALL: pip3 install torch==2.1.1 torchvision torchaudio transformers==4.48.0 accelerate==0.28.0 (You must use these versions, higher version have some numerical instability bug on MPS chips) Interactive model evaluation script for pretraining experiments. Automatically discovers and loads all models with /hf subdirectories. """ import os import glob from pathlib import Path from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import torch import warnings # Suppress warnings for cleaner output warnings.filterwarnings("ignore") MODEL_NAME_FILTER = None class ModelEvaluator: def __init__(self): self.models = {} self.tokenizers = {} self.pipelines = {} self.model_names = [] def discover_models(self): """Discover all models with /hf subdirectories.""" print("šŸ” Discovering models with /hf subdirectories...") # Find all directories that contain an /hf subdirectory hf_dirs = [] for item in os.listdir('.'): if os.path.isdir(item) and os.path.exists(os.path.join(item, 'hf')): if MODEL_NAME_FILTER is None or MODEL_NAME_FILTER in item: hf_dirs.append(item) if not hf_dirs: print("āŒ No models with /hf subdirectories found!") return False print(f"āœ… Found {len(hf_dirs)} models:") for model_dir in hf_dirs: print(f" - {model_dir}") return hf_dirs def load_model(self, model_dir): """Load a single model and its tokenizer.""" try: hf_path = os.path.join(model_dir, 'hf') print(f"šŸ”„ Loading {model_dir}...") # Load tokenizer tokenizer = AutoTokenizer.from_pretrained(hf_path) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load model model = AutoModelForCausalLM.from_pretrained( hf_path, device_map=None, torch_dtype=torch.float16, trust_remote_code=True ) model = model.to(torch.float16) if torch.cuda.is_available(): model.to("cuda:0") else: model.to("mps") # Create pipeline - use conversational for chat models, text-generation for others if "chat" in model_dir.lower() or "sft" in model_dir.lower(): pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, device_map="auto", torch_dtype=torch.float16 ) print(f" šŸ”„ Using conversational pipeline for chat model") else: pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, device_map="auto", torch_dtype=torch.float16 ) print(f" šŸ”„ Using text-generation pipeline") self.models[model_dir] = model self.tokenizers[model_dir] = tokenizer self.pipelines[model_dir] = pipe self.model_names.append(model_dir) print(f" āœ… {model_dir} loaded successfully") return True except Exception as e: print(f" āŒ Failed to load {model_dir}: {str(e)}") return False def load_all_models(self): """Load all discovered models.""" hf_dirs = self.discover_models() if not hf_dirs: return False print("\nšŸš€ Loading models...") successful_loads = 0 for model_dir in hf_dirs: if self.load_model(model_dir): successful_loads += 1 print(f"\nšŸ“Š Loaded {successful_loads}/{len(hf_dirs)} models successfully") return successful_loads > 0 def generate_response(self, model_name, prompt, max_length=256): """Generate response for a specific model.""" try: pipe = self.pipelines[model_name] # Check if this is a conversational pipeline if "chat" in model_name.lower() or "sft" in model_name.lower(): # For conversational models, use the chat format chat_input = [{"role": "user", "content": prompt}] outputs = pipe( chat_input, max_new_tokens=max_length, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=self.tokenizers[model_name].eos_token_id ) # Extract the assistant's response from the conversational output if outputs and len(outputs) > 0: # The conversational pipeline returns the full conversation # We need to extract just the assistant's last response conversation = outputs[0]['generated_text'] if isinstance(conversation, list) and len(conversation) > 1: # Find the last assistant message for message in reversed(conversation): if message.get('role') == 'assistant': return message.get('content', 'No response generated') # If no assistant message found, return the last message content return conversation[-1].get('content', 'No response generated') else: return str(conversation) else: return "No response generated" else: # For text-generation models, use the original format outputs = pipe( prompt, max_new_tokens=max_length, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=self.tokenizers[model_name].eos_token_id, return_full_text=False ) return outputs[0]['generated_text'] except Exception as e: return f"āŒ Generation failed: {str(e)}" def evaluate_prompt(self, prompt): """Evaluate a prompt across all loaded models.""" print(f"\nšŸŽÆ Evaluating prompt: '{prompt}'") print("=" * 80) for model_name in self.model_names: print(f"\nšŸ¤– {model_name}:") print("-" * 40) response = self.generate_response(model_name, prompt) print(response) print("\n" + "=" * 80) def interactive_loop(self): """Main interactive evaluation loop.""" print("\nšŸŽ® Interactive Evaluation Mode") print("Commands:") print(" - Type your prompt to evaluate all models") print(" - Type 'quit' or 'exit' to end") print(" - Type 'help' for this message") print(" - Type 'models' to list loaded models") print(" - Type 'clear' to clear screen") print("\nšŸ’” Note: Models with 'chat' in their name use conversational pipeline,") print(" other models use text-generation pipeline.") while True: try: user_input = input("\nšŸ’¬ Enter prompt (or command): ").strip() if not user_input: continue if user_input.lower() in ['quit', 'exit', 'q']: print("šŸ‘‹ Goodbye!") break elif user_input.lower() == 'help': print("\nšŸŽ® Interactive Evaluation Mode") print("Commands:") print(" - Type your prompt to evaluate all models") print(" - Type 'quit' or 'exit' to end") print(" - Type 'help' for this message") print(" - Type 'models' to list loaded models") print(" - Type 'clear' to clear screen") print("\nšŸ’” Note: Models with 'chat' in their name use conversational pipeline,") print(" other models use text-generation pipeline.") elif user_input.lower() == 'models': print(f"\nšŸ“‹ Loaded models ({len(self.model_names)}):") for i, model_name in enumerate(self.model_names, 1): print(f" {i}. {model_name}") elif user_input.lower() == 'clear': os.system('clear' if os.name == 'posix' else 'cls') else: self.evaluate_prompt(user_input) except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye!") break except Exception as e: print(f"āŒ Error: {str(e)}") def main(): print("šŸš€ Model Evaluation Script") print("=" * 50) evaluator = ModelEvaluator() # Load all models if not evaluator.load_all_models(): print("āŒ No models could be loaded. Exiting.") return # Start interactive loop evaluator.interactive_loop() if __name__ == "__main__": main()