MarkNawar's picture
Upload folder using huggingface_hub
a6bfba7 verified
import httpx
import time
from typing import Dict, List, Optional, Any, Union
import json
from dataclasses import asdict
import asyncio
from datetime import datetime
from itertools import islice
from .types import (
StackOverflowQuestion,
StackOverflowAnswer,
StackOverflowComment,
SearchResult,
SearchResultComments
)
from .env import (
MAX_REQUEST_PER_WINDOW,
RATE_LIMIT_WINDOW_MS,
RETRY_AFTER_MS
)
STACKOVERFLOW_API = "https://api.stackexchange.com/2.3"
BATCH_SIZE = 100 # API limit for batch requests
class StackExchangeAPI:
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
self.request_timestamps = []
self.client = httpx.AsyncClient(timeout=30.0)
async def close(self):
await self.client.aclose()
def _check_rate_limit(self) -> bool:
now = time.time() * 1000
self.request_timestamps = [
ts for ts in self.request_timestamps
if now - ts < RATE_LIMIT_WINDOW_MS
]
if len(self.request_timestamps) >= MAX_REQUEST_PER_WINDOW:
return False
self.request_timestamps.append(now)
return True
async def _with_rate_limit(self, func, *args, retries=3, attempts=10, **kwargs):
"""Execute a function with rate limiting.
Args:
func (_type_): Function to execute with rate limiting
retries (int, optional): Number of retries after API rate limit error. Defaults to 3.
attempts (int, optional): Number of times to retry after hitting local rate limit. Defaults to 10.
Raises:
Exception: When maximum rate limiting attempts are exceeded
e: Original error if retries are exhausted
Returns:
Any: Result from the function
"""
if retries is None:
retries = 3
if attempts <= 0:
raise Exception("Maximum rate limiting attempts exceeded")
if not self._check_rate_limit():
print("Rate limit exceeded, waiting before retry")
await asyncio.sleep(RETRY_AFTER_MS / 1000)
return await self._with_rate_limit(func, *args, retries=retries, attempts=attempts-1, **kwargs)
try:
return await func(*args, **kwargs)
except httpx.HTTPStatusError as e:
if retries > 0 and e.response.status_code == 429:
print("Rate limit hit (429), retrying after delay...")
await asyncio.sleep(RETRY_AFTER_MS/1000)
return await self._with_rate_limit(func, *args, retries=retries-1, attempts=attempts, **kwargs)
raise e
async def fetch_batch_answers(self, question_ids: List[int]) -> Dict[int, List[StackOverflowAnswer]]:
"""Fetch answers for multiple questions in a single API call.
Args:
question_ids (List[int]): List of Stack Overflow question IDs
Returns:
Dict[int, List[StackOverflowAnswer]]: Dictionary mapping question IDs to their answers
"""
if not question_ids:
return {}
result = {}
# Process in batches of BATCH_SIZE (API limit)
for i in range(0, len(question_ids), BATCH_SIZE):
batch = question_ids[i:i+BATCH_SIZE]
ids_string = ";".join(str(qid) for qid in batch)
params = {
"site": "stackoverflow",
"sort": "votes",
"order": "desc",
"filter": "withbody",
"pagesize": "100"
}
if self.api_key:
params["key"] = self.api_key
async def _do_fetch():
response = await self.client.get(
f"{STACKOVERFLOW_API}/questions/{ids_string}/answers",
params=params
)
response.raise_for_status()
return response.json()
data = await self._with_rate_limit(_do_fetch)
for answer_data in data.get("items", []):
question_id = answer_data.get("question_id")
if question_id not in result:
result[question_id] = []
answer = StackOverflowAnswer(
answer_id=answer_data.get("answer_id"),
question_id=question_id,
score=answer_data.get("score", 0),
is_accepted=answer_data.get("is_accepted", False),
body=answer_data.get("body", ""),
creation_date=answer_data.get("creation_date", 0),
last_activity_date=answer_data.get("last_activity_date", 0),
link=answer_data.get("link", ""),
owner=answer_data.get("owner")
)
result[question_id].append(answer)
return result
async def fetch_batch_comments(self, post_ids: List[int]) -> Dict[int, List[StackOverflowComment]]:
"""Fetch comments for multiple posts in a single API call.
Args:
post_ids (List[int]): List of Stack Overflow post IDs (questions or answers)
Returns:
Dict[int, List[StackOverflowComment]]: Dictionary mapping post IDs to their comments
"""
if not post_ids:
return {}
result = {}
# Process in batches of BATCH_SIZE (API limit)
for i in range(0, len(post_ids), BATCH_SIZE):
batch = post_ids[i:i+BATCH_SIZE]
ids_string = ";".join(str(pid) for pid in batch)
params = {
"site": "stackoverflow",
"sort": "votes",
"order": "desc",
"filter": "withbody",
"pagesize": "100"
}
if self.api_key:
params["key"] = self.api_key
async def _do_fetch():
response = await self.client.get(
f"{STACKOVERFLOW_API}/posts/{ids_string}/comments",
params=params
)
response.raise_for_status()
return response.json()
data = await self._with_rate_limit(_do_fetch)
for comment_data in data.get("items", []):
post_id = comment_data.get("post_id")
if post_id not in result:
result[post_id] = []
comment = StackOverflowComment(
comment_id=comment_data.get("comment_id"),
post_id=post_id,
score=comment_data.get("score", 0),
body=comment_data.get("body", ""),
creation_date=comment_data.get("creation_date", 0),
owner=comment_data.get("owner")
)
result[post_id].append(comment)
return result
async def advanced_search(
self,
query: Optional[str] = None,
tags: Optional[List[str]] = None,
excluded_tags: Optional[List[str]] = None,
min_score: Optional[int] = None,
title: Optional[str] = None,
body: Optional[str] = None,
answers: Optional[int] = None,
has_accepted_answer: Optional[bool] = None,
views: Optional[int] = None,
url: Optional[str] = None,
user_id: Optional[int] = None,
is_closed: Optional[bool] = None,
is_wiki: Optional[bool] = None,
is_migrated: Optional[bool] = None,
has_notice: Optional[bool] = None,
from_date: Optional[datetime] = None,
to_date: Optional[datetime] = None,
sort_by: Optional[str] = "votes",
limit: Optional[int] = 5,
include_comments: bool = False,
retries: Optional[int] = 3
) -> List[SearchResult]:
"""Advanced search for Stack Overflow questions with many filter options."""
params = {
"site": "stackoverflow",
"sort": sort_by,
"order": "desc",
"filter": "withbody"
}
if query:
params["q"] = query
if tags:
params["tagged"] = ";".join(tags)
if excluded_tags:
params["nottagged"] = ";".join(excluded_tags)
if title:
params["title"] = title
if body:
params["body"] = body
if answers is not None:
params["answers"] = str(answers)
if has_accepted_answer is not None:
params["accepted"] = "true" if has_accepted_answer else "false"
if views is not None:
params["views"] = str(views)
if url:
params["url"] = url
if user_id is not None:
params["user"] = str(user_id)
if is_closed is not None:
params["closed"] = "true" if is_closed else "false"
if is_wiki is not None:
params["wiki"] = "true" if is_wiki else "false"
if is_migrated is not None:
params["migrated"] = "true" if is_migrated else "false"
if has_notice is not None:
params["notice"] = "true" if has_notice else "false"
if from_date:
params["fromdate"] = str(int(from_date.timestamp()))
if to_date:
params["todate"] = str(int(to_date.timestamp()))
if limit:
params["pagesize"] = str(limit)
if self.api_key:
params["key"] = self.api_key
async def _do_search():
response = await self.client.get(f"{STACKOVERFLOW_API}/search/advanced", params=params)
response.raise_for_status()
return response.json()
data = await self._with_rate_limit(_do_search, retries=retries)
questions = []
question_ids = []
for question_data in data.get("items", []):
if min_score is not None and question_data.get("score", 0) < min_score:
continue
question = StackOverflowQuestion(
question_id=question_data.get("question_id"),
title=question_data.get("title", ""),
body=question_data.get("body", ""),
score=question_data.get("score", 0),
answer_count=question_data.get("answer_count", 0),
is_answered=question_data.get("is_answered", False),
accepted_answer_id=question_data.get("accepted_answer_id"),
creation_date=question_data.get("creation_date", 0),
last_activity_date=question_data.get("last_activity_date", 0),
view_count=question_data.get("view_count", 0),
tags=question_data.get("tags", []),
link=question_data.get("link", ""),
is_closed=question_data.get("closed_date") is not None,
owner=question_data.get("owner")
)
questions.append(question)
question_ids.append(question.question_id)
answers_by_question = await self.fetch_batch_answers(question_ids)
results = []
if include_comments:
all_post_ids = question_ids.copy()
for qid, answers in answers_by_question.items():
all_post_ids.extend([a.answer_id for a in answers])
# Batch fetch all comments
all_comments = await self.fetch_batch_comments(all_post_ids)
# Construct results with comments
for question in questions:
question_answers = answers_by_question.get(question.question_id, [])
# Create comment structure
question_comments = all_comments.get(question.question_id, [])
answer_comments = {}
for answer in question_answers:
answer_comments[answer.answer_id] = all_comments.get(answer.answer_id, [])
comments = SearchResultComments(
question=question_comments,
answers=answer_comments
)
results.append(SearchResult(
question=question,
answers=question_answers,
comments=comments
))
else:
for question in questions:
question_answers = answers_by_question.get(question.question_id, [])
results.append(SearchResult(
question=question,
answers=question_answers,
comments=None
))
return results
async def search_by_query(
self,
query: str,
tags: Optional[List[str]] = None,
excluded_tags: Optional[List[str]] = None,
min_score: Optional[int] = None,
title: Optional[str] = None,
body: Optional[str] = None,
has_accepted_answer: Optional[bool] = None,
answers: Optional[int] = None,
sort_by: Optional[str] = "votes",
limit: Optional[int] = 5,
include_comments: bool = False,
retries: Optional[int] = 3
) -> List[SearchResult]:
"""Search Stack Overflow for questions matching a query with additional filters."""
return await self.advanced_search(
query=query,
tags=tags,
excluded_tags=excluded_tags,
min_score=min_score,
title=title,
body=body,
has_accepted_answer=has_accepted_answer,
answers=answers,
sort_by=sort_by,
limit=limit,
include_comments=include_comments,
retries=retries
)
async def fetch_answers(self, question_id: int) -> List[StackOverflowAnswer]:
"""Fetch answers for a specific question.
Note: This is kept for backward compatibility, but new code should
use fetch_batch_answers for better performance.
"""
answers_dict = await self.fetch_batch_answers([question_id])
return answers_dict.get(question_id, [])
async def fetch_comments(self, post_id: int) -> List[StackOverflowComment]:
"""Fetch comments for a specific post.
Note: This is kept for backward compatibility, but new code should
use fetch_batch_comments for better performance.
"""
comments_dict = await self.fetch_batch_comments([post_id])
return comments_dict.get(post_id, [])
async def get_question(self, question_id: int, include_comments: bool = True) -> SearchResult:
"""Get a specific question by ID."""
params = {
"site": "stackoverflow",
"filter": "withbody"
}
if self.api_key:
params["key"] = self.api_key
async def _do_fetch():
response = await self.client.get(
f"{STACKOVERFLOW_API}/questions/{question_id}",
params=params
)
response.raise_for_status()
return response.json()
data = await self._with_rate_limit(_do_fetch)
if not data.get("items"):
raise ValueError(f"Question with ID {question_id} not found")
question_data = data["items"][0]
question = StackOverflowQuestion(
question_id=question_data.get("question_id"),
title=question_data.get("title", ""),
body=question_data.get("body", ""),
score=question_data.get("score", 0),
answer_count=question_data.get("answer_count", 0),
is_answered=question_data.get("is_answered", False),
accepted_answer_id=question_data.get("accepted_answer_id"),
creation_date=question_data.get("creation_date", 0),
last_activity_date=question_data.get("last_activity_date", 0),
view_count=question_data.get("view_count", 0),
tags=question_data.get("tags", []),
link=question_data.get("link", ""),
is_closed=question_data.get("closed_date") is not None,
owner=question_data.get("owner")
)
answers = await self.fetch_answers(question.question_id)
comments = None
if include_comments:
post_ids = [question.question_id] + [answer.answer_id for answer in answers]
all_comments = await self.fetch_batch_comments(post_ids)
question_comments = all_comments.get(question.question_id, [])
answer_comments = {}
for answer in answers:
answer_comments[answer.answer_id] = all_comments.get(answer.answer_id, [])
comments = SearchResultComments(
question=question_comments,
answers=answer_comments
)
return SearchResult(
question=question,
answers=answers,
comments=comments
)