阿里百炼MCP对接
<p>[TOC]</p>
<h2>Java 对接</h2>
<ul>
<li>需要idk17或者jdk17以上</li>
<li>此代码支持SSE 和 StreamableHttp 协议</li>
</ul>
<h3>引入 maven 包,包的版本跟随spring-ai-alibaba-bom,可自行调整</h3>
<pre><code class="language-xml">&lt;dependencyManagement&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-dependencies&lt;/artifactId&gt;
&lt;version&gt;${spring.boot.version}&lt;/version&gt;
&lt;type&gt;pom&lt;/type&gt;
&lt;scope&gt;import&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.alibaba.cloud.ai&lt;/groupId&gt;
&lt;artifactId&gt;spring-ai-alibaba-bom&lt;/artifactId&gt;
&lt;version&gt;1.0.0.2&lt;/version&gt;
&lt;type&gt;pom&lt;/type&gt;
&lt;scope&gt;import&lt;/scope&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;/dependencyManagement&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.alibaba.cloud.ai&lt;/groupId&gt;
&lt;artifactId&gt;spring-ai-alibaba-starter-dashscope&lt;/artifactId&gt;
&lt;/dependency&gt;</code></pre>
<h3>Java Demo</h3>
<pre><code class="language-java">import java.net.http.HttpRequest;
import java.util.Map;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
public class BailianSseClient {
public static void main(String[] args) {
String baseUrl = &quot;https://dashscope.aliyuncs.com/api/v1/mcps/*****&quot;;//从百炼平台复制的url
String apiKey = &quot;Bearer api-key&quot;;//创建的自己的api key
HttpRequest.Builder request = HttpRequest.newBuilder().header(&quot;Authorization&quot;,apiKey)
.header(&quot;Content-Type&quot;,&quot;application/json&quot;);
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(baseUrl)
.requestBuilder(request)
.build();
//初始化
McpSyncClient client = McpClient.sync(transport).build();
client.initialize();
client.ping();
// 列出并展示可用的工具
McpSchema.ListToolsResult toolsList = client.listTools();
System.out.println(&quot;可用工具 = &quot; + toolsList);
//调用工具
McpSchema.CallToolResult weatherForecastResult = client.callTool(new McpSchema.CallToolRequest(&quot;get_ip_info&quot;,
Map.of(&quot;ip&quot;, &quot;127.0.0.1&quot;)));
System.out.println(&quot;返回结果: &quot; + weatherForecastResult.content());
client.closeGracefully();
}
}</code></pre>
<h2>Python 对接</h2>
<ul>
<li>此代码支持SSE 和 StreamableHttp 协议</li>
</ul>
<h3>安装依赖</h3>
<pre><code class="language-python">pip install asyncio mcp</code></pre>
<h3>Python Demo</h3>
<h4>SSE 协议</h4>
<pre><code class="language-python">import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
async def _run_session(read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# 所有的tools
tools = await session.list_tools()
print(tools)
# 调用tool
arguments = {
&#039;ip&#039;: &#039;127.0.0.1&#039;
}
results = await session.call_tool(name=&#039;get_ip_info&#039;, arguments=arguments)
print(results)
return results
async def _main(url: str, headers: dict):
async with sse_client(
url=url,
headers=headers
) as (read_stream, write_stream):
await _run_session(read_stream, write_stream)
if __name__ == &#039;__main__&#039;:
# 百炼创建的apikey
api_key = &#039;Bearer api-key&#039;
# 从百炼中复制的url
base_url = &#039;https://dashscope.aliyuncs.com/api/v1/mcps/****&#039;
headers = {
&quot;Authorization&quot;: api_key
}
asyncio.run(_main(url=base_url, headers=headers))</code></pre>
<h4>StreamableHttp 协议</h4>
<pre><code class="language-python">import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
async def _run_session(read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# 所有的tools
tools = await session.list_tools()
print(tools)
# 调用tool
arguments = {
&#039;ip&#039;: &#039;127.0.0.1&#039;
}
results = await session.call_tool(name=&#039;get_ip_info&#039;, arguments=arguments)
print(results)
return results
async def _main( url: str, headers: dict):
async with streamablehttp_client(
url=url,
headers=headers
) as (read_stream, write_stream, get_session_id):
await _run_session(read_stream, write_stream)
if __name__ == &#039;__main__&#039;:
# 百炼创建的apikey
api_key = &#039;Bearer api-key&#039;
# 从百炼中复制的url
base_url = &#039;https://dashscope.aliyuncs.com/api/v1/mcps/****&#039;
headers = {
&quot;Authorization&quot;: api_key
}
asyncio.run(_main(url=base_url, headers=headers))</code></pre>