Spaces:
Runtime error
Runtime error
from fastapi import FastAPI, HTTPException | |
from transformers import AutoTokenizer, T5ForConditionalGeneration | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.middleware.cors import CORSMiddleware | |
import json | |
import os | |
import logging | |
import time | |
import gc | |
import re | |
import psutil # Para monitorar uso de recursos | |
# Configura logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Carrega questions.json | |
try: | |
with open("questions.json", "r", encoding="utf-8") as f: | |
examples = json.load(f) | |
logger.info("questions.json carregado com sucesso.") | |
except FileNotFoundError: | |
examples = [] | |
logger.warning("questions.json não encontrado, usando lista vazia.") | |
# Função para carregar modelo e tokenizer | |
def get_model(): | |
if not hasattr(get_model, "model_data"): | |
logger.info("Iniciando carregamento de modelo e tokenizer...") | |
start_time = time.time() | |
try: | |
tokenizer = AutoTokenizer.from_pretrained( | |
"unicamp-dl/ptt5-small-portuguese-vocab", | |
legacy=False, | |
clean_up_tokenization_spaces=True | |
) | |
logger.info(f"Tokenizer baixado e carregado em {time.time() - start_time:.2f} segundos.") | |
model = T5ForConditionalGeneration.from_pretrained( | |
"unicamp-dl/ptt5-small-portuguese-vocab" | |
) | |
logger.info(f"Modelo baixado e carregado em {time.time() - start_time:.2f} segundos.") | |
get_model.model_data = {"tokenizer": tokenizer, "model": model} | |
logger.info("Modelo e tokenizer armazenados com sucesso em model_data.") | |
except Exception as e: | |
logger.error(f"Erro ao carregar modelo ou tokenizer: {e}") | |
get_model.model_data = None | |
logger.debug(f"Retornando model_data: {get_model.model_data is not None}") | |
return get_model.model_data | |
def parse_model_output(response): | |
logger.debug(f"Saída bruta do modelo: {response}") | |
pattern = r"Enunciado clínico: (.*?)(?:\s*Alternativas: (.*?))?(?:\s*Gabarito: (.*?))?(?:\s*Explicação: (.*?))?" | |
match = re.match(pattern, response, re.DOTALL) | |
if match: | |
question = match.group(1).strip() if match.group(1) else response[:200] | |
options = [opt.strip() for opt in (match.group(2) or "").split(",") if opt.strip()] if match.group(2) else [] | |
answer = match.group(3).strip() if match.group(3) else "" | |
explanation = match.group(4).strip() if match.group(4) else "Sem explicação ou parsing incompleto" | |
if len(options) >= 4: | |
return { | |
"question": f"Enunciado clínico: {question}", | |
"options": [f"A) {options[0]}", f"B) {options[1]}", f"C) {options[2]}", f"D) {options[3]}"], | |
"answer": answer, | |
"explanation": explanation | |
} | |
logger.warning(f"Parsing falhou para: {response[:200]}") | |
# Fallback para tentar extrair algo útil | |
if "Enunciado clínico" in response: | |
return {"question": response[:200], "options": [], "answer": "", "explanation": "Formato parcial detectado"} | |
return {"question": response[:200] if len(response) > 200 else response, "options": [], "answer": "", "explanation": "Erro no parsing ou formato inválido"} | |
def generate_question_from_prompt(theme, difficulty, example_question=None): | |
model_data = get_model() | |
logger.debug(f"Verificando model_data: {model_data is not None}") | |
if not model_data or not model_data["tokenizer"] or not model_data["model"]: | |
logger.error("Modelo ou tokenizer não disponível.") | |
return {"question": "Erro: Modelo ou tokenizer não carregado.", "options": [], "answer": "", "explanation": "Por favor, verifique os logs."} | |
tokenizer = model_data["tokenizer"] | |
model = model_data["model"] | |
logger.info(f"Gerando questão com tema: {theme}, dificuldade: {difficulty}") | |
logger.debug(f"Uso de CPU: {psutil.cpu_percent()}%, Memória: {psutil.virtual_memory().percent}%") | |
if example_question: | |
example_text = example_question.get("question", "") + " " + ", ".join(example_question.get("options", [])) | |
prompt = f"Usando '{example_text[:100]}' como exemplo, gere uma NOVA questão curta sobre '{theme}', dificuldade '{difficulty}', estilo USP. Responda SOMENTE: 'Enunciado clínico: [texto]. Alternativas: A) [opção], B) [opção], C) [opção], D) [opção]. Gabarito: [letra]. Explicação: [texto].'" | |
else: | |
prompt = f"Gere uma NOVA questão curta sobre '{theme}', dificuldade '{difficulty}', estilo USP. Responda SOMENTE: 'Enunciado clínico: [texto]. Alternativas: A) [opção], B) [opção], C) [opção], D) [opção]. Gabarito: [letra]. Explicação: [texto].'" | |
try: | |
inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
outputs = model.generate(**inputs, max_new_tokens=512, do_sample=True, temperature=0.7, top_p=0.9) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
logger.debug(f"Resposta bruta: {response}") | |
result = parse_model_output(response) | |
logger.debug(f"Questão processada: {result}") | |
gc.collect() | |
return result | |
except Exception as e: | |
logger.error(f"Erro na geração da questão: {e}") | |
gc.collect() | |
return {"question": f"Erro na geração: {e}", "options": [], "answer": "", "explanation": "Tente novamente."} | |
# Função para exibir perguntas do JSON e gerar adicionais | |
def generate_simulado(): | |
logger.info("Iniciando geração de simulado...") | |
# Exibe as 3 primeiras perguntas do JSON | |
max_json_questions = min(3, len(examples)) | |
for i in range(max_json_questions): | |
question_data = examples[i] | |
logger.info(f"Questão do JSON {i + 1}: {question_data['question']}") | |
for opt in question_data['options']: | |
logger.info(f" {opt}") | |
logger.info(f" Gabarito: {question_data['answer']}") | |
logger.info(f" Explicação: {question_data['explanation']}") | |
# Gera 3 perguntas adicionais com o modelo | |
for i in range(3): | |
logger.debug(f"Gerando pergunta adicional {i + 1}") | |
example = examples[i % len(examples)] if examples else None | |
question_data = generate_question_from_prompt("clinica medica", "medio", example) | |
logger.info(f"Questão Gerada {max_json_questions + i + 1}: {question_data['question']}") | |
for opt in question_data['options']: | |
logger.info(f" {opt}") | |
logger.info(f" Gabarito: {question_data['answer']}") | |
logger.info(f" Explicação: {question_data['explanation']}") | |
logger.info("Geração de simulado concluída.") | |
return {"simulado": examples[:3] + [generate_question_from_prompt("clinica medica", "medio") for _ in range(3)]} | |
# Força carregamento inicial | |
logger.info("Testando carregamento inicial do modelo...") | |
start_time = time.time() | |
model_data = get_model() | |
if model_data: | |
logger.info(f"Modelo e tokenizer inicializados em {time.time() - start_time:.2f} segundos.") | |
time.sleep(1) # Delay para estabilidade | |
generate_simulado() | |
else: | |
logger.error("Falha na inicialização do modelo.") | |
async def generate_question(theme: str, difficulty: str): | |
valid_difficulties = ["fácil", "médio", "difícil"] | |
if not theme or difficulty.lower() not in valid_difficulties: | |
raise HTTPException(status_code=400, detail="Tema inválido ou dificuldade deve ser 'fácil', 'médio' ou 'difícil'.") | |
example = examples[0] if examples else None | |
return generate_question_from_prompt(theme, difficulty, example) | |
async def get_simulado(num_questions: int = 6): # 3 do JSON + 3 geradas | |
simulado = examples[:min(3, len(examples))] # Até 3 do JSON | |
for _ in range(min(3, num_questions - len(simulado))): # Gera até 3 adicionais | |
example = examples[0] if examples else None | |
question_data = generate_question_from_prompt("clinica medica", "medio", example) | |
simulado.append(question_data) | |
return {"simulado": simulado} |