dtka's picture
fix: resolve syntax errors and improve code formatting
458da34
import os
import requests
import gradio as gr
import anthropic
import yaml
import hashlib
import json
import io
import matplotlib.pyplot as plt
import seaborn as sns
from dotenv import load_dotenv
from datetime import datetime
from itertools import combinations
from collections import defaultdict
load_dotenv()
# Anthropic API Setup
anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
# ---- Agent Discovery Logic ----
AGENT_ICONS = {
"Climate Sensor": "🌦️",
"Policy Modeler": "πŸ“œ",
"Economic Forecast": "πŸ’Ή",
"Media Monitor": "πŸ“°",
"Public Health": "πŸ₯",
"NGO Matcher": "🀝"
}
CLAUDE_MODEL="claude-sonnet-4-20250514"
RETIREMENT_THRESHOLD = 1 # Agent appears in fewer than this many swarms
def should_spawn_hybrid(threshold=3, log_path="swarm_log.jsonl", registry_path="agents_registry.json"):
from collections import Counter
import itertools
try:
with open(log_path, "r", encoding="utf-8") as f:
lines = [json.loads(line) for line in f if line.strip()]
except:
return None, None
pair_counts = Counter()
for entry in lines:
agents = sorted(entry.get("agents", []))
for a, b in itertools.combinations(agents, 2):
pair_counts[(a, b)] += 1
try:
with open(registry_path, "r", encoding="utf-8") as f:
registry = json.load(f)
hybrid_origins = [tuple(sorted(agent.get("origin", []))) for agent in registry.get("agents", []) if agent.get("status") == "prototype"]
except:
hybrid_origins = []
for (a, b), count in pair_counts.items():
if count >= threshold and (a, b) not in hybrid_origins:
return a, b
return None, None
def spawn_hybrid_agent(agent_a, agent_b, registry_path="agents_registry.json"):
import uuid
hybrid_name = f"Hybrid_{uuid.uuid4().hex[:6]}"
hybrid_description = f"Hybrid of {agent_a} and {agent_b}, designed through observed co-usage."
hybrid_icon = "🧬"
new_agent = {
"name": hybrid_name,
"description": hybrid_description,
"status": "prototype",
"origin": [agent_a, agent_b],
"icon": hybrid_icon
}
try:
with open(registry_path, "r", encoding="utf-8") as f:
data = json.load(f)
except:
data = {"agents": []}
data["agents"].append(new_agent)
with open(registry_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
return hybrid_name
pair_counts = Counter()
for entry in lines:
agents = sorted(entry.get("agents", []))
for a, b in itertools.combinations(agents, 2):
pair_counts[(a, b)] += 1
try:
with open(registry_path, "r", encoding="utf-8") as f:
registry = json.load(f)
hybrid_origins = [tuple(sorted(agent.get("origin", []))) for agent in registry.get("agents", []) if agent.get("status") == "prototype"]
except:
hybrid_origins = []
for (a, b), count in pair_counts.items():
if count >= threshold and (a, b) not in hybrid_origins:
return a, b
return None, None
# ---- Utility: Deprecate Stale Agents ----
def deprecate_low_usage_agents(log_path="swarm_log.jsonl", registry_path="agents_registry.json"):
usage_counter = defaultdict(int)
try:
with open(log_path, "r", encoding="utf-8") as f:
for line in f:
entry = json.loads(line)
for agent in entry.get("agents", []):
usage_counter[agent] += 1
if not os.path.exists(registry_path):
print("No registry found to update.")
return
with open(registry_path, 'r') as f:
registry = json.load(f)
modified = False
for agent in registry.get("agents", []):
if agent.get("status") == "active" and usage_counter[agent["name"]] < RETIREMENT_THRESHOLD:
agent["status"] = "deprecated"
modified = True
if modified:
with open(registry_path, 'w') as f:
json.dump(registry, f, indent=2)
print("Stale agents deprecated.")
else:
print("No agents met deprecation criteria.")
except Exception as e:
print(f"Error during agent deprecation: {e}")
# ---- Swarm Self-Assembly ----
def analyze_intent_and_select_swarm(user_input, registry_path="agents_registry.json"):
try:
with open(registry_path, 'r') as f:
registry = json.load(f)
except Exception as e:
return [], f"Failed to load registry: {str(e)}"
try:
available_agents = "\n".join([f"- {a['name']}: {a.get('description', '')}" for a in registry['agents'] if a.get('status') == 'active'])
messages = [
{"role": "system", "content": "You're a swarm selector. Given a task, you select a subset of agents best suited to it."},
{"role": "user", "content": f"Task: {user_input}\nAvailable agents:\n{available_agents}"},
]
response = anthropic_client.messages.create(
model=CLAUDE_MODEL,
max_tokens=256,
messages=messages
)
selected_names = []
for agent in registry['agents']:
if agent['name'].lower() in response.content[0].text.lower():
selected_names.append(agent['name'])
return selected_names, None
except Exception as e:
return [], f"Error selecting swarm: {str(e)}"
# ---- Swarm Execution & Aggregation ----
def route_to_swarm_and_aggregate(user_input, selected_agents, registry_path="agents_registry.json", log_path="swarm_log.jsonl"):
try:
with open(registry_path, 'r') as f:
registry = json.load(f)
except Exception as e:
return f"Failed to load registry: {str(e)}"
responses = []
for agent in registry['agents']:
if agent['name'] in selected_agents:
try:
resp = requests.post(
agent['endpoint'],
json={"input": user_input},
timeout=agent.get('timeout_seconds', 30)
)
resp.raise_for_status()
out = resp.json().get("output", "No output.")
responses.append(f"[{agent['name']}]\n{out}\n")
except Exception as e:
responses.append(f"[{agent['name']}] Error: {str(e)}")
# Log swarm usage
try:
with open(log_path, "a", encoding="utf-8") as logf:
json.dump({
"timestamp": datetime.utcnow().isoformat(),
"input": user_input,
"agents": selected_agents
}, logf)
logf.write("\n")
except:
pass
return "\n---\n".join(responses)
def fetch_registry():
# Load from local file first, fall back to remote if not found
local_registry = "agents_registry.json"
if os.path.exists(local_registry):
print("Loading agents from local registry file")
try:
with open(local_registry, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading local registry: {e}")
# Fall back to remote registry
remote_url = "https://huggingface.co/spaces/Agents-MCP-Hackathon/collective-intelligence-orchestrator/resolve/main/agents_registry.json"
print(f"Fetching agents from remote registry: {remote_url}")
try:
res = requests.get(remote_url, timeout=5)
res.raise_for_status()
return res.json()
except Exception as e:
print(f"Error fetching remote registry: {e}")
return None
def fetch_agent_yaml(space_url):
try:
res = requests.get(f"{space_url}/agent.yaml")
return yaml.safe_load(res.text)
except:
return None
def compute_agent_embedding(text):
# Handle None or empty input
if not text:
return 0
# Simulated embedding using a hash β€” replace with real embedding logic if desired
return int(hashlib.md5(text.encode()).hexdigest(), 16) % 10000
def discover_agents_from_registry():
registry = fetch_registry()
if not registry or 'agents' not in registry:
print("No valid registry data found")
return [], [], []
tools = []
cards = []
index = []
for agent in registry['agents']:
if not agent.get('status') == 'active':
continue
try:
# Use metadata from registry or fetch from agent if needed
name = agent.get('name', 'Unnamed Agent')
description = agent.get('description', 'No description available')
endpoint = agent.get('endpoint')
icon = agent.get('icon', '')
# Create tool function using the agent's endpoint
def tool_func(input_text, agent_endpoint=endpoint, agent_name=name):
try:
resp = requests.post(
agent_endpoint,
json={"input": input_text},
timeout=agent.get('timeout_seconds', 30)
)
resp.raise_for_status()
return resp.json().get("output", "No structured output.")
except Exception as e:
print(f"Error calling {agent_name} agent: {str(e)}")
return f"Error: Failed to call {agent_name} agent - {str(e)}"
# Compute embedding for semantic search
emb = compute_agent_embedding(f"{name} {description} {' '.join(agent.get('tags', []))}")
# Add to tools with all necessary metadata
tools.append((
name,
icon,
description,
tool_func,
emb,
agent.get('categories', []),
agent.get('capabilities', [])
))
# Update index for semantic search
index.append((emb, name))
# Add card for UI
cards.append((icon, name, description, agent.get('categories', [])))
except Exception as e:
print(f"Error processing agent {agent.get('name', 'unknown')}: {str(e)}")
continue
print(f"Successfully loaded {len(tools)} agents from registry")
return tools, cards, index
# ---- Heatmap Generator ----
def generate_swarm_heatmap(log_path="swarm_log.jsonl"):
from itertools import combinations
from collections import defaultdict
import pandas as pd
agent_pairs = defaultdict(int)
try:
with open(log_path, "r", encoding="utf-8") as f:
for line in f:
entry = json.loads(line)
agents = entry.get("agents", [])
for a, b in combinations(sorted(agents), 2):
agent_pairs[(a, b)] += 1
all_agents = sorted({agent for pair in agent_pairs for agent in pair})
matrix = pd.DataFrame(0, index=all_agents, columns=all_agents)
for (a, b), count in agent_pairs.items():
matrix.at[a, b] = count
matrix.at[b, a] = count
# Create heatmap figure
plt.figure(figsize=(8, 6))
sns.heatmap(matrix, annot=True, fmt="d", cmap="YlGnBu", linewidths=0.5)
plt.title("Swarm Agent Co-occurrence")
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format="png")
plt.close()
buf.seek(0)
return buf
except Exception as e:
print(f"Error generating heatmap: {e}")
return None
# ---- Agent Breeder ----
def get_top_swarm_pairs(log_path="swarm_log.jsonl", top_n=1):
counter = defaultdict(int)
with open(log_path, "r", encoding="utf-8") as f:
for line in f:
entry = json.loads(line)
agents = entry.get("agents", [])
for a, b in combinations(sorted(agents), 2):
counter[(a, b)] += 1
return sorted(counter.items(), key=lambda x: x[1], reverse=True)[:top_n]
def breed_hybrid_agent():
top_pairs = get_top_swarm_pairs()
if not top_pairs:
return "No agent pairs to breed."
(agent1, agent2), _ = top_pairs[0]
hybrid_name = f"Hybrid_{agent1.split('-')[-1]}_{agent2.split('-')[-1]}"
hybrid_prompt = f"This agent combines the capabilities of {agent1} and {agent2} to solve complex tasks."
hybrid_metadata = {
"name": hybrid_name,
"description": hybrid_prompt,
"origin": [agent1, agent2],
"status": "prototype",
"generated_at": datetime.utcnow().isoformat(),
"endpoint": f"https://huggingface.co/spaces/Agents-MCP-Hackathon/collective-intelligence-orchestrator/resolve/main/hybrids/{hybrid_name}/serve",
"icon": "🧬",
"categories": ["hybrid"],
"capabilities": ["emergent-analysis"]
}
os.makedirs(f"hybrids/{hybrid_name}", exist_ok=True)
# Save metadata
with open(f"hybrids/{hybrid_name}/{hybrid_name}.json", "w") as f:
json.dump(hybrid_metadata, f, indent=2)
# Generate agent.yaml
agent_yaml = {
"name": hybrid_name,
"description": hybrid_prompt,
"author": "Orchestrator",
"tags": ["hybrid", "generated"],
"capabilities": ["emergent-analysis"],
"timeout_seconds": 30
}
with open(f"hybrids/{hybrid_name}/agent.yaml", "w") as f:
yaml.dump(agent_yaml, f)
# Generate app.py
app_code = f"""import gradio as gr\n\ndef respond(input):\n return \"[Hybrid Agent: {hybrid_name}]\nResponding with insight from merged origins: {agent1} + {agent2}\"\n\niface = gr.Interface(fn=respond, inputs=\"text\", outputs=\"text\", title=\"{hybrid_name}\")\n\niface.launch(mcp_server=True)"""
with open(f"hybrids/{hybrid_name}/app.py", "w") as f:
f.write(app_code)
# Generate README
with open(f"hybrids/{hybrid_name}/README.md", "w") as f:
f.write(f"""# {hybrid_name}
This hybrid agent was auto-generated by combining:
- `{agent1}`
- `{agent2}`
## Description
{hybrid_prompt}
## Status
Prototype
Generated at {datetime.utcnow().isoformat()}
""")
# Auto-update registry
registry_path = "agents_registry.json"
try:
if os.path.exists(registry_path):
with open(registry_path, 'r') as f:
registry = json.load(f)
else:
registry = {"agents": []}
registry["agents"].append(hybrid_metadata)
with open(registry_path, 'w') as f:
json.dump(registry, f, indent=2)
except Exception as e:
print(f"Failed to update registry: {e}")
return f"Hybrid agent '{hybrid_name}' scaffolded and registered."
# ---- Claude Orchestrator ----
def match_agents_by_vector(input_text, tools, index):
if not tools or not index:
return []
input_emb = compute_agent_embedding(input_text)
# Calculate similarity scores for all agents
scored_agents = []
for i, (emb, name) in enumerate(index):
# Simple similarity based on vector distance
similarity = 1 / (1 + abs(emb - input_emb))
scored_agents.append((similarity, tools[i]))
# Sort by similarity score (descending)
scored_agents.sort(reverse=True, key=lambda x: x[0])
# Return top matching agents (above threshold or top 3)
threshold = 0.3 # Adjust based on your needs
return [agent for score, agent in scored_agents if score > threshold][:5] # Limit to top 5 matches
def claude_conductor(message, history, tools=None, index=None):
if tools is None:
tools = []
if index is None:
index = []
selected_tools = match_agents_by_vector(message, tools, index)
tools_description = "\n".join(
f"- {icon} {name}: {desc} (Categories: {', '.join(categories) if categories else 'None'})"
for name, icon, desc, _, _, categories, _ in selected_tools
) if selected_tools else "No relevant tools matched."
# Format the conversation history for Claude
conversation = []
for user_msg, bot_msg in history:
if user_msg:
conversation.append({"role": "user", "content": user_msg})
if bot_msg:
conversation.append({"role": "assistant", "content": bot_msg})
# Add the current user message
conversation.append({"role": "user", "content": message})
# Create system prompt
system_prompt = f"""You are the conductor of a Collective Intelligence Swarmβ€”a coordinated network of AI agents including both foundational agents and auto-generated hybrid prototypes.
Each agent specializes in real-world crisis domains (climate, public health, media monitoring, etc.) and has capabilities such as forecasting, summarization, cross-domain linking, or anomaly detection.
You must analyze the user's problem and determine which agents, or combination of agents, are best suited for the task.
Give preference to agents whose capabilities align with the user's request.
You may also draw on emergent-hybrid agents created from frequent co-occurrence patterns.
Here are the currently active tools:
{tools_description}
Respond clearly and concisely with your synthesis of the swarm’s outputs."""
# Load top co-occurring agents for swarm awareness
top_swarm_pairs = get_top_swarm_pairs(top_n=3)
co_usage_info = "\n".join(
f"- {a} + {b}: used together {count} times"
for (a, b), count in top_swarm_pairs
)
if co_usage_info:
system_prompt += f"\n\nHistorical synergy data:\n{co_usage_info}"
try:
# Call Claude API
response = anthropic_client.messages.create(
model=CLAUDE_MODEL,
max_tokens=1000,
system=system_prompt,
messages=conversation,
temperature=0.7
)
if response.content and len(response.content) > 0:
output_text = response.content[0].text
else:
output_text = "I couldn't generate a response. Please try again."
# Log swarm agent usage
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"input": message,
"agents": [name for name, _, _, _, _, _, _ in selected_tools],
"response": output_text
}
with open("swarm_log.jsonl", "a", encoding="utf-8") as log_file:
log_file.write(json.dumps(log_entry) + "\n")
return output_text
except Exception as e:
print(f"Error calling Claude API: {str(e)}")
return f"An error occurred while processing your request: {str(e)}"
def mcp_entry_point(input_text: str) -> str:
return claude_conductor(message=input_text, history=[], tools=tools, index=index)
# ---- Launch Gradio ChatInterface ----
if __name__ == "__main__":
print("Starting application...")
tools, cards, index = discover_agents_from_registry()
print(f"Discovered {len(tools)} tools, {len(cards)} cards, {len(index)} index entries")
if not tools:
print("WARNING: No tools discovered. The UI may not display correctly.")
with gr.Blocks(
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan"),
title="Collective Intelligence Orchestrator"
) as demo:
gr.Markdown("""
# 🧠 Collective Intelligence Orchestrator
_Activate a living swarm of AI agents._
Enter a real-world scenario (e.g., natural disaster, policy failure, humanitarian crisis), and let the orchestrator dynamically coordinate a swarm response using multiple autonomous MCP agents.
**Author**: [@dtka](https://huggingface.co/dtka)
**Project Docs**: [HF Repo README](https://huggingface.co/spaces/Agents-MCP-Hackathon/collective-intelligence-orchestrator/resolve/main/README.md)
**Hackathon**: [Hugging Face MCP Hackathon](https://huggingface.co/Agents-MCP-Hackathon)
""")
with gr.Tab("Chat with Swarm"):
with gr.Row():
with gr.Column(scale=1):
with gr.Accordion("Agents Details"):
gr.Markdown("### 🧩 Available Agents")
if not cards:
gr.Markdown("⚠️ No agents discovered. Please check agents_registry.json or try again later.")
for icon, name, desc, categories in cards:
categories_html = f"<br><span style='font-size: 0.8em; color: #666;'><i>Categories: {', '.join(categories) if categories else 'General'}</i></span>" if categories else ""
gr.Markdown(
f"<b>{icon} {name}</b><br>"
f"<span style='font-size: 0.9em;'>{desc}</span>"
f"{categories_html}",
render=True,
elem_id="agent-card"
)
with gr.Column(scale=2):
# Create the chat interface with explicit buttons
with gr.Row():
with gr.Column(scale=8):
# Chatbot display
chatbot = gr.Chatbot(
height=500,
show_copy_button=True,
show_label=False,
container=True,
bubble_full_width=True,
placeholder="Start a conversation...",
elem_id="chatbot"
)
# Input area with buttons
with gr.Row():
msg = gr.Textbox(
placeholder="Describe a crisis or scenario...",
container=False,
scale=8,
min_width=200,
show_label=False
)
submit_btn = gr.Button("Send to Swarm", variant="primary", scale=1)
stop_btn = gr.Button("Stop", variant="stop", scale=1, visible=False)
# Additional buttons
with gr.Row():
clear_btn = gr.Button("Clear Chat")
retry_btn = gr.Button("Retry")
# Format messages for the chat interface
def format_messages(history):
formatted = []
for user_msg, bot_msg in history:
if user_msg:
formatted.append((user_msg, None))
if bot_msg is not None:
if formatted and formatted[-1][1] is None:
formatted[-1] = (formatted[-1][0], bot_msg)
else:
formatted.append((None, bot_msg))
return formatted
# Set up button click handlers
def user(user_message, history):
if not user_message.strip():
return "", history
return "", history + [[user_message, None]]
def bot(history):
if not history or not history[-1][0]:
return history
# Get the current message and previous conversation
current_message = history[-1][0]
prev_messages = history[:-1]
# Format history for Claude
formatted_history = []
for user_msg, bot_msg in prev_messages:
if user_msg:
formatted_history.append((user_msg, bot_msg or ""))
# Get response from Claude
try:
bot_message = claude_conductor(current_message, formatted_history, tools, index)
history[-1][1] = bot_message
except Exception as e:
print(f"Error in bot response: {e}")
history[-1][1] = "Sorry, I encountered an error. Please try again."
return history
# Message submission handler
def process_message(user_message, history):
if not user_message.strip():
return "", history
history = history + [[user_message, None]]
return "", history
# Connect UI elements
submit_event = msg.submit(
process_message,
[msg, chatbot],
[msg, chatbot],
queue=False
).then(
bot,
chatbot,
chatbot
)
submit_btn.click(
process_message,
[msg, chatbot],
[msg, chatbot],
queue=False
).then(
bot,
chatbot,
chatbot
)
# Clear chat button
clear_btn.click(lambda: [], None, chatbot, queue=False)
# Retry button
def retry_last(history):
if not history:
return history
if history[-1][1] is not None:
history[-1][1] = None
return history
retry_btn.click(
retry_last,
chatbot,
chatbot,
queue=False
).then(
bot,
chatbot,
chatbot
)
# Add Heatmap Tab
with gr.Tab("Agent Swarm Heatmap"):
with gr.Row():
with gr.Column():
heatmap_btn = gr.Button("Show Heatmap")
heatmap_output = gr.Image(type="pil", label="Agent Interaction Heatmap")
def show_heatmap():
heatmap = generate_swarm_heatmap()
if heatmap is None:
return None
from PIL import Image
return Image.open(heatmap)
heatmap_btn.click(show_heatmap, outputs=heatmap_output)
gr.Markdown("""
### How to read this heatmap:
- The heatmap shows how often different agents interact together
- Darker cells indicate more frequent interactions between agents
- The diagonal shows self-interactions (typically not used)
- Use this to understand which agents work together most frequently
""")
# Add Evolution Dashboard Tab
with gr.Tab("🧬 Agent Evolution Dashboard"):
dashboard_md = gr.Markdown("""### Agent Registry Summary
Click **Refresh** to see the latest agent status.
Click **Retire Stale Agents** to deprecate unused tools.
""")
evolution_display = gr.Markdown("Loading...", elem_id="evolution_status")
refresh_btn = gr.Button("πŸ”„ Refresh Dashboard")
retire_btn = gr.Button("πŸ›‘ Retire Stale Agents")
def list_agents_by_status():
try:
with open("agents_registry.json", "r", encoding="utf-8") as f:
data = json.load(f)
active, prototype, deprecated = [], [], []
for agent in data.get("agents", []):
name = agent.get("name", "Unnamed")
status = agent.get("status", "unknown")
origin = ", ".join(agent.get("origin", [])) if "origin" in agent else "β€”"
icon = agent.get("icon", "")
card = f"- {icon} **{name}** (origin: {origin})"
if status == "active":
active.append(card)
elif status == "prototype":
prototype.append(card)
elif status == "deprecated":
deprecated.append(card)
def format_group(title, items):
header = f"### {title}\n"
content = "\n".join(items) if items else "_No agents found._"
return header + content
return (
format_group("🟒 Active Agents", active) + "\n\n"
+ format_group("πŸ§ͺ Prototypes (Hybrids)", prototype) + "\n\n"
+ format_group("πŸ›‘ Deprecated Agents", deprecated)
)
except Exception as e:
return f"Error loading registry: {e}"
# Spawn Hybrid Agent Button
spawn_btn = gr.Button("🧬 Spawn Hybrid Agent")
def handle_spawn_hybrid():
a, b = should_spawn_hybrid()
if a and b:
name = spawn_hybrid_agent(a, b)
return f"βœ… Spawned new hybrid: {name}\n\n" + list_agents_by_status()
else:
return "⚠️ No suitable agent pair found for hybridization.\n\n" + list_agents_by_status()
spawn_btn.click(fn=handle_spawn_hybrid, outputs=evolution_display)
# Refresh Button
refresh_btn.click(list_agents_by_status, outputs=evolution_display)
# Retire Button
retire_btn.click(
fn=lambda: (deprecate_low_usage_agents(), list_agents_by_status())[1],
outputs=evolution_display
)
list_agents_by_status()
# Add Documentation Tab
with gr.Tab("πŸ“š Documentation"):
def load_readme():
try:
with open('README.md', 'r', encoding='utf-8') as f:
content = f.read()
# Remove the frontmatter if it exists
if content.startswith('---'):
content = content.split('---', 2)[-1].strip()
return content
except Exception as e:
return f"# Error loading documentation\n\nCould not load README.md: {str(e)}"
gr.Markdown(load_readme())
mcp_interface = gr.Interface(
fn=mcp_entry_point,
inputs=gr.Textbox(label="User Input"),
outputs=gr.Textbox(label="Claude Orchestrator Response"),
title="Collective Intelligence MCP API",
description="Send a crisis query to swarm-coordinated Claude agents"
)
demo.launch(mcp_server=True)