Muhammad Abdur Rahman Saad
commited on
Commit
·
625fba9
1
Parent(s):
21949e8
fix: add manual trigger to observe pipeline execution
Browse files- app/routes/lda.py +65 -1
app/routes/lda.py
CHANGED
@@ -1,9 +1,10 @@
|
|
1 |
"""This module defines the /lda route for the FastAPI application."""
|
2 |
|
3 |
import logging
|
4 |
-
from fastapi import APIRouter
|
5 |
from fastapi.responses import JSONResponse
|
6 |
from models.database import lda_collection # pylint: disable=import-error
|
|
|
7 |
|
8 |
# Configure logger
|
9 |
logger = logging.getLogger(__name__)
|
@@ -11,6 +12,29 @@ logger = logging.getLogger(__name__)
|
|
11 |
# Create FastAPI Router
|
12 |
router = APIRouter(prefix="/lda", tags=["lda"])
|
13 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
14 |
@router.get('/{filter_type}')
|
15 |
async def get_lda_results(filter_type: str):
|
16 |
"""
|
@@ -61,6 +85,46 @@ async def get_lda_results(filter_type: str):
|
|
61 |
'message': 'Failed to retrieve LDA results'
|
62 |
}, status_code=500)
|
63 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
64 |
|
65 |
@router.get('')
|
66 |
async def get_all_lda_results():
|
|
|
1 |
"""This module defines the /lda route for the FastAPI application."""
|
2 |
|
3 |
import logging
|
4 |
+
from fastapi import APIRouter, BackgroundTasks
|
5 |
from fastapi.responses import JSONResponse
|
6 |
from models.database import lda_collection # pylint: disable=import-error
|
7 |
+
from collectors.finfast.lda import collect as run_lda_pipeline # pylint: disable=import-error
|
8 |
|
9 |
# Configure logger
|
10 |
logger = logging.getLogger(__name__)
|
|
|
12 |
# Create FastAPI Router
|
13 |
router = APIRouter(prefix="/lda", tags=["lda"])
|
14 |
|
15 |
+
def run_lda_pipeline_background():
|
16 |
+
"""
|
17 |
+
Background task function to run the LDA pipeline.
|
18 |
+
This function runs the complete LDA analysis pipeline in the background.
|
19 |
+
"""
|
20 |
+
try:
|
21 |
+
logger.info("Starting LDA pipeline background execution")
|
22 |
+
results = run_lda_pipeline()
|
23 |
+
|
24 |
+
if results:
|
25 |
+
logger.info("LDA pipeline completed successfully. Processed %d time periods",
|
26 |
+
len(results))
|
27 |
+
for filter_type, analysis_results in results.items():
|
28 |
+
if analysis_results:
|
29 |
+
logger.info("LDA analysis completed for %s period", filter_type)
|
30 |
+
else:
|
31 |
+
logger.warning("LDA analysis failed for %s period", filter_type)
|
32 |
+
else:
|
33 |
+
logger.warning("LDA pipeline completed but no results were generated")
|
34 |
+
|
35 |
+
except Exception as e: # pylint: disable=broad-exception-caught
|
36 |
+
logger.error("Error during LDA pipeline background execution: %s", e)
|
37 |
+
|
38 |
@router.get('/{filter_type}')
|
39 |
async def get_lda_results(filter_type: str):
|
40 |
"""
|
|
|
85 |
'message': 'Failed to retrieve LDA results'
|
86 |
}, status_code=500)
|
87 |
|
88 |
+
@router.post('/trigger')
|
89 |
+
async def trigger_lda_pipeline(background_tasks: BackgroundTasks):
|
90 |
+
"""
|
91 |
+
Manual trigger endpoint for LDA pipeline execution.
|
92 |
+
|
93 |
+
This endpoint starts the LDA analysis pipeline in the background and returns
|
94 |
+
immediately with a success response. The pipeline runs asynchronously on the server.
|
95 |
+
|
96 |
+
Args:
|
97 |
+
background_tasks (BackgroundTasks): FastAPI background tasks handler
|
98 |
+
|
99 |
+
Returns:
|
100 |
+
JSONResponse: Immediate success response indicating the pipeline has started
|
101 |
+
"""
|
102 |
+
try:
|
103 |
+
logger.info("Received request for manual LDA pipeline trigger")
|
104 |
+
|
105 |
+
# Add the background task to run the LDA pipeline
|
106 |
+
background_tasks.add_task(run_lda_pipeline_background)
|
107 |
+
|
108 |
+
logger.info("LDA pipeline background task scheduled successfully")
|
109 |
+
|
110 |
+
return JSONResponse(content={
|
111 |
+
"status": "success",
|
112 |
+
"message": "LDA pipeline has been triggered and is running in the background",
|
113 |
+
"details": {
|
114 |
+
"operation": "LDA topic modeling and sentiment analysis pipeline",
|
115 |
+
"time_periods": ["daily", "week", "month"],
|
116 |
+
"execution": "background",
|
117 |
+
"note": "Pipeline may take several minutes to complete. Check logs for progress."
|
118 |
+
}
|
119 |
+
}, status_code=202) # 202 Accepted - request accepted for processing
|
120 |
+
|
121 |
+
except Exception as e: # pylint: disable=broad-exception-caught
|
122 |
+
logger.error("Error triggering LDA pipeline: %s", e)
|
123 |
+
return JSONResponse(content={
|
124 |
+
"status": "error",
|
125 |
+
"message": "Failed to trigger LDA pipeline",
|
126 |
+
"error": str(e)
|
127 |
+
}, status_code=500)
|
128 |
|
129 |
@router.get('')
|
130 |
async def get_all_lda_results():
|