Daniel Kliewer

Crafting Symbiotic Intelligence Implementing MCP with OpenAI Responses API, Agents SDK, and Ollama

8 min read

Crafting Symbiotic Intelligence: Implementing MCP with OpenAI Responses API, Agents SDK, and Ollama

Theoretical Foundations and Architectural Vision

The integration of Model Context Protocol (MCP) with OpenAI's Responses API and Agents SDK, all mediated through Ollama's local inference capabilities, represents a paradigm shift in autonomous agent construction. This implementation transcends conventional client-server architectures, establishing instead a distributed cognitive system with both local computational sovereignty and cloud-augmented capabilities. The following exposition presents both the conceptual framework and practical implementation details for advanced practitioners.

Prerequisites for Cognitive System Implementation

Before embarking on this architectural journey, ensure your development environment encompasses:

  • Python 3.10+ runtime environment
  • Working Ollama installation with models configured
  • OpenAI API credentials
  • Basic familiarity with asynchronous programming patterns
  • Understanding of agent-based system architectures

Implementation Architecture

1. Foundational Layer: Environment Configuration

# Install the required cognitive infrastructure
pip install openai openai-agents pydantic httpx

# Additional utilities for MCP implementation
pip install fastapi uvicorn

2. Ontological Framework: MCP Configuration

Create a comprehensive configuration file that defines the tool ontology available to your agent:

# mcp_config.yaml
$mcp_servers:
  - name: "knowledge_retrieval"
    url: "http://localhost:8000"
  - name: "computational_tools"
    url: "http://localhost:8001"
  - name: "file_operations"
    url: "http://localhost:8002"

3. Cognitive Core: Custom Client Implementation

The central architectural challenge lies in creating a polymorphic client that maintains protocol compatibility with OpenAI's interfaces while redirecting computational work to local inference engines:

import json
import httpx
from openai import OpenAI
from openai.types.chat import ChatCompletion, ChatCompletionMessage
from openai.types.chat.chat_completion import Choice

class HybridInferenceClient:
    """
    A cognitive architecture that presents an OpenAI-compatible interface
    while intelligently routing inference requests between Ollama and OpenAI.
    """
    
    def __init__(self, openai_api_key, ollama_base_url="http://localhost:11434", 
                 ollama_model="llama3", use_local_for_completion=True):
        self.openai_client = OpenAI(api_key=openai_api_key)
        self.ollama_base_url = ollama_base_url
        self.ollama_model = ollama_model
        self.use_local_for_completion = use_local_for_completion
        self.httpx_client = httpx.Client(timeout=60.0)
    
    def chat_completion(self, messages, model=None, **kwargs):
        """
        Polymorphic inference method that routes requests based on architectural policy.
        """
        if self.use_local_for_completion:
            return self._ollama_completion(messages, **kwargs)
        else:
            return self.openai_client.chat.completions.create(
                model=model or "gpt-4",
                messages=messages,
                **kwargs
            )
    
    def _ollama_completion(self, messages, **kwargs):
        """
        Local inference implementation utilizing Ollama's capabilities.
        """
        ollama_payload = {
            "model": self.ollama_model,
            "messages": messages,
            "stream": kwargs.get("stream", False)
        }
        
        response = self.httpx_client.post(
            f"{self.ollama_base_url}/api/chat",
            json=ollama_payload
        )
        
        if response.status_code != 200:
            raise Exception(f"Ollama inference error: {response.text}")
            
        result = response.json()
        
        # Transform Ollama response to OpenAI-compatible format
        return ChatCompletion(
            id=f"ollama-{self.ollama_model}-{hash(json.dumps(messages))}",
            choices=[
                Choice(
                    finish_reason="stop",
                    index=0,
                    message=ChatCompletionMessage(
                        content=result["message"]["content"],
                        role=result["message"]["role"]
                    )
                )
            ],
            created=int(time.time()),
            model=self.ollama_model,
            object="chat.completion"
        )

4. Integration with OpenAI Responses API and Agents SDK

Now, we implement the core agent architecture that utilizes both the Responses API and Agents SDK, while leveraging our hybrid inference client:

from openai.types.beta.threads import Run
from openai.types.beta.threads.runs import RunStatus
from openai._types import NotGiven
import asyncio
import time
from typing import List, Dict, Any, Optional
from pydantic import BaseModel

class ResponsesAgent:
    """
    Advanced agent architecture integrating OpenAI Responses API with MCP capabilities
    through a hybrid inference approach.
    """
    
    def __init__(self, client, mcp_config_path="mcp_config.yaml"):
        self.client = client
        self.mcp_config = self._load_mcp_config(mcp_config_path)
        
    def _load_mcp_config(self, config_path):
        """Load MCP server configurations from YAML file"""
        with open(config_path, 'r') as f:
            import yaml
            return yaml.safe_load(f)
    
    async def create_response(self, user_query: str, 
                             context: Optional[Dict[str, Any]] = None):
        """
        Create a response using OpenAI Responses API, with MCP context integration.
        """
        # Prepare MCP context for the response
        mcp_context = {
            "mcp_servers": self.mcp_config.get("$mcp_servers", []),
            "additional_context": context or {}
        }
        
        # Create response using the Responses API
        response = self.client.openai_client.beta.responses.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "You are an assistant with access to specialized tools."},
                {"role": "user", "content": user_query}
            ],
            tools=self._prepare_tool_definitions(),
            context=mcp_context,
        )
        
        # Process any tool calls that were made during response generation
        if hasattr(response, 'tool_calls') and response.tool_calls:
            # Handle tool calls through MCP servers
            tool_results = await self._execute_mcp_tool_calls(response.tool_calls)
            
            # Create a follow-up response incorporating tool results
            final_response = self.client.openai_client.beta.responses.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are an assistant with access to specialized tools."},
                    {"role": "user", "content": user_query},
                    {"role": "assistant", "content": response.content},
                    {"role": "tool", "content": json.dumps(tool_results)}
                ],
                context=mcp_context,
            )
            return final_response
        
        return response
    
    def _prepare_tool_definitions(self):
        """
        Dynamically generate tool definitions based on MCP server capabilities.
        """
        # This would typically involve querying each MCP server for its available tools
        # For demonstration, we'll return a static set of tool definitions
        return [
            {
                "type": "function",
                "function": {
                    "name": "fetch_information",
                    "description": "Fetch information from external sources",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "The information to search for"
                            }
                        },
                        "required": ["query"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "read_file",
                    "description": "Read the contents of a file",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "file_path": {
                                "type": "string",
                                "description": "Path to the file"
                            }
                        },
                        "required": ["file_path"]
                    }
                }
            }
        ]
    
    async def _execute_mcp_tool_calls(self, tool_calls):
        """
        Execute tool calls through appropriate MCP servers.
        """
        results = []
        for tool_call in tool_calls:
            # Determine which MCP server handles this tool
            server_info = self._find_mcp_server_for_tool(tool_call.function.name)
            if not server_info:
                results.append({
                    "tool_call_id": tool_call.id,
                    "error": f"No MCP server found for tool: {tool_call.function.name}"
                })
                continue
                
            # Execute the tool call against the appropriate MCP server
            try:
                async with httpx.AsyncClient() as client:
                    response = await client.post(
                        f"{server_info['url']}/execute",
                        json={
                            "tool": tool_call.function.name,
                            "parameters": json.loads(tool_call.function.arguments)
                        }
                    )
                    results.append({
                        "tool_call_id": tool_call.id,
                        "result": response.json()
                    })
            except Exception as e:
                results.append({
                    "tool_call_id": tool_call.id,
                    "error": str(e)
                })
                
        return results
    
    def _find_mcp_server_for_tool(self, tool_name):
        """
        Find the appropriate MCP server for a given tool.
        In a real implementation, this would query each server for its capabilities.
        """
        # Simplified mapping logic - in practice, you would discover this dynamically
        tool_server_mapping = {
            "fetch_information": "knowledge_retrieval",
            "read_file": "file_operations"
        }
        
        server_name = tool_server_mapping.get(tool_name)
        if not server_name:
            return None
            
        for server in self.mcp_config.get("$mcp_servers", []):
            if server["name"] == server_name:
                return server
                
        return None

5. Implementing MCP Servers

To complete the architecture, implement MCP servers that provide tool functionality:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

class ToolRequest(BaseModel):
    tool: str
    parameters: dict

class KnowledgeRetrievalServer:
    """
    MCP server implementation for knowledge retrieval capabilities.
    """
    
    def __init__(self):
        self.app = FastAPI(title="Knowledge Retrieval MCP Server")
        self._setup_routes()
        
    def _setup_routes(self):
        @self.app.post("/execute")
        async def execute_tool(request: ToolRequest):
            if request.tool == "fetch_information":
                return await self._fetch_information(request.parameters.get("query"))
            raise HTTPException(status_code=404, detail=f"Tool not found: {request.tool}")
            
    async def _fetch_information(self, query):
        # In a real implementation, this would access knowledge bases or external APIs
        return {
            "status": "success",
            "data": f"Retrieved information about: {query}",
            "source": "simulated knowledge base"
        }
        
    def run(self, host="localhost", port=8000):
        uvicorn.run(self.app, host=host, port=port)

# Similar implementations would be created for the other MCP servers

6. Main Application Implementation

Finally, bring everything together in a cohesive application:

import asyncio
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

async def main():
    # Initialize the hybrid client
    client = HybridInferenceClient(
        openai_api_key=os.getenv("OPENAI_API_KEY"),
        ollama_model="llama3",
        use_local_for_completion=True
    )
    
    # Initialize the agent
    agent = ResponsesAgent(client, mcp_config_path="mcp_config.yaml")
    
    # Execute a query
    response = await agent.create_response(
        "I need information about quantum computing and then save that information to a file called quantum_notes.txt"
    )
    
    print("Agent Response:")
    print(response.content)
    
    # Additional examples could demonstrate other capabilities

if __name__ == "__main__":
    # Launch MCP servers in separate processes
    # For brevity, this step is omitted but would involve launching the server implementations
    
    # Run the main application
    asyncio.run(main())

Theoretical Implications and Advanced Considerations

This architecture embodies several advanced AI system design principles:

  1. Computational Locality: By routing appropriate inference tasks to Ollama, the system maintains computational sovereignty while leveraging cloud capabilities when beneficial.

  2. Semantic Polymorphism: The client interface maintains compatibility with OpenAI's protocols while abstracting the underlying execution environment.

  3. Distributed Tool Ontology: MCP provides a standardized mechanism for discovering and invoking capabilities across a distributed system.

  4. Contextual Reasoning: The integration with Responses API allows the agent to maintain coherent reasoning across multiple tool invocations.

For production deployments, additional considerations would include:

  • Implementing robust error handling and retries
  • Adding authentication mechanisms to MCP servers
  • Developing dynamic tool discovery protocols
  • Creating a caching layer for frequently used inferences
  • Implementing a more sophisticated routing policy between local and cloud inference

Conclusion: Toward Autonomous Cognitive Systems

The implementation detailed above represents not merely a technical integration but a philosophical approach to AI system design that values autonomy, interoperability, and extensibility. By combining the structured reasoning capabilities of the OpenAI Responses API with the tool-using capabilities of the Agents SDK, all while maintaining computational sovereignty through Ollama, we create a system that transcends the limitations of any individual component.

The resulting architecture provides a foundation for increasingly sophisticated autonomous agents capable of complex reasoning across distributed knowledge and computational resources—a significant step toward truly intelligent systems that can reason about and act upon the world in meaningful ways.