import time import schedule import threading import gradio as gr from activate import trigger from restart import restart from config import * from times import * def run_schedule(): global START_TIME START_TIME = datetime.now() trigger() while True: schedule.run_pending() time.sleep(DELAY) def monitor(period=PERIOD): print(f"Monitor is on and triggered every {period}h...") schedule.every(int(period)).hours.do(trigger) threading.Thread(target=run_schedule, daemon=True).start() schedule.every(47).hours.do(restart) def tasklist(): running4 = f"Has been running for {time_diff(START_TIME, datetime.now())}" print(running4) outputs = "" jobs = schedule.get_jobs() for job in jobs: last_run = fix_datetime(job.last_run) if not last_run: last_run = "never" next_run = fix_datetime(job.next_run) outputs += f"Every {job.interval}h do {job.job_func} (last run: {last_run}, next run: {next_run})\n\n" return f"{outputs}{running4}" if __name__ == "__main__": monitor() gr.Interface( title="See current task status", fn=tasklist, inputs=None, outputs=gr.TextArea(label="Current task details"), flagging_mode="never", ).launch()