import gradio as gr import subprocess import os from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler import pytz # Für Zeitzonen-Management # --- Konfiguration --- OUTPUT_DIR = "recordings" # Erstelle das Verzeichnis, falls es nicht existiert os.makedirs(OUTPUT_DIR, exist_ok=True) # Initialisiere den Scheduler scheduler = BackgroundScheduler() scheduler.start() # --- Globale Variable für laufende Prozesse (optional, aber nützlich für Stop-Funktionalität) --- # Für dieses einfache Beispiel lassen wir das "Stoppen" über die Dauer von ffmpeg # Wenn du eine "Sofort Stoppen"-Funktion bräuchtest, müsstest du ffmpeg-Prozesse speichern und killen. # --- Funktionen für die Aufnahme und Planung --- def record_radio_stream(stream_url: str, output_filename: str, duration_seconds: int): """ Startet die Aufnahme eines Webradio-Streams mit ffmpeg. """ full_output_path = os.path.join(OUTPUT_DIR, output_filename) print(f"🎬 Starte Aufnahme von {stream_url} für {duration_seconds} Sekunden nach {full_output_path}...") # ffmpeg Kommando: # -i: Input URL # -c:a copy: Audio-Codec kopieren (kein Re-Encoding), das spart CPU und Zeit # -map 0:a: Stelle sicher, dass nur der Audio-Stream gemappt wird # -t: Dauer der Aufnahme in Sekunden command = [ "ffmpeg", "-i", stream_url, "-c:a", "copy", "-map", "0:a", "-t", str(duration_seconds), full_output_path ] try: # Führe den ffmpeg-Befehl aus # check=True wird eine CalledProcessError erzeugen, wenn ffmpeg fehlschlägt subprocess.run(command, check=True, capture_output=True) print(f"✅ Aufnahme abgeschlossen: {full_output_path}") return full_output_path except subprocess.CalledProcessError as e: print(f"❌ Fehler bei der Aufnahme von {stream_url}:") print(f"Stdout: {e.stdout.decode()}") print(f"Stderr: {e.stderr.decode()}") # Lösche unvollständige Datei, falls vorhanden if os.path.exists(full_output_path): os.remove(full_output_path) return None # Signalisiere Fehler def schedule_recording(stream_url: str, start_datetime_str: str, end_datetime_str: str): """ Plant eine Webradio-Aufnahme basierend auf Start- und Endzeit. Die Zeiten kommen als String von Gradio und müssen geparst werden. """ try: # Gradio gibt Datetime-Strings im Format 'YYYY-MM-DD HH:MM:SS' zurück # Konvertiere in datetime-Objekte start_datetime = datetime.fromisoformat(start_datetime_str) end_datetime = datetime.fromisoformat(end_datetime_str) # Optional: Zeitzone explizit setzen (z.B. UTC), wenn du sicherstellen willst, dass es unabhängig vom Server läuft # Du kannst hier auch die Zeitzone des Nutzers abfragen, wenn es komplizierter werden soll # Für Hugging Face Spaces ist UTC oft eine gute Wahl für den Server. # Wenn der Nutzer aber eine lokale Zeit eingibt, muss dies berücksichtigt werden. # Hier gehen wir davon aus, dass die eingegebene Zeit auf dem Server interpretiert wird. # Berechne die Dauer der Aufnahme in Sekunden duration = (end_datetime - start_datetime).total_seconds() if duration <= 0: return "❌ Fehler: Die Endzeit muss nach der Startzeit liegen." # Generiere einen eindeutigen Dateinamen timestamp = start_datetime.strftime("%Y%m%d_%H%M%S") output_filename = f"radio_recording_{timestamp}.mp3" # Füge den Job zum Scheduler hinzu # 'date' trigger bedeutet, dass der Job zu einem spezifischen Datum und Uhrzeit ausgeführt wird scheduler.add_job( record_radio_stream, 'date', run_date=start_datetime, args=[stream_url, output_filename, int(duration)] # Dauer als Integer übergeben ) # Zeige geplante Jobs an (optional, zur Fehlersuche) # for job in scheduler.get_jobs(): # print(f"Geplanter Job: {job.id} - Nächste Ausführung: {job.next_run_time}") return f"✅ Aufnahme von **{stream_url}** erfolgreich geplant.\nStart: **{start_datetime}** | Ende: **{end_datetime}**.\nDatei: **{output_filename}**\nBitte aktualisiere die Dateiliste, nachdem die Aufnahme abgeschlossen ist." except ValueError as e: return f"❌ Fehler beim Parsen der Daten/Uhrzeiten: {e}. Bitte stelle sicher, dass das Format korrekt ist." except Exception as e: return f"❌ Ein unerwarteter Fehler ist aufgetreten: {e}" def get_recorded_files(): """ Gibt eine Liste der Pfade zu allen aufgenommenen MP3-Dateien zurück. """ files = [os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR) if f.endswith(".mp3")] # Gradio gr.Files erwartet eine Liste von Pfaden. # Wenn keine Dateien da sind, gibt eine leere Liste zurück. return files # --- Gradio UI Definition --- with gr.Blocks() as demo: gr.Markdown("# 📻 Webradio Recorder") gr.Markdown( "Plane deine Webradio-Aufnahmen! Gib die Stream-URL, Start- und Endzeit an, " "und die App nimmt den Stream für dich auf. Die fertige Datei kannst du dann herunterladen." ) with gr.Tab("Aufnahme planen"): with gr.Row(): stream_url_input = gr.Textbox( label="Stream URL", placeholder="z.B. http://stream.laut.fm/meinstream (MP3- oder AAC-Stream)" ) with gr.Row(): start_datetime_input = gr.Datetime( label="Start Datum & Uhrzeit", value=datetime.now() + timedelta(minutes=1), # Voreinstellung: 1 Minute in der Zukunft info="Wähle das Datum und die Uhrzeit, wann die Aufnahme beginnen soll." ) end_datetime_input = gr.Datetime( label="End Datum & Uhrzeit", value=datetime.now() + timedelta(minutes=10), # Voreinstellung: 10 Minuten in der Zukunft info="Wähle das Datum und die Uhrzeit, wann die Aufnahme enden soll." ) schedule_button = gr.Button("▶️ Aufnahme planen", variant="primary") schedule_output_message = gr.Textbox(label="Status der Planung", interactive=False) schedule_button.click( fn=schedule_recording, inputs=[stream_url_input, start_datetime_input, end_datetime_input], outputs=schedule_output_message ) with gr.Tab("Verfügbare Aufnahmen"): gr.Markdown("Hier siehst du alle bisher aufgenommenen Dateien.") refresh_files_button = gr.Button("🔄 Aufnahmen aktualisieren", variant="secondary") # gr.Files ermöglicht das Herunterladen der Dateien downloadable_files = gr.Files(label="Deine Aufnahmen zum Herunterladen", type="file") # Wenn der "Aktualisieren"-Button geklickt wird, rufe die Funktion auf, um die Dateien zu listen refresh_files_button.click( fn=get_recorded_files, outputs=downloadable_files ) # Beim Laden der App auch die Dateien einmal anzeigen demo.load( fn=get_recorded_files, outputs=downloadable_files ) # --- App starten --- if __name__ == "__main__": demo.launch()