MCP 协议详解:给 AI Agent 装上标准接口

2024 年底 Anthropic 发布了 MCP(Model Context Protocol),2025 年它迅速成为 AI 工程化的核心基础设施。Claude、Cursor、Windsurf 等主流工具全面支持,被称为 AI 集成的”USB-C 接口”。

为什么需要 MCP

在 MCP 之前,每个 AI 应用都要自己实现工具集成:

应用 A → 自定义 Prometheus 集成
应用 B → 自定义 Prometheus 集成(重复造轮子)
应用 C → 自定义 Prometheus 集成(再次重复)

MCP 统一了这个过程:

应用 A ─┐
应用 B ─┼→ MCP Client → MCP Server → Prometheus
应用 C ─┘

一次实现,所有支持 MCP 的 AI 工具都能复用。

核心架构

MCP 采用 Client-Server 架构:

┌─────────────────┐         ┌──────────────────┐
│   MCP Client    │◄───────►│   MCP Server     │
│  (Claude/Cursor)│  JSON-  │  (你实现的工具)   │
└─────────────────┘  RPC    └──────────────────┘

                              ┌──────┴──────┐
                              │  实际系统    │
                              │ (K8s/Prom)  │
                              └─────────────┘

MCP Server 暴露三类能力:

  • Tools:可执行的操作(执行命令、查询数据)
  • Resources:可读取的数据源(文件、数据库)
  • Prompts:预定义的提示模板

与 Function Calling 的区别

特性Function CallingMCP
标准化各家格式不同统一协议
复用性绑定特定模型跨模型通用
传输方式HTTP/APIstdio / SSE
工具发现手动定义自动发现

实战:构建 Kubernetes MCP Server

用 Python 实现一个能查询 K8s 集群状态的 MCP Server:

# k8s_mcp_server.py
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import subprocess
import json

server = Server("k8s-ops")

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="get_pods",
            description="获取指定命名空间的 Pod 列表和状态",
            inputSchema={
                "type": "object",
                "properties": {
                    "namespace": {
                        "type": "string",
                        "description": "命名空间,默认 default",
                        "default": "default"
                    }
                }
            }
        ),
        Tool(
            name="describe_pod",
            description="查看 Pod 详细信息和事件,用于故障排查",
            inputSchema={
                "type": "object",
                "properties": {
                    "pod_name": {"type": "string"},
                    "namespace": {"type": "string", "default": "default"}
                },
                "required": ["pod_name"]
            }
        ),
        Tool(
            name="get_pod_logs",
            description="获取 Pod 日志",
            inputSchema={
                "type": "object",
                "properties": {
                    "pod_name": {"type": "string"},
                    "namespace": {"type": "string", "default": "default"},
                    "lines": {"type": "integer", "default": 50}
                },
                "required": ["pod_name"]
            }
        ),
        Tool(
            name="get_node_status",
            description="查看集群节点状态和资源使用情况",
            inputSchema={"type": "object", "properties": {}}
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        if name == "get_pods":
            ns = arguments.get("namespace", "default")
            result = subprocess.run(
                ["kubectl", "get", "pods", "-n", ns, "-o", "wide"],
                capture_output=True, text=True, timeout=10
            )
            return [TextContent(type="text", text=result.stdout or result.stderr)]

        elif name == "describe_pod":
            pod = arguments["pod_name"]
            ns = arguments.get("namespace", "default")
            result = subprocess.run(
                ["kubectl", "describe", "pod", pod, "-n", ns],
                capture_output=True, text=True, timeout=10
            )
            return [TextContent(type="text", text=result.stdout[-4000:])]

        elif name == "get_pod_logs":
            pod = arguments["pod_name"]
            ns = arguments.get("namespace", "default")
            lines = arguments.get("lines", 50)
            result = subprocess.run(
                ["kubectl", "logs", pod, "-n", ns, f"--tail={lines}"],
                capture_output=True, text=True, timeout=10
            )
            return [TextContent(type="text", text=result.stdout or result.stderr)]

        elif name == "get_node_status":
            result = subprocess.run(
                ["kubectl", "get", "nodes", "-o", "wide"],
                capture_output=True, text=True, timeout=10
            )
            top = subprocess.run(
                ["kubectl", "top", "nodes"],
                capture_output=True, text=True, timeout=10
            )
            return [TextContent(type="text",
                text=f"节点状态:\n{result.stdout}\n\n资源使用:\n{top.stdout}")]

    except Exception as e:
        return [TextContent(type="text", text=f"执行失败:{str(e)}")]

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream,
                        server.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

配置到 Claude Desktop

// ~/Library/Application Support/Claude/claude_desktop_config.json
{
  "mcpServers": {
    "k8s-ops": {
      "command": "python",
      "args": ["/path/to/k8s_mcp_server.py"],
      "env": {
        "KUBECONFIG": "/path/to/.kube/config"
      }
    }
  }
}

配置后,你可以直接用自然语言操作集群:

“帮我查一下 production 命名空间里有没有 CrashLoopBackOff 的 Pod,如果有,看看它的日志是什么错误”

Claude 会自动调用 get_podsget_pod_logs,给出诊断结果。

安全注意事项

MCP Server 拥有执行命令的能力,生产环境需要:

  1. 只读权限:kubectl 使用只读 ServiceAccount,禁止 delete/patch 操作
  2. 命名空间隔离:限制只能访问特定命名空间
  3. 审计日志:记录所有工具调用
  4. 超时控制:防止长时间阻塞

MCP 正在重塑 AI 与系统集成的方式,运维工程师掌握 MCP Server 开发,就能把任何内部系统变成 AI 可操作的工具。

← 返回文章列表