|
|
|
""" |
|
Stack Exchange API Query Tool |
|
|
|
This script allows you to directly call the Stack Exchange API with various parameters |
|
and see the results. It's useful for testing queries and seeing the raw results. |
|
|
|
Usage: |
|
python api_query.py search "python pandas dataframe" --tags python,pandas --min-score 10 |
|
python api_query.py question 12345 |
|
python api_query.py error "TypeError: cannot use a string pattern" --language python |
|
""" |
|
|
|
import os |
|
import sys |
|
import json |
|
import asyncio |
|
import argparse |
|
from dotenv import load_dotenv |
|
|
|
from stackoverflow_mcp.api import StackExchangeAPI |
|
from stackoverflow_mcp.formatter import format_response |
|
|
|
|
|
def setup_environment(): |
|
"""Load environment variables from .env file""" |
|
if os.path.exists(".env"): |
|
load_dotenv(".env") |
|
elif os.path.exists(".env.test"): |
|
load_dotenv(".env.test") |
|
else: |
|
print("Warning: No .env or .env.test file found. Using default settings.") |
|
|
|
|
|
async def run_search_query(api, args): |
|
"""Run a search query with the given arguments""" |
|
tags = args.tags.split(',') if args.tags else None |
|
|
|
excluded_tags = args.excluded_tags.split(',') if args.excluded_tags else None |
|
|
|
print(f"\nRunning search query: '{args.query}'") |
|
if args.title: |
|
print(f"Running search with title containing: '{args.title}'") |
|
if args.body: |
|
print(f"Running search with body containing: '{args.body}'") |
|
print(f"Tags: {tags}") |
|
print(f"Excluded tags: {excluded_tags}") |
|
print(f"Min score: {args.min_score}") |
|
print(f"Limit: {args.limit}") |
|
print(f"Include comments: {args.comments}\n") |
|
|
|
try: |
|
results = await api.search_by_query( |
|
query=args.query, |
|
tags=tags, |
|
title=args.title, |
|
body=args.body, |
|
excluded_tags=excluded_tags, |
|
min_score=args.min_score, |
|
limit=args.limit, |
|
include_comments=args.comments |
|
) |
|
|
|
print(f"Found {len(results)} results") |
|
|
|
if args.raw: |
|
for i, result in enumerate(results): |
|
print(f"\n--- Result {i+1} ---") |
|
print(f"Question ID: {result.question.question_id}") |
|
print(f"Title: {result.question.title}") |
|
print(f"Score: {result.question.score}") |
|
print(f"Tags: {result.question.tags}") |
|
print(f"Link: {result.question.link}") |
|
print(f"Answers: {len(result.answers)}") |
|
if result.comments: |
|
print(f"Question comments: {len(result.comments.question)}") |
|
else: |
|
formatted = format_response(results, args.format) |
|
print(formatted) |
|
|
|
except Exception as e: |
|
print(f"Error during search: {str(e)}") |
|
|
|
|
|
async def run_question_query(api, args): |
|
"""Get a specific question by ID""" |
|
try: |
|
print(f"\nFetching question ID: {args.question_id}") |
|
print(f"Include comments: {args.comments}\n") |
|
|
|
result = await api.get_question( |
|
question_id=args.question_id, |
|
include_comments=args.comments |
|
) |
|
|
|
if args.raw: |
|
print(f"Question ID: {result.question.question_id}") |
|
print(f"Title: {result.question.title}") |
|
print(f"Score: {result.question.score}") |
|
print(f"Tags: {result.question.tags}") |
|
print(f"Link: {result.question.link}") |
|
print(f"Answers: {len(result.answers)}") |
|
if result.comments: |
|
print(f"Question comments: {len(result.comments.question)}") |
|
else: |
|
formatted = format_response([result], args.format) |
|
print(formatted) |
|
|
|
except Exception as e: |
|
print(f"Error fetching question: {str(e)}") |
|
|
|
|
|
async def run_error_query(api, args): |
|
"""Search for an error message with optional language filter""" |
|
technologies = args.technologies.split(',') if args.technologies else None |
|
|
|
try: |
|
print(f"\nSearching for error: '{args.error}'") |
|
print(f"Language: {args.language}") |
|
print(f"Technologies: {technologies}") |
|
if args.title: |
|
print(f"Title containing: '{args.title}'") |
|
if args.body: |
|
print(f"Body containing: '{args.body}'") |
|
print(f"Min score: {args.min_score}") |
|
print(f"Limit: {args.limit}") |
|
print(f"Include comments: {args.comments}\n") |
|
|
|
tags = [] |
|
if args.language: |
|
tags.append(args.language.lower()) |
|
if technologies: |
|
tags.extend([t.lower() for t in technologies]) |
|
|
|
results = await api.search_by_query( |
|
query=args.error, |
|
title=args.title, |
|
body=args.body, |
|
tags=tags if tags else None, |
|
min_score=args.min_score, |
|
limit=args.limit, |
|
include_comments=args.comments |
|
) |
|
|
|
print(f"Found {len(results)} results") |
|
|
|
if args.raw: |
|
for i, result in enumerate(results): |
|
print(f"\n--- Result {i+1} ---") |
|
print(f"Question ID: {result.question.question_id}") |
|
print(f"Title: {result.question.title}") |
|
print(f"Score: {result.question.score}") |
|
print(f"Tags: {result.question.tags}") |
|
print(f"Link: {result.question.link}") |
|
print(f"Answers: {len(result.answers)}") |
|
if result.comments: |
|
print(f"Question comments: {len(result.comments.question)}") |
|
else: |
|
formatted = format_response(results, args.format) |
|
print(formatted) |
|
|
|
except Exception as e: |
|
print(f"Error searching for error: {str(e)}") |
|
|
|
|
|
async def main(): |
|
"""Parse arguments and run the appropriate query""" |
|
parser = argparse.ArgumentParser(description="Stack Exchange API Query Tool") |
|
subparsers = parser.add_subparsers(dest="command", help="Command to run") |
|
|
|
|
|
search_parser = subparsers.add_parser("search", help="Search Stack Overflow") |
|
search_parser.add_argument("query", help="Search query") |
|
search_parser.add_argument("--tags", help="Comma-separated list of tags") |
|
search_parser.add_argument("--title", help="Word(s) that must appear in the question title") |
|
search_parser.add_argument("--body", help="Word(s) that must appear in the body of the question") |
|
search_parser.add_argument("--excluded-tags", help="Comma-separated list of tags to exclude") |
|
search_parser.add_argument("--min-score", type=int, default=0, help="Minimum score") |
|
search_parser.add_argument("--limit", type=int, default=5, help="Maximum number of results") |
|
search_parser.add_argument("--comments", action="store_true", help="Include comments") |
|
search_parser.add_argument("--format", choices=["markdown", "json"], default="markdown", help="Output format") |
|
search_parser.add_argument("--raw", action="store_true", help="Print raw data structure") |
|
|
|
|
|
question_parser = subparsers.add_parser("question", help="Get a specific question") |
|
question_parser.add_argument("question_id", type=int, help="Question ID") |
|
question_parser.add_argument("--comments", action="store_true", help="Include comments") |
|
question_parser.add_argument("--format", choices=["markdown", "json"], default="markdown", help="Output format") |
|
question_parser.add_argument("--raw", action="store_true", help="Print raw data structure") |
|
|
|
|
|
error_parser = subparsers.add_parser("error", help="Search for an error message") |
|
error_parser.add_argument("error", help="Error message") |
|
error_parser.add_argument("--title", help="Word(s) that must appear in the question title") |
|
error_parser.add_argument("--body", help="Word(s) that must appear in the body of the question") |
|
error_parser.add_argument("--language", help="Programming language") |
|
error_parser.add_argument("--technologies", help="Comma-separated list of technologies") |
|
error_parser.add_argument("--min-score", type=int, default=0, help="Minimum score") |
|
error_parser.add_argument("--limit", type=int, default=5, help="Maximum number of results") |
|
error_parser.add_argument("--comments", action="store_true", help="Include comments") |
|
error_parser.add_argument("--format", choices=["markdown", "json"], default="markdown", help="Output format") |
|
error_parser.add_argument("--raw", action="store_true", help="Print raw data structure") |
|
|
|
args = parser.parse_args() |
|
|
|
if not args.command: |
|
parser.print_help() |
|
return 1 |
|
|
|
setup_environment() |
|
|
|
api_key = os.getenv("STACK_EXCHANGE_API_KEY") |
|
|
|
if not api_key: |
|
print("Warning: No API key found. Requests may be rate limited.") |
|
|
|
api = StackExchangeAPI(api_key=api_key) |
|
|
|
try: |
|
if args.command == "search": |
|
await run_search_query(api, args) |
|
elif args.command == "question": |
|
await run_question_query(api, args) |
|
elif args.command == "error": |
|
await run_error_query(api, args) |
|
|
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
return 1 |
|
|
|
finally: |
|
await api.close() |
|
|
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
sys.exit(asyncio.run(main())) |