OxbridgeEconomics
added the hot entity retrieve route and handling logic
d055f1a
"""FastAPI application entry point."""
import json
import logging
import importlib
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from routes import category, summary, keyword, lda, entity # pylint: disable=import-error
class Config: # pylint: disable=too-few-public-methods
"""
Configuration class for the FastAPI application.
This class contains configuration settings for the application, such as
the database URL and other environment-specific settings.
"""
def __init__(self):
self.scheduler_api_enabled = True
self.jobs = self.load_jobs()
def load_jobs(self):
"""
Loads scheduled jobs from a JSON file.
Returns:
dict: A dictionary containing the scheduled jobs.
"""
try:
with open('jobs.json', 'r', encoding='utf-8') as jobs_file:
jobs_data = json.load(jobs_file)
# Return the jobs list if jobs.json has {"jobs": [...]} structure
if isinstance(jobs_data, dict) and 'jobs' in jobs_data:
return jobs_data['jobs']
# Return as-is if it's already a list
if isinstance(jobs_data, list):
return jobs_data
logging.warning("Invalid jobs.json format")
return []
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error("Error loading jobs: %s", e)
return []
#Global scheduler instance
scheduler = AsyncIOScheduler()
config = Config()
@asynccontextmanager
async def lifespan(_app: FastAPI):
"""
Lifespan context manager for the FastAPI application.
This function initializes the scheduler and sets up the application
lifespan context.
"""
#startup
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'
)
logging.getLogger().setLevel(logging.ERROR)
#start scheduler anf add jobs
scheduler.start()
await setup_scheduled_jobs()
logging.info("Application startup complete")
yield
#shutdown
# Shutdown
logging.info("Shutting down scheduler...")
scheduler.shutdown()
logging.info("Application shutdown complete")
async def setup_scheduled_jobs():
"""
Set up scheduled jobs based on the configuration.
This function iterates through the jobs defined in the configuration
and schedules them using the APScheduler.
"""
if not config.jobs:
logging.info("No jobs to schedule")
return
for job_config in config.jobs:
try:
#get the function reference
func_ref = get_function_reference(job_config['func'])
if not func_ref:
logging.error("Function %s not found", job_config['func'])
continue
#Create the trigger
trigger = CronTrigger(
hour=job_config.get('hour', '*'),
minute=job_config.get('minute', '*'),
second=job_config.get('second', '0')
)
#add the job to the scheduler
scheduler.add_job(
func=func_ref,
trigger=trigger,
id=job_config['id'],
replace_existing=True
)
logging.info("Scheduled job: %s", job_config['id'])
except (FileNotFoundError, json.JSONDecodeError, ImportError, AttributeError) as e:
logging.error("Failed to schedule job %s: %s", job_config['id'], e)
def get_function_reference(func_string):
"""
Converts a string representation of a function to an actual function reference.
Supports formats like:
- "module.function"
- "package.module.function"
- "package.module:function"
Args:
func_string (str): String representation of the function
Returns:
callable: The function reference, or None if not found
"""
try:
# Handle both ":" and "." as separators
if ':' in func_string:
module_path, func_name = func_string.rsplit(':', 1)
elif '.' in func_string:
parts = func_string.split('.')
func_name = parts[-1]
module_path = '.'.join(parts[:-1])
else:
logging.error("Invalid function string format: %s", func_string)
return None
# Import the module
module = importlib.import_module(module_path)
# Get the function
func = getattr(module, func_name)
return func
except (ImportError, AttributeError) as e:
logging.error("Could not import function %s: %s", func_string, e)
return None
#Create FastAPI app instance
app = FastAPI(docs_url="/", lifespan=lifespan)
#include CORS middleware
origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials = True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(category.router)
app.include_router(summary.router)
app.include_router(keyword.router)
app.include_router(lda.router)
app.include_router(entity.router)
@app.get("/_health")
def health():
"""
Returns the health status of the application.
:return: A string "OK" indicating the health status.
"""
return "OK"
@app.get("/scheduler/status")
async def scheduler_status():
"""Get scheduler status and job information."""
if not config.scheduler_api_enabled:
return JSONResponse({"error": "Scheduler API disabled"}, status_code=403)
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"id": job.id,
"name": job.name,
"func": str(job.func),
"trigger": str(job.trigger),
"next_run": job.next_run_time.isoformat() if job.next_run_time else None
})
return {
"scheduler_running": scheduler.running,
"job_count": len(jobs),
"jobs": jobs
}