Spaces:
Build error
Build error
| # llama_generator.py | |
| from config.config import GenerationConfig, ModelConfig | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from datetime import datetime | |
| import logging | |
| import torch | |
| from config.config import settings | |
| from services.prompt_builder import LlamaPromptTemplate | |
| from services.model_manager import ModelManager | |
| from services.base_generator import BaseGenerator | |
| from services.strategy import DefaultStrategy, MajorityVotingStrategy, BestOfN, BeamSearch, DVT, COT, ReAct | |
| import asyncio | |
| from io import StringIO | |
| import pandas as pd | |
| from langfuse.decorators import observe, langfuse_context | |
| import os | |
| # Initialize Langfuse | |
| os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-04d2302a-aa5c-4870-9703-58ab64c3bcae" | |
| os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-d34ea200-feec-428e-a621-784fce93a5af" | |
| os.environ["LANGFUSE_HOST"] = "https://chris4k-langfuse-template-space.hf.space" # 🇪🇺 EU region | |
| try: | |
| langfuse = Langfuse() | |
| except Exception as e: | |
| print("Langfuse Offline") | |
| class LlamaGenerator(BaseGenerator): | |
| def __init__( | |
| self, | |
| llama_model_name: str, | |
| prm_model_path: str, | |
| device: Optional[str] = None, | |
| default_generation_config: Optional[GenerationConfig] = None, | |
| model_config: Optional[ModelConfig] = None, | |
| cache_size: int = 1000, | |
| max_batch_size: int = 32, | |
| # self.tokenizer = self.load_tokenizer(llama_model_name) | |
| # self.tokenizer = self.load_tokenizer(llama_model_name) # Add this line to initialize the tokenizer | |
| ): | |
| print(llama_model_name) | |
| print(prm_model_path) | |
| self.model_manager = ModelManager() | |
| self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.tokenizer = self.model_manager.load_tokenizer(llama_model_name) # Add this line to initialize the tokenizer | |
| super().__init__( | |
| llama_model_name, | |
| device, | |
| default_generation_config, | |
| model_config, | |
| cache_size, | |
| max_batch_size | |
| ) | |
| # Initialize models | |
| self.model_manager.load_model( | |
| "llama", | |
| llama_model_name, | |
| "llama", | |
| self.model_config | |
| ) | |
| self.model_manager.load_model( | |
| "prm", | |
| prm_model_path, | |
| "gguf", | |
| self.model_config | |
| ) | |
| # Assign llama model to self.model | |
| self.model = self.model_manager.models.get("llama") | |
| if not self.model: | |
| raise ValueError(f"Failed to load model: {llama_model_name}") | |
| self.prm_model = self.model_manager.models.get("prm") | |
| #self.prm_tokenizer = self.model_manager.load_tokenizer(prm_model_path) # Add this line to initialize the tokenizer | |
| self.prompt_builder = LlamaPromptTemplate() | |
| self._init_strategies() | |
| def _init_strategies(self): | |
| self.strategies = { | |
| "default": DefaultStrategy(), | |
| "majority_voting": MajorityVotingStrategy(), | |
| "best_of_n": BestOfN(), | |
| "beam_search": BeamSearch(), | |
| "dvts": DVT(), | |
| } | |
| def _get_generation_kwargs(self, config: GenerationConfig) -> Dict[str, Any]: | |
| """Get generation kwargs based on config.""" | |
| return { | |
| key: getattr(config, key) | |
| for key in [ | |
| "max_new_tokens", | |
| "temperature", | |
| "top_p", | |
| "top_k", | |
| "repetition_penalty", | |
| "length_penalty", | |
| "do_sample" | |
| ] | |
| if hasattr(config, key) | |
| } | |
| def generate_stream (self): | |
| return " NOt implememnted yet " | |
| def generate( | |
| self, | |
| prompt: str, | |
| model_kwargs: Dict[str, Any], | |
| strategy: str = "default", | |
| **kwargs | |
| ) -> str: | |
| """ | |
| Generate text based on a given strategy. | |
| Args: | |
| prompt (str): Input prompt for text generation. | |
| model_kwargs (Dict[str, Any]): Additional arguments for model generation. | |
| strategy (str): The generation strategy to use (default: "default"). | |
| **kwargs: Additional arguments passed to the strategy. | |
| Returns: | |
| str: Generated text response. | |
| Raises: | |
| ValueError: If the specified strategy is not available. | |
| """ | |
| # Validate that the strategy exists | |
| if strategy not in self.strategies: | |
| raise ValueError(f"Unknown strategy: {strategy}. Available strategies are: {list(self.strategies.keys())}") | |
| # Extract `generator` from kwargs if it exists to prevent duplication | |
| kwargs.pop("generator", None) | |
| # Call the selected strategy with the provided arguments | |
| return self.strategies[strategy].generate( | |
| generator=self, # The generator instance | |
| prompt=prompt, # The input prompt | |
| model_kwargs=model_kwargs, # Arguments for the model | |
| **kwargs # Any additional strategy-specific arguments | |
| ) | |
| def generate_with_context( | |
| self, | |
| context: str, | |
| user_input: str, | |
| chat_history: List[Tuple[str, str]], | |
| model_kwargs: Dict[str, Any], | |
| max_history_turns: int = 3, | |
| strategy: str = "default", | |
| num_samples: int = 5, | |
| depth: int = 3, | |
| breadth: int = 2, | |
| ) -> str: | |
| """Generate a response using context and chat history. | |
| Args: | |
| context (str): Context for the conversation | |
| user_input (str): Current user input | |
| chat_history (List[Tuple[str, str]]): List of (user, assistant) message pairs | |
| model_kwargs (dict): Additional arguments for model.generate() | |
| max_history_turns (int): Maximum number of history turns to include | |
| strategy (str): Generation strategy | |
| num_samples (int): Number of samples for applicable strategies | |
| depth (int): Depth for DVTS strategy | |
| breadth (int): Breadth for DVTS strategy | |
| Returns: | |
| str: Generated response | |
| """ | |
| prompt = self.prompt_builder.format( | |
| context, | |
| user_input, | |
| chat_history, | |
| max_history_turns | |
| ) | |
| return self.generate( | |
| generator=self, | |
| prompt=prompt, | |
| model_kwargs=model_kwargs, | |
| strategy=strategy, | |
| num_samples=num_samples, | |
| depth=depth, | |
| breadth=breadth | |
| ) | |
| def check_health(self) : #-> HealthStatus: | |
| """Check the health status of the generator.""" | |
| #return self.health_check.check_system_resources() # TODO add model status | |
| return "All good? - Check not omplemented " |