import os os.environ['MKL_THREADING_LAYER'] = 'GNU' import spaces import torch from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, StoppingCriteria, StoppingCriteriaList from .prompts import format_rag_prompt from .shared import generation_interrupt models = { "Qwen2.5-1.5b-Instruct": "qwen/qwen2.5-1.5b-instruct", "Qwen2.5-3b-Instruct": "qwen/qwen2.5-3b-instruct", "Llama-3.2-1b-Instruct": "meta-llama/llama-3.2-1b-instruct", "Llama-3.2-3b-Instruct": "meta-llama/llama-3.2-3b-instruct", #"Gemma-3-1b-it": "google/gemma-3-1b-it", #"Gemma-3-4b-it": "google/gemma-3-4b-it", "Gemma-2-2b-it": "google/gemma-2-2b-it", "Phi-4-mini-instruct": "microsoft/phi-4-mini-instruct", "Cogito-v1-preview-llama-3b": "deepcogito/cogito-v1-preview-llama-3b", "IBM Granite-3.3-2b-instruct": "ibm-granite/granite-3.3-2b-instruct", # #"Bitnet-b1.58-2B4T": "microsoft/bitnet-b1.58-2B-4T", # #"MiniCPM3-RAG-LoRA": "openbmb/MiniCPM3-RAG-LoRA", "Qwen3-0.6b": "qwen/qwen3-0.6b", "Qwen3-1.7b": "qwen/qwen3-1.7b", "Qwen3-4b": "qwen/qwen3-4b", "SmolLM2-1.7b-Instruct": "HuggingFaceTB/SmolLM2-1.7B-Instruct", "EXAONE-3.5-2.4B-instruct": "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct", "OLMo-2-1B-Instruct": "allenai/OLMo-2-0425-1B-Instruct", } tokenizer_cache = {} # List of model names for easy access model_names = list(models.keys()) # Custom stopping criteria that checks the interrupt flag class InterruptCriteria(StoppingCriteria): def __init__(self, interrupt_event): self.interrupt_event = interrupt_event def __call__(self, input_ids, scores, **kwargs): return self.interrupt_event.is_set() @spaces.GPU def generate_summaries(example, model_a_name, model_b_name): """ Generates summaries for the given example using the assigned models sequentially. """ if generation_interrupt.is_set(): return "", "" context_text = "" context_parts = [] if "full_contexts" in example and example["full_contexts"]: for i, ctx in enumerate(example["full_contexts"]): content = "" # Extract content from either dict or string if isinstance(ctx, dict) and "content" in ctx: content = ctx["content"] elif isinstance(ctx, str): content = ctx # Add document number if not already present if not content.strip().startswith("Document"): content = f"Document {i+1}:\n{content}" context_parts.append(content) context_text = "\n\n".join(context_parts) else: # Provide a graceful fallback instead of raising an error print("Warning: No full context found in the example, using empty context") context_text = "" question = example.get("question", "") if generation_interrupt.is_set(): return "", "" # Run model A summary_a = run_inference(models[model_a_name], context_text, question) if generation_interrupt.is_set(): return summary_a, "" # Run model B summary_b = run_inference(models[model_b_name], context_text, question) return summary_a, summary_b @spaces.GPU def run_inference(model_name, context, question): """ Run inference using the specified model. Returns the generated text or empty string if interrupted. """ # Check interrupt at the beginning if generation_interrupt.is_set(): return "" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") result = "" tokenizer_kwargs = { "add_generation_prompt": True, } # make sure qwen3 doesn't use thinking generation_kwargs = { "max_new_tokens": 512, } if "qwen3" in model_name.lower(): print(f"Recognized {model_name} as a Qwen3 model. Setting enable_thinking=False.") tokenizer_kwargs["enable_thinking"] = False try: if model_name in tokenizer_cache: tokenizer = tokenizer_cache[model_name] else: tokenizer = AutoTokenizer.from_pretrained( model_name, padding_side="left", token=True, kwargs=tokenizer_kwargs ) tokenizer_cache[model_name] = tokenizer accepts_sys = ( "System role not supported" not in tokenizer.chat_template if tokenizer.chat_template else False # Handle missing chat_template ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Check interrupt before loading the model if generation_interrupt.is_set(): return "" pipe = pipeline( "text-generation", model=model_name, tokenizer=tokenizer, device_map='cuda', trust_remote_code=True, torch_dtype=torch.bfloat16, model_kwargs={ "attn_implementation": "eager", } ) text_input = format_rag_prompt(question, context, accepts_sys) if "Gemma-3".lower() not in model_name.lower(): formatted = pipe.tokenizer.apply_chat_template( text_input, tokenize=False, **tokenizer_kwargs, ) input_length = len(formatted) # Check interrupt before generation outputs = pipe(formatted, max_new_tokens=512, generation_kwargs={"skip_special_tokens": True}) #print(outputs[0]['generated_text']) result = outputs[0]['generated_text'][input_length:] else: # don't use apply chat template? I don't know why gemma keeps breaking result = pipe(text_input, max_new_tokens=512, generation_kwargs={"skip_special_tokens": True})[0]['generated_text'] result = result[0]['generated_text'][-1]['content'] except Exception as e: print(f"Error in inference for {model_name}: {e}") result = f"Error generating response: {str(e)[:200]}..." finally: # Clean up resources if torch.cuda.is_available(): torch.cuda.empty_cache() return result