"""FastAPI application entry point.""" import json import logging import importlib from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from routes import category, summary, keyword, lda, entity # pylint: disable=import-error class Config: # pylint: disable=too-few-public-methods """ Configuration class for the FastAPI application. This class contains configuration settings for the application, such as the database URL and other environment-specific settings. """ def __init__(self): self.scheduler_api_enabled = True self.jobs = self.load_jobs() def load_jobs(self): """ Loads scheduled jobs from a JSON file. Returns: dict: A dictionary containing the scheduled jobs. """ try: with open('jobs.json', 'r', encoding='utf-8') as jobs_file: jobs_data = json.load(jobs_file) # Return the jobs list if jobs.json has {"jobs": [...]} structure if isinstance(jobs_data, dict) and 'jobs' in jobs_data: return jobs_data['jobs'] # Return as-is if it's already a list if isinstance(jobs_data, list): return jobs_data logging.warning("Invalid jobs.json format") return [] except (FileNotFoundError, json.JSONDecodeError) as e: logging.error("Error loading jobs: %s", e) return [] #Global scheduler instance scheduler = AsyncIOScheduler() config = Config() @asynccontextmanager async def lifespan(_app: FastAPI): """ Lifespan context manager for the FastAPI application. This function initializes the scheduler and sets up the application lifespan context. """ #startup logging.basicConfig( format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s' ) logging.getLogger().setLevel(logging.ERROR) #start scheduler anf add jobs scheduler.start() await setup_scheduled_jobs() logging.info("Application startup complete") yield #shutdown # Shutdown logging.info("Shutting down scheduler...") scheduler.shutdown() logging.info("Application shutdown complete") async def setup_scheduled_jobs(): """ Set up scheduled jobs based on the configuration. This function iterates through the jobs defined in the configuration and schedules them using the APScheduler. """ if not config.jobs: logging.info("No jobs to schedule") return for job_config in config.jobs: try: #get the function reference func_ref = get_function_reference(job_config['func']) if not func_ref: logging.error("Function %s not found", job_config['func']) continue #Create the trigger trigger = CronTrigger( hour=job_config.get('hour', '*'), minute=job_config.get('minute', '*'), second=job_config.get('second', '0') ) #add the job to the scheduler scheduler.add_job( func=func_ref, trigger=trigger, id=job_config['id'], replace_existing=True ) logging.info("Scheduled job: %s", job_config['id']) except (FileNotFoundError, json.JSONDecodeError, ImportError, AttributeError) as e: logging.error("Failed to schedule job %s: %s", job_config['id'], e) def get_function_reference(func_string): """ Converts a string representation of a function to an actual function reference. Supports formats like: - "module.function" - "package.module.function" - "package.module:function" Args: func_string (str): String representation of the function Returns: callable: The function reference, or None if not found """ try: # Handle both ":" and "." as separators if ':' in func_string: module_path, func_name = func_string.rsplit(':', 1) elif '.' in func_string: parts = func_string.split('.') func_name = parts[-1] module_path = '.'.join(parts[:-1]) else: logging.error("Invalid function string format: %s", func_string) return None # Import the module module = importlib.import_module(module_path) # Get the function func = getattr(module, func_name) return func except (ImportError, AttributeError) as e: logging.error("Could not import function %s: %s", func_string, e) return None #Create FastAPI app instance app = FastAPI(docs_url="/", lifespan=lifespan) #include CORS middleware origins = [ "*" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials = True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(category.router) app.include_router(summary.router) app.include_router(keyword.router) app.include_router(lda.router) app.include_router(entity.router) @app.get("/_health") def health(): """ Returns the health status of the application. :return: A string "OK" indicating the health status. """ return "OK" @app.get("/scheduler/status") async def scheduler_status(): """Get scheduler status and job information.""" if not config.scheduler_api_enabled: return JSONResponse({"error": "Scheduler API disabled"}, status_code=403) jobs = [] for job in scheduler.get_jobs(): jobs.append({ "id": job.id, "name": job.name, "func": str(job.func), "trigger": str(job.trigger), "next_run": job.next_run_time.isoformat() if job.next_run_time else None }) return { "scheduler_running": scheduler.running, "job_count": len(jobs), "jobs": jobs }