Spaces:
Sleeping
Sleeping
| import re | |
| import numpy as np | |
| import json | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| import os | |
| import gradio as gr | |
| import time | |
| tokenizer = AutoTokenizer.from_pretrained("answerdotai/ModernBERT-base") | |
| sentence_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| def clean_text(text): | |
| text = re.sub(r'\[speaker_\d+\]', '', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def split_text_by_tokens(text, max_tokens=8000): | |
| text = clean_text(text) | |
| tokens = tokenizer.encode(text) | |
| if len(tokens) <= max_tokens: | |
| return [text] | |
| split_point = len(tokens) // 2 | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| first_half = [] | |
| second_half = [] | |
| current_tokens = 0 | |
| for sentence in sentences: | |
| sentence_tokens = len(tokenizer.encode(sentence)) | |
| if current_tokens + sentence_tokens <= split_point: | |
| first_half.append(sentence) | |
| current_tokens += sentence_tokens | |
| else: | |
| second_half.append(sentence) | |
| return [" ".join(first_half), " ".join(second_half)] | |
| def analyze_segment_with_gemini(segment_text): | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-1.5-flash", | |
| temperature=0.7, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=3 | |
| ) | |
| prompt = f""" | |
| Analyze the following text and identify distinct segments within it and do text segmentation: | |
| 1. Segments should be STRICTLY max=10 | |
| 2. For each segment/topic you identify: | |
| - Provide a SPECIFIC and UNIQUE topic name (3-5 words) that clearly differentiates it from other segments | |
| - List 3-5 key concepts discussed in that segment (be precise and avoid repetition between segments) | |
| - Write a brief summary of that segment (3-5 sentences) | |
| - Create 5 quiz questions based DIRECTLY on the content in that segment only | |
| For each quiz question: | |
| - Create one correct answer that comes DIRECTLY from the text | |
| - Create two plausible but incorrect answers | |
| - IMPORTANT: Ensure all answer options have similar length (± 3 words) | |
| - Ensure the correct answer is clearly indicated with a ✓ symbol | |
| Text: | |
| {segment_text} | |
| Format your response as JSON with the following structure: | |
| {{ | |
| "segments": [ | |
| {{ | |
| "topic_name": "Unique and Specific Topic Name", | |
| "key_concepts": ["concept1", "concept2", "concept3"], | |
| "summary": "Brief summary of this segment.", | |
| "quiz_questions": [ | |
| {{ | |
| "question": "Question text?", | |
| "options": [ | |
| {{ | |
| "text": "Option A", | |
| "correct": false | |
| }}, | |
| {{ | |
| "text": "Option B", | |
| "correct": true | |
| }}, | |
| {{ | |
| "text": "Option C", | |
| "correct": false | |
| }} | |
| ] | |
| }} | |
| ] | |
| }} | |
| ] | |
| }} | |
| IMPORTANT: Each segment must have a DISTINCT topic name that clearly differentiates it from others. | |
| """ | |
| response = llm.invoke(prompt) | |
| response_text = response.content | |
| try: | |
| json_match = re.search(r'\{[\s\S]*\}', response_text) | |
| if json_match: | |
| return json.loads(json_match.group(0)) | |
| else: | |
| return json.loads(response_text) | |
| except json.JSONDecodeError: | |
| return { | |
| "segments": [ | |
| { | |
| "topic_name": "JSON Parsing Error", | |
| "key_concepts": ["Error in response format"], | |
| "summary": "Could not parse the API response.", | |
| "quiz_questions": [] | |
| } | |
| ] | |
| } | |
| def process_document_with_quiz(text): | |
| start_time = time.time() | |
| token_count = len(tokenizer.encode(text)) | |
| print(f"[LOG] Total document tokens: {token_count}") | |
| if token_count > 8000: | |
| print(f"[LOG] Document exceeds 8000 tokens. Splitting into parts.") | |
| parts = split_text_by_tokens(text) | |
| print(f"[LOG] Document split into {len(parts)} parts") | |
| for i, part in enumerate(parts): | |
| part_tokens = len(tokenizer.encode(part)) | |
| print(f"[LOG] Part {i+1} contains {part_tokens} tokens") | |
| else: | |
| print(f"[LOG] Document under 8000 tokens. Processing as a single part.") | |
| parts = [text] | |
| all_segments = [] | |
| segment_counter = 1 | |
| for i, part in enumerate(parts): | |
| part_start_time = time.time() | |
| print(f"[LOG] Processing part {i+1}...") | |
| analysis = analyze_segment_with_gemini(part) | |
| if "segments" in analysis: | |
| print(f"[LOG] Found {len(analysis['segments'])} segments in part {i+1}") | |
| for segment in analysis["segments"]: | |
| segment["segment_number"] = segment_counter | |
| all_segments.append(segment) | |
| print(f"[LOG] Segment {segment_counter}: {segment['topic_name']}") | |
| segment_counter += 1 | |
| else: | |
| # Fallback if response format is unexpected | |
| print(f"[LOG] Error: Unexpected format in part {i+1} analysis") | |
| fallback_segment = { | |
| "topic_name": f"Segment {segment_counter} Analysis", | |
| "key_concepts": ["Format error in analysis"], | |
| "summary": "Could not properly segment this part of the text.", | |
| "quiz_questions": [], | |
| "segment_number": segment_counter | |
| } | |
| all_segments.append(fallback_segment) | |
| print(f"[LOG] Added fallback segment {segment_counter}") | |
| segment_counter += 1 | |
| part_time = time.time() - part_start_time | |
| print(f"[LOG] Part {i+1} processed in {part_time:.2f} seconds") | |
| total_time = time.time() - start_time | |
| print(f"[LOG] Total processing time: {total_time:.2f} seconds") | |
| print(f"[LOG] Generated {len(all_segments)} segments total") | |
| return all_segments | |
| def format_quiz_for_display(results): | |
| output = [] | |
| for segment in results: | |
| topic = segment["topic_name"] | |
| segment_num = segment["segment_number"] | |
| output.append(f"\n\n{'='*40}") | |
| output.append(f"SEGMENT {segment_num}: {topic}") | |
| output.append(f"{'='*40}\n") | |
| output.append("KEY CONCEPTS:") | |
| for concept in segment["key_concepts"]: | |
| output.append(f"• {concept}") | |
| output.append("\nSUMMARY:") | |
| output.append(segment["summary"]) | |
| output.append("\nQUIZ QUESTIONS:") | |
| for i, q in enumerate(segment["quiz_questions"]): | |
| output.append(f"\n{i+1}. {q['question']}") | |
| for j, option in enumerate(q['options']): | |
| letter = chr(97 + j).upper() | |
| correct_marker = " ✓" if option["correct"] else "" | |
| output.append(f" {letter}. {option['text']}{correct_marker}") | |
| return "\n".join(output) | |
| def save_results_as_json(results, filename="analysis_results.json"): | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| return filename | |
| def save_results_as_txt(formatted_text, filename="analysis_results.txt"): | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(formatted_text) | |
| return filename | |
| def analyze_document(document_text, api_key): | |
| print(f"[LOG] Starting document analysis...") | |
| overall_start_time = time.time() | |
| os.environ["GOOGLE_API_KEY"] = api_key | |
| try: | |
| results = process_document_with_quiz(document_text) | |
| formatted_output = format_quiz_for_display(results) | |
| json_path = "analysis_results.json" | |
| txt_path = "analysis_results.txt" | |
| with open(json_path, "w", encoding="utf-8") as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| with open(txt_path, "w", encoding="utf-8") as f: | |
| f.write(formatted_output) | |
| overall_time = time.time() - overall_start_time | |
| print(f"[LOG] Document analysis completed in {overall_time:.2f} seconds") | |
| topics_summary = "DOCUMENT ANALYSIS SUMMARY:\n" | |
| topics_summary += f"Total segments: {len(results)}\n" | |
| topics_summary += f"Processing time: {overall_time:.2f} seconds\n\n" | |
| topics_summary += "SEGMENTS:\n" | |
| for segment in results: | |
| topics_summary += f"- Segment {segment['segment_number']}: {segment['topic_name']}\n" | |
| formatted_output = topics_summary + "\n" + formatted_output | |
| return formatted_output, json_path, txt_path | |
| except Exception as e: | |
| error_msg = f"Error processing document: {str(e)}" | |
| print(f"[LOG] ERROR: {error_msg}") | |
| return error_msg, None, None | |
| with gr.Blocks(title="Quiz Generator") as app: | |
| gr.Markdown("# Quiz Generator") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_text = gr.Textbox( | |
| label="Input Document Text", | |
| placeholder="Paste your document text here...", | |
| lines=10 | |
| ) | |
| api_key = gr.Textbox( | |
| label="Gemini API Key", | |
| placeholder="Enter your Gemini API key", | |
| type="password" | |
| ) | |
| analyze_btn = gr.Button("Analyze Document") | |
| with gr.Column(): | |
| output_results = gr.Textbox( | |
| label="Analysis Results", | |
| lines=20 | |
| ) | |
| json_file_output = gr.File(label="Download JSON") | |
| txt_file_output = gr.File(label="Download TXT") | |
| analyze_btn.click( | |
| fn=analyze_document, | |
| inputs=[input_text, api_key], | |
| outputs=[output_results, json_file_output, txt_file_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |