Spaces:
Running
Running
import time | |
import schedule | |
import threading | |
import gradio as gr | |
from activate import trigger | |
from restart import restart | |
from config import * | |
from times import * | |
def run_schedule(): | |
global START_TIME | |
START_TIME = datetime.now() | |
trigger() | |
while True: | |
schedule.run_pending() | |
time.sleep(DELAY) | |
def monitor(period=PERIOD): | |
print(f"Monitor is on and triggered every {period}h...") | |
schedule.every(int(period)).hours.do(trigger) | |
threading.Thread(target=run_schedule, daemon=True).start() | |
schedule.every(47).hours.do(restart) | |
def tasklist(): | |
running4 = f"Has been running for {time_diff(START_TIME, datetime.now())}" | |
print(running4) | |
outputs = "" | |
jobs = schedule.get_jobs() | |
for job in jobs: | |
last_run = fix_datetime(job.last_run) | |
if not last_run: | |
last_run = "never" | |
next_run = fix_datetime(job.next_run) | |
outputs += f"Every {job.interval}h do {job.job_func} (last run: {last_run}, next run: {next_run})\n\n" | |
return f"{outputs}{running4}" | |
if __name__ == "__main__": | |
monitor() | |
gr.Interface( | |
title="See current task status", | |
fn=tasklist, | |
inputs=None, | |
outputs=gr.TextArea(label="Current task details"), | |
flagging_mode="never", | |
).launch() | |