Spaces:
Sleeping
Sleeping
| import tempfile | |
| import logging | |
| import os | |
| import asyncio | |
| import gc | |
| import psutil | |
| from moviepy.editor import * | |
| import edge_tts | |
| import gradio as gr | |
| from pydub import AudioSegment | |
| # Configuraci贸n de Logs | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| # CONSTANTES DE ARCHIVOS | |
| INTRO_VIDEO = "introvideo.mp4" | |
| OUTRO_VIDEO = "outrovideo.mp4" | |
| MUSIC_BG = "musicafondo.mp3" | |
| EJEMPLO_VIDEO = "ejemplo.mp4" | |
| # CONSTANTES DE LIMITACIONES | |
| MAX_VIDEO_SIZE = 200 * 1024 * 1024 # Tama帽o m谩ximo en bytes (200MB) | |
| # Configuraci贸n de chunks | |
| SEGMENT_DURATION = 30 # Duraci贸n exacta entre transiciones (sin overlap) | |
| TRANSITION_DURATION = 1.5 # Duraci贸n del efecto slide | |
| PROCESSING_CHUNK = 120 # Procesar en bloques de 2 minutos para optimizar memoria | |
| # Validar existencia de archivos | |
| for file in [INTRO_VIDEO, OUTRO_VIDEO, MUSIC_BG, EJEMPLO_VIDEO]: | |
| if not os.path.exists(file): | |
| logging.error(f"Falta archivo necesario: {file}") | |
| raise FileNotFoundError(f"Falta: {file}") | |
| def mostrar_uso_memoria(): | |
| proceso = psutil.Process(os.getpid()) | |
| memoria_uso = proceso.memory_info().rss / 1024 / 1024 | |
| logging.info(f"Uso de memoria: {memoria_uso:.2f} MB") | |
| def eliminar_archivo_tiempo(ruta, delay=3600): | |
| def eliminar(): | |
| try: | |
| if os.path.exists(ruta): | |
| os.remove(ruta) | |
| logging.info(f"Archivo eliminado: {ruta}") | |
| except Exception as e: | |
| logging.error(f"Error al eliminar {ruta}: {e}") | |
| from threading import Timer | |
| Timer(delay, eliminar).start() | |
| def validar_video(video_path): | |
| try: | |
| # Comprobar tama帽o del archivo | |
| file_size = os.path.getsize(video_path) | |
| if file_size > MAX_VIDEO_SIZE: | |
| logging.warning(f"El video excede el tama帽o m谩ximo: {file_size/1024/1024:.2f}MB > {MAX_VIDEO_SIZE/1024/1024}MB") | |
| return False | |
| # Validar que es un video | |
| clip = VideoFileClip(video_path) | |
| duracion = clip.duration | |
| clip.close() | |
| return True | |
| except Exception as e: | |
| logging.error(f"El video no es v谩lido: {e}") | |
| return False | |
| def convertir_video(video_path): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_converted: | |
| output_path = tmp_converted.name | |
| # Convertir a un formato m谩s eficiente pero manteniendo la resoluci贸n original | |
| os.system(f'ffmpeg -i "{video_path}" -c:v libx264 -crf 28 -preset ultrafast -c:a aac -b:a 96k "{output_path}" -y') | |
| # Comprobar si ahora cumple las limitaciones de tama帽o | |
| if not validar_video(output_path): | |
| # Si sigue sin cumplir, aumentar la compresi贸n pero sin cambiar la resoluci贸n | |
| os.system(f'ffmpeg -i "{output_path}" -c:v libx264 -crf 32 -preset ultrafast -c:a aac -b:a 64k "{output_path}.tmp" -y') | |
| os.remove(output_path) | |
| os.rename(f"{output_path}.tmp", output_path) | |
| return output_path | |
| except Exception as e: | |
| logging.error(f"Error al convertir el video: {e}") | |
| raise | |
| async def generar_tts(texto, voz, duracion_total): | |
| try: | |
| if not texto.strip(): | |
| raise ValueError("El texto para TTS no puede estar vac铆o.") | |
| # Limitar el texto a 1000 caracteres para procesar m谩s r谩pido | |
| if len(texto) > 1000: | |
| texto = texto[:1000] | |
| logging.info("Texto para TTS truncado a 1000 caracteres para optimizar rendimiento") | |
| logging.info(f"Generando TTS con voz: {voz}") | |
| communicate = edge_tts.Communicate(texto, voz) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_tts: | |
| await communicate.save(tmp_tts.name) | |
| tts_audio = AudioFileClip(tmp_tts.name) | |
| if tts_audio.duration > duracion_total: | |
| tts_audio = tts_audio.subclip(0, duracion_total) | |
| return tts_audio, tmp_tts.name | |
| except Exception as e: | |
| logging.error(f"Fallo en TTS: {str(e)}") | |
| raise | |
| def crear_musica_fondo(duracion_total): | |
| bg_music = AudioSegment.from_mp3(MUSIC_BG) | |
| needed_ms = int(duracion_total * 1000) | |
| repeticiones = needed_ms // len(bg_music) + 1 | |
| bg_music = bg_music * repeticiones | |
| bg_music = bg_music[:needed_ms].fade_out(1000) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_bg: | |
| bg_music.export(tmp_bg.name, format="mp3") | |
| return AudioFileClip(tmp_bg.name).volumex(0.15), tmp_bg.name | |
| def create_slide_transition(clip1, clip2, duration=TRANSITION_DURATION): | |
| # Obtener dimensiones de los clips | |
| width, height = clip1.size | |
| part1 = clip1.subclip(clip1.duration - duration) | |
| part2 = clip2.subclip(0, duration) | |
| transition = CompositeVideoClip([ | |
| part1.fx(vfx.fadeout, duration), | |
| part2.fx(vfx.fadein, duration).set_position( | |
| lambda t: ('center', height - (height * (t/duration))) | |
| ) | |
| ], size=(width, height)).set_duration(duration) | |
| return transition | |
| def liberar_memoria(objetos_cerrar=None): | |
| """Forzar liberaci贸n de memoria cerrando objetos y llamando al recolector de basura""" | |
| if objetos_cerrar: | |
| for obj in objetos_cerrar: | |
| if obj is not None: | |
| try: | |
| obj.close() | |
| except: | |
| pass | |
| # Forzar recolecci贸n de basura | |
| gc.collect() | |
| mostrar_uso_memoria() | |
| async def procesar_video(video_input, texto_tts, voz_seleccionada, progress=gr.Progress()): | |
| temp_files = [] | |
| intro, outro, video_original = None, None, None | |
| segmentos_temp = [] | |
| try: | |
| mostrar_uso_memoria() | |
| logging.info("Iniciando procesamiento") | |
| progress(0, desc="Validando video") | |
| if not validar_video(video_input): | |
| progress(0.05, desc="Optimizando formato de video") | |
| video_input = convertir_video(video_input) | |
| temp_files.append(video_input) | |
| progress(0.1, desc="Preparando video") | |
| # Cargamos el video original pero respetamos su resoluci贸n | |
| video_original = VideoFileClip(video_input) | |
| duracion_video = video_original.duration | |
| # Guardamos las dimensiones originales | |
| original_size = video_original.size | |
| video_original.close() # Cerrar para liberar memoria | |
| # Informaci贸n importante sobre el video original | |
| logging.info(f"Duraci贸n total del video: {duracion_video} segundos") | |
| logging.info(f"Resoluci贸n original: {original_size[0]}x{original_size[1]}") | |
| if duracion_video <= 0: | |
| raise ValueError("El video debe tener una duraci贸n mayor que cero.") | |
| progress(0.2, desc="Generando narraci贸n (TTS)") | |
| tts_audio, tts_path = await generar_tts(texto_tts, voz_seleccionada, duracion_video) | |
| temp_files.append(tts_path) | |
| progress(0.3, desc="Preparando m煤sica de fondo") | |
| bg_audio, bg_path = crear_musica_fondo(duracion_video) | |
| temp_files.append(bg_path) | |
| # Procesar por bloques para optimizar memoria | |
| num_chunks = int(duracion_video // PROCESSING_CHUNK) + (1 if duracion_video % PROCESSING_CHUNK > 0 else 0) | |
| logging.info(f"Procesando video en {num_chunks} bloques") | |
| for chunk_idx in range(num_chunks): | |
| chunk_start = chunk_idx * PROCESSING_CHUNK | |
| chunk_end = min((chunk_idx + 1) * PROCESSING_CHUNK, duracion_video) | |
| progress(0.35 + (0.45 * chunk_idx / num_chunks), | |
| desc=f"Procesando bloque {chunk_idx+1}/{num_chunks} ({chunk_start:.1f}s - {chunk_end:.1f}s)") | |
| # Cargar solo la porci贸n del video que necesitamos | |
| chunk_video = VideoFileClip(video_input).subclip(chunk_start, chunk_end) | |
| # Extraer la porci贸n de audio correspondiente a este bloque | |
| tts_chunk_end = min(chunk_end, tts_audio.duration) | |
| chunk_tts = None | |
| if chunk_start < tts_audio.duration: | |
| chunk_tts = tts_audio.subclip(chunk_start, tts_chunk_end) | |
| chunk_bg = bg_audio.subclip(chunk_start, chunk_end) | |
| # Crear la mezcla de audio para este bloque | |
| audio_chunks = [chunk_bg] | |
| if chunk_video.audio: | |
| audio_chunks.append(chunk_video.audio.volumex(0.5)) | |
| if chunk_tts: | |
| audio_chunks.append(chunk_tts.volumex(0.85)) | |
| chunk_audio_final = CompositeAudioClip(audio_chunks) | |
| chunk_video = chunk_video.set_audio(chunk_audio_final) | |
| # Procesar las transiciones dentro de este chunk si es necesario | |
| if chunk_end - chunk_start > SEGMENT_DURATION: | |
| segments_in_chunk = [] | |
| segments_count = int((chunk_end - chunk_start) // SEGMENT_DURATION) + \ | |
| (1 if (chunk_end - chunk_start) % SEGMENT_DURATION > 0 else 0) | |
| for i in range(segments_count): | |
| seg_start = i * SEGMENT_DURATION | |
| seg_end = min(seg_start + SEGMENT_DURATION, chunk_end - chunk_start) | |
| segment = chunk_video.subclip(seg_start, seg_end) | |
| if i == 0: | |
| segments_in_chunk.append(segment) | |
| else: | |
| prev_segment = segments_in_chunk[-1] | |
| transition = create_slide_transition(prev_segment, segment) | |
| prev_end = prev_segment.duration - TRANSITION_DURATION | |
| if prev_end > 0: | |
| segments_in_chunk[-1] = prev_segment.subclip(0, prev_end) | |
| segments_in_chunk.append(transition) | |
| segments_in_chunk.append(segment) | |
| chunk_processed = concatenate_videoclips(segments_in_chunk, method="compose") | |
| else: | |
| chunk_processed = chunk_video | |
| # Guardar este chunk procesado como archivo temporal | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f"_chunk{chunk_idx}.mp4") as chunk_file: | |
| chunk_path = chunk_file.name | |
| chunk_processed.write_videofile( | |
| chunk_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| preset="ultrafast", | |
| bitrate="1M", | |
| ffmpeg_params=["-crf", "28"], | |
| verbose=False | |
| ) | |
| segmentos_temp.append(chunk_path) | |
| # Liberar memoria | |
| chunk_video.close() | |
| chunk_processed.close() | |
| liberar_memoria() | |
| # Liberar memoria antes de procesar intro/outro | |
| liberar_memoria([tts_audio, bg_audio]) | |
| tts_audio = bg_audio = None | |
| # A帽adir intro y outro - conservar resoluci贸n original para consistencia | |
| progress(0.85, desc="Preparando intro y outro") | |
| intro = VideoFileClip(INTRO_VIDEO) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix="_intro.mp4") as tmp_intro: | |
| intro.write_videofile( | |
| tmp_intro.name, | |
| codec="libx264", | |
| audio_codec="aac", | |
| preset="ultrafast", | |
| bitrate="1M", | |
| ffmpeg_params=["-crf", "28"], | |
| verbose=False | |
| ) | |
| segmentos_temp.insert(0, tmp_intro.name) # Intro al principio | |
| intro.close() | |
| outro = VideoFileClip(OUTRO_VIDEO) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix="_outro.mp4") as tmp_outro: | |
| outro.write_videofile( | |
| tmp_outro.name, | |
| codec="libx264", | |
| audio_codec="aac", | |
| preset="ultrafast", | |
| bitrate="1M", | |
| ffmpeg_params=["-crf", "28"], | |
| verbose=False | |
| ) | |
| segmentos_temp.append(tmp_outro.name) # Outro al final | |
| outro.close() | |
| # Unir todos los segmentos con ffmpeg | |
| progress(0.9, desc="Generando video final") | |
| with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as concat_file: | |
| # Escribir archivo de lista para concatenaci贸n | |
| for segment in segmentos_temp: | |
| concat_file.write(f"file '{segment}'\n".encode()) | |
| concat_path = concat_file.name | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_final: | |
| output_path = tmp_final.name | |
| os.system(f'ffmpeg -f concat -safe 0 -i "{concat_path}" -c copy "{output_path}" -y') | |
| # Limpiar archivos temporales | |
| os.remove(concat_path) | |
| for segment in segmentos_temp: | |
| if os.path.exists(segment): | |
| os.remove(segment) | |
| eliminar_archivo_tiempo(output_path, 3600) # Eliminaci贸n despu茅s de 1 hora | |
| progress(1.0, desc="隆Video listo!") | |
| logging.info(f"Video final guardado: {output_path}") | |
| mostrar_uso_memoria() | |
| return output_path | |
| except Exception as e: | |
| logging.error(f"Fallo general: {str(e)}") | |
| raise | |
| finally: | |
| try: | |
| liberar_memoria([video_original, intro, outro]) | |
| for file in temp_files: | |
| try: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| except Exception as e: | |
| logging.warning(f"Error limpiando {file}: {e}") | |
| for segment in segmentos_temp: | |
| try: | |
| if os.path.exists(segment): | |
| os.remove(segment) | |
| except Exception as e: | |
| logging.warning(f"Error limpiando segmento {segment}: {e}") | |
| except Exception as e: | |
| logging.warning(f"Error al cerrar recursos: {str(e)}") | |
| # Interfaz Gradio | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Editor de Video con IA") | |
| with gr.Tab("Principal"): | |
| video_input = gr.Video(label="Subir video") | |
| texto_tts = gr.Textbox( | |
| label="Texto para TTS (m谩x. 1000 caracteres)", | |
| lines=3, | |
| placeholder="Escribe aqu铆 tu texto..." | |
| ) | |
| voz_seleccionada = gr.Dropdown( | |
| label="Voz", | |
| choices=[ | |
| "es-ES-AlvaroNeural", "es-MX-BeatrizNeural", | |
| "es-ES-ElviraNeural", "es-MX-JavierNeural", | |
| "es-AR-ElenaNeural", "es-AR-TomasNeural", | |
| "es-CL-CatalinaNeural", "es-CL-LorenzoNeural", | |
| "es-CO-SofiaNeural", "es-CO-GonzaloNeural", | |
| "es-PE-CamilaNeural", "es-PE-AlexNeural", | |
| "es-VE-MariaNeural", "es-VE-ManuelNeural", | |
| "es-US-AlonsoNeural", "es-US-PalomaNeural", | |
| "es-ES-AbrilNeural", "es-ES-DarioNeural", | |
| "es-ES-HelenaRUS", "es-ES-LauraNeural", | |
| "es-ES-PabloNeural", "es-ES-TriniNeural", | |
| "en-US-AriaNeural", "en-US-GuyNeural", | |
| "en-US-JennyNeural", "en-US-AmberNeural", | |
| "en-US-AnaNeural", "en-US-AshleyNeural", | |
| "en-US-BrandonNeural", "en-US-ChristopherNeural", | |
| "en-US-CoraNeural", "en-US-DavisNeural", | |
| "en-US-ElizabethNeural", "en-US-EricNeural", | |
| "en-US-GinaNeural", "en-US-JacobNeural", | |
| "en-US-JaneNeural", "en-US-JasonNeural", | |
| "en-US-MichelleNeural", "en-US-MonicaNeural", | |
| "en-US-SaraNeural", "en-US-SteffanNeural", | |
| "en-US-TonyNeural", "en-US-YaraNeural", | |
| "fr-FR-AlainNeural", "fr-FR-BrigitteNeural", | |
| "fr-FR-CelesteNeural", "fr-FR-ClaudeNeural", | |
| "fr-FR-CoralieNeural", "fr-FR-DeniseNeural", | |
| "fr-FR-EloiseNeural", "fr-FR-HenriNeural", | |
| "fr-FR-JacquelineNeural", "fr-FR-JeromeNeural", | |
| "fr-FR-JosephineNeural", "fr-FR-MauriceNeural", | |
| "fr-FR-YvesNeural", "fr-FR-YvetteNeural", | |
| "de-DE-AmalaNeural", "de-DE-BerndNeural", | |
| "de-DE-ChristophNeural", "de-DE-ConradNeural", | |
| "de-DE-ElkeNeural", "de-DE-GiselaNeural", | |
| "de-DE-KasperNeural", "de-DE-KatjaNeural", | |
| "de-DE-KillianNeural", "de-DE-KlarissaNeural", | |
| "de-DE-KlausNeural", "de-DE-LouisaNeural", | |
| "de-DE-MajaNeural", "de-DE-RalfNeural", | |
| "de-DE-TanjaNeural", "de-DE-ViktoriaNeural", | |
| "it-IT-BenignoNeural", "it-IT-CalimeroNeural", | |
| "it-IT-CataldoNeural", "it-IT-DiegoNeural", | |
| "it-IT-ElsaNeural", "it-IT-FabiolaNeural", | |
| "it-IT-GianniNeural", "it-IT-ImeldaNeural", | |
| "it-IT-IrmaNeural", "it-IT-IsabellaNeural", | |
| "it-IT-LisandroNeural", "it-IT-PalmiraNeural", | |
| "it-IT-PierinaNeural", "it-IT-RinaldoNeural", | |
| "ja-JP-AoiNeural", "ja-JP-DaichiNeural", | |
| "ja-JP-HarukaNeural", "ja-JP-KeitaNeural", | |
| "ja-JP-MayuNeural", "ja-JP-NanamiNeural", | |
| "ja-JP-NaokiNeural", "ja-JP-ShioriNeural" | |
| ], | |
| value="es-ES-AlvaroNeural" | |
| ) | |
| procesar_btn = gr.Button("Generar Video (Modo Optimizado)") | |
| video_output = gr.Video(label="Video Procesado") | |
| with gr.Accordion("Ejemplos de Uso", open=False): | |
| gr.Examples( | |
| examples=[[EJEMPLO_VIDEO, "隆Hola! Esto es una prueba. Suscr铆bete al canal."]], | |
| inputs=[video_input, texto_tts], | |
| label="Ejemplos" | |
| ) | |
| procesar_btn.click( | |
| procesar_video, | |
| inputs=[video_input, texto_tts, voz_seleccionada], | |
| outputs=video_output | |
| ) | |
| gr.Markdown(""" | |
| ### 鈩癸笍 Notas importantes: | |
| - **Optimizaciones para Hugging Face Spaces:** | |
| - Procesamiento por bloques para videos largos | |
| - M谩ximo tama帽o de archivo: 200MB | |
| - Mantiene la resoluci贸n original del video | |
| - Texto TTS limitado a 1000 caracteres | |
| - Las transiciones ocurren cada 30 segundos | |
| - El video contiene intro y outro predefinidos | |
| - El archivo generado se elimina despu茅s de 1 hora | |
| - Para videos de alta calidad, considera usar este c贸digo localmente | |
| """) | |
| if __name__ == "__main__": | |
| # Instalar psutil si no est谩 disponible | |
| try: | |
| import psutil | |
| except ImportError: | |
| os.system("pip install psutil") | |
| import psutil | |
| demo.queue().launch() |