resumate / functions /github.py
gperdrizet's picture
Cleaned up data directory structure for intermediate results
bef6750 verified
raw
history blame
25.8 kB
"""
github.py
Functions for retrieving information from GitHub profiles and repositories.
"""
import re
import json
import logging
from typing import List, Dict, Optional
from pathlib import Path
import requests
# pylint: disable=broad-exception-caught
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_github_repositories(github_url: str) -> Dict:
"""
Retrieve public repositories from a GitHub profile URL.
Args:
github_url (str): GitHub profile URL (e.g., https://github.com/username)
Returns:
dict: Dictionary containing status, repositories list, and metadata
Example:
{
"status": "success",
"repositories": [
{
"name": "repo-name",
"description": "Repository description",
"language": "Python",
"stars": 10,
"forks": 2,
"updated_at": "2024-01-01T00:00:00Z",
"html_url": "https://github.com/user/repo",
"topics": ["python", "api"]
}
],
"metadata": {
"username": "username",
"total_repos": 25,
"public_repos": 20
},
"message": "Successfully retrieved repositories"
}
"""
if not github_url or not github_url.strip():
return {"status": "error", "message": "No GitHub URL provided"}
try:
# Extract username from GitHub URL
username = _extract_github_username(github_url)
if not username:
return {"status": "error", "message": "Invalid GitHub URL format"}
logger.info("Fetching repositories for GitHub user: %s", username)
# Get user info first
user_info = _get_github_user_info(username)
if user_info["status"] != "success":
return user_info
# Get repositories
repositories = _get_user_repositories(username)
if repositories["status"] != "success":
return repositories
# Process and structure repository data
processed_repos = _process_repository_data(repositories["data"])
result = {
"status": "success",
"repositories": processed_repos,
"metadata": {
"username": username,
"total_repos": user_info["data"].get("public_repos", 0),
"public_repos": len(processed_repos),
"profile_url": github_url
},
"message": f"Successfully retrieved {len(processed_repos)} repositories"
}
# Save results to JSON file
try:
github_repos_dir = Path(__file__).parent.parent / "data" / "github_repos"
github_repos_dir.mkdir(parents=True, exist_ok=True)
output_file = github_repos_dir / "github_repos.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
logger.info("GitHub repositories saved to %s", output_file)
except Exception as save_error:
logger.warning("Failed to save GitHub repositories to file: %s", str(save_error))
return result
except Exception as e:
logger.error("Error retrieving GitHub repositories: %s", str(e))
return {
"status": "error",
"message": f"Failed to retrieve GitHub repositories: {str(e)}"
}
def _extract_github_username(github_url: str) -> Optional[str]:
"""
Extract username from GitHub URL.
Args:
github_url (str): GitHub profile URL
Returns:
Optional[str]: Username if valid URL, None otherwise
"""
try:
# Clean up the URL
url = github_url.strip().rstrip('/')
# Handle various GitHub URL formats
patterns = [
r'github\.com/([^/]+)/?$', # https://github.com/username
r'github\.com/([^/]+)/.*', # https://github.com/username/anything
r'^([a-zA-Z0-9\-_]+)$' # Just username
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
username = match.group(1)
# Validate username format
if re.match(r'^[a-zA-Z0-9\-_]+$', username) and len(username) <= 39:
return username
return None
except Exception as e:
logger.warning("Error extracting username from URL %s: %s", github_url, str(e))
return None
def _get_github_user_info(username: str) -> Dict:
"""
Get basic user information from GitHub API.
Args:
username (str): GitHub username
Returns:
dict: API response with user information
"""
try:
url = f"https://api.github.com/users/{username}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 404:
return {"status": "error", "message": f"GitHub user '{username}' not found"}
elif response.status_code == 403:
return {"status": "error", "message": "GitHub API rate limit exceeded"}
elif response.status_code != 200:
return {"status": "error", "message": f"GitHub API error: {response.status_code}"}
return {"status": "success", "data": response.json()}
except requests.RequestException as e:
logger.error("Network error fetching user info: %s", str(e))
return {"status": "error", "message": f"Network error: {str(e)}"}
def _get_user_repositories(username: str) -> Dict:
"""
Get user's public repositories from GitHub API.
Args:
username (str): GitHub username
Returns:
dict: API response with repositories
"""
try:
# Get repositories with pagination
all_repos = []
page = 1
per_page = 100 # Maximum allowed by GitHub API
while True:
url = f"https://api.github.com/users/{username}/repos"
params = {
"type": "public",
"sort": "updated",
"direction": "desc",
"per_page": per_page,
"page": page
}
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, params=params, timeout=10)
if response.status_code != 200:
return {"status": "error", "message": f"GitHub API error: {response.status_code}"}
repos = response.json()
if not repos: # No more repositories
break
all_repos.extend(repos)
# If we got less than per_page, we've reached the end
if len(repos) < per_page:
break
page += 1
# Safety limit to prevent infinite loops
if page > 10: # Max 1000 repos
break
return {"status": "success", "data": all_repos}
except requests.RequestException as e:
logger.error("Network error fetching repositories: %s", str(e))
return {"status": "error", "message": f"Network error: {str(e)}"}
def _process_repository_data(repos: List[Dict]) -> List[Dict]:
"""
Process and clean repository data for easier consumption.
Args:
repos (List[Dict]): Raw repository data from GitHub API
Returns:
List[Dict]: Processed repository data
"""
processed = []
for repo in repos:
# Skip forks unless they have significant modifications
if repo.get("fork", False) and repo.get("stargazers_count", 0) == 0:
continue
processed_repo = {
"name": repo.get("name", ""),
"description": repo.get("description", ""),
"language": repo.get("language", ""),
"stars": repo.get("stargazers_count", 0),
"forks": repo.get("forks_count", 0),
"updated_at": repo.get("updated_at", ""),
"created_at": repo.get("created_at", ""),
"html_url": repo.get("html_url", ""),
"topics": repo.get("topics", []),
"size": repo.get("size", 0),
"is_fork": repo.get("fork", False),
"default_branch": repo.get("default_branch", "main"),
"has_issues": repo.get("has_issues", False),
"has_wiki": repo.get("has_wiki", False),
"has_pages": repo.get("has_pages", False)
}
processed.append(processed_repo)
return processed
def format_repositories_for_llm(github_result: Dict) -> str:
"""
Format GitHub repositories data for LLM consumption.
Args:
github_result (dict): Result from get_github_repositories
Returns:
str: Formatted text ready for LLM context
"""
if github_result.get("status") != "success":
return "GitHub repositories could not be retrieved: " + \
f"{github_result.get('message', 'Unknown error')}"
repositories = github_result.get("repositories", [])
metadata = github_result.get("metadata", {})
if not repositories:
return f"No public repositories found for {metadata.get('username', 'user')}"
formatted_parts = [
"=== GITHUB REPOSITORIES ===\n",
f"Profile: {metadata.get('profile_url', 'N/A')}",
f"Username: {metadata.get('username', 'N/A')}",
f"Public Repositories: {len(repositories)}\n"
]
for i, repo in enumerate(repositories[:20], 1): # Limit to top 20 repos
repo_info = [
f"[REPOSITORY {i}]",
f"Name: {repo['name']}",
f"URL: {repo['html_url']}"
]
if repo['description']:
repo_info.append(f"Description: {repo['description']}")
if repo['language']:
repo_info.append(f"Primary Language: {repo['language']}")
if repo['topics']:
repo_info.append(f"Topics: {', '.join(repo['topics'][:5])}") # Limit topics
repo_info.extend([
f"Stars: {repo['stars']} | Forks: {repo['forks']}",
f"Last Updated: {repo['updated_at'][:10]}", # Just the date
"" # Empty line between repositories
])
formatted_parts.extend(repo_info)
if len(repositories) > 20:
formatted_parts.append(f"... and {len(repositories) - 20} more repositories")
formatted_parts.append("\n=== END GITHUB REPOSITORIES ===")
return '\n'.join(formatted_parts)
def get_repository_details(repo_url: str) -> Dict:
"""
Get detailed information about a specific GitHub repository.
Args:
repo_url (str): GitHub repository URL (e.g., https://github.com/user/repo)
Returns:
dict: Dictionary containing comprehensive repository information
Example:
{
"status": "success",
"repository": {
"name": "repo-name",
"full_name": "user/repo-name",
"description": "Repository description",
"language": "Python",
"languages": {"Python": 85.5, "JavaScript": 14.5},
"stars": 100,
"forks": 25,
"watchers": 50,
"size": 1024,
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-15T00:00:00Z",
"pushed_at": "2024-01-15T00:00:00Z",
"html_url": "https://github.com/user/repo",
"clone_url": "https://github.com/user/repo.git",
"topics": ["python", "api", "web"],
"license": {"name": "MIT License", "spdx_id": "MIT"},
"readme": "README content here...",
"file_structure": ["src/", "tests/", "README.md", "setup.py"],
"releases": [{"tag_name": "v1.0.0", "name": "Release 1.0.0"}],
"contributors": [{"login": "user1", "contributions": 50}],
"is_fork": false,
"is_archived": false,
"is_private": false,
"default_branch": "main",
"open_issues": 5,
"has_issues": true,
"has_wiki": true,
"has_pages": false
},
"message": "Successfully retrieved repository details"
}
"""
if not repo_url or not repo_url.strip():
return {"status": "error", "message": "No repository URL provided"}
try:
# Extract owner and repo name from URL
owner, repo_name = _extract_repo_info(repo_url)
if not owner or not repo_name:
return {"status": "error", "message": "Invalid GitHub repository URL format"}
logger.info("Fetching detailed information for repository: %s/%s", owner, repo_name)
# Get basic repository information
repo_info = _get_repository_info(owner, repo_name)
if repo_info["status"] != "success":
return repo_info
repo_data = repo_info["data"]
# Get additional repository details
additional_data = {}
# Get languages
languages_result = _get_repository_languages(owner, repo_name)
if languages_result["status"] == "success":
additional_data["languages"] = languages_result["data"]
# Get README content
readme_result = _get_repository_readme(owner, repo_name)
if readme_result["status"] == "success":
additional_data["readme"] = readme_result["data"]
# Get file structure (root directory)
file_structure_result = _get_repository_contents(owner, repo_name)
if file_structure_result["status"] == "success":
additional_data["file_structure"] = file_structure_result["data"]
# Get releases
releases_result = _get_repository_releases(owner, repo_name)
if releases_result["status"] == "success":
additional_data["releases"] = releases_result["data"]
# Get contributors
contributors_result = _get_repository_contributors(owner, repo_name)
if contributors_result["status"] == "success":
additional_data["contributors"] = contributors_result["data"]
# Combine all data
repository_details = {
"name": repo_data.get("name", ""),
"full_name": repo_data.get("full_name", ""),
"description": repo_data.get("description", ""),
"language": repo_data.get("language", ""),
"languages": additional_data.get("languages", {}),
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"watchers": repo_data.get("watchers_count", 0),
"size": repo_data.get("size", 0),
"created_at": repo_data.get("created_at", ""),
"updated_at": repo_data.get("updated_at", ""),
"pushed_at": repo_data.get("pushed_at", ""),
"html_url": repo_data.get("html_url", ""),
"clone_url": repo_data.get("clone_url", ""),
"ssh_url": repo_data.get("ssh_url", ""),
"topics": repo_data.get("topics", []),
"license": repo_data.get("license", {}),
"readme": additional_data.get("readme", ""),
"file_structure": additional_data.get("file_structure", []),
"releases": additional_data.get("releases", []),
"contributors": additional_data.get("contributors", []),
"is_fork": repo_data.get("fork", False),
"is_archived": repo_data.get("archived", False),
"is_private": repo_data.get("private", False),
"default_branch": repo_data.get("default_branch", "main"),
"open_issues": repo_data.get("open_issues_count", 0),
"has_issues": repo_data.get("has_issues", False),
"has_wiki": repo_data.get("has_wiki", False),
"has_pages": repo_data.get("has_pages", False),
"has_projects": repo_data.get("has_projects", False),
"visibility": repo_data.get("visibility", "public")
}
result = {
"status": "success",
"repository": repository_details,
"message": f"Successfully retrieved details for {owner}/{repo_name}"
}
# Save results to JSON file
try:
github_repos_dir = Path(__file__).parent.parent / "data" / "github_repos"
github_repos_dir.mkdir(parents=True, exist_ok=True)
output_file = github_repos_dir / f"repo_details_{owner}_{repo_name}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
logger.info("Repository details saved to %s", output_file)
except Exception as save_error:
logger.warning("Failed to save repository details to file: %s", str(save_error))
return result
except Exception as e:
logger.error("Error retrieving repository details: %s", str(e))
return {
"status": "error",
"message": f"Failed to retrieve repository details: {str(e)}"
}
def _extract_repo_info(repo_url: str) -> tuple:
"""
Extract owner and repository name from GitHub repository URL.
Args:
repo_url (str): GitHub repository URL
Returns:
tuple: (owner, repo_name) if valid URL, (None, None) otherwise
"""
try:
# Clean up the URL
url = repo_url.strip().rstrip('/')
# Handle various GitHub repository URL formats
patterns = [
r'github\.com/([^/]+)/([^/]+)/?$', # https://github.com/owner/repo
r'github\.com/([^/]+)/([^/]+)/.*', # https://github.com/owner/repo/anything
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
owner = match.group(1)
repo_name = match.group(2)
# Remove .git suffix if present
if repo_name.endswith('.git'):
repo_name = repo_name[:-4]
# Validate format
if (re.match(r'^[a-zA-Z0-9\-_\.]+$', owner) and
re.match(r'^[a-zA-Z0-9\-_\.]+$', repo_name)):
return owner, repo_name
return None, None
except Exception as e:
logger.warning("Error extracting repo info from URL %s: %s", repo_url, str(e))
return None, None
def _get_repository_info(owner: str, repo_name: str) -> Dict:
"""Get basic repository information from GitHub API."""
try:
url = f"https://api.github.com/repos/{owner}/{repo_name}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 404:
return {"status": "error", "message": f"Repository '{owner}/{repo_name}' not found"}
elif response.status_code == 403:
return {"status": "error", "message": "GitHub API rate limit exceeded"}
elif response.status_code != 200:
return {"status": "error", "message": f"GitHub API error: {response.status_code}"}
return {"status": "success", "data": response.json()}
except requests.RequestException as e:
logger.error("Network error fetching repository info: %s", str(e))
return {"status": "error", "message": f"Network error: {str(e)}"}
def _get_repository_languages(owner: str, repo_name: str) -> Dict:
"""Get repository languages from GitHub API."""
try:
url = f"https://api.github.com/repos/{owner}/{repo_name}/languages"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
# Convert byte counts to percentages
languages = response.json()
total_bytes = sum(languages.values())
if total_bytes > 0:
language_percentages = {
lang: round((bytes_count / total_bytes) * 100, 1)
for lang, bytes_count in languages.items()
}
return {"status": "success", "data": language_percentages}
return {"status": "error", "message": "Could not retrieve languages"}
except Exception as e:
logger.warning("Error fetching repository languages: %s", str(e))
return {"status": "error", "message": str(e)}
def _get_repository_readme(owner: str, repo_name: str) -> Dict:
"""Get repository README content from GitHub API."""
try:
url = f"https://api.github.com/repos/{owner}/{repo_name}/readme"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
readme_data = response.json()
# Get the raw content URL and fetch it
download_url = readme_data.get("download_url")
if download_url:
content_response = requests.get(download_url, timeout=10)
if content_response.status_code == 200:
return {"status": "success", "data": content_response.text}
return {"status": "error", "message": "README not found"}
except Exception as e:
logger.warning("Error fetching README: %s", str(e))
return {"status": "error", "message": str(e)}
def _get_repository_contents(owner: str, repo_name: str, path: str = "") -> Dict:
"""Get repository contents (file structure) from GitHub API."""
try:
url = f"https://api.github.com/repos/{owner}/{repo_name}/contents/{path}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
contents = response.json()
# Extract file and directory names
file_structure = []
for item in contents:
name = item.get("name", "")
if item.get("type") == "dir":
name += "/"
file_structure.append(name)
# Sort with directories first
file_structure.sort(key=lambda x: (not x.endswith("/"), x.lower()))
return {"status": "success", "data": file_structure}
return {"status": "error", "message": "Could not retrieve file structure"}
except Exception as e:
logger.warning("Error fetching repository contents: %s", str(e))
return {"status": "error", "message": str(e)}
def _get_repository_releases(owner: str, repo_name: str) -> Dict:
"""Get repository releases from GitHub API."""
try:
url = f"https://api.github.com/repos/{owner}/{repo_name}/releases"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
releases = response.json()
# Extract key release information
release_info = []
for release in releases[:10]: # Limit to 10 most recent
release_info.append({
"tag_name": release.get("tag_name", ""),
"name": release.get("name", ""),
"published_at": release.get("published_at", ""),
"prerelease": release.get("prerelease", False),
"draft": release.get("draft", False)
})
return {"status": "success", "data": release_info}
return {"status": "error", "message": "Could not retrieve releases"}
except Exception as e:
logger.warning("Error fetching repository releases: %s", str(e))
return {"status": "error", "message": str(e)}
def _get_repository_contributors(owner: str, repo_name: str) -> Dict:
"""Get repository contributors from GitHub API."""
try:
url = f"https://api.github.com/repos/{owner}/{repo_name}/contributors"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Resumate-App/1.0"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
contributors = response.json()
# Extract key contributor information
contributor_info = []
for contributor in contributors[:20]: # Limit to top 20 contributors
contributor_info.append({
"login": contributor.get("login", ""),
"contributions": contributor.get("contributions", 0),
"html_url": contributor.get("html_url", ""),
"type": contributor.get("type", "")
})
return {"status": "success", "data": contributor_info}
return {"status": "error", "message": "Could not retrieve contributors"}
except Exception as e:
logger.warning("Error fetching repository contributors: %s", str(e))
return {"status": "error", "message": str(e)}