mcp-server / app.py
albertvillanova's picture
Add MCP server
fc6208e verified
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict
app = FastAPI()
class MCPMessage(BaseModel):
session: str
id: str
command: str
args: Dict[str, str]
@app.post("/mcp")
async def mcp_endpoint(message: MCPMessage, request: Request):
print(f"[MCP Server] Received: {message.command} | Args: {message.args}")
if message.command == "mcp-negotiate":
return {
"id": message.id,
"status": "ok",
"message": "Negotiation successful",
"session": message.session,
"version": message.args.get("version", "1.0"),
}
elif message.command == "mcp-message":
return {
"id": message.id,
"status": "ok",
"message": f"Delivered to {message.args.get('to')}",
"echo": message.args.get("body", ""),
}
elif message.command == "mcp-echo-upper":
original = message.args.get("body", "")
return {
"id": message.id,
"status": "ok",
"message": "Uppercase echo",
"original": original,
"upper": original.upper()
}
return JSONResponse(status_code=400, content={
"id": message.id,
"status": "error",
"message": f"Unknown command: {message.command}"
})
@app.get("/")
def root():
return {"message": "MCP Test Server is running"}