序言
行业痛点
在电商客服场景中,过去的人工客服系统常见的问题可以概括为三类:一是流程不一致导致“同问不同答”,尤其在退款审批、异常升级、跨部门协作等分支复杂的场景,话术与动作很难长期保持统一;二是扩展与治理能力不足,知识常停留在文档或坐席话术层,缺乏版本、审批、灰度与审计机制,造成变更不可控、问题难以回溯;三是效率与体验的结构性矛盾,高峰期响应不稳、夜间与跨时区覆盖不充分,且人工转接链路时常不顺畅,导致一通解率与满意度承压,这些在大促或售后集中期尤为明显。
破局之道
在引入 AI 大模型后,智能体具备更强的自然语言理解与生成能力,能够对用户多样化表述进行语义归一,并以更贴近用户意图的话术输出结果,从而显著降低“问不懂、答不准”的摩擦;同时,大模型可以把复杂问题分解为多步指令,驱动工具逐步完成查询、校验、审批与记录等动作,帮助从“会答问题”升级为“会做事情”;更重要的是,通过把 SOP 显式化为可调用工具,大模型的“自由生成”被约束在企业流程边界内,既保持体验,又能将高风险决策交由规则与工具落地,达成“智能与合规”的平衡。
MCP(Model Context Protocol)的优势在于它提供了模型与外部工具/资源之间的标准化通信契约:工具的功能、参数与返回被结构化定义,模型可以发现、选择并调用这些工具;在客服场景中,这意味着把 SOP 变成“可发现、可调用、可组合”的原子能力,支持通用的“问—判类—取 SOP—执行—回写—留痕”闭环;同时,MCP 的会话化与流式能力让工具调用与结果反馈具备良好的交互体验,且天然适配多步任务编排;对工程团队而言,标准协议降低了对接成本,便于把已有系统(订单、账单、物流、CRM)逐步纳入“工具层”,形成可控的企业级能力底座。
落地基石
Amazon AgentCore 的价值在于把上述能力落到生产级运行时:其 Runtime 托管让团队无需自建网关与容器基础设施,即可获得镜像构建、部署、身份与网络治理的“即开即用”体验;通过与容器仓库、构建服务和参数存储的打通,AgentCore 支持云端构建 ARM64 镜像、自动创建/复用镜像仓库与执行角色,并将已部署的智能体/工具以 ARN 的方式被参数化管理,客户端与代理只需读取参数即可完成多环境发现与切换,极大降低运维复杂度与变更风险;在安全方面,Runtime 支持基于签名的鉴权接入,使 MCP 流式通道在企业 IAM 体系内安全运行;在工程体验上,工具模板与脚手架让团队可在本地完成最小可行验证,再一键推送到云端,以一致的方式管理从概念到生产的演进过程。
业务背景
在经历了多家电商客服体系的落地之后,我们发现退款、账单、物流、保修这些高一致性环节最容易出现“同问不同答”的体验割裂,原因不在于知识是否存在,而在于流程分支与审批约束没有以“可执行的机制”绑定到应答链路;当 SOP 仅停留在文档层,话术与动作很难实现灰度发布和审计闭环,这会在高峰流量和跨团队协作时迅速放大风险。
因此,我们将目标定义为:把 SOP 抽象为可调用的“工具契约”,在 AgentCore Runtime 中以容器方式统一托管,并通过 MCP 协议暴露给智能体及多触点前端(网页坐席、IM、IVR);在此基础上实现工具可发现、调用可鉴权、链路可观测、版本可治理,让客服“能用、好用、可控”。
总体架构
在架构层面有4个模块:
- MCP Server 负责实现 SOP 工具与命中逻辑;
- AgentCore Runtime 承载镜像构建、部署、身份与网络;
- MCP 客户端负责通过 SigV4 与运行时建立可靠的流式 HTTP 通道;
- SSM Parameter Store 持久化 Agent ARN 以支持多环境无侵入发现与切换,这让开发、预发、生产的切换可控而透明。
请求从 Agent 或外部客户端发起,经 SigV4 签名后进入 AgentCore 的入口网关,再路由给运行中的 MCP Server 执行具体 SOP 工具;服务端返回包含 SOP Category, SOP Key, SOP Content 等内容的结构化结果,这既便于智能体策略分支,也让整个链路进入可观测与审计体系,便于回溯与优化。
代码分析
customer_service_sop_server.py
from mcp.server.fastmcp import FastMCP
from starlette.responses import JSONResponse
import json
import os
import logging
from typing import Dict, List, Optional
# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP(host="0.0.0.0", stateless_http=True)
# In-memory SOP database (in a real scenario, this would be in a database)
# We're using a simple dictionary for demo purposes
sop_database = {
"billing": {
"refund_policy": "Customers are eligible for refunds within 30 days of purchase with valid receipt. Process through the billing system with approval for amounts over $50.",
"payment_methods": "We accept Visa, MasterCard, American Express, and PayPal. For corporate accounts, we also accept wire transfers with NET-30 terms.",
"subscription_cancellation": "To cancel a subscription, verify customer identity, process in billing portal, and send confirmation email. Offer retention options for high-value accounts."
},
"technical_support": {
"password_reset": "Verify customer identity using security questions. Generate temporary password and send to registered email. Follow up after 24 hours if needed.",
"account_lockout": "Check lockout reason in security logs. For suspicious activity, escalate to security team. For failed login attempts, verify identity and unlock with 24-hour monitoring.",
"system_outage": "Acknowledge the issue, check status dashboard, inform about estimated recovery time. Create incident ticket if not already created. Update customers every 30 minutes."
},
"product": {
"warranty_information": "Standard warranty is 12 months from purchase date. Premium products have 24-month coverage. Warranty covers manufacturing defects but not physical damage.",
"returns_process": "Returns accepted within 14 days with original packaging. Issue RMA number, provide return shipping label, process refund within 5 business days of receiving returned item.",
"product_compatibility": "Check product compatibility matrix in knowledge base. For custom setups, consult with technical specialist before providing definitive answer."
}
}
@mcp.tool()
def get_sop_for_question(question: str) -> Dict:
"""
Retrieves the appropriate Standard Operating Procedure (SOP) based on customer's question.
Args:
question (str): The customer's question or issue description
Returns:
Dict: A dictionary containing the SOP information
"""
logger.info(f"Received question: {question}")
# Convert question to lowercase for case-insensitive matching
question_lower = question.lower()
# Define keywords for each category
keywords = {
"billing": ["bill", "payment", "refund", "charge", "subscription", "cancel", "price"],
"technical_support": ["password", "reset", "locked", "outage", "down", "error", "bug", "login"],
"product": ["warranty", "return", "compatibility", "feature", "specification", "damaged"]
}
# Find matching categories based on keywords
matching_categories = {}
for category, words in keywords.items():
score = sum(1 for word in words if word in question_lower)
if score > 0:
matching_categories[category] = score
# If no matches found, return generic response
if not matching_categories:
logger.info("No matching SOP found, returning generic response")
return {
"category": "general",
"sop_content": "I don't have a specific SOP for this query. Please gather the customer's specific issue details and transfer to the appropriate department."
}
# Get the category with highest match score
best_category = max(matching_categories.items(), key=lambda x: x[1])[0]
logger.info(f"Best matching category: {best_category}")
# Find the best matching SOP in the category
sops_in_category = sop_database[best_category]
best_sop = None
best_score = 0
for sop_key, sop_content in sops_in_category.items():
# Calculate simple relevance score by counting keyword occurrences
score = sum(1 for word in sop_key.split("_") if word in question_lower)
if score > best_score:
best_score = score
best_sop = sop_key
# If we couldn't determine a specific SOP, return all SOPs for the category
if not best_sop or best_score == 0:
logger.info(f"No specific SOP found in category, returning all SOPs for {best_category}")
return {
"category": best_category,
"sop_list": list(sops_in_category.keys()),
"sop_content": "Multiple SOPs may apply to this query. Please select the most relevant one."
}
# Return the best matching SOP
logger.info(f"Best matching SOP: {best_sop}")
return {
"category": best_category,
"sop_key": best_sop,
"sop_content": sops_in_category[best_sop]
}
@mcp.tool()
def get_specific_sop(category: str, sop_key: str) -> Dict:
"""
Retrieves a specific SOP by category and key.
Args:
category (str): The SOP category (e.g., "billing", "technical_support", "product")
sop_key (str): The specific SOP key within the category
Returns:
Dict: A dictionary containing the SOP information
"""
logger.info(f"Retrieving specific SOP: {category}/{sop_key}")
if category not in sop_database:
logger.warning(f"Category {category} not found")
return {
"error": "Category not found",
"available_categories": list(sop_database.keys())
}
if sop_key not in sop_database[category]:
logger.warning(f"SOP key {sop_key} not found in category {category}")
return {
"error": "SOP key not found",
"available_sops": list(sop_database[category].keys())
}
return {
"category": category,
"sop_key": sop_key,
"sop_content": sop_database[category][sop_key]
}
@mcp.tool()
def list_sop_categories() -> List[str]:
"""
Lists all available SOP categories.
Returns:
List[str]: A list of available SOP categories
"""
logger.info("Listing all SOP categories")
return list(sop_database.keys())
@mcp.tool()
def add_sop_entry(category: str, sop_key: str, sop_content: str) -> Dict:
"""
Adds a new SOP entry to the database.
Args:
category (str): The SOP category
sop_key (str): The specific SOP key
sop_content (str): The SOP content/instructions
Returns:
Dict: A dictionary with the result of the operation
"""
logger.info(f"Adding new SOP entry: {category}/{sop_key}")
# Create category if it doesn't exist
if category not in sop_database:
sop_database[category] = {}
# Add or update the SOP
sop_database[category][sop_key] = sop_content
return {
"status": "success",
"message": f"Added SOP entry: {category}/{sop_key}",
"category": category,
"sop_key": sop_key
}
if __name__ == "__main__":
logger.info("Starting Customer Service SOP MCP server on port 8000")
mcp.run(transport="streamable-http")
get_sop_for_question(question: str) → Dict
- 职责定位:面向自然语言问题的“智能分流与命中”,根据用户描述自动判定所属 SOP 类别,并尽量给出最相关的具体 SOP。
- 输入参数:
- question:用户的问题或故障描述,任意自然语言字符串。
- 核心逻辑:
- 将问题小写化以做不区分大小写的匹配。
- 维护类别到关键词的映射(billing、technical_support、product),统计每类关键词在问题中的命中次数作为打分。
- 选择得分最高的类别作为 best_category。
- 在该类别内,按 sop_key 中由下划线分隔的词与问题的重合度做二次打分,选出 best_sop。
- 若无类别命中,返回通用兜底答复;若命中类别但无法确定具体 SOP,则返回该类所有 SOP 列表作为安全回退。
- 典型返回:
- 命中具体 SOP 时:{ category, sop_key, sop_content }。
- 仅命中类别时:{ category, sop_list, sop_content: “Multiple SOPs may apply…” }。
- 无类别命中时:{ category: “general”, sop_content: “I don’t have a specific SOP…” }。
get_specific_sop(category: str, sop_key: str) → Dict
- 职责定位:面向已知类别与键的“精确读取”,确保拿到权威 SOP 内容。
- 输入参数:
- category:SOP 类别(如 “billing”、”technical_support”、”product”)。
- sop_key:该类别下的具体 SOP 键(如下划线命名的 “refund_policy” 等)。
- 核心逻辑:
- 校验类别是否存在,不存在则返回错误与 available_categories。
- 校验 sop_key 是否存在,不存在则返回错误与 available_sops。
- 命中则返回该 SOP 的内容。
- 典型返回:
- 成功:{ category, sop_key, sop_content }。
- 失败:{ error: “Category not found”, available_categories } 或 { error: “SOP key not found”, available_sops }。
list_sop_categories() → List[str]
- 职责定位:边界发现与 UI 引导,向调用方暴露当前可用的 SOP 类别集合。
- 输入参数:无。
- 核心逻辑:直接返回内存数据库的一级键集合。
- 返回:类别名称列表(如 [“billing”, “technical_support”, “product”])。
add_sop_entry(category: str, sop_key: str, sop_content: str) → Dict
- 职责定位:SOP 的在线增量维护,支持应急补录与小步快跑的灰度。
- 输入参数:
- category:要写入的类别(不存在将自动创建)。
- sop_key:要写入的具体 SOP 键。
- sop_content:SOP 文本内容/操作说明。
- 核心逻辑:
- 若类别不存在则创建空字典。
- 以 sop_key 为键写入或更新 SOP 内容。
- 典型返回:
- { status: “success”, message, category, sop_key }。
streamable_http_sigv4.py (来源:https://github.com/awslabs/amazon-bedrock-agentcore-samples/blob/7e0c819f90c5f2ee165a0f403db200b8b0477a11/01-tutorials/01-AgentCore-runtime/02-hosting-MCP-server/streamable_http_sigv4.py)
"""
StreamableHTTP Client Transport with AWS SigV4 Signing
This module extends the MCP StreamableHTTPTransport to add AWS SigV4 request signing
for authentication with MCP servers that authenticate using AWS IAM.
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Generator
import httpx
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from mcp.client.streamable_http import (
GetSessionIdCallback,
StreamableHTTPTransport,
streamablehttp_client,
)
from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client
from mcp.shared.message import SessionMessage
class SigV4HTTPXAuth(httpx.Auth):
"""HTTPX Auth class that signs requests with AWS SigV4."""
def __init__(
self,
credentials: Credentials,
service: str,
region: str,
):
self.credentials = credentials
self.service = service
self.region = region
self.signer = SigV4Auth(credentials, service, region)
def auth_flow(
self, request: httpx.Request
) -> Generator[httpx.Request, httpx.Response, None]:
"""Signs the request with SigV4 and adds the signature to the request headers."""
# Create an AWS request
headers = dict(request.headers)
# Header 'connection' = 'keep-alive' is not used in calculating the request
# signature on the server-side, and results in a signature mismatch if included
headers.pop("connection", None) # Remove if present, ignore if not
aws_request = AWSRequest(
method=request.method,
url=str(request.url),
data=request.content,
headers=headers,
)
# Sign the request with SigV4
self.signer.add_auth(aws_request)
# Add the signature header to the original request
request.headers.update(dict(aws_request.headers))
yield request
class StreamableHTTPTransportWithSigV4(StreamableHTTPTransport):
"""
Streamable HTTP client transport with AWS SigV4 signing support.
This transport enables communication with MCP servers that authenticate using AWS IAM,
such as servers behind a Lambda function URL or API Gateway.
"""
def __init__(
self,
url: str,
credentials: Credentials,
service: str,
region: str,
headers: dict[str, str] | None = None,
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
) -> None:
"""Initialize the StreamableHTTP transport with SigV4 signing.
Args:
url: The endpoint URL.
credentials: AWS credentials for signing.
service: AWS service name (e.g., 'lambda').
region: AWS region (e.g., 'us-east-1').
headers: Optional headers to include in requests.
timeout: HTTP timeout for regular operations.
sse_read_timeout: Timeout for SSE read operations.
"""
# Initialize parent class with SigV4 auth handler
super().__init__(
url=url,
headers=headers,
timeout=timeout,
sse_read_timeout=sse_read_timeout,
auth=SigV4HTTPXAuth(credentials, service, region),
)
self.credentials = credentials
self.service = service
self.region = region
@asynccontextmanager
async def streamablehttp_client_with_sigv4(
url: str,
credentials: Credentials,
service: str,
region: str,
headers: dict[str, str] | None = None,
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
terminate_on_close: bool = True,
httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
) -> AsyncGenerator[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback,
],
None,
]:
"""
Client transport for Streamable HTTP with SigV4 auth.
This transport enables communication with MCP servers that authenticate using AWS IAM,
such as servers behind a Lambda function URL or API Gateway.
Yields:
Tuple containing:
- read_stream: Stream for reading messages from the server
- write_stream: Stream for sending messages to the server
- get_session_id_callback: Function to retrieve the current session ID
"""
async with streamablehttp_client(
url=url,
headers=headers,
timeout=timeout,
sse_read_timeout=sse_read_timeout,
terminate_on_close=terminate_on_close,
httpx_client_factory=httpx_client_factory,
auth=SigV4HTTPXAuth(credentials, service, region),
) as result:
yield result
requirements.txt
mcp>=1.10.0
boto3
bedrock-agentcore<=0.1.5
bedrock-agentcore-starter-toolkit==0.1.14
invoke_customer_service_sop_tools.py
import asyncio
import sys
import os
import logging
import boto3
from boto3.session import Session
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from streamable_http_sigv4 import streamablehttp_client_with_sigv4
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def create_streamable_http_transport_sigv4(
mcp_url: str, service_name: str, region: str
):
"""
Create a streamable HTTP transport with AWS SigV4 authentication.
This function creates an MCP client transport that uses AWS Signature Version 4 (SigV4)
to authenticate requests. This is necessary because standard MCP clients don't natively
support AWS IAM authentication, and this bridges that gap.
Args:
mcp_url (str): The URL of the MCP gateway endpoint
service_name (str): The AWS service name for SigV4 signing (typically "bedrock-agentcore")
region (str): The AWS region where the gateway is deployed
Returns:
StreamableHTTPTransportWithSigV4: A transport instance configured for SigV4 auth
"""
# Get AWS credentials from the current boto3 session
# These credentials will be used to sign requests with SigV4
session = boto3.Session()
credentials = session.get_credentials()
# Create and return the custom transport with SigV4 signing capability
return streamablehttp_client_with_sigv4(
url=mcp_url,
credentials=credentials,
service=service_name,
region=region,
)
def get_full_tools_list(client):
"""
Retrieve the complete list of tools from an MCP client, handling pagination.
MCP servers may return tools in paginated responses. This function handles the
pagination automatically and returns all available tools in a single list.
Args:
client: An MCP client instance (from strands.tools.mcp.mcp_client.MCPClient)
Returns:
list: A complete list of all tools available from the MCP server
"""
more_tools = True
tools = []
pagination_token = None
# Loop until we've fetched all pages
while more_tools:
tmp_tools = client.list_tools_sync(pagination_token=pagination_token)
tools.extend(tmp_tools)
# Check if there are more pages to fetch
if tmp_tools.pagination_token is None:
# No more pages - we're done
more_tools = False
else:
# More pages exist - prepare to fetch the next one
more_tools = True
pagination_token = tmp_tools.pagination_token
return tools
async def main():
boto_session = Session()
region = boto_session.region_name
print(f"Using AWS region: {region}")
# Check if SSM parameter name is provided as command line argument
if len(sys.argv) > 1:
param_name = sys.argv[1]
else:
param_name = "/mcp_server/customer_service_sop/agent_arn"
print(f"Using SSM parameter: {param_name}")
ssm_client = boto3.client("ssm", region_name=region)
agent_arn_response = ssm_client.get_parameter(
Name=param_name
)
agent_arn = agent_arn_response["Parameter"]["Value"]
print(f"Retrieved Agent ARN: {agent_arn}")
if not agent_arn:
print("❌ Error: Agent ARN not found")
sys.exit(1)
encoded_arn = agent_arn.replace(":", "%3A").replace("/", "%2F")
mcp_url = f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
try:
async with create_streamable_http_transport_sigv4(
mcp_url=mcp_url, service_name="bedrock-agentcore", region=region
) as (
read_stream,
write_stream,
_,
):
async with ClientSession(read_stream, write_stream) as session:
print("\n🔄 Initializing MCP session...")
await session.initialize()
print("✓ MCP session initialized")
print("\n🔄 Listing available tools...")
tool_result = await session.list_tools()
print("\n📋 Available Customer Service SOP Tools:")
print("=" * 50)
for tool in tool_result.tools:
print(f"🔧 {tool.name}: {tool.description}")
print("\n🧪 Testing Customer Service SOP Tools:")
print("=" * 50)
# Test 1: List SOP categories
try:
print("\n📊 Testing list_sop_categories()...")
categories_result = await session.call_tool(
name="list_sop_categories", arguments={}
)
print(f" Result: {categories_result.content[0].text}")
except Exception as e:
print(f" Error: {e}")
# Test 2: Get SOP for a billing question
try:
print("\n💳 Testing get_sop_for_question(billing question)...")
question = "I want to know the refund policy for my recent purchase"
billing_result = await session.call_tool(
name="get_sop_for_question", arguments={"question": question}
)
print(f" Question: {question}")
print(f" Result: {billing_result.content[0].text}")
except Exception as e:
print(f" Error: {e}")
# Test 3: Get SOP for a technical support question
try:
print("\n🔒 Testing get_sop_for_question(technical question)...")
question = "My account is locked and I can't log in"
tech_result = await session.call_tool(
name="get_sop_for_question", arguments={"question": question}
)
print(f" Question: {question}")
print(f" Result: {tech_result.content[0].text}")
except Exception as e:
print(f" Error: {e}")
# Test 4: Get specific SOP by category and key
try:
print("\n📝 Testing get_specific_sop()...")
category = "product"
sop_key = "warranty_information"
specific_result = await session.call_tool(
name="get_specific_sop",
arguments={"category": category, "sop_key": sop_key}
)
print(f" Category: {category}, SOP Key: {sop_key}")
print(f" Result: {specific_result.content[0].text}")
except Exception as e:
print(f" Error: {e}")
# Test 5: Add a new SOP entry
try:
print("\n➕ Testing add_sop_entry()...")
new_sop_result = await session.call_tool(
name="add_sop_entry",
arguments={
"category": "shipping",
"sop_key": "international_shipping",
"sop_content": "For international shipping inquiries, please check that the destination country is on our approved list. Standard shipping takes 7-14 business days. Express shipping (where available) takes 3-5 business days. All international orders over $100 receive free standard shipping."
}
)
print(f" Result: {new_sop_result.content[0].text}")
# Verify the newly added SOP by retrieving it
print("\n🔍 Verifying newly added SOP...")
verify_result = await session.call_tool(
name="get_specific_sop",
arguments={"category": "shipping", "sop_key": "international_shipping"}
)
print(f" Result: {verify_result.content[0].text}")
except Exception as e:
print(f" Error: {e}")
print("\n✅ Customer Service SOP tool testing completed!")
except Exception as e:
print(f"❌ Error connecting to MCP server: {e}")
import traceback
print("\n🔍 Full error traceback:")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
主要功能
- SigV4 签名:create_streamable_http_transport_sigv4 基于 boto3 会话获取临时凭证,构造带 SigV4 签名的 Streamable HTTP 传输,弥补标准 MCP 客户端不支持 IAM 鉴权的缺口。
- 运行时发现:从 SSM Parameter Store 读取 Agent ARN,做 URL 编码后拼接出 Runtime 的 MCP 网关调用 URL,避免硬编码并支持多环境切换。
- 会话与工具枚举:使用 ClientSession 初始化 MCP 会话并调用 list_tools 输出可用工具清单,作为自检与观测入口。
- 端到端回归:依次调用 list_sop_categories、get_sop_for_question(账单与技术支持两类示例)、get_specific_sop(精确读取)、add_sop_entry(新增)并再次 get_specific_sop 验证写后读,覆盖核心正向与回退路径。
流程步骤
- 读取当前 AWS 区域与 SSM 参数名(可从命令行覆盖,默认 /mcp_server/customer_service_sop/agent_arn),拉取 Agent ARN 并进行 URL 编码,拼出 https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT 的调用入口。
- 通过 create_streamable_http_transport_sigv4 建立带签名的流式通道,进入 ClientSession 上下文,先 initialize,再 list_tools 并输出 name/description,确认工具已被网关识别与暴露。
- 测试用例:
- list_sop_categories:校验类别边界暴露是否正确。
- get_sop_for_question(“I want to know the refund policy…”):验证 billing 语义归类与具体 SOP 命中。
- get_sop_for_question(“My account is locked and I can’t log in”):验证 technical_support 归类与命中。
- get_specific_sop(“product”,”warranty_information”):验证精确读取与存在性校验路径。
- add_sop_entry(“shipping”,”international_shipping”, “…”):新增后再用 get_specific_sop 读取,验证写后读一致性。
- 异常处理:对每个测试块 try/except 打印错误;外层捕获连接异常时输出 traceback 并退出非零状态,便于在 CI 中快速定位问题。
customer_service_sop_server.ipynb
!pip install -U -r requirements.txt
from bedrock_agentcore_starter_toolkit import Runtime
from bedrock_agentcore_starter_toolkit.operations.runtime import destroy_bedrock_agentcore
from boto3.session import Session
from pathlib import Path
import os
boto_session = Session()
region = boto_session.region_name
agentcore_control_client = boto_session.client("bedrock-agentcore-control", region_name=region)
ssm_client = boto_session.client('ssm', region_name=region)
tool_name = "customer_service_sop"
print(f"Using AWS region: {region}")
required_files = ["customer_service_sop_server.py", "requirements.txt"]
for file in required_files:
if not os.path.exists(file):
raise FileNotFoundError(f"Required file {file} not found")
print("All required files found ✓")
agentcore_runtime = Runtime()
print("Configuring AgentCore Runtime...")
response = agentcore_runtime.configure(
entrypoint="customer_service_sop_server.py",
auto_create_execution_role=True,
auto_create_ecr=True,
requirements_file="requirements.txt",
region=region,
protocol="MCP",
agent_name=tool_name,
)
print("Configuration completed ✓")
print("Launching Customer Service SOP MCP server to AgentCore Runtime...")
print("This may take several minutes...")
launch_result = agentcore_runtime.launch()
print("Launch completed ✓")
print(f"Agent ARN: {launch_result.agent_arn}")
print(f"Agent ID: {launch_result.agent_id}")
agent_arn_response = ssm_client.put_parameter(
Name='/mcp_server/customer_service_sop/agent_arn',
Value=launch_result.agent_arn,
Type='String',
Description='Agent ARN for Customer Service SOP MCP server',
Overwrite=True
)
print("✓ Agent ARN stored in Parameter Store")
print("\nConfiguration stored successfully!")
print(f"Agent ARN: {launch_result.agent_arn}")
!python invoke_customer_service_sop_tools.py
功能概述
- 依赖准备与区域解析:安装依赖后,读取当前会话的 AWS 区域,用于后续资源创建与 API 调用的一致性。
- 运行时配置生成:通过 Starter Toolkit 的 Runtime 封装,基于入口文件 customer_service_sop_server.py、requirements.txt、协议类型 MCP 和 agent 名称 customer_service_sop,自动完成运行时配置(含镜像构建配置、执行角色与 ECR 仓库的“按需自动创建”开关)。
- 云端构建与托管发布:调用 launch 触发云端构建与部署,将容器镜像推送到 ECR,并在 AgentCore Runtime 中创建可调用的 Runtime Agent 与 Endpoint,返回可被调用的 Agent ARN/Agent ID。
- 配置注册与环境发现:把返回的 Agent ARN 写入 SSM Parameter Store(/mcp_server/customer_service_sop/agent_arn),作为“单一事实来源”,供客户端或上层智能体在开发/预发/生产环境无侵入发现与切换。
- 部署后验证:在同一流程末尾直接运行 invoke_customer_service_sop_tools.py,基于前述参数自动发现 ARN,构造带签名的流式 HTTP 调用,枚举工具并依次执行 list_sop_categories、get_sop_for_question、get_specific_sop、add_sop_entry 等回归用例,验证写后读一致性与服务可用性。
步骤拆解
- 配置阶段(configure):
- entrypoint 指向 MCP Server 代码文件;
- auto_create_execution_role/ECR 开启后自动准备所需 IAM 角色与镜像仓库;
- protocol 设置为 MCP,确保以 MCP Server 形态托管;
- agent_name 用于命名与资源标识。
- 发布阶段(launch):触发 CodeBuild 云端构建(无需本地安装容器引擎),完成镜像推送与 Runtime 部署,返回 Agent ARN/ID。
- 参数落盘(put_parameter):将 Agent ARN 写入 SSM,以字符串形式覆盖存储,便于不同环境/团队一致读取。
- 一键验证:直接运行随项目提供的远程测试脚本。
运行结果
python invoke_customer_service_sop_tools.py
Using AWS region: us-west-2
Using SSM parameter: /mcp_server/customer_service_sop/agent_arn
2025-10-28 17:05:33,819 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
Retrieved Agent ARN: arn:aws:bedrock-agentcore:us-west-2:310850127430:runtime/customer_service_sop-QIAcnREytM
2025-10-28 17:05:35,223 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
🔄 Initializing MCP session...
2025-10-28 17:05:36,845 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
2025-10-28 17:05:36,846 - mcp.client.streamable_http - INFO - Received session ID: 59915786-4fca-4c1c-9c76-f7190d420eb6
2025-10-28 17:05:36,847 - mcp.client.streamable_http - INFO - Negotiated protocol version: 2025-06-18
✓ MCP session initialized
🔄 Listing available tools...
2025-10-28 17:05:37,992 - httpx - INFO - HTTP Request: GET https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 404 Not Found"
2025-10-28 17:05:38,105 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 202 Accepted"
2025-10-28 17:05:39,362 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
📋 Available Customer Service SOP Tools:
🔧 get_sop_for_question:
Retrieves the appropriate Standard Operating Procedure (SOP) based on customer's question.
Args:
question (str): The customer's question or issue description
Returns:
Dict: A dictionary containing the SOP information
🔧 get_specific_sop:
Retrieves a specific SOP by category and key.
Args:
category (str): The SOP category (e.g., "billing", "technical_support", "product")
sop_key (str): The specific SOP key within the category
Returns:
Dict: A dictionary containing the SOP information
🔧 list_sop_categories:
Lists all available SOP categories.
Returns:
List[str]: A list of available SOP categories
🔧 add_sop_entry:
Adds a new SOP entry to the database.
Args:
category (str): The SOP category
sop_key (str): The specific SOP key
sop_content (str): The SOP content/instructions
Returns:
Dict: A dictionary with the result of the operation
🧪 Testing Customer Service SOP Tools:
📊 Testing list_sop_categories()...
2025-10-28 17:05:40,567 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
Result: billing
💳 Testing get_sop_for_question(billing question)...
2025-10-28 17:05:41,857 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
Question: I want to know the refund policy for my recent purchase
Result: {
"category": "billing",
"sop_key": "refund_policy",
"sop_content": "Customers are eligible for refunds within 30 days of purchase with valid receipt. Process through the billing system with approval for amounts over $50."
}
🔒 Testing get_sop_for_question(technical question)...
2025-10-28 17:05:43,219 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
Question: My account is locked and I can't log in
Result: {
"category": "technical_support",
"sop_key": "account_lockout",
"sop_content": "Check lockout reason in security logs. For suspicious activity, escalate to security team. For failed login attempts, verify identity and unlock with 24-hour monitoring."
}
📝 Testing get_specific_sop()...
2025-10-28 17:05:44,559 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
Category: product, SOP Key: warranty_information
Result: {
"category": "product",
"sop_key": "warranty_information",
"sop_content": "Standard warranty is 12 months from purchase date. Premium products have 24-month coverage. Warranty covers manufacturing defects but not physical damage."
}
➕ Testing add_sop_entry()...
2025-10-28 17:05:45,837 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
Result: {
"status": "success",
"message": "Added SOP entry: shipping/international_shipping",
"category": "shipping",
"sop_key": "international_shipping"
}
🔍 Verifying newly added SOP...
2025-10-28 17:05:47,157 - httpx - INFO - HTTP Request: POST https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A310850127430%3Aruntime%2Fcustomer_service_sop-QIAcnREytM/invocations?qualifier=DEFAULT "HTTP/1.1 200 OK"
Result: {
"category": "shipping",
"sop_key": "international_shipping",
"sop_content": "For international shipping inquiries, please check that the destination country is on our approved list. Standard shipping takes 7-14 business days. Express shipping (where available) takes 3-5 business days. All international orders over $100 receive free standard shipping."
}
✅ Customer Service SOP tool testing completed!
运行结果表明:远程部署在 AgentCore Runtime 的 SOP MCP Server 已成功通过 SigV4 流式通道建立会话、完成工具枚举,并顺利跑通“查类→判类→精确读取→新增→写后读验证”的端到端调用,行为与预期一致,具备可用性与基本稳定性。
会话与协议
- 会话建立成功:初始化阶段返回 200,获得会话 ID 59915786-…,并协商到协议版本 2025-06-18。
工具枚举与契约
- 工具清单完整:get_sop_for_question、get_specific_sop、list_sop_categories、add_sop_entry 全量可见且带有人类可读的描述与参数契约,满足“可发现、可自描述”的要求,便于智能体正确选用。
端到端功能验证
- 列类别边界:list_sop_categories 返回 billing(日志仅展示一项,实际实现通常返回多类,输出受打印或分页影响),证明基本元数据可读。
- 判类与命中(账单):get_sop_for_question 针对退款问题正确归类为 billing,并命中 refund_policy,返回 SOP 内容,判类与具体 SOP 命中逻辑有效。
- 判类与命中(技术支持):针对账号锁定问题归类 technical_support,并命中 account_lockout,返回正确 SOP 指引,进一步验证关键词打分与二次匹配策略可用。
- 精确读取:get_specific_sop(“product”,”warranty_information”) 返回保修信息,精确读取与存在性路径正常。
- 新增与写后读:add_sop_entry 写入 shipping/international_shipping 返回 success,随后用 get_specific_sop 读取到同条目且内容一致
AWS控制台监控
最佳实践
为了让类似的客服智能体在企业级规模下稳定、可控、高效运行,建议在设计和落地阶段遵循以下原则:
- 工具即契约:每个 SOP 工具都应具备清晰的输入参数、输出结构与错误返回约定。模型通过工具契约发现与调用,而非依赖模板话术或硬编码逻辑。
- 小步快跑与灰度演进:通过 add_sop_entry 等接口支持在线增量维护,结合 AgentCore Runtime 多版本托管与参数化控制,实现工具化灰度发布和安全回滚。
- 安全与合规边界:SigV4 签名让调用严格受控于企业 IAM 体系,可结合 CloudTrail 与 CloudWatch Logs 形成可追溯审计;敏感操作须经审批工具链或人工确认。
- 配置即发现:通过 SSM Parameter Store 统一管理 Agent ARN 与运行时参数,实现开发、预发、生产环境的无侵入切换,减少人为配置错误。
- 可观测与回溯:为每次 MCP 调用记录请求上下文、匹配类别、SOP Key 及返回内容,以支持后续的优化、归因和知识沉淀;异常路径务必具备自诊能力。
- 标准化模板复用:结合 AgentCore Starter Toolkit 的 Runtime 模板与脚手架,统一研发体验,从概念到生产均沿用一致的构建与部署方式,降低团队沟通成本。
结语
在这篇博客中,我们通过将 SOP 流程抽象为符合 MCP 标准的可执行工具,并借助 Amazon AgentCore 提供的安全、托管、自动化 Runtime 环境,电商客服系统得以从“话术知识库”跃升为“智能执行中枢”。这样不仅让客服智能体在多轮交互中保持一致、准确、合规,也使企业能以更小的代价实现流程治理、灰度控制与全链路可观测。未来,当 SOP 模块与 CRM、OMS、ERP 系统进一步打通,AgentCore 将成为连接 AI 能力与企业应用生态的关键粘合层,推动客服智能体系加速升级、持续进化。
*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。
本篇作者