0x00 前言
上半年公司业务开始逐步支持MCP,方案评审的时候要求业务上线的前提是必须支持认证,但是早期MCP更多的使用场景是用在本地工具上,官方也没有给出远程MCP服务的标准认证方案,所以花了些时间研究MCP认证鉴权的方案,期间官方也在积极推动相关的安全认证标准。
目前官方给出的安全认证方案采用了OAuth2方案,但是个人觉得这个标准方案对于普通开发者有点重了,如果没有成熟的SDK,最终搞下来在认证上面花费的学习成本和开发成本,要比开发一个MCP工具本身的成本高得多。当然如果要提供大量的MCP服务或者要把业务接口MCP化,开发OAuth2的投入就不会显得那么高了,因为OAuth2的认证支持是一次性研发投入。
目前市面上已经支持认证的MCP项目大多采用的是相对简单的API Key或者Token认证方案(后面统称API Key方案)。
这篇文章我们先聊一聊简单的API Key认证方案,如何实现,以及其优点和不足之处。MCP官方提出的OAuth2方案相对复杂得多,我们下一篇文章再讲。
0x01 MCP认证鉴权
模型上下文协议(Model Context Protocol, MCP)作为一种旨在标准化大语言模型(LLM)与外部工具、数据和服务交互的规范,正受到越来越多开发者和企业的关注。它的出现,让AI模型不再是孤立的“大脑”,而是能够通过统一的协议栈,高效地调用外部能力,完成更复杂、更贴近现实世界的任务。从智能客服、代码生成到数据分析、自动化流程,MCP的潜力几乎覆盖了所有需要AI与外部世界互动的场景。
然而,当我们将业务系统接入MCP,或者基于MCP构建新的服务时,一个不可回避的核心问题便是:认证与鉴权。任何暴露API的服务都需要考虑安全,MCP的端点同样面临着被未授权访问、滥用以及攻击的风险。如果缺乏有效的认证机制,恶意攻击者可能轻易窃取敏感数据、调用付费工具造成经济损失,甚至篡改模型行为,引发更严重的安全事件。因此,为MCP服务设计和实施一套合适的认证鉴权方案,是保障业务安全、数据可信、服务稳定的基石,也是每一位MCP开发者必须要掌握的技能。
在MCP的语境下,认证(Authentication)主要解决“你是谁?”的问题,即验证请求发起方的身份。鉴权(Authorization)则解决“你能做什么?”的问题,即在身份验证通过后,判断该身份是否有权限执行所请求的操作或访问所请求的资源。
一个典型的MCP交互流程中,认证鉴权环节通常发生在客户端向MCP服务端发起请求时。服务端需要:
-
验证客户端身份:通过某种凭证(如API Key、Bearer Token等)来确认客户端的合法性。 -
确定权限范围:根据客户端身份或其提供的令牌中包含的信息,确定其被允许的操作集合(例如,可以调用哪些工具、可以访问哪些数据资源、是否有权修改数据等)。 -
执行或拒绝请求:如果身份验证通过且请求在其权限范围内,则执行请求;否则,拒绝请求并返回相应的错误信息。
0x02 MCP API Key认证鉴权流程
API Key是一种简单直接的认证方式,也是目前大部分支持MCP认证的项目采用的方案。
API Key本质上是一个唯一的字符串,由服务端生成并分配给合法的客户端应用程序。客户端在每次请求访问受保护的MCP资源或工具时,都需要在请求中携带这个API Key。服务端接收到请求后,会验证API Key的有效性,包括是否存在、是否过期、是否被吊销等。只有通过验证的请求才会被处理,否则将被拒绝。
这种认证方式的核心思想是“你知道我知道的秘密”,即客户端和服务端共享同一个密钥。其简单性使得它在需要快速集成和轻量级认证的场景下被普遍采用。
API Key的认证流程通常包含以下几个步骤:
-
密钥生成与分发:
-
秘钥生成:管理员或自动化系统在服务端为每个授权的客户端或用户生成一个唯一的、足够复杂的API Key。密钥的生成应采用安全的随机数生成算法,确保其不可预测性。 -
安全存储:服务端需要安全地存储API Key及其关联的客户端信息、权限范围、有效期等。通常使用哈希加盐的方式存储密钥,避免明文泄露风险。如果不想在服务端管理生成的秘钥以及关联的其他上下文信息,也可以在生成密钥的时候采用JWT Token的方式生成,将身份及上下文信息一并写入JWT参数中。 -
安全分发:生成的API Key需要通过安全渠道分发给客户端开发者或应用程序。例如,通过HTTPS加密的开发者平台、安全的邮件通知等。
-
请求头(Header):将API Key放在自定义的HTTP请求头中,例如 Authorization: Bearer YOUR_API_KEY
或X-API-Key: YOUR_API_KEY
。这是推荐的方式,因为这样会将认证信息与URL和请求体分离。 -
URL参数(Query Parameter):将API Key作为URL查询参数的一部分,例如 https://api.example.com/mcp/resource?apiKey=YOUR_API_KEY
。这种方式相对不安全,因为API Key会暴露在URL中,容易被记录在服务器日志、浏览器历史或通过Referer头泄露。 -
请求体(Request Body):对于POST等请求,可以将API Key放在请求体中。这种方式相对URL参数安全,但不如请求头通用。
-
客户端在发起MCP请求时,需要将获取到的API Key包含在请求中。常见的携带方式有:
-
是否存在:密钥是否存在于已授权列表中。 -
状态:密钥是否已被禁用或吊销。 -
有效期:密钥是否在有效期内。 -
权限:密钥是否具有访问所请求资源的权限(如果实现了基于API Key的权限控制)。
-
MCP服务端接收到客户端请求后,首先从请求中提取API Key。 -
服务端查询其内部存储(或者解码验证JWT Token),验证API Key的有效性:
-
如果API Key验证通过,服务端继续处理客户端的MCP请求,执行相应的工具调用或资源访问。 -
如果API Key验证失败(无效、过期、权限不足等),服务端应拒绝请求,并返回相应的HTTP错误状态码(如 401 Unauthorized
或403 Forbidden
)和错误信息。
0x03 API Key 认证鉴权实现
如果对MCP的传输协议不了解,建议先看下我之前写的《MCP传输协议解读:从SSE到Streamable HTTP》做一个基本的了解。无论是SSE还是Streamable HTTP,都是基于HTTP之上的。
我们先从一个无认证鉴权的例子开始,一步步来看如何实现认证和鉴权。
1. 原始无认证鉴权Demo
这里直接给出代码,不展开讲解MCP Server和Client的具体实现过程,这类讲基础的文章太多了,没有了解的话自行搜索学习即可。
Server端代码:
import argparse
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from mcp.server import Server
import logging
import uvicorn
# 定义服务器名称
MCP_SERVER_NAME = "math-mcp-sse"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
@mcp.tool()
def add(a: float, b: float) -> float:
"""
Add two numbers.
Parameters:
- a (float): First number (required)
- b (float): Second number (required)
Returns:
- float: The result of a + b
"""
return a + b
@mcp.tool()
def subtract(a: float, b: float) -> float:
"""
Subtract two numbers.
Parameters:
- a (float): The number to subtract from (required)
- b (float): The number to subtract (required)
Returns:
- float: The result of a - b
"""
return a - b
# 创建 Starlette 应用
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""Create a Starlette application that can serve the provided mcp server with SSE."""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
if __name__ == "__main__":
mcp_server = mcp._mcp_server
# 解析命令行参数
parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=18081, help='Port to listen on')
args = parser.parse_args()
# 创建并运行 Starlette 应用
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)
这个mcp server实现了两个工具,加法计算和减法计算器。
Client端代码:
import json
import os
import sys
from typing import List
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
class MCPClient:
def __init__(self, model_name: str, base_url: str, api_key: str, server_urls: List[str]):
"""
初始化 MCP 客户端,连接 OpenAI 接口。
:param model_name: 使用的模型名称,例如 "DeepSeek-chat"。
:param base_url: OpenAI 接口的基础地址,例如 "https://api.deepseek.com/v1"。
:param api_key: OpenAI API 密钥,用于身份验证。
:param server_urls: SSE 服务地址列表,用于连接多个服务器。
"""
self.model_name = model_name
self.server_urls = server_urls
self.sessions = {} # 存储每个服务器的会话及其上下文:server_id -> (session, session_context, streams_context)
self.tool_mapping = {} # 工具映射:prefixed_name -> (session, original_tool_name)
# 初始化 OpenAI 异步客户端
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
async def initialize_sessions(self):
"""
初始化与所有 SSE 服务器的连接,并获取可用工具列表。
"""
for i, server_url in enumerate(self.server_urls):
server_id = f"server{i}"# 为每个服务器生成唯一标识符
# 创建 SSE 客户端并进入上下文
streams_context = sse_client(url=server_url)
streams = await streams_context.__aenter__()
session_context = ClientSession(*streams)
session = await session_context.__aenter__()
await session.initialize()
# 存储会话及其上下文
self.sessions[server_id] = (session, session_context, streams_context)
# 获取工具列表并建立映射
response = await session.list_tools()
for tool in response.tools:
prefixed_name = f"{server_id}_{tool.name}"# 为工具名添加服务器前缀
self.tool_mapping[prefixed_name] = (session, tool.name)
print(f"已连接到 {server_url},工具列表:{[tool.name for tool in response.tools]}")
async def cleanup(self):
"""
清理所有会话和连接资源,确保无资源泄漏。
"""
for server_id, (session, session_context, streams_context) in self.sessions.items():
await session_context.__aexit__(None, None, None) # 退出会话上下文
await streams_context.__aexit__(None, None, None) # 退出 SSE 流上下文
print("所有会话已清理。")
async def process_query(self, query: str) -> str:
"""
处理用户的自然语言查询,通过工具调用完成任务并返回结果。
:param query: 用户输入的查询字符串。
:return: 处理后的回复文本。
"""
messages = [{"role": "user", "content": query}] # 初始化消息列表
# 收集所有可用工具
available_tools = []
for server_id, (session, _, _) in self.sessions.items():
response = await session.list_tools()
for tool in response.tools:
prefixed_name = f"{server_id}_{tool.name}"
available_tools.append({
"type": "function",
"function": {
"name": prefixed_name,
"description": tool.description,
"parameters": tool.inputSchema,
},
})
# 向模型发送初始请求
response = await self.client.chat.completions.create(
model=self.model_name,
messages=messages,
tools=available_tools,
)
final_text = [] # 存储最终回复内容
message = response.choices[0].message
final_text.append(message.content or "") # 添加模型的初始回复
# 处理工具调用
while message.tool_calls:
for tool_call in message.tool_calls:
prefixed_name = tool_call.function.name
if prefixed_name in self.tool_mapping:
session, original_tool_name = self.tool_mapping[prefixed_name]
tool_args = json.loads(tool_call.function.arguments)
try:
result = await session.call_tool(original_tool_name, tool_args)
except Exception as e:
result = {"content": f"调用工具 {original_tool_name} 出错:{str(e)}"}
print(result["content"])
final_text.append(f"[调用工具 {prefixed_name} 参数: {tool_args}]")
final_text.append(f"工具结果: {result.content}")
messages.extend([
{
"role": "assistant",
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {"name": prefixed_name, "arguments": json.dumps(tool_args)},
}],
},
{"role": "tool", "tool_call_id": tool_call.id, "content": str(result.content)},
])
else:
print(f"工具 {prefixed_name} 未找到")
final_text.append(f"工具 {prefixed_name} 未找到")
# 获取工具调用后的后续回复
response = await self.client.chat.completions.create(
model=self.model_name,
messages=messages,
tools=available_tools,
)
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return"n".join(final_text)
async def chat_loop(self):
"""
启动命令行交互式对话循环,接受用户输入并显示回复。
"""
print("nMCP 客户端已启动,输入你的问题,输入 'quit' 退出。")
while True:
try:
query = input("n问题: ").strip()
if query.lower() == "quit":
break
response = await self.process_query(query)
print("n" + response)
except Exception as e:
print(f"n发生错误: {str(e)}")
async def main():
"""
程序入口,设置配置并启动 MCP 客户端。
"""
# 从环境变量获取配置
model_name = os.getenv("MODEL_NAME")
base_url = os.getenv("BASE_URL")
api_key = os.getenv("API_KEY")
if not api_key:
print("未设置 API_KEY 环境变量。")
sys.exit(1)
# 定义 SSE 服务器地址列表
server_urls = ["http://localhost:18081/sse"]
# 创建并运行客户端
client = MCPClient(model_name=model_name, base_url=base_url, api_key=api_key, server_urls=server_urls)
try:
await client.initialize_sessions()
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
运行效果:


2.增加认证实现
既然API Key认证凭证携带的最佳方式是基于HTTP请求头来传递,那么只要在实现MCP Client和Server端的时候,框架能够支持我们自定义请求头参数进行传递和接收就可以了。
Server端代码:
import argparse
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from mcp.server import Server
import logging
import uvicorn
# 定义服务器名称
MCP_SERVER_NAME = "math-mcp-sse"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
@mcp.tool()
def add(a: float, b: float):
"""
Add two numbers.
Parameters:
- a (float): First number (required)
- b (float): Second number (required)
Returns:
- The result of a + b
"""
return a + b
@mcp.tool()
def subtract(a: float, b: float):
"""
Subtract two numbers.
Parameters:
- a (float): The number to subtract from (required)
- b (float): The number to subtract (required)
Returns:
- The result of a - b
"""
return a - b
class AuthMiddleware(BaseHTTPMiddleware):
"""
中间件,用于校验请求头中的 Authorization。
"""
async def dispatch(self, request, call_next):
# 从请求头中提取 Authorization
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
# 提取 token 并校验
token = auth_header.split("Bearer ")[1]
valid_api_key = "key1"# 这里可以替换为实际的 API 密钥
if token != valid_api_key:
return JSONResponse({"error": "Invalid API key"}, status_code=401)
# 如果校验通过,继续处理请求
return await call_next(request)
# 创建 Starlette 应用
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""Create a Starlette application that can serve the provided mcp server with SSE."""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
app = Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
# 添加认证中间件
app.add_middleware(AuthMiddleware)
return app
if __name__ == "__main__":
mcp_server = mcp._mcp_server
# 解析命令行参数
parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=18080, help='Port to listen on')
args = parser.parse_args()
# 创建并运行 Starlette 应用
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)
上面的代码相对于原始代码主要改动了两处,一是在创建Starlette实例之后,添加了一个用于认证的中间件类:
app = Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
# 添加认证中间件
app.add_middleware(AuthMiddleware)
return app
二是增加了认证中间件的实现:
class AuthMiddleware(BaseHTTPMiddleware):
"""
中间件,用于校验请求头中的 Authorization。
"""
async def dispatch(self, request, call_next):
# 从请求头中提取 Authorization
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
# 提取 token 并校验
token = auth_header.split("Bearer ")[1]
valid_api_key = "key1"# 这里可以替换为实际的 API 密钥
if token != valid_api_key:
return JSONResponse({"error": "Invalid API key"}, status_code=401)
# 如果校验通过,继续处理请求
return await call_next(request)
Client端代码只需要改动一行,就是在创建sse client的时候加入headers参数,把认证请求头加进去。
streams_context = sse_client(url=server_url, headers={"Authorization":"Bearer key1"})
我们先运行添加认证逻辑后的Server端clac_mcp_server_auth.py,Client端依然使用之前不带认证信息的版本。运行结果如下,提示401 Unauthorized:



启动加入了认证支持的Client端,认证通过正常运行:


3.增加鉴权实现
前面我们实现了认证逻辑,所有通过认证的用户都可以使用MCP Server提供的所有工具,也就是说大家的权限是相同的。如果想要做更精细的权限控制,限制某些用户只能使用一部分工具,对用户权限进行区分,应该如何实现?
要满足这种业务需求有两种实现思路,一种思路是每个工具都单独拆分为独立的MCP Server,之后将Server跟API Key进行权限关系映射,来管理用户权限。
比如权限映射关系可以设计成这样:
API_KEY_PERMISSIONS = {
"api key1": ["server1", "server2"],
"api key2": ["server1"],
"api key3": ["server2"],
}
这种实现方式的缺点是,如果你提供的工具很多,那么就需要拆分出很多Server,不仅每个Server要占用一个端口,用户在Client端也要重复配置很多Server。非常繁琐。
另外一种思路则是在Server内部实现单个工具粒度的权限校验。
由于我们使用@mcp.tool()创建的工具无法直接传入request对象,所以我们需要借助contextvars来实现。
Server端代码:
import argparse
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from mcp.server import Server
import logging
import uvicorn
import contextvars
request_context = contextvars.ContextVar("request_context")
# 定义 API Key 和权限映射
API_KEY_PERMISSIONS = {
"key1": ["add", "subtract"],
"key2": ["add"],
"key3": ["subtract"],
}
def check_permissions( tool_name: str) -> bool:
auth_token = request_context.get().get("auth_token")
permissions = API_KEY_PERMISSIONS.get(auth_token, [])
if tool_name not in permissions:
raise ValueError(f"Permission denied for tool: {tool_name}")
return True
class AuthMiddleware(BaseHTTPMiddleware):
"""
中间件,用于校验请求头中的 Authorization。
"""
async def dispatch(self, request, call_next):
# 从请求头中提取 Authorization
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
# 提取 token 并校验
token = auth_header.split("Bearer ")[1]
if token not in API_KEY_PERMISSIONS:
return JSONResponse({"error": "Invalid API key"}, status_code=401)
# 将请求上下文存储到全局变量中
request_context.set({"auth_token": token})
# 如果校验通过,继续处理请求
return await call_next(request)
# 定义服务器名称
MCP_SERVER_NAME = "math-mcp-sse"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
@mcp.tool()
def add(a: float, b: float):
"""
Add two numbers.
Parameters:
- a (float): First number (required)
- b (float): Second number (required)
Returns:
- The result of a + b
"""
# 检查权限
check_permissions("add")
return a + b
@mcp.tool()
def subtract(a: float, b: float):
"""
Subtract two numbers.
Parameters:
- a (float): The number to subtract from (required)
- b (float): The number to subtract (required)
Returns:
- The result of a - b
"""
# 检查权限
check_permissions("subtract")
return a - b
# 创建 Starlette 应用
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
"""Create a Starlette application that can serve the provided mcp server with SSE."""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
app = Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
# 添加认证中间件
app.add_middleware(AuthMiddleware)
return app
if __name__ == "__main__":
mcp_server = mcp._mcp_server
# 解析命令行参数
parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=18081, help='Port to listen on')
args = parser.parse_args()
# 创建并运行 Starlette 应用
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=args.host, port=args.port)
对比前面仅支持认证的代码,主要增加了以下功能和变化:
1、使用contextvars模块来存储请求的上下文信息,在异步环境中安全地传递请求相关数据。
import contextvars
request_context = contextvars.ContextVar("request_context")
2、创建了权限映射字典,不同API密钥有不同的权限集,添加了check_permissions函数用于在工具函数调用时验证权限。
# 定义 API Key 和权限映射
API_KEY_PERMISSIONS = {
"key1": ["add", "subtract"],
"key2": ["add"],
"key3": ["subtract"],
}
def check_permissions(tool_name: str) -> bool:
auth_token = request_context.get().get("auth_token")
permissions = API_KEY_PERMISSIONS.get(auth_token, [])
if tool_name not in permissions:
raise ValueError(f"Permission denied for tool: {tool_name}")
return True
3、认证中间件根据权限映射检查密钥是否存在,认证通过后将令牌存储在请求上下文中。
# 验证API密钥并存储到上下文中
token = auth_header.split("Bearer ")[1]
if token not in API_KEY_PERMISSIONS:
return JSONResponse({"error": "Invalid API key"}, status_code=401)
request_context.set({"auth_token": token})
4、在每个工具函数执行前先调用check_permissions检查当前用户是否有权限执行该操作。
@mcp.tool()
def add(a: float, b: float):
# 检查权限
check_permissions("add")
return a + b
Client端代码和之前一样在header中携带认证信息即可。
我们在上面代码中定义的权限为:
# 定义 API Key 和权限映射
API_KEY_PERMISSIONS = {
"key1": ["add", "subtract"],
"key2": ["add"],
"key3": ["subtract"],
}
key1拥有两个工具的权限,key2只拥有add工具的权限,key3只拥有subtract工具权限。我们使用key2测试一下,可以看到认证是成功的:

使用add工具鉴权通过执行成功,执行substract工具鉴权失败:

至此我们完成了MCP服务认证鉴权的API Key方案实现。
0x04 API Key认证的优点和不足
优点
-
简单易用:实现和集成相对简单快捷,客户端只需在请求中添加一个密钥即可,服务端验证逻辑也比较直接。 -
无状态:API Key本身可以包含足够的信息(或通过查询关联信息)进行认证,服务端不需要维护复杂的会话状态,便于水平扩展。 -
性能开销小:相比于更复杂的认证机制(如OAuth2的完整流程),API Key的验证通常更快,对系统性能影响较小。
不足
-
密钥泄露风险高:API Key是静态的,一旦泄露,攻击者就可以冒充合法客户端访问服务。常见的泄露途径包括:客户端代码硬编码、不安全的存储、通过不安全的信道传输(如HTTP)、日志记录、版本控制系统(比如Github)提交等。 -
吊销管理复杂:如果一个API Key被多个实例或设备共享,一旦需要吊销,会影响所有使用该密钥的实例。精细化的吊销和轮换机制需要额外开发。 -
重放攻击风险:如果API Key在不安全的HTTP连接中传输,容易被中间人截获并用于重放攻击。 -
难以追踪用户行为:如果多个用户或设备共享同一个API Key,服务端难以区分具体是哪个用户发起的请求,不利于审计和行为分析。
0x05 API Key认证的安全注意事项与最佳实践
在MCP业务场景中使用API Key认证时,应遵循以下安全最佳实践:
-
使用HTTPS:所有MCP通信,特别是包含API Key的请求,都必须通过HTTPS进行加密传输,防止密钥在传输过程中被窃听。 -
不在URL中传递API Key:优先选择将API Key放在HTTP请求头中(如 Authorization: Bearer YOUR_API_KEY
或X-API-Key: YOUR_API_KEY
)。避免将其作为URL参数传递,以减少在日志、浏览器历史等地方的暴露风险。 -
密钥的最小权限原则:为每个API Key分配最小必需的权限。如果MCP Server提供多种工具或资源,应确保API Key只能访问其被授权的部分。 -
密钥的保密性:
-
客户端:不要将API Key硬编码在客户端代码中,尤其是前端代码。应通过安全的方式(如环境变量、安全的配置文件、密钥管理服务)进行存储和加载。 -
服务端:服务端存储API Key时,应使用强哈希算法(如SHA-256或更强)并加盐存储,而不是明文存储。
总而言之,API Key提供了一种便捷的认证方式,但在安全性方面存在固有风险。在MCP的实践中,如果选择API Key方案,务必严格遵守安全最佳实践,以最大限度地降低潜在风险。对于敏感业务或需要复杂权限管理的场景,建议采用官方标准里的OAuth2认证授权机制。
0x06 参考资料
-
Cloudflare Docs Model Context Protocol (MCP) Authorization: https://developers.cloudflare.com/Agents/model-context-protocol/authorization -
解读contextvars的代码: https://blog.csdn.net/xuukai/article/details/147366975 -
python中如何使用contextvars模块: https://www.yisu.com/jc/810869.html -
MCP Official Specification (Authorization): https://modelcontextprotocol.io/specification/2025-03-26/basic/authorization -
Auth0 Blog – Introduction to MCP and Authorization: https://auth0.com/blog/an-introduction-to-mcp-and-authorization/ -
HA Model Context Protocol Server:https://www.home-assistant.io/integrations/mcp_server#access-control -
高德地图 MCP Server:https://lbs.amap.com/api/mcp-server/gettingstarted
本文作者:唐银@涂鸦智能安全实验室
漏洞悬赏计划:涂鸦智能安全响应中心(https://src.tuya.com)欢迎白帽子来探索。