聚美智数


阿里百炼MCP对接

<p>[TOC]</p> <h2>Java 对接</h2> <ul> <li>需要idk17或者jdk17以上</li> <li>此代码支持SSE 和 StreamableHttp 协议</li> </ul> <h3>引入 maven 包,包的版本跟随spring-ai-alibaba-bom,可自行调整</h3> <pre><code class="language-xml">&amp;lt;dependencyManagement&amp;gt; &amp;lt;dependencies&amp;gt; &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;org.springframework.boot&amp;lt;/groupId&amp;gt; &amp;lt;artifactId&amp;gt;spring-boot-dependencies&amp;lt;/artifactId&amp;gt; &amp;lt;version&amp;gt;${spring.boot.version}&amp;lt;/version&amp;gt; &amp;lt;type&amp;gt;pom&amp;lt;/type&amp;gt; &amp;lt;scope&amp;gt;import&amp;lt;/scope&amp;gt; &amp;lt;/dependency&amp;gt; &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;com.alibaba.cloud.ai&amp;lt;/groupId&amp;gt; &amp;lt;artifactId&amp;gt;spring-ai-alibaba-bom&amp;lt;/artifactId&amp;gt; &amp;lt;version&amp;gt;1.0.0.2&amp;lt;/version&amp;gt; &amp;lt;type&amp;gt;pom&amp;lt;/type&amp;gt; &amp;lt;scope&amp;gt;import&amp;lt;/scope&amp;gt; &amp;lt;/dependency&amp;gt; &amp;lt;/dependencies&amp;gt; &amp;lt;/dependencyManagement&amp;gt; &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;com.alibaba.cloud.ai&amp;lt;/groupId&amp;gt; &amp;lt;artifactId&amp;gt;spring-ai-alibaba-starter-dashscope&amp;lt;/artifactId&amp;gt; &amp;lt;/dependency&amp;gt;</code></pre> <h3>Java Demo</h3> <pre><code class="language-java">import java.net.http.HttpRequest; import java.util.Map; import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.McpSyncClient; import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; import io.modelcontextprotocol.spec.McpSchema; public class BailianSseClient { public static void main(String[] args) { String baseUrl = &amp;quot;https://dashscope.aliyuncs.com/api/v1/mcps/*****&amp;quot;;//从百炼平台复制的url String apiKey = &amp;quot;Bearer api-key&amp;quot;;//创建的自己的api key HttpRequest.Builder request = HttpRequest.newBuilder().header(&amp;quot;Authorization&amp;quot;,apiKey) .header(&amp;quot;Content-Type&amp;quot;,&amp;quot;application/json&amp;quot;); HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(baseUrl) .requestBuilder(request) .build(); //初始化 McpSyncClient client = McpClient.sync(transport).build(); client.initialize(); client.ping(); // 列出并展示可用的工具 McpSchema.ListToolsResult toolsList = client.listTools(); System.out.println(&amp;quot;可用工具 = &amp;quot; + toolsList); //调用工具 McpSchema.CallToolResult weatherForecastResult = client.callTool(new McpSchema.CallToolRequest(&amp;quot;get_ip_info&amp;quot;, Map.of(&amp;quot;ip&amp;quot;, &amp;quot;127.0.0.1&amp;quot;))); System.out.println(&amp;quot;返回结果: &amp;quot; + weatherForecastResult.content()); client.closeGracefully(); } }</code></pre> <h2>Python 对接</h2> <ul> <li>此代码支持SSE 和 StreamableHttp 协议</li> </ul> <h3>安装依赖</h3> <pre><code class="language-python">pip install asyncio mcp</code></pre> <h3>Python Demo</h3> <h4>SSE 协议</h4> <pre><code class="language-python">import asyncio from mcp import ClientSession from mcp.client.sse import sse_client async def _run_session(read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() # 所有的tools tools = await session.list_tools() print(tools) # 调用tool arguments = { &amp;#039;ip&amp;#039;: &amp;#039;127.0.0.1&amp;#039; } results = await session.call_tool(name=&amp;#039;get_ip_info&amp;#039;, arguments=arguments) print(results) return results async def _main(url: str, headers: dict): async with sse_client( url=url, headers=headers ) as (read_stream, write_stream): await _run_session(read_stream, write_stream) if __name__ == &amp;#039;__main__&amp;#039;: # 百炼创建的apikey api_key = &amp;#039;Bearer api-key&amp;#039; # 从百炼中复制的url base_url = &amp;#039;https://dashscope.aliyuncs.com/api/v1/mcps/****&amp;#039; headers = { &amp;quot;Authorization&amp;quot;: api_key } asyncio.run(_main(url=base_url, headers=headers))</code></pre> <h4>StreamableHttp 协议</h4> <pre><code class="language-python">import asyncio from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client async def _run_session(read_stream, write_stream): async with ClientSession(read_stream, write_stream) as session: await session.initialize() # 所有的tools tools = await session.list_tools() print(tools) # 调用tool arguments = { &amp;#039;ip&amp;#039;: &amp;#039;127.0.0.1&amp;#039; } results = await session.call_tool(name=&amp;#039;get_ip_info&amp;#039;, arguments=arguments) print(results) return results async def _main( url: str, headers: dict): async with streamablehttp_client( url=url, headers=headers ) as (read_stream, write_stream, get_session_id): await _run_session(read_stream, write_stream) if __name__ == &amp;#039;__main__&amp;#039;: # 百炼创建的apikey api_key = &amp;#039;Bearer api-key&amp;#039; # 从百炼中复制的url base_url = &amp;#039;https://dashscope.aliyuncs.com/api/v1/mcps/****&amp;#039; headers = { &amp;quot;Authorization&amp;quot;: api_key } asyncio.run(_main(url=base_url, headers=headers))</code></pre>

页面列表

ITEM_HTML