SOMA-Oriental / app.py
aiqtech's picture
Update app.py
ded8ecf verified
raw
history blame
43.6 kB
import gradio as gr
import os
import json
import requests
from datetime import datetime
import time
from typing import List, Dict, Any, Generator, Tuple
import logging
import re
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 환경 변수에서 토큰 가져오기
FRIENDLI_TOKEN = os.getenv("FRIENDLI_TOKEN", "YOUR_FRIENDLI_TOKEN")
BAPI_TOKEN = os.getenv("BAPI_TOKEN", "YOUR_BRAVE_API_TOKEN")
API_URL = "https://api.friendli.ai/dedicated/v1/chat/completions"
BRAVE_SEARCH_URL = "https://api.search.brave.com/res/v1/web/search"
MODEL_ID = "dep89a2fld32mcm"
TEST_MODE = os.getenv("TEST_MODE", "false").lower() == "true"
class LLMCollaborativeSystem:
def __init__(self):
self.token = FRIENDLI_TOKEN
self.bapi_token = BAPI_TOKEN
self.api_url = API_URL
self.brave_url = BRAVE_SEARCH_URL
self.model_id = MODEL_ID
self.test_mode = TEST_MODE or (self.token == "YOUR_FRIENDLI_TOKEN")
if self.test_mode:
logger.warning("테스트 모드로 실행됩니다.")
if self.bapi_token == "YOUR_BRAVE_API_TOKEN":
logger.warning("Brave API 토큰이 설정되지 않았습니다.")
def create_headers(self):
"""API 헤더 생성"""
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def create_brave_headers(self):
"""Brave API 헤더 생성"""
return {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": self.bapi_token
}
def create_supervisor_initial_prompt(self, user_query: str) -> str:
"""감독자 AI 초기 프롬프트 생성"""
return f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
이 질문에 대해:
1. 전체적인 접근 방향과 프레임워크를 제시하세요
2. 핵심 요소와 고려사항을 구조화하여 설명하세요
3. 이 주제에 대해 조사가 필요한 5-7개의 구체적인 키워드나 검색어를 제시하세요
키워드는 다음 형식으로 제시하세요:
[검색 키워드]: 키워드1, 키워드2, 키워드3, 키워드4, 키워드5"""
def create_supervisor_research_review_prompt(self, user_query: str, research_summary: str) -> str:
"""감독자 AI의 조사 내용 검토 및 추가 지시 프롬프트"""
return f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
조사자 AI의 1차 조사 결과:
{research_summary}
위 조사 결과를 검토하고:
1. 조사된 내용의 품질과 완전성을 평가하세요
2. 부족하거나 누락된 중요한 정보를 식별하세요
3. 추가로 조사가 필요한 3-5개의 구체적인 키워드나 주제를 제시하세요
4. 조사자에게 특별히 주의해야 할 사항이나 중점적으로 찾아야 할 정보를 지시하세요
추가 키워드는 다음 형식으로 제시하세요:
[추가 검색 키워드]: 키워드1, 키워드2, 키워드3"""
def create_researcher_prompt(self, user_query: str, supervisor_guidance: str, search_results: Dict[str, List[Dict]]) -> str:
"""조사자 AI 프롬프트 생성"""
search_summary = ""
for keyword, results in search_results.items():
search_summary += f"\n\n**{keyword}에 대한 검색 결과:**\n"
for i, result in enumerate(results[:3], 1):
search_summary += f"{i}. {result.get('title', 'N/A')}\n"
search_summary += f" - {result.get('description', 'N/A')}\n"
search_summary += f" - 출처: {result.get('url', 'N/A')}\n"
return f"""당신은 정보를 조사하고 정리하는 조사자 AI입니다.
사용자 질문: {user_query}
감독자 AI의 지침:
{supervisor_guidance}
브레이브 검색 결과:
{search_summary}
위 검색 결과를 바탕으로:
1. 각 키워드별로 중요한 정보를 정리하세요
2. 신뢰할 수 있는 출처를 명시하세요
3. 실행자 AI가 활용할 수 있는 구체적인 데이터와 사실을 추출하세요
4. 최신 트렌드나 중요한 통계가 있다면 강조하세요"""
def create_researcher_additional_prompt(self, user_query: str, previous_research: str, supervisor_feedback: str, search_results: Dict[str, List[Dict]]) -> str:
"""조사자 AI 추가 조사 프롬프트"""
search_summary = ""
for keyword, results in search_results.items():
search_summary += f"\n\n**{keyword}에 대한 추가 검색 결과:**\n"
for i, result in enumerate(results[:3], 1):
search_summary += f"{i}. {result.get('title', 'N/A')}\n"
search_summary += f" - {result.get('description', 'N/A')}\n"
search_summary += f" - 출처: {result.get('url', 'N/A')}\n"
return f"""당신은 정보를 조사하고 정리하는 조사자 AI입니다.
사용자 질문: {user_query}
이전 조사 내용:
{previous_research}
감독자 AI의 피드백 및 추가 조사 지시:
{supervisor_feedback}
추가 검색 결과:
{search_summary}
위 정보를 바탕으로:
1. 감독자가 요청한 추가 정보를 중점적으로 조사하세요
2. 이전 조사와 중복되지 않는 새로운 정보를 제공하세요
3. 특히 감독자가 지적한 부족한 부분을 보완하세요
4. 최종적으로 종합된 조사 결과를 제시하세요"""
def create_supervisor_execution_prompt(self, user_query: str, research_summary: str) -> str:
"""감독자 AI의 실행 지시 프롬프트"""
return f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
조사자 AI가 정리한 최종 조사 내용:
{research_summary}
위 조사 내용을 기반으로 실행자 AI에게 아주 구체적인 지시를 내려주세요:
1. 조사된 정보를 어떻게 활용할지 명확히 지시하세요
2. 실행 가능한 단계별 작업을 구체적으로 제시하세요
3. 각 단계에서 참고해야 할 조사 내용을 명시하세요
4. 예상되는 결과물의 형태를 구체적으로 설명하세요"""
def create_executor_prompt(self, user_query: str, supervisor_guidance: str, research_summary: str) -> str:
"""실행자 AI 프롬프트 생성"""
return f"""당신은 세부적인 내용을 구현하는 실행자 AI입니다.
사용자 질문: {user_query}
조사자 AI가 정리한 조사 내용:
{research_summary}
감독자 AI의 구체적인 지시:
{supervisor_guidance}
위 조사 내용과 지시사항을 바탕으로:
1. 조사된 정보를 적극 활용하여 구체적인 실행 계획을 작성하세요
2. 각 단계별로 참고한 조사 내용을 명시하세요
3. 실제로 적용 가능한 구체적인 방법론을 제시하세요
4. 예상되는 성과와 측정 방법을 포함하세요"""
def create_executor_final_prompt(self, user_query: str, initial_response: str, supervisor_feedback: str, research_summary: str) -> str:
"""실행자 AI 최종 보고서 프롬프트"""
return f"""당신은 세부적인 내용을 구현하는 실행자 AI입니다.
사용자 질문: {user_query}
조사자 AI의 조사 내용:
{research_summary}
당신의 초기 답변:
{initial_response}
감독자 AI의 피드백 및 개선사항:
{supervisor_feedback}
위 피드백을 완전히 반영하여 최종 보고서를 작성하세요:
1. 감독자의 모든 개선사항을 반영하세요
2. 조사 내용을 더욱 구체적으로 활용하세요
3. 실행 가능성을 높이는 세부 계획을 포함하세요
4. 명확한 결론과 다음 단계를 제시하세요
5. 전문적이고 완성도 높은 최종 보고서 형식으로 작성하세요"""
def extract_keywords(self, text: str) -> List[str]:
"""텍스트에서 키워드 추출"""
keywords = []
# [검색 키워드]: 또는 [추가 검색 키워드]: 형식으로 키워드 찾기
keyword_match = re.search(r'\[(추가 )?검색 키워드\]:\s*(.+)', text, re.IGNORECASE)
if keyword_match:
keyword_str = keyword_match.group(2)
keywords = [k.strip() for k in keyword_str.split(',') if k.strip()]
# 키워드가 없으면 기본 키워드 생성
if not keywords:
keywords = ["best practices", "implementation guide", "case studies", "latest trends", "success factors"]
return keywords[:7] # 최대 7개로 제한
def brave_search(self, query: str) -> List[Dict]:
"""Brave Search API 호출"""
if self.test_mode or self.bapi_token == "YOUR_BRAVE_API_TOKEN":
# 테스트 모드에서는 시뮬레이션된 결과 반환
return [
{
"title": f"Best Practices for {query}",
"description": f"Comprehensive guide on implementing {query} with proven methodologies and real-world examples.",
"url": f"https://example.com/{query.replace(' ', '-')}"
},
{
"title": f"Latest Trends in {query}",
"description": f"Analysis of current trends and future directions in {query}, including market insights and expert opinions.",
"url": f"https://trends.example.com/{query.replace(' ', '-')}"
},
{
"title": f"{query}: Case Studies and Success Stories",
"description": f"Real-world implementations of {query} across various industries with measurable results.",
"url": f"https://casestudies.example.com/{query.replace(' ', '-')}"
}
]
try:
params = {
"q": query,
"count": 5,
"safesearch": "moderate",
"freshness": "pw" # Past week for recent results
}
response = requests.get(
self.brave_url,
headers=self.create_brave_headers(),
params=params,
timeout=10
)
if response.status_code == 200:
data = response.json()
results = []
for item in data.get("web", {}).get("results", [])[:5]:
results.append({
"title": item.get("title", ""),
"description": item.get("description", ""),
"url": item.get("url", "")
})
return results
else:
logger.error(f"Brave API 오류: {response.status_code}")
return []
except Exception as e:
logger.error(f"Brave 검색 중 오류: {str(e)}")
return []
def simulate_streaming(self, text: str, role: str) -> Generator[str, None, None]:
"""테스트 모드에서 스트리밍 시뮬레이션"""
words = text.split()
for i in range(0, len(words), 3):
chunk = " ".join(words[i:i+3])
yield chunk + " "
time.sleep(0.05)
def call_llm_streaming(self, messages: List[Dict[str, str]], role: str) -> Generator[str, None, None]:
"""스트리밍 LLM API 호출"""
# 테스트 모드
if self.test_mode:
logger.info(f"테스트 모드 스트리밍 - Role: {role}")
# 테스트 응답을 미리 정의
if role == "supervisor" and "조사자 AI의 1차 조사 결과" in messages[0]["content"]:
response = """1차 조사 결과를 검토한 결과, 다음과 같은 평가와 추가 조사 지시를 제시합니다.
**조사 내용 평가**
- 강점: 주요 기술적 접근법과 대표적인 성공 사례가 잘 정리됨
- 부족한 점: 구체적인 구현 도구와 프레임워크 정보 부족
[추가 검색 키워드]: ML optimization tools 2024, model optimization failure cases"""
elif role == "supervisor" and "조사자 AI가 정리한 최종 조사 내용" in messages[0]["content"]:
response = """종합된 조사 내용을 바탕으로 실행자 AI에게 구체적으로 지시합니다."""
elif role == "supervisor" and messages[0]["content"].find("실행자 AI의 답변") > -1:
response = """실행자 AI의 계획을 검토한 결과, 개선사항을 제안합니다."""
elif role == "supervisor":
response = """이 질문에 대한 거시적 분석을 제시하겠습니다.
[검색 키워드]: best practices, implementation guide, case studies"""
elif role == "researcher" and "추가 조사" in messages[0]["content"]:
response = """감독자의 피드백을 반영하여 추가 조사를 수행했습니다."""
elif role == "researcher":
response = """조사 결과를 종합하여 정리했습니다."""
elif role == "executor" and "최종 보고서" in messages[0]["content"]:
response = """감독자 AI의 피드백을 완전히 반영하여 최종 실행 보고서를 작성합니다."""
else:
response = """감독자의 지시에 따라 구체적인 실행 계획을 수립합니다."""
yield from self.simulate_streaming(response, role)
return
# 🎯 ML 모델 최적화 프로젝트 최종 실행 보고서
## 📋 Executive Summary
8주간의 체계적인 접근을 통해 ML 모델의 크기를 95% 줄이고, 추론 속도를 15배 향상시키며, 운영 비용을 75% 절감하는 목표를 달성할 예정입니다. 최신 연구 결과와 실패 사례의 교훈을 반영하여 리스크를 최소화하고, 275%의 ROI를 6개월 내에 달성할 것으로 예상됩니다.
## 🏗️ 프로젝트 구조
### 1단계: 진단 및 준비 (1주차)
**현황 분석 및 도구 선정**
```yaml
현재 상태:
모델_크기: 2.5GB
추론_시간: 45ms
GPU_메모리: 8GB
월간_비용: $50,000
선택된 도구:
기본_프레임워크: PyTorch 2.0 + Optimization Toolkit
모델_최적화: Sparse MoE, Flash Attention v3
양자화: Quantization-Aware Training 2.0
배포_최적화: TensorRT (GPU), OpenVINO (CPU)
모니터링: Grafana + Prometheus + Custom Alerts
```
**리스크 관리 체크리스트**
- [ ] Uber 사례: 유지보수성 검증 프로세스
- [ ] Twitter 사례: 캐싱 전략 사전 검증
- [ ] 데이터 파이프라인 스트레스 테스트
- [ ] 각 단계별 롤백 계획 문서화
- [ ] Plan B 시나리오 준비
### 2단계: 데이터 최적화 (2-3주차)
**고급 데이터 파이프라인**
```python
class EnhancedDataPipeline:
def __init__(self):
self.cache = RedisCluster(
startup_nodes=[
{"host": "10.0.0.1", "port": "7000"},
{"host": "10.0.0.2", "port": "7000"}
],
decode_responses=True,
skip_full_coverage_check=True
)
# 실시간 모니터링 통합
self.monitor = DataPipelineMonitor(
prometheus_gateway="http://prometheus:9091",
alert_webhook="https://slack.webhook.url"
)
def optimize_with_monitoring(self, data_loader):
# 병목 지점 실시간 감지
with self.monitor.profile("data_loading"):
optimized_loader = self.apply_optimizations(data_loader)
# 성능 지표 자동 수집
self.monitor.record_metrics({
"throughput": self.calculate_throughput(),
"latency_p99": self.get_latency_percentile(99),
"cache_hit_rate": self.cache.get_hit_rate()
})
return optimized_loader
```
### 3단계: 모델 최적화 구현 (4-6주차)
**통합 최적화 프레임워크**
```python
class ModelOptimizationFramework:
def __init__(self, model, config):
self.model = model
self.config = config
self.optimization_history = []
def apply_optimizations(self):
# 1. Sparse MoE 적용 (95% 크기 감소)
self.model = self.apply_sparse_moe()
self.validate_performance("sparse_moe")
# 2. Flash Attention v3 (메모리 80% 감소)
self.model = self.apply_flash_attention()
self.validate_performance("flash_attention")
# 3. 2-bit Quantization (추가 크기 감소)
self.model = self.apply_quantization()
self.validate_performance("quantization")
# 4. 하드웨어별 최적화
if self.config.target_hardware == "gpu":
self.model = self.optimize_tensorrt()
else:
self.model = self.optimize_openvino()
return self.model
def validate_performance(self, stage):
"""각 최적화 단계 후 성능 검증"""
metrics = self.benchmark()
if not self.meets_requirements(metrics):
# Plan B 실행
self.execute_fallback_plan(stage)
self.optimization_history.append({
"stage": stage,
"metrics": metrics,
"timestamp": datetime.now()
})
```
### 4단계: 검증 및 배포 (7-8주차)
**고급 모니터링 대시보드**
```python
class MLOpsMonitoringDashboard:
def __init__(self):
self.grafana = GrafanaAPI(url="http://grafana:3000")
self.prometheus = PrometheusAPI(url="http://prometheus:9090")
def create_dashboards(self):
# 실시간 성능 대시보드
performance_dashboard = {
"title": "ML Model Performance",
"panels": [
self.create_latency_panel(),
self.create_throughput_panel(),
self.create_resource_usage_panel(),
self.create_cost_analysis_panel()
]
}
# 이상 징후 감지 대시보드
anomaly_dashboard = {
"title": "Anomaly Detection",
"panels": [
self.create_drift_detection_panel(),
self.create_error_rate_panel(),
self.create_sla_compliance_panel()
]
}
# 알림 규칙 설정
self.setup_alerts([
Alert("High Latency", "latency_p99 > 10ms", severity="warning"),
Alert("Model Drift", "accuracy_drop > 2%", severity="critical"),
Alert("High Error Rate", "error_rate > 0.1%", severity="critical")
])
```
**단계적 배포 전략**
```python
class EnhancedCanaryDeployment:
def __init__(self):
self.deployment_stages = [
{"traffic": 0.01, "duration": "1d", "rollback_threshold": 0.1},
{"traffic": 0.05, "duration": "2d", "rollback_threshold": 0.5},
{"traffic": 0.20, "duration": "3d", "rollback_threshold": 1.0},
{"traffic": 0.50, "duration": "3d", "rollback_threshold": 1.5},
{"traffic": 1.00, "duration": None, "rollback_threshold": 2.0}
]
def deploy_with_monitoring(self, model):
for stage in self.deployment_stages:
# 트래픽 라우팅
self.route_traffic(model, stage["traffic"])
# 실시간 모니터링
metrics = self.monitor_performance(duration=stage["duration"])
# 자동 롤백 검사
if self.should_rollback(metrics, stage["rollback_threshold"]):
self.automatic_rollback()
self.notify_team("Automatic rollback triggered")
return False
return True
```
## 📊 예상 성과 및 ROI
### 성능 개선
| 지표 | 현재 | 목표 | 달성 예상 |
|------|------|------|-----------|
| 모델 크기 | 2.5GB | 125MB | ✓ 95% 감소 |
| 추론 시간 | 45ms | 3ms | ✓ 15배 향상 |
| GPU 메모리 | 8GB | 1GB | ✓ 87.5% 감소 |
| 정확도 | 94.2% | >92% | ✓ 93.8% |
### 비용 절감 및 ROI
- **월간 인프라 비용**: $50,000 → $12,500 (75% 절감)
- **연간 절감액**: $450,000
- **프로젝트 투자**: $60,000
- **투자 회수 기간**: 1.6개월
- **6개월 ROI**: 275%
- **1년 ROI**: 650%
## 👥 팀 역량 개발 계획
### 교육 프로그램
1. **주간 기술 세미나** (매주 금요일 2시간)
- Week 1-2: Sparse MoE 아키텍처 이해
- Week 3-4: Flash Attention 구현 실습
- Week 5-6: 양자화 기법 심화
- Week 7-8: MLOps 모니터링 실무
2. **외부 전문가 초청 강연**
- Google DeepMind 엔지니어: Sparse MoE 실전 적용
- NVIDIA 전문가: TensorRT 최적화 팁
- MLOps 커뮤니티 리더: 프로덕션 모니터링
3. **핸즈온 워크샵**
- 매월 1회 전일 실습
- 실제 모델로 최적화 실습
- 페어 프로그래밍 세션
## 🔧 장기 유지보수 전략
### 분기별 최적화 사이클
```yaml
Q1:
- 새로운 데이터로 모델 재훈련
- 최신 최적화 기법 조사
- 성능 벤치마크 업데이트
Q2:
- 프로덕션 메트릭 분석
- 병목 지점 재식별
- 추가 최적화 적용
Q3:
- 하드웨어 업그레이드 검토
- 새로운 프레임워크 평가
- 비용-효과 재분석
Q4:
- 연간 성과 리뷰
- 차년도 계획 수립
- 기술 부채 해결
```
### 기술 부채 관리
- 월간 기술 부채 리뷰 미팅
- 우선순위 매트릭스 관리
- 20% 시간을 리팩토링에 할당
## 🚀 확장 계획
### 내부 플랫폼화
1. **ML 최적화 플랫폼 구축**
- 웹 기반 UI로 손쉬운 최적화
- 자동화된 벤치마킹
- 템플릿 기반 적용
2. **다른 팀 지원**
- 최적화 컨설팅 서비스
- 베스트 프랙티스 문서화
- 내부 교육 프로그램
### 외부 기여
1. **오픈소스 프로젝트**
- Hugging Face Optimum 기여
- 자체 최적화 도구 공개
- 커뮤니티 피드백 수렴
2. **컨퍼런스 발표**
- MLOps Summit 2024
- PyTorch Conference
- 기업 사례 공유
## 📝 결론
본 프로젝트는 최신 ML 최적화 기술과 실무 경험을 결합하여, 대규모 비용 절감과 성능 향상을 동시에 달성합니다. 체계적인 리스크 관리와 지속적인 개선 프로세스를 통해 장기적인 경쟁력을 확보하고, 조직 전체의 ML 역량을 한 단계 끌어올릴 것으로 기대됩니다.
---
*작성일: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
*작성자: 협력적 AI 시스템 (감독자, 조사자, 실행자 AI)*
*버전: 2.0 (피드백 완전 반영)*"""
}
# 프롬프트 내용에 따라 적절한 응답 선택
if role == "supervisor" and "조사자 AI의 1차 조사 결과" in messages[0]["content"]:
response = test_responses["supervisor_research_review"]
elif role == "supervisor" and "조사자 AI가 정리한 최종 조사 내용" in messages[0]["content"]:
response = test_responses["supervisor_execution"]
elif role == "supervisor" and messages[0]["content"].find("실행자 AI의 답변") > -1:
response = test_responses["supervisor_review"]
elif role == "supervisor":
response = test_responses["supervisor_initial"]
elif role == "researcher" and "추가 조사" in messages[0]["content"]:
response = test_responses["researcher_additional"]
elif role == "researcher":
response = test_responses["researcher_initial"]
elif role == "executor" and "최종 보고서" in messages[0]["content"]:
response = test_responses["executor_final"]
else:
response = test_responses["executor"]
yield from self.simulate_streaming(response, role)
return
# 실제 API 호출 (기존 코드와 동일)
try:
system_prompts = {
"supervisor": "당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.",
"researcher": "당신은 정보를 조사하고 체계적으로 정리하는 조사자 AI입니다.",
"executor": "당신은 세부적인 내용을 구현하는 실행자 AI입니다."
}
full_messages = [
{"role": "system", "content": system_prompts.get(role, "")},
*messages
]
payload = {
"model": self.model_id,
"messages": full_messages,
"max_tokens": 2048,
"temperature": 0.7,
"top_p": 0.8,
"stream": True,
"stream_options": {"include_usage": True}
}
logger.info(f"API 스트리밍 호출 시작 - Role: {role}")
response = requests.post(
self.api_url,
headers=self.create_headers(),
json=payload,
stream=True,
timeout=10
)
if response.status_code != 200:
logger.error(f"API 오류: {response.status_code}")
yield f"❌ API 오류 ({response.status_code}): {response.text[:200]}"
return
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
content = chunk["choices"][0].get("delta", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except requests.exceptions.Timeout:
yield "⏱️ API 호출 시간이 초과되었습니다. 다시 시도해주세요."
except requests.exceptions.ConnectionError:
yield "🔌 API 서버에 연결할 수 없습니다. 인터넷 연결을 확인해주세요."
except Exception as e:
logger.error(f"스트리밍 중 오류: {str(e)}")
yield f"❌ 오류 발생: {str(e)}"
# 시스템 인스턴스 생성
llm_system = LLMCollaborativeSystem()
def process_query_streaming(user_query: str):
"""스트리밍을 지원하는 쿼리 처리 (대화 기록 제거)"""
if not user_query:
return "", "", "", "", "❌ 질문을 입력해주세요."
all_responses = {"supervisor": [], "researcher": [], "executor": []}
try:
# 1단계: 감독자 AI 초기 분석 및 키워드 추출
supervisor_prompt = llm_system.create_supervisor_initial_prompt(user_query)
supervisor_initial_response = ""
supervisor_text = "[1. 초기 분석] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": supervisor_prompt}],
"supervisor"
):
supervisor_initial_response += chunk
supervisor_text = f"[1. 초기 분석] - {datetime.now().strftime('%H:%M:%S')}\n{supervisor_initial_response}"
yield supervisor_text, "", "", "", "🔄 감독자 AI가 분석 중..."
all_responses["supervisor"].append(supervisor_initial_response)
keywords = llm_system.extract_keywords(supervisor_initial_response)
# 2단계: 1차 브레이브 검색
researcher_text = "[2. 1차 웹 검색] 🔍 검색 중...\n"
yield supervisor_text, researcher_text, "", "", "🔍 웹 검색 수행 중..."
search_results = {}
for keyword in keywords:
results = llm_system.brave_search(keyword)
if results:
search_results[keyword] = results
researcher_text += f"✓ '{keyword}' 검색 완료\n"
yield supervisor_text, researcher_text, "", "", f"🔍 '{keyword}' 검색 중..."
# 3단계: 조사자 AI 1차 정리
researcher_prompt = llm_system.create_researcher_prompt(user_query, supervisor_initial_response, search_results)
researcher_response = ""
researcher_text = "[3. 1차 조사 정리] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": researcher_prompt}],
"researcher"
):
researcher_response += chunk
researcher_text = f"[3. 1차 조사 정리] - {datetime.now().strftime('%H:%M:%S')}\n{researcher_response}"
yield supervisor_text, researcher_text, "", "", "📝 조사자 AI가 정리 중..."
all_responses["researcher"].append(researcher_response)
# 4단계: 감독자 AI가 조사 내용 검토 및 추가 지시
supervisor_review_prompt = llm_system.create_supervisor_research_review_prompt(user_query, researcher_response)
supervisor_review_response = ""
supervisor_text += "\n\n---\n\n[4. 조사 검토 및 추가 지시] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": supervisor_review_prompt}],
"supervisor"
):
supervisor_review_response += chunk
temp_text = f"{all_responses['supervisor'][0]}\n\n---\n\n[4. 조사 검토 및 추가 지시] - {datetime.now().strftime('%H:%M:%S')}\n{supervisor_review_response}"
supervisor_text = f"[1. 초기 분석] - {datetime.now().strftime('%H:%M:%S')}\n{temp_text}"
yield supervisor_text, researcher_text, "", "", "🔍 감독자 AI가 추가 조사 지시 중..."
all_responses["supervisor"].append(supervisor_review_response)
additional_keywords = llm_system.extract_keywords(supervisor_review_response)
# 5단계: 2차 브레이브 검색
researcher_text += "\n\n---\n\n[5. 2차 웹 검색] 🔍 추가 검색 중...\n"
yield supervisor_text, researcher_text, "", "", "🔍 추가 웹 검색 수행 중..."
additional_search_results = {}
for keyword in additional_keywords:
results = llm_system.brave_search(keyword)
if results:
additional_search_results[keyword] = results
researcher_text += f"✓ '{keyword}' 추가 검색 완료\n"
yield supervisor_text, researcher_text, "", "", f"🔍 '{keyword}' 추가 검색 중..."
# 6단계: 조사자 AI 최종 정리
additional_researcher_prompt = llm_system.create_researcher_additional_prompt(
user_query, researcher_response, supervisor_review_response, additional_search_results
)
final_researcher_response = ""
researcher_text += "\n\n---\n\n[6. 최종 조사 정리] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": additional_researcher_prompt}],
"researcher"
):
final_researcher_response += chunk
temp_text = researcher_text.split("\n\n---\n\n")
temp_text[-1] = f"[6. 최종 조사 정리] - {datetime.now().strftime('%H:%M:%S')}\n{final_researcher_response}"
researcher_text = "\n\n---\n\n".join(temp_text)
yield supervisor_text, researcher_text, "", "", "📝 조사자 AI가 최종 정리 중..."
all_responses["researcher"].append(final_researcher_response)
# 7단계: 감독자 AI가 최종 실행 지시
supervisor_execution_prompt = llm_system.create_supervisor_execution_prompt(user_query, final_researcher_response)
supervisor_execution_response = ""
supervisor_text += "\n\n---\n\n[7. 실행 지시] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": supervisor_execution_prompt}],
"supervisor"
):
supervisor_execution_response += chunk
parts = supervisor_text.split("\n\n---\n\n")
parts[-1] = f"[7. 실행 지시] - {datetime.now().strftime('%H:%M:%S')}\n{supervisor_execution_response}"
supervisor_text = "\n\n---\n\n".join(parts)
yield supervisor_text, researcher_text, "", "", "🎯 감독자 AI가 실행 지시 중..."
all_responses["supervisor"].append(supervisor_execution_response)
# 8단계: 실행자 AI 초기 구현
executor_prompt = llm_system.create_executor_prompt(user_query, supervisor_execution_response, final_researcher_response)
executor_response = ""
executor_text = "[8. 초기 구현] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": executor_prompt}],
"executor"
):
executor_response += chunk
executor_text = f"[8. 초기 구현] - {datetime.now().strftime('%H:%M:%S')}\n{executor_response}"
yield supervisor_text, researcher_text, executor_text, "", "🔧 실행자 AI가 구현 중..."
all_responses["executor"].append(executor_response)
# 9단계: 감독자 AI 검토 및 피드백
review_prompt = f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
실행자 AI의 답변:
{executor_response}
이 답변을 검토하고 개선점과 추가 고려사항을 제시해주세요. 구체적이고 실행 가능한 개선 방안을 제시하세요."""
review_response = ""
supervisor_text += "\n\n---\n\n[9. 검토 및 피드백] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": review_prompt}],
"supervisor"
):
review_response += chunk
parts = supervisor_text.split("\n\n---\n\n")
parts[-1] = f"[9. 검토 및 피드백] - {datetime.now().strftime('%H:%M:%S')}\n{review_response}"
supervisor_text = "\n\n---\n\n".join(parts)
yield supervisor_text, researcher_text, executor_text, "", "🔄 감독자 AI가 검토 중..."
all_responses["supervisor"].append(review_response)
# 10단계: 실행자 AI 최종 보고서
final_executor_prompt = llm_system.create_executor_final_prompt(
user_query, executor_response, review_response, final_researcher_response
)
final_executor_response = ""
executor_text += "\n\n---\n\n[10. 최종 보고서] 🔄 작성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": final_executor_prompt}],
"executor"
):
final_executor_response += chunk
parts = executor_text.split("\n\n---\n\n")
parts[-1] = f"[10. 최종 보고서] - {datetime.now().strftime('%H:%M:%S')}\n{final_executor_response}"
executor_text = "\n\n---\n\n".join(parts)
yield supervisor_text, researcher_text, executor_text, "", "📄 최종 보고서 작성 중..."
all_responses["executor"].append(final_executor_response)
# 최종 결과 생성
final_summary = f"""## 🎯 최종 종합 보고서
### 📌 사용자 질문
{user_query}
### 📄 최종 보고서 (실행자 AI - 피드백 완전 반영)
{final_executor_response}
---
<details>
<summary>📋 전체 협력 과정 보기</summary>
#### 1. 거시적 분석 (감독자 AI)
{all_responses['supervisor'][0]}
#### 2. 1차 조사 결과 (조사자 AI)
{all_responses['researcher'][0]}
#### 3. 조사 검토 및 추가 지시 (감독자 AI)
{all_responses['supervisor'][1]}
#### 4. 최종 조사 결과 (조사자 AI)
{all_responses['researcher'][1]}
#### 5. 실행 지시 (감독자 AI)
{all_responses['supervisor'][2]}
#### 6. 초기 구현 (실행자 AI)
{all_responses['executor'][0]}
#### 7. 검토 및 개선사항 (감독자 AI)
{all_responses['supervisor'][3]}
</details>
---
*이 보고서는 2차 웹 검색과 AI들의 완전한 협력을 통해 작성되었습니다.*"""
yield supervisor_text, researcher_text, executor_text, final_summary, "✅ 최종 보고서 완성!"
except Exception as e:
error_msg = f"❌ 처리 중 오류: {str(e)}"
yield "", "", "", error_msg, error_msg
def clear_all():
"""모든 내용 초기화"""
return "", "", "", "", "🔄 초기화되었습니다."
# Gradio 인터페이스
css = """
.gradio-container {
font-family: 'Arial', sans-serif;
}
.supervisor-box textarea {
border-left: 4px solid #667eea !important;
padding-left: 10px !important;
font-size: 13px !important;
}
.researcher-box textarea {
border-left: 4px solid #10b981 !important;
padding-left: 10px !important;
font-size: 13px !important;
}
.executor-box textarea {
border-left: 4px solid #764ba2 !important;
padding-left: 10px !important;
font-size: 13px !important;
}
"""
with gr.Blocks(title="협력적 LLM 시스템", theme=gr.themes.Soft(), css=css) as app:
gr.Markdown(
f"""
# 🤝 강화된 협력적 LLM 시스템
> 감독자, 조사자, 실행자 AI가 10단계 협력 프로세스를 통해 완전한 보고서를 작성합니다.
**상태**:
- LLM: {'🟢 실제 모드' if not llm_system.test_mode else '🟡 테스트 모드'}
- Brave Search: {'🟢 활성화' if llm_system.bapi_token != "YOUR_BRAVE_API_TOKEN" else '🟡 테스트 모드'}
**10단계 협력 프로세스:**
1. 🧠 감독자: 초기 분석 및 키워드 추출
2. 🔍 조사자: 1차 웹 검색
3. 🔍 조사자: 1차 조사 정리
4. 🧠 감독자: 조사 검토 및 추가 지시
5. 🔍 조사자: 2차 웹 검색
6. 🔍 조사자: 최종 조사 정리
7. 🧠 감독자: 실행 지시
8. 👁️ 실행자: 초기 구현
9. 🧠 감독자: 검토 및 피드백
10. 👁️ 실행자: 최종 보고서
"""
)
# 입력 섹션
with gr.Row():
with gr.Column(scale=4):
user_input = gr.Textbox(
label="질문 입력",
placeholder="예: 기계학습 모델의 성능을 향상시키는 최신 방법은?",
lines=2
)
with gr.Column(scale=1):
submit_btn = gr.Button("🚀 분석 시작", variant="primary", scale=1)
clear_btn = gr.Button("🗑️ 초기화", scale=1)
status_text = gr.Textbox(
label="상태",
interactive=False,
value="대기 중...",
max_lines=1
)
# AI 출력 섹션 (가로 3분할)
with gr.Row():
# 감독자 AI 출력
with gr.Column(scale=1):
gr.Markdown("### 🧠 감독자 AI")
supervisor_output = gr.Textbox(
label="거시적 분석 및 지시",
lines=20,
max_lines=25,
interactive=False,
elem_classes=["supervisor-box"]
)
# 조사자 AI 출력
with gr.Column(scale=1):
gr.Markdown("### 🔍 조사자 AI")
researcher_output = gr.Textbox(
label="웹 검색 및 정리",
lines=20,
max_lines=25,
interactive=False,
elem_classes=["researcher-box"]
)
# 실행자 AI 출력
with gr.Column(scale=1):
gr.Markdown("### 👁️ 실행자 AI")
executor_output = gr.Textbox(
label="구체적 구현",
lines=20,
max_lines=25,
interactive=False,
elem_classes=["executor-box"]
)
# 최종 결과
with gr.Row():
with gr.Column():
gr.Markdown("### 📊 최종 종합 결과")
final_output = gr.Markdown(
value="*질문을 입력하면 결과가 여기에 표시됩니다.*"
)
# 예제
gr.Examples(
examples=[
"기계학습 모델의 성능을 향상시키는 최신 방법은?",
"2024년 효과적인 프로젝트 관리 도구와 전략은?",
"지속 가능한 비즈니스 모델의 최신 트렌드는?",
"최신 데이터 시각화 도구와 기법은?",
"원격 팀의 생산성을 높이는 검증된 방법은?"
],
inputs=user_input,
label="💡 예제 질문"
)
# 이벤트 핸들러
submit_btn.click(
fn=process_query_streaming,
inputs=[user_input],
outputs=[supervisor_output, researcher_output, executor_output, final_output, status_text]
).then(
fn=lambda: "",
outputs=[user_input]
)
user_input.submit(
fn=process_query_streaming,
inputs=[user_input],
outputs=[supervisor_output, researcher_output, executor_output, final_output, status_text]
).then(
fn=lambda: "",
outputs=[user_input]
)
clear_btn.click(
fn=clear_all,
outputs=[supervisor_output, researcher_output, executor_output, final_output, status_text]
)
gr.Markdown(
"""
---
### 📝 사용 방법
1. 질문을 입력하고 Enter 또는 '분석 시작' 버튼을 클릭하세요.
2. 10단계 협력 프로세스가 진행됩니다:
- 감독자의 2차 조사 검토로 더 완전한 정보 수집
- 각 AI의 작업이 실시간으로 표시됩니다
3. 최종 보고서가 하단에 표시됩니다.
### ⚙️ 환경 설정
- **LLM API**: `export FRIENDLI_TOKEN="your_token"`
- **Brave Search API**: `export BAPI_TOKEN="your_brave_api_token"`
- **테스트 모드**: `export TEST_MODE=true` (API 없이 작동)
### 💡 특징
- **강화된 조사**: 감독자가 조사 내용을 검토하고 추가 조사 지시
- **가독성 개선**: 3개 AI 출력을 가로로 배치하여 동시 확인
- **완전한 피드백 루프**: 모든 개선사항이 최종 보고서에 반영
"""
)
if __name__ == "__main__":
app.queue() # 스트리밍을 위한 큐 활성화
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)