import gradio as gr import os from src.processing.gemini_processor import GeminiProcessor from src.analysis.coverage_generator import CoverageGenerator from src.analysis.creative_analyzer import CreativeAnalyzer from src.analysis.analysis_post_processor import AnalysisPostProcessor from src.analysis.market_analyzer import MarketAnalyzer from pathlib import Path import logging class ConsoleOutput: def __init__(self): self.messages = [] def write(self, message): self.messages.append(str(message)) return "\n".join(self.messages) def get_output(self): return "\n".join(self.messages) class GradioHandler(logging.Handler): def emit(self, record): msg = self.format(record) self.console.write(msg) def setup_logging(console_output): logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() logger.handlers = [] handler = GradioHandler() handler.console = console_output logger.addHandler(handler) return logger def process_screenplay(pdf_file, progress=gr.Progress()): if pdf_file is None: raise gr.Error("Please upload a PDF file") console = ConsoleOutput() logger = setup_logging(console) try: processor = GeminiProcessor() progress(0.5, desc="Processing screenplay...") cleaned_path = Path("cleaned_screenplay_long.txt") success = processor.process_screenplay(pdf_file.name, str(cleaned_path)) if not success: raise gr.Error("Failed to process screenplay") with open(cleaned_path, 'r') as f: cleaned_text = f.read() progress(1.0, desc="Complete!") return [cleaned_text, gr.update(value=str(cleaned_path), visible=True), gr.update(interactive=True, variant="primary"), gr.update(interactive=True, variant="primary"), gr.update(interactive=False, variant="secondary"), gr.update(interactive=False, variant="secondary"), console.get_output()] except Exception as e: error_msg = f"Error: {str(e)}" console.write(error_msg) raise gr.Error(error_msg) def generate_coverage(progress=gr.Progress()): console = ConsoleOutput() logger = setup_logging(console) try: coverage_gen = CoverageGenerator() progress(0.5, desc="Generating coverage...") cleaned_path = Path("cleaned_screenplay_long.txt") success = coverage_gen.generate_coverage(cleaned_path) if not success: raise gr.Error("Failed to generate coverage") coverage_path = Path("coverage.txt") with open(coverage_path, 'r') as f: coverage = f.read() progress(1.0, desc="Complete!") return [coverage, gr.update(value=str(coverage_path), visible=True), console.get_output()] except Exception as e: error_msg = f"Error: {str(e)}" console.write(error_msg) raise gr.Error(error_msg) def analyze_screenplay(progress=gr.Progress()): console = ConsoleOutput() logger = setup_logging(console) try: analyzer = CreativeAnalyzer() progress(0.5, desc="Performing creative analysis...") cleaned_path = Path("cleaned_screenplay_long.txt") success = analyzer.analyze_screenplay(cleaned_path) if not success: raise gr.Error("Failed to generate creative analysis") analysis_path = Path("creative_analysis.txt") with open(analysis_path, 'r') as f: analysis = f.read() progress(1.0, desc="Complete!") return [analysis, gr.update(value=str(analysis_path), visible=True), gr.update(interactive=True, variant="primary"), console.get_output()] except Exception as e: error_msg = f"Error: {str(e)}" console.write(error_msg) raise gr.Error(error_msg) def post_process_analysis(progress=gr.Progress()): console = ConsoleOutput() logger = setup_logging(console) try: post_processor = AnalysisPostProcessor() progress(0.5, desc="Post-processing analysis...") input_path = Path("creative_analysis.txt") output_path = Path("cleaned_creative_analysis.txt") success = post_processor.process_analysis(str(input_path), str(output_path)) if not success: raise gr.Error("Failed to post-process analysis") with open(output_path, 'r') as f: analysis = f.read() progress(1.0, desc="Complete!") return [analysis, gr.update(value=str(output_path), visible=True), gr.update(interactive=True, variant="primary"), console.get_output()] except Exception as e: error_msg = f"Error: {str(e)}" console.write(error_msg) raise gr.Error(error_msg) def generate_market_analysis(progress=gr.Progress()): console = ConsoleOutput() logger = setup_logging(console) try: market_analyzer = MarketAnalyzer() progress(0.5, desc="Generating market analysis...") coverage_path = Path("coverage.txt") creative_analysis_path = Path("cleaned_creative_analysis.txt") success = market_analyzer.generate_market_analysis(coverage_path, creative_analysis_path) if not success: raise gr.Error("Failed to generate market analysis") market_path = Path("market_analysis.txt") with open(market_path, 'r') as f: analysis = f.read() progress(1.0, desc="Complete!") return [analysis, gr.update(value=str(market_path), visible=True), console.get_output()] except Exception as e: error_msg = f"Error: {str(e)}" console.write(error_msg) raise gr.Error(error_msg) with gr.Blocks(title="Screenplay Coverage Generator") as demo: gr.Markdown("# Screenplay Coverage Generator") with gr.Row(): file_input = gr.File(label="Upload Screenplay PDF", file_types=[".pdf"]) with gr.Row(): process_btn = gr.Button("Process Screenplay", variant="primary") coverage_btn = gr.Button("Generate Coverage", interactive=False) analysis_btn = gr.Button("Creative Analysis", interactive=False) post_process_btn = gr.Button("Post-Process Analysis", interactive=False) market_btn = gr.Button("Market Analysis", interactive=False) with gr.Row(): console = gr.Textbox(label="Console Output", lines=10, max_lines=30, autoscroll=True, show_copy_button=True) with gr.Tabs(): with gr.TabItem("Cleaned Screenplay"): with gr.Row(): cleaned_output = gr.Textbox(label="Cleaned Screenplay", lines=10, show_copy_button=True) with gr.Row(): cleaned_file = gr.File(label="Download Cleaned Screenplay", interactive=True, visible=False) with gr.TabItem("Coverage"): with gr.Row(): coverage_output = gr.Textbox(label="Coverage Document", lines=10, show_copy_button=True) with gr.Row(): coverage_file = gr.File(label="Download Coverage", interactive=True, visible=False) with gr.TabItem("Creative Analysis"): with gr.Row(): analysis_output = gr.Textbox(label="Creative Analysis", lines=10, show_copy_button=True) with gr.Row(): analysis_file = gr.File(label="Download Creative Analysis", interactive=True, visible=False) with gr.TabItem("Post-Processed Analysis"): with gr.Row(): post_process_output = gr.Textbox(label="Post-Processed Analysis", lines=10, show_copy_button=True) with gr.Row(): post_process_file = gr.File(label="Download Post-Processed Analysis", interactive=True, visible=False) with gr.TabItem("Market Analysis"): with gr.Row(): market_output = gr.Textbox(label="Market Analysis", lines=10, show_copy_button=True) with gr.Row(): market_file = gr.File(label="Download Market Analysis", interactive=True, visible=False) process_btn.click( fn=process_screenplay, inputs=[file_input], outputs=[cleaned_output, cleaned_file, coverage_btn, analysis_btn, post_process_btn, market_btn, console] ) coverage_btn.click( fn=generate_coverage, outputs=[coverage_output, coverage_file, console] ) analysis_btn.click( fn=analyze_screenplay, outputs=[analysis_output, analysis_file, post_process_btn, console] ) post_process_btn.click( fn=post_process_analysis, outputs=[post_process_output, post_process_file, market_btn, console] ) market_btn.click( fn=generate_market_analysis, outputs=[market_output, market_file, console] ) if __name__ == "__main__": demo.launch()