import gradio as gr from huggingface_hub import HfApi, list_spaces import time from typing import List, Dict, Optional from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor, as_completed @dataclass class MCPSpace: id: str title: str author: str likes: int status: str url: str description: str sdk: str last_modified: str created_at: str class MCPSpaceFinder: def __init__(self): self.api = HfApi() self.all_mcp_spaces_cache = None # Cache for ALL MCP spaces self.running_spaces_cache = None # Separate cache for running/building spaces self.cache_timestamp = None self.cache_duration = 300 # 5 minutes cache def get_space_status(self, space_id: str) -> str: """Get the current runtime status of a space.""" try: runtime = self.api.get_space_runtime(space_id) if hasattr(runtime, 'stage'): return runtime.stage return "unknown" except Exception: return "error" def process_space_batch(self, spaces_batch) -> List[MCPSpace]: """Process a batch of spaces to get their runtime status.""" processed_spaces = [] for space in spaces_batch: try: space_id = space.id # Get space status -- an expensive operation (separate request per space) status = self.get_space_status(space_id) processed_space = MCPSpace( id=space_id, title=getattr(space, 'title', space_id.split('/')[-1]) or space_id.split('/')[-1], author=space.author, likes=getattr(space, 'likes', 0), status=status, url=f"https://huggingface.co/spaces/{space_id}", description=getattr(space, 'description', 'No description available') or 'No description available', sdk=getattr(space, 'sdk', 'unknown'), last_modified=str(getattr(space, 'last_modified', 'unknown')), created_at=str(getattr(space, 'created_at', 'unknown')) ) processed_spaces.append(processed_space) except Exception as e: print(f"Error processing space {getattr(space, 'id', 'unknown')}: {e}") continue return processed_spaces def find_all_mcp_spaces(self, check_status: bool = False) -> List[MCPSpace]: """ Find ALL MCP-enabled spaces, optionally checking their runtime status. Args: check_status: If True, fetch runtime status (slower). If False, return all spaces without status check. Returns: List of MCPSpace objects """ # Check cache first if not check_status and self.all_mcp_spaces_cache is not None: if self.cache_timestamp and time.time() - self.cache_timestamp < self.cache_duration: return self.all_mcp_spaces_cache if check_status and self.running_spaces_cache is not None: if self.cache_timestamp and time.time() - self.cache_timestamp < self.cache_duration: return self.running_spaces_cache print("Fetching ALL MCP spaces from HuggingFace Hub...") # Get ALL spaces with the mcp-server tag # Using limit=None or a very high limit to ensure we get everything try: # First try with no limit (gets all) spaces = list(list_spaces( filter="mcp-server", sort="likes", direction=-1, limit=None, # Get ALL spaces full=True )) except Exception as e: print(f"Failed with limit=None, trying with high limit: {e}") # Fallback to high limit if None doesn't work spaces = list(list_spaces( filter="mcp-server", sort="likes", direction=-1, limit=10000, # Very high limit to ensure we get all the relevant spaces full=True )) print(f"Found {len(spaces)} total spaces with mcp-server tag") if not check_status: # Quick mode: Don't check runtime status, just return basic info all_spaces = [] for space in spaces: try: processed_space = MCPSpace( id=space.id, title=getattr(space, 'title', space.id.split('/')[-1]) or space.id.split('/')[-1], author=space.author, likes=getattr(space, 'likes', 0), status="not_checked", # We're not checking status in quick mode url=f"https://huggingface.co/spaces/{space.id}", description=getattr(space, 'description', 'No description available') or 'No description available', sdk=getattr(space, 'sdk', 'unknown'), last_modified=str(getattr(space, 'last_modified', 'unknown')), created_at=str(getattr(space, 'created_at', 'unknown')) ) all_spaces.append(processed_space) except Exception as e: print(f"Error processing space: {e}") continue # Sort by likes all_spaces.sort(key=lambda x: x.likes, reverse=True) # Cache the results self.all_mcp_spaces_cache = all_spaces self.cache_timestamp = time.time() return all_spaces else: # Full mode: Check runtime status -- will be slow print("Checking runtime status for spaces...") # Process spaces in batches using ThreadPoolExecutor for status checking batch_size = 50 space_batches = [spaces[i:i + batch_size] for i in range(0, len(spaces), batch_size)] all_processed_spaces = [] with ThreadPoolExecutor(max_workers=5) as executor: future_to_batch = { executor.submit(self.process_space_batch, batch): batch for batch in space_batches } for future in as_completed(future_to_batch): try: batch_results = future.result() all_processed_spaces.extend(batch_results) except Exception as e: print(f"Error processing batch: {e}") # Filter to only include running or building spaces if desired running_building_spaces = [ space for space in all_processed_spaces if space.status in ["RUNNING", "RUNNING_BUILDING", "BUILDING"] ] # Sort by likes descending running_building_spaces.sort(key=lambda x: x.likes, reverse=True) # Debug: Count by status status_counts = {} for space in all_processed_spaces: status_counts[space.status] = status_counts.get(space.status, 0) + 1 print(f"Status breakdown: {status_counts}") print(f"Found {len(running_building_spaces)} running/building spaces out of {len(all_processed_spaces)} total") # Cache the results self.running_spaces_cache = running_building_spaces self.cache_timestamp = time.time() return running_building_spaces def find_mcp_spaces(self, limit: int = None, only_running: bool = False) -> List[MCPSpace]: """ Backward compatible method that can either: 1. Get ALL MCP spaces (default behavior, matching SA) 2. Get only running/building spaces (original FA behavior) Args: limit: Optional limit on number of spaces to return only_running: If True, only return running/building spaces (requires status check) """ if only_running: spaces = self.find_all_mcp_spaces(check_status=True) else: spaces = self.find_all_mcp_spaces(check_status=False) if limit and limit < len(spaces): return spaces[:limit] return spaces # Initialize the finder finder = MCPSpaceFinder() def get_mcp_spaces_list(): """Get the list of ALL MCP spaces for the dropdown.""" # Get ALL spaces, not just running ones (matching SA behavior) spaces = finder.find_mcp_spaces(only_running=False) # Create options for dropdown - format as username/spacename options = [] for space in spaces: label = space.id # This is already in format "username/spacename" options.append((label, space.id)) return options, len(spaces) def display_space_info(space_id): """Display detailed information about the selected space.""" if not space_id: return "Please select a space from the dropdown." # First check the all spaces cache spaces = finder.all_mcp_spaces_cache or finder.find_mcp_spaces(only_running=False) selected_space = next((s for s in spaces if s.id == space_id), None) if not selected_space: return f"Space {space_id} not found in cache." # Get fresh status if not already checked if selected_space.status == "not_checked": selected_space.status = finder.get_space_status(space_id) # Get status emoji and description status_emoji = "đŸŸĸ" if selected_space.status in ["RUNNING", "RUNNING_BUILDING"] else "🟡" if selected_space.status in ["BUILDING"] else "🔴" status_description = { "RUNNING": "Ready to use", "RUNNING_BUILDING": "Running (rebuilding)", "BUILDING": "Building - please wait", "STOPPED": "Stopped/Sleeping", "PAUSED": "Paused", "error": "Error getting status", "unknown": "Status unknown", "not_checked": "Status not checked" }.get(selected_space.status, selected_space.status) # Format the information info = f""" # {selected_space.title} **Author:** {selected_space.author} **Likes:** â¤ī¸ {selected_space.likes} **Status:** {status_emoji} {status_description} **SDK:** {selected_space.sdk} **Created:** {selected_space.created_at} **Last Modified:** {selected_space.last_modified} **URL:** [{selected_space.url}]({selected_space.url}) **Description:** {selected_space.description} --- ## 🔧 MCP Configuration ### For VSCode/Cursor/Claude Code (Recommended) ```json {{ "servers": {{ "{selected_space.id.replace('/', '-')}": {{ "url": "{selected_space.url}/gradio_api/mcp/sse" }} }} }} ``` ### For Claude Desktop ```json {{ "mcpServers": {{ "{selected_space.id.replace('/', '-')}": {{ "command": "npx", "args": [ "mcp-remote", "{selected_space.url}/gradio_api/mcp/sse" ] }} }} }} ``` ### Alternative: Use HF MCP Space Server ```json {{ "mcpServers": {{ "hf-spaces": {{ "command": "npx", "args": [ "-y", "@llmindset/mcp-hfspace", "{selected_space.id}" ] }} }} }} ``` --- **Note:** {status_description}{"" if selected_space.status in ["RUNNING", "RUNNING_BUILDING"] else " - The space may need to be started before use."} """.strip() return info def show_all_spaces(filter_running: bool = False): """ Display information about MCP spaces. Args: filter_running: If True, only show running/building spaces. If False, show all. """ spaces = finder.find_mcp_spaces(only_running=filter_running) total_spaces = len(spaces) # Create summary markdown filter_text = "Running/Building" if filter_running else "All" summary = f""" # 📊 {filter_text} MCP Servers Summary **Available MCP Servers:** {total_spaces} {"(running or building)" if filter_running else "(all statuses)"} **Sorted by:** Popularity (likes descending) Browse all MCP servers: [https://huggingface.co/spaces?filter=mcp-server](https://huggingface.co/spaces?filter=mcp-server) --- """ # Create DataFrame data df_data = [] for i, space in enumerate(spaces, 1): if filter_running: status_emoji = "đŸŸĸ" if space.status == "RUNNING" else "🟡" if space.status == "RUNNING_BUILDING" else "đŸ”ļ" else: status_emoji = "❓" # Unknown status for non-filtered view desc_short = (space.description[:80] + "...") if len(space.description) > 80 else space.description df_data.append([ i, # Rank space.id, # Show as username/spacename format space.author, space.likes, f"{status_emoji} {space.status if filter_running else 'Not checked'}", desc_short, f"[🔗 Open]({space.url})" # Clickable link ]) return summary, df_data # Create the Gradio interface with toggle for showing all vs running spaces def create_interface(): with gr.Blocks(title="HuggingFace MCP Server Browser") as demo: gr.Markdown("# 🤖 HuggingFace MCP Server Browser") gr.Markdown("Discover **ALL Model Context Protocol (MCP)** servers on HuggingFace Spaces") with gr.Row(): with gr.Column(scale=3): # Get initial options and count initial_options, total_count = get_mcp_spaces_list() dropdown = gr.Dropdown( choices=initial_options, label=f"🤖 MCP Servers ({total_count} total available)", info="All MCP servers on HuggingFace, sorted by popularity", value=None ) with gr.Column(scale=1): refresh_btn = gr.Button("🔄 Refresh", variant="secondary") filter_toggle = gr.Checkbox(label="Show only running", value=False) summary_btn = gr.Button("📊 Show All", variant="primary") # Use Markdown for better formatting instead of Textbox output_md = gr.Markdown( value=f""" ## Welcome! Select an MCP server above to view configuration details. **Total MCP servers found:** {total_count} **Browse all:** [https://huggingface.co/spaces?filter=mcp-server](https://huggingface.co/spaces?filter=mcp-server) â„šī¸ **Note:** This browser now shows ALL MCP spaces by default. Enable "Show only running" to filter for active spaces (slower). """, visible=True ) # Add DataFrame for clean table display output_df = gr.DataFrame( headers=["Rank", "Space ID", "Author", "Likes", "Status", "Description", "Link"], datatype=["number", "str", "str", "number", "str", "str", "markdown"], visible=False, wrap=True, ) # Event handlers def handle_dropdown_change(space_id): if space_id: info = display_space_info(space_id) return gr.Markdown(value=info, visible=True), gr.DataFrame(visible=False) return gr.Markdown(visible=True), gr.DataFrame(visible=False) def handle_show_all(filter_running): summary, df_data = show_all_spaces(filter_running) return gr.Markdown(value=summary, visible=True), gr.DataFrame(value=df_data, visible=True) def handle_refresh(filter_running): # Clear cache to force refresh finder.all_mcp_spaces_cache = None finder.running_spaces_cache = None finder.cache_timestamp = None if filter_running: # This will be slower as it checks status options, total_count = [(s.id, s.id) for s in finder.find_mcp_spaces(only_running=True)], len(finder.find_mcp_spaces(only_running=True)) label_text = f"🤖 Running MCP Servers ({total_count} available)" else: options, total_count = get_mcp_spaces_list() label_text = f"🤖 MCP Servers ({total_count} total available)" return ( gr.Dropdown(choices=options, value=None, label=label_text), gr.Markdown(value=f""" ## Welcome! Select an MCP server above to view configuration details. **MCP servers found:** {total_count} **Filter:** {"Running/Building only" if filter_running else "All spaces"} **Browse all:** [https://huggingface.co/spaces?filter=mcp-server](https://huggingface.co/spaces?filter=mcp-server) """, visible=True), gr.DataFrame(visible=False) ) dropdown.change( fn=handle_dropdown_change, inputs=[dropdown], outputs=[output_md, output_df] ) summary_btn.click( fn=handle_show_all, inputs=[filter_toggle], outputs=[output_md, output_df] ) refresh_btn.click( fn=handle_refresh, inputs=[filter_toggle], outputs=[dropdown, output_md, output_df] ) filter_toggle.change( fn=handle_refresh, inputs=[filter_toggle], outputs=[dropdown, output_md, output_df] ) return demo # Create and launch the interface if __name__ == "__main__": demo = create_interface() demo.launch(mcp_server=True)