Spaces:
Sleeping
Sleeping
import os | |
import openai | |
import tiktoken | |
import re | |
from gitingest import ingest | |
import json | |
import datetime | |
import logging | |
import sys | |
import time | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class GitHubCompanion: | |
def __init__(self, requesty_api_key=None): | |
"""Initialize the GitHub Companion chatbot""" | |
self.requesty_api_key = requesty_api_key or os.environ.get("REQUESTY_API_KEY") | |
if not self.requesty_api_key: | |
raise ValueError("Requesty API key is required") | |
# Log partial API key for debugging (first and last 5 chars) | |
api_key_preview = f"{self.requesty_api_key[:5]}...{self.requesty_api_key[-5:]}" if self.requesty_api_key else "None" | |
logger.info(f"Initializing with API key: {api_key_preview}") | |
# Updated client initialization with minimal parameters | |
try: | |
self.client = openai.OpenAI( | |
api_key=self.requesty_api_key, | |
base_url="https://router.requesty.ai/v1" | |
) | |
logger.info("OpenAI client initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing OpenAI client: {e}") | |
raise | |
# self.model = "google/gemini-2.5-pro-exp-03-25" | |
self.model = "google/gemini-2.0-flash-thinking-exp-01-21" | |
self.conversation_history = [] | |
self.repo_info = None | |
self.token_count = 0 | |
# Gemini has a limit of 1048576 tokens, but we need to leave room for the conversation | |
self.max_tokens = 800000 # Further reduced to account for conversation history too | |
self.encoding = tiktoken.get_encoding("cl100k_base") # OpenAI's encoding | |
self.max_retries = 3 | |
self.retry_delay = 20 # seconds | |
def count_tokens(self, text): | |
"""Count the number of tokens in a text""" | |
return len(self.encoding.encode(text)) | |
def extract_repo_info(self, github_url): | |
"""Extract repository information using gitingest""" | |
print(f"Extracting information from {github_url}...") | |
try: | |
# Use gitingest to extract repo information | |
summary, tree, content = ingest(github_url) | |
# Check token counts for each component | |
summary_tokens = self.count_tokens(summary) | |
tree_tokens = self.count_tokens(tree) | |
content_tokens = self.count_tokens(content) | |
print(f"Token counts - Summary: {summary_tokens}, Tree: {tree_tokens}, Content: {content_tokens}") | |
# Calculate how much content we can include | |
header = f"SUMMARY:\n{summary}\n\nFILE STRUCTURE:\n{tree}\n\nCONTENT:\n" | |
header_tokens = self.count_tokens(header) | |
# Reserve more space for conversation | |
conversation_buffer = 100000 # Reserve 100K tokens for conversation | |
max_content_tokens = self.max_tokens - header_tokens - conversation_buffer | |
# Truncate content if needed | |
if content_tokens > max_content_tokens: | |
print(f"Warning: Content exceeds available token space. Truncating from {content_tokens} to {max_content_tokens} tokens.") | |
content_token_list = self.encoding.encode(content) | |
truncated_content = self.encoding.decode(content_token_list[:max_content_tokens]) | |
content = truncated_content | |
# Combine all the information | |
repo_info = f"SUMMARY:\n{summary}\n\nFILE STRUCTURE:\n{tree}\n\nCONTENT:\n{content}" | |
# Final token count check | |
token_count = self.count_tokens(repo_info) | |
print(f"Repository information extracted. Token count: {token_count}") | |
# Safety check | |
if token_count > self.max_tokens: | |
print(f"Warning: Repository information still exceeds the token limit. Performing additional truncation.") | |
repo_info_tokens = self.encoding.encode(repo_info) | |
repo_info = self.encoding.decode(repo_info_tokens[:self.max_tokens - conversation_buffer]) | |
token_count = self.count_tokens(repo_info) | |
print(f"Final token count after truncation: {token_count}") | |
self.repo_info = repo_info | |
self.token_count = token_count | |
return True | |
except Exception as e: | |
print(f"Error extracting repository information: {e}") | |
return False | |
def add_to_conversation(self, role, content): | |
"""Add a message to the conversation history""" | |
self.conversation_history.append({"role": role, "content": content}) | |
def create_system_prompt(self): | |
"""Create the system prompt with repository information""" | |
current_date = datetime.datetime.now().strftime("%Y-%m-%d") | |
# Calculate tokens for the system prompt | |
base_prompt = ( | |
f"You are GitHub Navigator, an AI assistant specialized in helping users with GitHub repositories. " | |
f"Today is {current_date}. " | |
f"You have been provided with information about a GitHub repository. " | |
f"Use this information to help the user understand and work with this repository. " | |
f"Be concise, accurate, and helpful. If asked questions about the repository content, " | |
f"refer to the provided information to give accurate answers." | |
) | |
base_prompt_tokens = self.count_tokens(base_prompt) | |
repo_info_tokens = self.count_tokens(self.repo_info) | |
print(f"System prompt base tokens: {base_prompt_tokens}, Repo info tokens: {repo_info_tokens}") | |
# Check if total tokens would be too large | |
total_tokens = base_prompt_tokens + repo_info_tokens | |
if total_tokens > 1000000: # Close to Gemini's limit | |
print(f"Warning: System prompt would be too large ({total_tokens} tokens). Trimming repository information.") | |
# Extract the important parts | |
parts = self.repo_info.split("\n\n") | |
if len(parts) >= 3: # Should have SUMMARY, FILE STRUCTURE, and CONTENT | |
summary = parts[0] | |
file_structure = parts[1] | |
# Calculate how much content we can include | |
max_content_tokens = 950000 - self.count_tokens(base_prompt) - self.count_tokens(summary) - self.count_tokens(file_structure) - 100 | |
content_parts = self.repo_info.split("CONTENT:\n") | |
if len(content_parts) > 1: | |
content = content_parts[1] | |
content_tokens = self.count_tokens(content) | |
if content_tokens > max_content_tokens: | |
content_token_list = self.encoding.encode(content) | |
truncated_content = self.encoding.decode(content_token_list[:max_content_tokens]) | |
trimmed_repo_info = f"{summary}\n\n{file_structure}\n\nCONTENT:\n{truncated_content}" | |
else: | |
trimmed_repo_info = self.repo_info | |
else: | |
trimmed_repo_info = f"{summary}\n\n{file_structure}\n\nCONTENT: [Content too large to include]" | |
else: | |
# Just truncate if we can't parse the structure | |
repo_info_tokens = self.encoding.encode(self.repo_info) | |
max_tokens = 950000 - self.count_tokens(base_prompt) - 100 | |
trimmed_repo_info = self.encoding.decode(repo_info_tokens[:max_tokens]) | |
# Final check | |
final_system_prompt = f"{base_prompt}\n\n{trimmed_repo_info}" | |
print(f"Final system prompt tokens: {self.count_tokens(final_system_prompt)}") | |
return final_system_prompt | |
# If not too large, return the full system prompt | |
return f"{base_prompt}\n\n{self.repo_info}" | |
def chat(self, user_message): | |
"""Process user message and generate a response""" | |
if not self.repo_info: | |
# Check if this is a GitHub URL | |
github_url_pattern = r'https?://github\.com/[a-zA-Z0-9_-]+/[a-zA-Z0-9_-]+' | |
match = re.search(github_url_pattern, user_message) | |
if match: | |
github_url = match.group(0) | |
success = self.extract_repo_info(github_url) | |
if success: | |
self.add_to_conversation("system", self.create_system_prompt()) | |
self.add_to_conversation("user", f"I want to work with the repository at {github_url}. Please help me understand it.") | |
return self.generate_response() | |
else: | |
return "I had trouble extracting information from that repository. Please check the URL and try again." | |
else: | |
return "Please provide a valid GitHub repository URL to get started." | |
# Add user message to conversation history | |
self.add_to_conversation("user", user_message) | |
# Generate response | |
return self.generate_response() | |
def generate_response(self): | |
"""Generate a response using the Requesty API with retry logic""" | |
retry_count = 0 | |
while retry_count < self.max_retries: | |
try: | |
# Create messages array for the API call | |
messages = [] | |
# Add system message if it exists | |
system_messages = [msg for msg in self.conversation_history if msg["role"] == "system"] | |
if system_messages: | |
messages.append(system_messages[-1]) # Use the most recent system message | |
# Add user and assistant messages | |
for msg in self.conversation_history: | |
if msg["role"] in ["user", "assistant"]: | |
messages.append(msg) | |
# Make API call | |
response = self.client.chat.completions.create( | |
model=self.model, | |
messages=messages | |
) | |
# Extract response content | |
assistant_response = response.choices[0].message.content | |
# Add assistant response to conversation history | |
self.add_to_conversation("assistant", assistant_response) | |
return assistant_response | |
except openai.RateLimitError as e: | |
retry_count += 1 | |
wait_time = self.retry_delay * retry_count | |
error_msg = f"Rate limit exceeded. Retrying in {wait_time} seconds... (Attempt {retry_count}/{self.max_retries})" | |
print(error_msg) | |
if retry_count < self.max_retries: | |
time.sleep(wait_time) | |
else: | |
return f"I'm currently experiencing high demand. Please try again later. Error: {e}" | |
except openai.APIError as e: | |
error_msg = f"Requesty API error: {e}" | |
print(error_msg) | |
# Check for token limit error | |
if "input token count" in str(e) and "exceeds the maximum" in str(e): | |
return "The repository is too large to process in one request. Please try a smaller repository or ask specific questions about particular parts of the codebase." | |
return error_msg | |
except Exception as e: | |
error_msg = f"Unexpected error: {e}" | |
print(error_msg) | |
return error_msg | |
def save_conversation(self, filename="conversation.json"): | |
"""Save the current conversation to a file""" | |
try: | |
with open(filename, 'w') as f: | |
json.dump(self.conversation_history, f, indent=2) | |
print(f"Conversation saved to {filename}") | |
except Exception as e: | |
print(f"Error saving conversation: {e}") | |
def load_conversation(self, filename="conversation.json"): | |
"""Load a conversation from a file""" | |
try: | |
with open(filename, 'r') as f: | |
self.conversation_history = json.load(f) | |
print(f"Conversation loaded from {filename}") | |
except FileNotFoundError: | |
print(f"File {filename} not found.") | |
except json.JSONDecodeError: | |
print(f"Error decoding JSON from {filename}.") | |
except Exception as e: | |
print(f"Error loading conversation: {e}") | |
# Command-line interface | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description="GitHub Navigator Chatbot") | |
parser.add_argument("--api-key", help="Requesty API Key (or set REQUESTY_API_KEY environment variable)") | |
parser.add_argument("--load", help="Load conversation from file") | |
args = parser.parse_args() | |
try: | |
# Check for API key in command line args first, then environment | |
api_key = args.api_key | |
if not api_key: | |
# Get from environment with proper logging | |
api_key = os.environ.get("REQUESTY_API_KEY") | |
if api_key: | |
logger.info(f"Using API key from environment: {api_key[:5]}...{api_key[-5:]}") | |
else: | |
print("Error: Requesty API key not configured. Please provide an API key.") | |
print("Usage: python github_companion.py --api-key YOUR_API_KEY") | |
print(" or set the REQUESTY_API_KEY environment variable") | |
sys.exit(1) | |
# Initialize the companion with the API key | |
companion = GitHubCompanion(requesty_api_key=api_key) | |
if args.load: | |
companion.load_conversation(args.load) | |
print("GitHub Companion Bot - Your AI assistant for GitHub repositories") | |
print("Enter a GitHub repository URL to begin, or type 'exit' to quit") | |
while True: | |
try: | |
user_input = input("\nYou: ") | |
if user_input.lower() in ["exit", "quit", "bye"]: | |
print("Saving conversation...") | |
companion.save_conversation() | |
print("Goodbye!") | |
break | |
response = companion.chat(user_input) | |
print(f"\nGitHub Companion: {response}") | |
except KeyboardInterrupt: | |
print("\nSaving conversation and exiting...") | |
companion.save_conversation() | |
print("Goodbye!") | |
break | |
except Exception as e: | |
print(f"Error processing input: {e}") | |
except Exception as e: | |
logger.error(f"Error initializing GitHub Companion: {e}") | |
print(f"Error initializing GitHub Companion: {e}") | |
print("Please check your dependencies and API key configuration.") | |
sys.exit(1) |