import os
import random
import base64
import requests
import tempfile
import shutil
import time
import numpy as np
import traceback
from typing import List, Tuple
from datetime import datetime, timedelta
from pathlib import Path
from io import BytesIO
from urllib.parse import urljoin
# Selenium 관련
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException, TimeoutException
# 이미지 처리
from PIL import Image, ImageDraw, ImageFont
# Gradio
import gradio as gr
# HuggingFace
from huggingface_hub import InferenceClient
from dotenv import load_dotenv
# HTML 파싱
from bs4 import BeautifulSoup
# 음성 및 비디오 처리
from gtts import gTTS
from moviepy.editor import (
VideoFileClip,
AudioFileClip,
ImageClip,
concatenate_videoclips
)
# 상단에 import 추가
import textwrap
# .env 파일에서 환경 변수 로드
load_dotenv()
# HuggingFace 인퍼런스 클라이언트 설정
hf_client = InferenceClient(
"CohereForAI/c4ai-command-r-plus-08-2024",
token=os.getenv("HF_TOKEN")
)
# 스크린샷 캐시 디렉토리 설정
CACHE_DIR = Path("screenshot_cache")
CACHE_DIR.mkdir(exist_ok=True)
# 전역 변수로 스크린샷 캐시 선언
SCREENSHOT_CACHE = {}
def get_cached_screenshot(url: str) -> str:
"""캐시된 스크린샷 가져오기 또는 새로 생성"""
try:
# URL을 안전한 파일명으로 변환
safe_filename = base64.urlsafe_b64encode(url.encode()).decode()
cache_file = CACHE_DIR / f"{safe_filename[:200]}.jpg" # PNG 대신 JPG 사용
if cache_file.exists():
try:
with Image.open(cache_file) as img:
buffered = BytesIO()
img.save(buffered, format="JPEG", quality=85, optimize=True)
return base64.b64encode(buffered.getvalue()).decode()
except Exception as e:
print(f"Cache read error for {url}: {e}")
if cache_file.exists():
cache_file.unlink()
return take_screenshot(url)
except Exception as e:
print(f"Screenshot cache error for {url}: {e}")
return ""
def take_screenshot(url: str) -> str:
"""웹사이트 스크린샷 촬영"""
if not url.startswith('http'):
url = f"https://{url}"
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--window-size=1080,720')
driver = None
try:
driver = webdriver.Chrome(options=options)
driver.get(url)
# 페이지 로딩 대기
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 추가 대기 시간
time.sleep(3)
# 스크린샷 촬영 및 최적화
screenshot = driver.get_screenshot_as_png()
img = Image.open(BytesIO(screenshot))
# 이미지 크기 최적화
max_size = (800, 600)
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# JPEG로 변환 및 최적화
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1])
img = background
# 캐시 저장
safe_filename = base64.urlsafe_b64encode(url.encode()).decode()
cache_file = CACHE_DIR / f"{safe_filename[:200]}.jpg"
img.save(cache_file, format="JPEG", quality=85, optimize=True)
# 반환용 이미지 생성
buffered = BytesIO()
img.save(buffered, format="JPEG", quality=85, optimize=True)
return base64.b64encode(buffered.getvalue()).decode()
except Exception as e:
print(f"Screenshot error for {url}: {e}")
return ""
finally:
if driver:
driver.quit()
def cleanup_cache():
"""캐시 정리"""
try:
current_time = time.time()
for cache_file in CACHE_DIR.glob("*.jpg"):
try:
# 24시간 이상 된 파일 또는 0바이트 파일 삭제
if (current_time - cache_file.stat().st_mtime > 86400) or cache_file.stat().st_size == 0:
cache_file.unlink()
except Exception as e:
print(f"Error cleaning cache file {cache_file}: {e}")
except Exception as e:
print(f"Cache cleanup error: {e}")
# 앱 시작 시 캐시 정리
cleanup_cache()
def calculate_rising_rate(created_date: str, rank: int) -> int:
"""AI Rising Rate 계산"""
# 생성일 기준 점수 계산
created = datetime.strptime(created_date.split('T')[0], '%Y-%m-%d')
today = datetime.now()
days_diff = (today - created).days
date_score = max(0, 300 - days_diff) # 최대 300점
# 순위 기준 점수 계산
rank_score = max(0, 600 - rank) # 최대 300점
# 총점 계산
total_score = date_score + rank_score
# 별 개수 계산 (0~5)
if total_score <= 200:
stars = 1
elif total_score <= 400:
stars = 2
elif total_score <= 600:
stars = 3
elif total_score <= 800:
stars = 4
else:
stars = 5
return stars
def get_popularity_grade(likes: int, stars: int) -> tuple:
"""AI Popularity Score 등급 계산"""
# 기본 점수 (likes)
base_score = min(likes, 10000) # 최대 10000점
# 별점 추가 점수 (별 하나당 500점)
star_score = stars * 1000
# 총점
total_score = base_score + star_score
# 등급 테이블 (18단계)
grades = [
(14500, "AAA+"), (14000, "AAA"), (13500, "AAA-"),
(13000, "AA+"), (12500, "AA"), (12000, "AA-"),
(11500, "A+"), (11000, "A"), (10000, "A-"),
(9000, "BBB+"), (8000, "BBB"), (7000, "BBB-"),
(6000, "BB+"), (5000, "BB"), (4000, "BB-"),
(3000, "B+"), (2000, "B"), (1000, "B-")
]
for threshold, grade in grades:
if total_score >= threshold:
return grade, total_score
return "B-", total_score
# get_card 함수 내의 hardware_info 부분을 다음으로 교체:
def get_rating_info(item: dict, index: int) -> str:
"""평가 정보 HTML 생성"""
created = item.get('createdAt', '').split('T')[0]
likes = int(str(item.get('likes', '0')).replace(',', ''))
# AI Rising Rate 계산
stars = calculate_rising_rate(created, index + 1)
star_html = "★" * stars + "☆" * (5 - stars) # 채워진 별과 빈 별 조합
# AI Popularity Score 계산
grade, score = get_popularity_grade(likes, stars)
# 등급별 색상 설정
grade_colors = {
'AAA': '#FFD700', 'AA': '#FFA500', 'A': '#FF4500',
'BBB': '#4169E1', 'BB': '#1E90FF', 'B': '#00BFFF'
}
grade_base = grade.rstrip('+-')
grade_color = grade_colors.get(grade_base, '#666666')
return f"""
AI Rising Rate:
{star_html}
AI Popularity Score:
{grade} ({score:,})
"""
def get_hardware_info(item: dict) -> tuple:
"""하드웨어 정보 추출"""
try:
# runtime 정보 확인
runtime = item.get('runtime', {})
# CPU 정보 처리
cpu_info = runtime.get('cpu', 'Standard')
# GPU 정보 처리
gpu_info = "None"
if runtime.get('accelerator') == "gpu":
gpu_type = runtime.get('gpu', {}).get('name', '')
gpu_memory = runtime.get('gpu', {}).get('memory', '')
if gpu_type:
gpu_info = f"{gpu_type}"
if gpu_memory:
gpu_info += f" ({gpu_memory}GB)"
# spaces decorator 확인
if '@spaces.GPU' in str(item.get('sdk_version', '')):
if gpu_info == "None":
gpu_info = "GPU Enabled"
# SDK 정보 처리
sdk = item.get('sdk', 'N/A')
print(f"Debug - Runtime Info: {runtime}") # 디버그 출력
print(f"Debug - GPU Info: {gpu_info}") # 디버그 출력
return cpu_info, gpu_info, sdk
except Exception as e:
print(f"Error parsing hardware info: {str(e)}")
return 'Standard', 'None', 'N/A'
def get_card(item: dict, index: int, card_type: str = "space") -> str:
"""통합 카드 HTML 생성"""
item_id = item.get('id', '')
author, title = item_id.split('/', 1)
likes = format(item.get('likes', 0), ',')
created = item.get('createdAt', '').split('T')[0]
# short_description 가져오기
short_description = item.get('cardData', {}).get('short_description', '')
# URL 정의
if card_type == "space":
url = f"https://huggingface.co/spaces/{item_id}"
elif card_type == "model":
url = f"https://huggingface.co/{item_id}"
else: # dataset
url = f"https://huggingface.co/datasets/{item_id}"
# 메타데이터 처리
tags = item.get('tags', [])
pipeline_tag = item.get('pipeline_tag', '')
license = item.get('license', '')
sdk = item.get('sdk', 'N/A')
# AI Rating 정보 가져오기
rating_info = get_rating_info(item, index)
# 카드 타입별 그라데이션 설정
if card_type == "space":
gradient_colors = """
rgba(255, 182, 193, 0.7), /* 파스텔 핑크 */
rgba(173, 216, 230, 0.7), /* 파스텔 블루 */
rgba(255, 218, 185, 0.7) /* 파스텔 피치 */
"""
bg_content = f"""
background-image: url(data:image/png;base64,{get_cached_screenshot(url) if get_cached_screenshot(url) else ''});
background-size: cover;
background-position: center;
"""
type_icon = "🎯"
type_label = "SPACE"
elif card_type == "model":
gradient_colors = """
rgba(110, 142, 251, 0.7), /* 모델 블루 */
rgba(130, 158, 251, 0.7),
rgba(150, 174, 251, 0.7)
"""
bg_content = f"""
background: linear-gradient(135deg, #6e8efb, #4a6cf7);
padding: 15px;
"""
type_icon = "🤖"
type_label = "MODEL"
else: # dataset
gradient_colors = """
rgba(255, 107, 107, 0.7), /* 데이터셋 레드 */
rgba(255, 127, 127, 0.7),
rgba(255, 147, 147, 0.7)
"""
bg_content = f"""
background: linear-gradient(135deg, #ff6b6b, #ff8787);
padding: 15px;
"""
type_icon = "📊"
type_label = "DATASET"
content_bg = f"""
background: linear-gradient(135deg, {gradient_colors});
backdrop-filter: blur(10px);
"""
# 태그 표시 (models와 datasets용)
tags_html = ""
if card_type != "space":
tags_html = f"""
{' '.join([f'''
#{tag}
''' for tag in tags[:5]])}
"""
# 카드 HTML 반환
return f"""
#{index + 1}
{type_icon} {type_label}
{tags_html}
{title}
{f'''
{short_description}
''' if card_type == "space" and short_description else ''}
👤 {author}
❤️ {likes}
📅 {created}
{rating_info}
"""
def get_trending_spaces(search_query="", sort_by="rank", progress=gr.Progress()) -> Tuple[str, str]:
"""트렌딩 스페이스 가져오기"""
url = "https://huggingface.co/api/spaces"
try:
progress(0, desc="Fetching spaces data...")
params = {
'full': 'true',
'limit': 24
}
response = requests.get(url, params=params)
response.raise_for_status()
spaces = response.json()
# 검색어로 필터링
if search_query:
spaces = [space for space in spaces if search_query.lower() in
(space.get('id', '') + ' ' + space.get('title', '')).lower()]
# 정렬
sort_by = sort_by.lower()
if sort_by == "rising_rate":
spaces.sort(key=lambda x: calculate_rising_rate(x.get('createdAt', ''), 0), reverse=True)
elif sort_by == "popularity":
spaces.sort(key=lambda x: get_popularity_grade(
int(str(x.get('likes', '0')).replace(',', '')),
calculate_rising_rate(x.get('createdAt', ''), 0))[1],
reverse=True)
progress(0.1, desc="Creating gallery...")
html_content = """
"""
for idx, space in enumerate(spaces):
html_content += get_card(space, idx, "space")
progress((0.1 + 0.9 * idx/len(spaces)), desc=f"Loading space {idx+1}/{len(spaces)}...")
html_content += "
"
progress(1.0, desc="Complete!")
return html_content, f"Found {len(spaces)} spaces"
except Exception as e:
error_html = f'Error: {str(e)}
'
return error_html, f"Error: {str(e)}"
def get_models(search_query="", sort_by="rank", progress=gr.Progress()) -> Tuple[str, str]:
"""인기 모델 가져오기"""
url = "https://huggingface.co/api/models"
try:
progress(0, desc="Fetching models data...")
params = {
'full': 'true',
'limit': 300
}
response = requests.get(url, params=params)
response.raise_for_status()
models = response.json()
# 검색어로 필터링
if search_query:
models = [model for model in models if search_query.lower() in
(model.get('id', '') + ' ' + model.get('title', '')).lower()]
# 정렬
sort_by = sort_by.lower()
if sort_by == "rising_rate":
models.sort(key=lambda x: calculate_rising_rate(x.get('createdAt', ''), 0), reverse=True)
elif sort_by == "popularity":
models.sort(key=lambda x: get_popularity_grade(
int(str(x.get('likes', '0')).replace(',', '')),
calculate_rising_rate(x.get('createdAt', ''), 0))[1],
reverse=True)
progress(0.1, desc="Creating gallery...")
html_content = """
"""
for idx, model in enumerate(models):
html_content += get_card(model, idx, "model")
progress((0.1 + 0.9 * idx/len(models)), desc=f"Loading model {idx+1}/{len(models)}...")
html_content += "
"
progress(1.0, desc="Complete!")
return html_content, f"Found {len(models)} models"
except Exception as e:
error_html = f'Error: {str(e)}
'
return error_html, f"Error: {str(e)}"
def get_datasets(search_query="", sort_by="rank", progress=gr.Progress()) -> Tuple[str, str]:
"""인기 데이터셋 가져오기"""
url = "https://huggingface.co/api/datasets"
try:
progress(0, desc="Fetching datasets data...")
params = {
'full': 'true',
'limit': 300
}
response = requests.get(url, params=params)
response.raise_for_status()
datasets = response.json()
# 검색어로 필터링
if search_query:
datasets = [dataset for dataset in datasets if search_query.lower() in
(dataset.get('id', '') + ' ' + dataset.get('title', '')).lower()]
# 정렬
sort_by = sort_by.lower()
if sort_by == "rising_rate":
datasets.sort(key=lambda x: calculate_rising_rate(x.get('createdAt', ''), 0), reverse=True)
elif sort_by == "popularity":
datasets.sort(key=lambda x: get_popularity_grade(
int(str(x.get('likes', '0')).replace(',', '')),
calculate_rising_rate(x.get('createdAt', ''), 0))[1],
reverse=True)
progress(0.1, desc="Creating gallery...")
html_content = """
"""
for idx, dataset in enumerate(datasets):
html_content += get_card(dataset, idx, "dataset")
progress((0.1 + 0.9 * idx/len(datasets)), desc=f"Loading dataset {idx+1}/{len(datasets)}...")
html_content += "
"
progress(1.0, desc="Complete!")
return html_content, f"Found {len(datasets)} datasets"
except Exception as e:
error_html = f'Error: {str(e)}
'
return error_html, f"Error: {str(e)}"
# 정렬 함수 추가
def sort_items(items, sort_by):
if sort_by == "rank":
return items # 이미 순위대로 정렬되어 있음
elif sort_by == "rising_rate":
return sorted(items, key=lambda x: calculate_rising_rate(x.get('createdAt', ''), 0), reverse=True)
elif sort_by == "popularity":
return sorted(items, key=lambda x: get_popularity_grade(int(str(x.get('likes', '0')).replace(',', '')),
calculate_rising_rate(x.get('createdAt', ''), 0))[1], reverse=True)
return items
# API 호출 함수 수정
def fetch_items(item_type, search_query="", sort_by="rank", limit=1000):
"""아이템 가져오기 (spaces/models/datasets)"""
base_url = f"https://huggingface.co/api/{item_type}"
params = {
'full': 'true',
'limit': limit,
'search': search_query
}
try:
response = requests.get(base_url, params=params)
response.raise_for_status()
items = response.json()
# 검색어로 필터링
if search_query:
items = [item for item in items if search_query.lower() in
(item.get('id', '') + item.get('title', '')).lower()]
# 정렬
items = sort_items(items, sort_by)
return items[:300] # 상위 300개만 반환
except Exception as e:
print(f"Error fetching items: {e}")
return []
def get_space_source(space_id: str) -> dict:
"""스페이스의 소스코드 가져오기"""
try:
headers = {"Authorization": f"Bearer {os.getenv('HF_TOKEN')}"}
files_to_try = [
'app.py',
'index.html',
'app.js',
'main.py',
'streamlit_app.py',
'gradio_ui.py'
]
source = {}
for file in files_to_try:
url = f"https://huggingface.co/spaces/{space_id}/raw/main/{file}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
source[file] = response.text
return source if source else {"app.py": "", "index.html": ""}
except Exception as e:
print(f"Error fetching source for {space_id}: {str(e)}")
return {"app.py": "", "index.html": ""}
def analyze_spaces(progress=gr.Progress()):
"""스페이스 분석 및 HTML 생성"""
try:
url = "https://huggingface.co/api/spaces"
response = requests.get(url, params={'full': 'true', 'limit': 24})
response.raise_for_status()
spaces = response.json()[:24]
html_content = ""
for idx, space in enumerate(spaces):
progress((idx + 1) / 24, desc=f"분석 중... {idx+1}/24")
try:
# 스크린샷 처리
space_url = f"https://huggingface.co/spaces/{space['id']}"
screenshot_base64 = get_cached_screenshot(space_url)
# 텍스트 분석
project_name = space['id'].split('/')[-1]
source = get_space_source(space['id'])
source_code = source["app.py"] or source["index.html"]
prompt = f"""
다음 스페이스를 간단히 설명해주세요:
스페이스 이름: {project_name}
순위: {idx + 1}위
다음 형식으로 작성:
1. 순위와 이름 소개
2. 주요 기능 설명
3. 특징적인 장점
"""
messages = [
{"role": "system", "content": "간단명료한 설명을 제공하는 리포터입니다."},
{"role": "user", "content": prompt}
]
response = hf_client.chat_completion(
messages,
max_tokens=150,
temperature=0.7
)
analysis = response.choices[0].message.content.strip()
# HTML 카드 생성
html_content += f"""
Space #{idx + 1}
{f'

' if screenshot_base64 else ''}
"""
except Exception as e:
print(f"Error processing space {space['id']}: {e}")
html_content += f"""
Space #{idx + 1}
분석을 준비중입니다.
"""
html_content += "
"
return html_content
except Exception as e:
print(f"Analysis error: {e}")
return "분석 중 오류가 발생했습니다.
"
def analyze_top_spaces(progress=gr.Progress()) -> Tuple[str, str]:
"""상위 24개 스페이스 분석"""
try:
progress(0, desc="스페이스 데이터 가져오는 중...")
url = "https://huggingface.co/api/spaces"
response = requests.get(url, params={'full': 'true', 'limit': 24})
response.raise_for_status()
spaces = response.json()[:24]
# 상단 입력 박스와 기본 텍스트를 포함한 HTML 시작
html_content = """
"""
for idx, space in enumerate(spaces):
progress((idx + 1) / 24, desc=f"분석 중... {idx+1}/24")
try:
source = get_space_source(space['id'])
source_code = source["app.py"] or source["index.html"]
# 스페이스 ID에서 사용자명 제거하고 프로젝트명만 추출
project_name = space['id'].split('/')[-1]
prompt = f"""
다음 HuggingFace 스페이스를 유튜브 뉴스 리포트 형식으로 설명해주세요.
시작은 반드시 "오늘의 인기순위 {idx + 1}위인 {project_name}입니다."로 시작하고,
이어서 주요 기능, 특징, 활용방안을 2-3문장으로 자연스럽게 설명해주세요.
전체 길이는 3-4문장으로 제한하고, 설명은 뉴스 리포터처럼 명확하고 전문적으로 해주세요.
소스코드:
```
{source_code[:1500]}
```
"""
messages = [
{"role": "system", "content": "AI 기술 전문 뉴스 리포터입니다."},
{"role": "user", "content": prompt}
]
response = hf_client.chat_completion(
messages,
max_tokens=2000,
temperature=0.7
)
script = response.choices[0].message.content.strip()
html_content += f"""
"""
except Exception as e:
print(f"Error analyzing space {space['id']}: {e}")
html_content += f"""
순위 {idx + 1}위 분석 중 오류가 발생했습니다.
"""
html_content += "
"
return html_content, f"24개 스페이스 분석 완료"
except Exception as e:
error_msg = f"Error: {str(e)}"
return f"{error_msg}
", error_msg
def analyze_single_space(space: dict, source_code: str) -> str:
"""단일 스페이스 분석"""
try:
if not source_code:
return "소스코드를 가져올 수 없습니다."
prompt = f"""
다음 스페이스의 소스코드를 분석해주세요:
```
{source_code[:4000]}
```
다음 항목을 각각 한 줄로 요약해주세요:
1. 개요:
2. 요약:
3. 특징 및 장점:
4. 사용 대상:
5. 사용 방법:
6. 유사 서비스와의 차별점:
"""
messages = [
{"role": "system", "content": "소스코드 분석 전문가입니다."},
{"role": "user", "content": prompt}
]
response = hf_client.chat_completion(
messages,
max_tokens=3900,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
return f"분석 중 오류 발생: {str(e)}"
def create_editable_space_analysis(progress=gr.Progress()) -> List[str]:
"""24개 스페이스 분석 텍스트 생성"""
try:
progress(0, desc="스페이스 데이터 가져오는 중...")
url = "https://huggingface.co/api/spaces"
response = requests.get(url, params={'full': 'true', 'limit': 24})
response.raise_for_status()
spaces = response.json()[:24]
analysis_texts = []
for idx, space in enumerate(spaces):
progress((idx + 1) / 24, desc=f"분석 중... {idx+1}/24")
try:
source = get_space_source(space['id'])
source_code = source["app.py"] or source["index.html"]
# 프로젝트명만 추출
project_name = space['id'].split('/')[-1]
prompt = f"""
다음 HuggingFace 스페이스를 분석하여 뉴스 리포트 형식으로 설명해주세요:
시작은 반드시 "오늘의 인기순위 {idx + 1}위인 {project_name}입니다."로 시작하고,
이어서 주요 기능, 특징, 활용방안을 자연스럽게 설명해주세요.
소스코드:
```
{source_code[:1500]}
```
"""
messages = [
{"role": "system", "content": "AI 기술 전문 뉴스 리포터입니다."},
{"role": "user", "content": prompt}
]
response = hf_client.chat_completion(
messages,
max_tokens=2000,
temperature=0.7
)
analysis_texts.append(response.choices[0].message.content.strip())
except Exception as e:
analysis_texts.append(f"오늘의 인기순위 {idx + 1}위인 {project_name}입니다.")
return analysis_texts
except Exception as e:
return [f"순위 {i+1}위 분석을 준비중입니다." for i in range(24)]
def generate_video(intro_text, analysis_html):
"""비디오 생성"""
try:
# HTML에서 텍스트 추출
soup = BeautifulSoup(analysis_html, 'html.parser')
texts = [intro_text] + [p.text for p in soup.find_all('p')]
temp_dir = tempfile.mkdtemp()
clips = []
# 각 텍스트에 대한 클립 생성
for idx, text in enumerate(texts):
if not text or len(text.strip()) == 0:
continue
# 이미지 생성
img = Image.new('RGB', (800, 600), (0, 0, 0))
draw = ImageDraw.Draw(img)
# 텍스트를 여러 줄로 나누어 그리기
lines = textwrap.wrap(text, width=40)
y = 40
for line in lines:
draw.text((40, y), line, fill=(255, 255, 255))
y += 30
# 음성 생성
tts = gTTS(text=text, lang='ko', slow=False)
audio_path = os.path.join(temp_dir, f"audio_{idx}.mp3")
tts.save(audio_path)
audio_clip = AudioFileClip(audio_path)
# 클립 생성
video_clip = ImageClip(np.array(img))
video_clip = video_clip.set_duration(audio_clip.duration)
video_clip = video_clip.set_audio(audio_clip)
clips.append(video_clip)
# 최종 영상 생성
final_clip = concatenate_videoclips(clips)
output_path = "output_video.mp4"
final_clip.write_videofile(
output_path,
fps=24,
codec='libx264',
audio_codec='aac'
)
return output_path
except Exception as e:
print(f"Video generation error: {e}")
traceback.print_exc()
return None
def create_interface():
with gr.Blocks(title="HuggingFace Trending Board", css="""
.search-sort-container {
background: linear-gradient(135deg, rgba(255,255,255,0.95), rgba(240,240,255,0.95));
border-radius: 15px;
padding: 20px;
margin: 10px 0;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
overflow: visible;
}
.search-box {
border: 2px solid #e1e1e1;
border-radius: 10px;
padding: 12px;
transition: all 0.3s ease;
background: linear-gradient(135deg, #ffffff, #f8f9ff);
width: 100%;
}
.search-box:focus {
border-color: #7b61ff;
box-shadow: 0 0 0 2px rgba(123,97,255,0.2);
background: linear-gradient(135deg, #ffffff, #f0f3ff);
}
.refresh-btn {
background: linear-gradient(135deg, #7b61ff, #6366f1);
color: white;
border: none;
padding: 10px 20px;
border-radius: 10px;
cursor: pointer;
transition: all 0.3s ease;
width: 120px;
height: 80px !important;
display: flex;
align-items: center;
justify-content: center;
margin-left: auto;
font-size: 1.2em !important;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
.refresh-btn:hover {
transform: translateY(-2px);
box-shadow: 0 6px 12px rgba(0,0,0,0.2);
background: linear-gradient(135deg, #8b71ff, #7376f1);
}
""") as interface:
gr.Markdown("""
# 🤗 HuggingFace Trending 24 NEWS
HuggingFace Trending Spaces Top 24 NEWS
""")
with gr.Tabs() as tabs:
# Spaces 탭
with gr.Tab("🎯 Trending Spaces"):
with gr.Row(elem_classes="search-sort-container"):
with gr.Column(scale=2):
spaces_search = gr.Textbox(
label="🔍 Search Spaces",
placeholder="Enter keywords to search...",
elem_classes="search-box"
)
with gr.Column(scale=2):
spaces_sort = gr.Radio(
choices=["rank", "rising_rate", "popularity"],
value="rank",
label="Sort by",
interactive=True
)
with gr.Column(scale=1):
spaces_refresh_btn = gr.Button(
"🔄 Refresh",
variant="primary",
elem_classes="refresh-btn"
)
spaces_gallery = gr.HTML()
spaces_status = gr.Markdown("Loading...")
# Models 탭
with gr.Tab("🤖 Trending Models"):
with gr.Row(elem_classes="search-sort-container"):
with gr.Column(scale=2):
models_search = gr.Textbox(
label="🔍 Search Models",
placeholder="Enter keywords to search...",
elem_classes="search-box"
)
with gr.Column(scale=2):
models_sort = gr.Radio(
choices=["rank", "rising_rate", "popularity"],
value="rank",
label="Sort by",
interactive=True
)
with gr.Column(scale=1):
models_refresh_btn = gr.Button(
"🔄 Refresh",
variant="primary",
elem_classes="refresh-btn"
)
models_gallery = gr.HTML()
models_status = gr.Markdown("Loading...")
# Datasets 탭
with gr.Tab("📊 Trending Datasets"):
with gr.Row(elem_classes="search-sort-container"):
with gr.Column(scale=2):
datasets_search = gr.Textbox(
label="🔍 Search Datasets",
placeholder="Enter keywords to search...",
elem_classes="search-box"
)
with gr.Column(scale=2):
datasets_sort = gr.Radio(
choices=["rank", "rising_rate", "popularity"],
value="rank",
label="Sort by",
interactive=True
)
with gr.Column(scale=1):
datasets_refresh_btn = gr.Button(
"🔄 Refresh",
variant="primary",
elem_classes="refresh-btn"
)
datasets_gallery = gr.HTML()
datasets_status = gr.Markdown("Loading...")
with gr.Tab("🔍 Top 24 Spaces Analysis"):
with gr.Row():
analysis_refresh_btn = gr.Button(
"🔄 Analyze All 24 Spaces",
variant="primary"
)
# 인트로 섹션
with gr.Row():
with gr.Column(scale=3):
intro_text = gr.Textbox(
value="안녕하세요. 매일 글로벌 최신 AI 인기 트렌드 서비스를 알아보는 '데일리 AI 트렌딩' 뉴스입니다.",
label="인트로 텍스트",
lines=4
)
# 분석 결과 컨테이너들
analysis_boxes = []
space_images = []
# Analysis 탭에서 이미지 컴포넌트 생성 부분 수정
for i in range(24):
with gr.Row():
with gr.Column(scale=3):
text_box = gr.Textbox(
label=f"Space #{i+1}",
lines=3,
interactive=True
)
analysis_boxes.append(text_box)
with gr.Column(scale=1):
img = gr.Image(
label=f"Screenshot #{i+1}",
type="filepath",
interactive=True, # 이미지 교체 가능하도록 설정
height=200, # 이미지 높이 설정
sources=["upload", "clipboard"] # 업로드와 클립보드 붙여넣기 허용
)
space_images.append(img)
# 비디오 생성 섹션
with gr.Row():
generate_btn = gr.Button(
"🎬 Generate Video",
variant="primary"
)
video_output = gr.Video(label="Generated Video")
# 이미지 업데이트 버튼
with gr.Row():
update_images_btn = gr.Button(
"🔄 Update Screenshots",
variant="secondary"
)
# 이미지 교체 이벤트 핸들러 추가
for img in space_images:
img.change(
fn=lambda x: x, # 간단한 패스스루 함수
inputs=[img],
outputs=[img]
)
# Event handlers
spaces_refresh_btn.click(
fn=get_trending_spaces,
inputs=[spaces_search, spaces_sort],
outputs=[spaces_gallery, spaces_status]
)
models_refresh_btn.click(
fn=get_models,
inputs=[models_search, models_sort],
outputs=[models_gallery, models_status]
)
datasets_refresh_btn.click(
fn=get_datasets,
inputs=[datasets_search, datasets_sort],
outputs=[datasets_gallery, datasets_status]
)
# Analysis 탭의 이벤트 핸들러들
analysis_refresh_btn.click(
fn=on_analyze,
outputs=analysis_boxes + space_images
)
generate_btn.click(
fn=on_generate_video,
inputs=[intro_text] + analysis_boxes,
outputs=video_output
)
update_images_btn.click(
fn=update_screenshots,
outputs=space_images
)
# 검색어 변경 시 자동 새로고침
spaces_search.change(
fn=get_trending_spaces,
inputs=[spaces_search, spaces_sort],
outputs=[spaces_gallery, spaces_status]
)
models_search.change(
fn=get_models,
inputs=[models_search, models_sort],
outputs=[models_gallery, models_status]
)
datasets_search.change(
fn=get_datasets,
inputs=[datasets_search, datasets_sort],
outputs=[datasets_gallery, datasets_status]
)
# 정렬 방식 변경 시 자동 새로고침
spaces_sort.change(
fn=get_trending_spaces,
inputs=[spaces_search, spaces_sort],
outputs=[spaces_gallery, spaces_status]
)
models_sort.change(
fn=get_models,
inputs=[models_search, models_sort],
outputs=[models_gallery, models_status]
)
datasets_sort.change(
fn=get_datasets,
inputs=[datasets_search, datasets_sort],
outputs=[datasets_gallery, datasets_status]
)
# 초기 데이터 로드
interface.load(
fn=get_trending_spaces,
inputs=[spaces_search, spaces_sort],
outputs=[spaces_gallery, spaces_status]
)
interface.load(
fn=get_models,
inputs=[models_search, models_sort],
outputs=[models_gallery, models_status]
)
interface.load(
fn=get_datasets,
inputs=[datasets_search, datasets_sort],
outputs=[datasets_gallery, datasets_status]
)
return interface
def on_analyze(progress=gr.Progress()):
"""분석 실행 및 텍스트박스/이미지 업데이트"""
try:
url = "https://huggingface.co/api/spaces"
response = requests.get(url, params={'full': 'true', 'limit': 24})
response.raise_for_status()
spaces = response.json()[:24]
text_results = []
image_results = []
temp_dir = Path("temp_screenshots")
temp_dir.mkdir(exist_ok=True)
for idx, space in enumerate(spaces):
progress((idx + 1) / 24, desc=f"분석 중... {idx+1}/24")
try:
# 스크린샷 처리
space_url = f"https://huggingface.co/spaces/{space['id']}"
screenshot_path = temp_dir / f"space_{idx:03d}.jpg"
# 이미지 저장
screenshot_base64 = get_cached_screenshot(space_url)
if screenshot_base64:
try:
img_data = base64.b64decode(screenshot_base64)
with open(screenshot_path, 'wb') as f:
f.write(img_data)
image_results.append(str(screenshot_path))
except Exception as e:
print(f"Screenshot save error: {e}")
image_results.append(None)
else:
image_results.append(None)
# 소스코드 가져오기
source = get_space_source(space['id'])
source_code = ""
# 소스코드 우선순위 설정
if source.get("app.py"):
source_code = source["app.py"]
elif source.get("streamlit_app.py"):
source_code = source["streamlit_app.py"]
elif source.get("gradio_ui.py"):
source_code = source["gradio_ui.py"]
elif source.get("main.py"):
source_code = source["main.py"]
elif source.get("index.html"):
source_code = source["index.html"]
if not source_code.strip():
text_results.append(f"오늘의 인기순위 {idx + 1}위 스페이스입니다. 소스코드를 확인할 수 없어 자세한 분석은 어렵습니다.")
continue
# 텍스트 분석
project_name = space['id'].split('/')[-1]
# 소스코드 분석을 위한 프롬프트
prompt = f"""
다음 HuggingFace 스페이스의 소스코드를 자세히 분석해주세요:
스페이스 정보:
- 이름: {project_name}
- 순위: {idx + 1}위
- URL: {space_url}
소스코드:
```python
{source_code[:2000]}
```
다음 내용을 분석하여 설명해주세요:
1. 이 스페이스가 사용하는 주요 라이브러리와 프레임워크
2. 구현된 핵심 기능과 작동 방식
3. 기술적 특징과 장점
응답 형식:
- 뉴스 리포터처럼 자연스럽게 설명
- 기술적 내용을 포함하되 이해하기 쉽게 설명
- 실제 소스코드에서 확인된 내용만 포함
-'인녕하세요'와 같은 인사와 '자기소개'는 포함하지말아라.
- 최대 7문장 이내로 간단명료하게 작성(반드시 완성된 문장이 아닌경우 출력하지말것)
"""
messages = [
{
"role": "system",
"content": "저는 소스코드를 분석하여 기술적 내용을 10세 아동도 이해할 수 있게 쉽게 설명하는 AI 기술 전문 리포터입니다."
},
{"role": "user", "content": prompt}
]
response = hf_client.chat_completion(
messages,
max_tokens=1000,
temperature=0.3
)
analysis = response.choices[0].message.content.strip()
# 분석 결과가 실제 소스코드 기반인지 확인
if "라이브러리" in analysis or "프레임워크" in analysis or "함수" in analysis:
text_results.append(analysis)
else:
# 재시도
prompt += "\n\n주의: 반드시 소스코드에서 확인된 실제 기술적 내용만 포함하여 설명해주세요."
messages[1]["content"] = prompt
response = hf_client.chat_completion(
messages,
max_tokens=300,
temperature=0.2
)
text_results.append(response.choices[0].message.content.strip())
except Exception as e:
print(f"Error processing space {space['id']}: {e}")
text_results.append(f"현재 {idx + 1}위 스페이스에 대한 분석을 준비중입니다.")
if len(image_results) <= idx:
image_results.append(None)
# 결과 개수 맞추기
while len(text_results) < 24:
text_results.append("분석을 준비중입니다.")
while len(image_results) < 24:
image_results.append(None)
return text_results + image_results
except Exception as e:
print(f"Analysis error: {e}")
return ["분석을 준비중입니다."] * 24 + [None] * 24
def on_generate_video(intro_text, *texts):
"""영상 생성"""
try:
temp_dir = tempfile.mkdtemp()
clips = []
# 인트로 처리 - intro.png 사용
try:
intro_img = Image.open("intro.png")
# 크기가 다른 경우 리사이즈
if intro_img.size != (800, 600):
intro_img = intro_img.resize((800, 600), Image.Resampling.LANCZOS)
except Exception as e:
print(f"Error loading intro image: {e}")
# intro.png가 없는 경우 검은 배경에 텍스트
intro_img = Image.new('RGB', (800, 600), (0, 0, 0))
draw = ImageDraw.Draw(intro_img)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
except:
font = ImageFont.load_default()
lines = textwrap.wrap(intro_text, width=40)
y = 40
for line in lines:
draw.text((40, y), line, fill=(255, 255, 255), font=font)
y += 30
# 인트로 클립 생성
intro_audio = gTTS(text=intro_text, lang='ko', slow=False)
intro_audio_path = os.path.join(temp_dir, "intro.mp3")
intro_audio.save(intro_audio_path)
intro_audio_clip = AudioFileClip(intro_audio_path)
intro_clip = ImageClip(np.array(intro_img))
intro_clip = intro_clip.set_duration(intro_audio_clip.duration)
intro_clip = intro_clip.set_audio(intro_audio_clip)
clips.append(intro_clip)
# 각 스페이스 처리
for idx, text in enumerate(texts):
if not text or len(str(text).strip()) == 0:
continue
# 스크린샷 이미지 가져오기
screenshot_path = f"temp_screenshots/space_{idx:03d}.jpg"
if os.path.exists(screenshot_path):
img = Image.open(screenshot_path)
# 이미지 크기 조정
img = img.resize((800, 600), Image.Resampling.LANCZOS)
else:
# 스크린샷이 없는 경우 검은 배경
img = Image.new('RGB', (800, 600), (0, 0, 0))
# 음성 생성
tts = gTTS(text=str(text), lang='ko', slow=False)
audio_path = os.path.join(temp_dir, f"audio_{idx}.mp3")
tts.save(audio_path)
audio_clip = AudioFileClip(audio_path)
# 클립 생성 (이미지 사용)
video_clip = ImageClip(np.array(img))
video_clip = video_clip.set_duration(audio_clip.duration)
video_clip = video_clip.set_audio(audio_clip)
clips.append(video_clip)
# 최종 영상 생성
final_clip = concatenate_videoclips(clips)
output_path = "output_video.mp4"
final_clip.write_videofile(
output_path,
fps=24,
codec='libx264',
audio_codec='aac'
)
return output_path
except Exception as e:
print(f"Video generation error: {e}")
traceback.print_exc()
return None
finally:
try:
if 'temp_dir' in locals():
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Cleanup error: {e}")
def update_screenshots():
"""스크린샷 일괄 업데이트"""
try:
url = "https://huggingface.co/api/spaces"
response = requests.get(url, params={'full': 'true', 'limit': 24})
spaces = response.json()[:24]
image_paths = []
temp_dir = Path("temp_screenshots")
temp_dir.mkdir(exist_ok=True)
for idx, space in enumerate(spaces):
try:
space_url = f"https://huggingface.co/spaces/{space['id']}"
time.sleep(5) # 충분한 로딩 시간
screenshot_base64 = get_cached_screenshot(space_url)
screenshot_path = temp_dir / f"space_{idx:03d}.jpg"
if screenshot_base64:
try:
img_data = base64.b64decode(screenshot_base64)
# 이미지 저장 및 최적화
with open(screenshot_path, 'wb') as f:
f.write(img_data)
# 이미지 크기 최적화
with Image.open(screenshot_path) as img:
img = img.resize((800, 600), Image.Resampling.LANCZOS)
img.save(screenshot_path, format="JPEG", quality=85, optimize=True)
image_paths.append(str(screenshot_path))
except Exception as e:
print(f"Screenshot save error: {e}")
image_paths.append(None)
else:
image_paths.append(None)
except Exception as e:
print(f"Error capturing screenshot for space {idx+1}: {e}")
image_paths.append(None)
return image_paths
except Exception as e:
print(f"Update screenshots error: {e}")
return [None] * 24
if __name__ == "__main__":
try:
CACHE_DIR.mkdir(exist_ok=True)
cleanup_cache()
demo = create_interface()
demo.launch(
share=True,
inbrowser=True,
show_api=False,
max_threads=4
)
except Exception as e:
print(f"Application error: {e}")