|
import gradio as gr |
|
import os |
|
from src.processing.gemini_processor import GeminiProcessor |
|
from src.analysis.coverage_generator import CoverageGenerator |
|
from src.analysis.creative_analyzer import CreativeAnalyzer |
|
from src.analysis.analysis_post_processor import AnalysisPostProcessor |
|
from pathlib import Path |
|
import logging |
|
|
|
class ConsoleOutput: |
|
def __init__(self): |
|
self.messages = [] |
|
|
|
def write(self, message): |
|
self.messages.append(str(message)) |
|
return "\n".join(self.messages) |
|
|
|
def get_output(self): |
|
return "\n".join(self.messages) |
|
|
|
class GradioHandler(logging.Handler): |
|
def emit(self, record): |
|
msg = self.format(record) |
|
self.console.write(msg) |
|
|
|
def setup_logging(console_output): |
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger() |
|
logger.handlers = [] |
|
handler = GradioHandler() |
|
handler.console = console_output |
|
logger.addHandler(handler) |
|
return logger |
|
|
|
def process_screenplay(pdf_file, progress=gr.Progress()): |
|
if pdf_file is None: |
|
raise gr.Error("Please upload a PDF file") |
|
|
|
console = ConsoleOutput() |
|
logger = setup_logging(console) |
|
|
|
try: |
|
processor = GeminiProcessor() |
|
progress(0.5, desc="Processing screenplay...") |
|
cleaned_path = Path("cleaned_screenplay_long.txt") |
|
success = processor.process_screenplay(pdf_file.name, str(cleaned_path)) |
|
|
|
if not success: |
|
raise gr.Error("Failed to process screenplay") |
|
|
|
with open(cleaned_path, 'r') as f: |
|
cleaned_text = f.read() |
|
|
|
progress(1.0, desc="Complete!") |
|
return [cleaned_text, gr.update(interactive=True, variant="primary"), |
|
gr.update(interactive=True, variant="primary"), |
|
gr.update(interactive=False, variant="secondary"), |
|
console.get_output()] |
|
|
|
except Exception as e: |
|
error_msg = f"Error: {str(e)}" |
|
console.write(error_msg) |
|
raise gr.Error(error_msg) |
|
|
|
def generate_coverage(progress=gr.Progress()): |
|
console = ConsoleOutput() |
|
logger = setup_logging(console) |
|
|
|
try: |
|
coverage_gen = CoverageGenerator() |
|
progress(0.5, desc="Generating coverage...") |
|
cleaned_path = Path("cleaned_screenplay_long.txt") |
|
success = coverage_gen.generate_coverage(cleaned_path) |
|
|
|
if not success: |
|
raise gr.Error("Failed to generate coverage") |
|
|
|
with open(Path("coverage.txt"), 'r') as f: |
|
coverage = f.read() |
|
|
|
progress(1.0, desc="Complete!") |
|
return [coverage, console.get_output()] |
|
|
|
except Exception as e: |
|
error_msg = f"Error: {str(e)}" |
|
console.write(error_msg) |
|
raise gr.Error(error_msg) |
|
|
|
def analyze_screenplay(progress=gr.Progress()): |
|
console = ConsoleOutput() |
|
logger = setup_logging(console) |
|
|
|
try: |
|
analyzer = CreativeAnalyzer() |
|
progress(0.5, desc="Performing creative analysis...") |
|
cleaned_path = Path("cleaned_screenplay_long.txt") |
|
success = analyzer.analyze_screenplay(cleaned_path) |
|
|
|
if not success: |
|
raise gr.Error("Failed to generate creative analysis") |
|
|
|
with open(Path("creative_analysis.txt"), 'r') as f: |
|
analysis = f.read() |
|
|
|
progress(1.0, desc="Complete!") |
|
return [analysis, gr.update(interactive=True, variant="primary"), console.get_output()] |
|
|
|
except Exception as e: |
|
error_msg = f"Error: {str(e)}" |
|
console.write(error_msg) |
|
raise gr.Error(error_msg) |
|
|
|
def post_process_analysis(progress=gr.Progress()): |
|
console = ConsoleOutput() |
|
logger = setup_logging(console) |
|
|
|
try: |
|
post_processor = AnalysisPostProcessor() |
|
progress(0.5, desc="Post-processing analysis...") |
|
input_path = Path("creative_analysis.txt") |
|
output_path = Path("cleaned_creative_analysis.txt") |
|
success = post_processor.process_analysis(str(input_path), str(output_path)) |
|
|
|
if not success: |
|
raise gr.Error("Failed to post-process analysis") |
|
|
|
with open(output_path, 'r') as f: |
|
analysis = f.read() |
|
|
|
progress(1.0, desc="Complete!") |
|
return [analysis, console.get_output()] |
|
|
|
except Exception as e: |
|
error_msg = f"Error: {str(e)}" |
|
console.write(error_msg) |
|
raise gr.Error(error_msg) |
|
|
|
with gr.Blocks(title="Screenplay Coverage Generator") as demo: |
|
gr.Markdown("# Screenplay Coverage Generator") |
|
|
|
with gr.Row(): |
|
file_input = gr.File(label="Upload Screenplay PDF", file_types=[".pdf"]) |
|
|
|
with gr.Row(): |
|
process_btn = gr.Button("Process Screenplay", variant="primary") |
|
coverage_btn = gr.Button("Generate Coverage", interactive=False) |
|
analysis_btn = gr.Button("Creative Analysis", interactive=False) |
|
post_process_btn = gr.Button("Post-Process Analysis", interactive=False) |
|
|
|
with gr.Row(): |
|
console = gr.Textbox(label="Console Output", lines=10, max_lines=30, autoscroll=True, show_copy_button=True) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Cleaned Screenplay"): |
|
cleaned_output = gr.Textbox(label="Cleaned Screenplay", lines=10, show_copy_button=True) |
|
with gr.TabItem("Coverage"): |
|
coverage_output = gr.Textbox(label="Coverage Document", lines=10, show_copy_button=True) |
|
with gr.TabItem("Creative Analysis"): |
|
analysis_output = gr.Textbox(label="Creative Analysis", lines=10, show_copy_button=True) |
|
with gr.TabItem("Post-Processed Analysis"): |
|
post_process_output = gr.Textbox(label="Post-Processed Analysis", lines=10, show_copy_button=True) |
|
|
|
process_btn.click( |
|
fn=process_screenplay, |
|
inputs=[file_input], |
|
outputs=[cleaned_output, coverage_btn, analysis_btn, post_process_btn, console] |
|
) |
|
|
|
coverage_btn.click( |
|
fn=generate_coverage, |
|
outputs=[coverage_output, console] |
|
) |
|
|
|
analysis_btn.click( |
|
fn=analyze_screenplay, |
|
outputs=[analysis_output, post_process_btn, console] |
|
) |
|
|
|
post_process_btn.click( |
|
fn=post_process_analysis, |
|
outputs=[post_process_output, console] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |