# Local System Agent - MCP Client Integration # Phase 1: Foundation Setup with MCP Communication import asyncio import uuid import time import json import logging import psutil import subprocess from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, field from enum import Enum import httpx from pathlib import Path # Configuration and Data Models class OperationStatus(Enum): QUEUED = "queued" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class TaskPriority(Enum): LOW = "low" NORMAL = "normal" HIGH = "high" URGENT = "urgent" @dataclass class ProcessInfo: pid: int command: str status: str cpu_percent: float memory_mb: float start_time: datetime @dataclass class FileOperation: path: str operation: str # create, read, write, delete, move timestamp: datetime size_bytes: int permissions: str @dataclass class NetworkCall: url: str method: str status_code: int response_time_ms: float data_size_bytes: int timestamp: datetime @dataclass class ModelCall: model_id: str tokens_used: int cost_usd: float response_time_ms: float success: bool timestamp: datetime @dataclass class Operation: operation_id: str type: str status: OperationStatus description: str created_at: datetime started_at: Optional[datetime] = None updated_at: Optional[datetime] = None estimated_completion: Optional[datetime] = None progress_percentage: int = 0 priority: TaskPriority = TaskPriority.NORMAL spawned_processes: List[ProcessInfo] = field(default_factory=list) file_operations: List[FileOperation] = field(default_factory=list) network_calls: List[NetworkCall] = field(default_factory=list) model_calls: List[ModelCall] = field(default_factory=list) error_messages: List[str] = field(default_factory=list) warning_messages: List[str] = field(default_factory=list) info_messages: List[str] = field(default_factory=list) context: Dict[str, Any] = field(default_factory=dict) result: Optional[Any] = None @dataclass class ModelConfig: model_id: str name: str type: str # "ollama", "lmstudio", "openai_compatible" endpoint: str api_key: Optional[str] = None capabilities: List[str] = field(default_factory=list) cost_per_token: float = 0.0 max_tokens: int = 4096 temperature: float = 0.1 enabled: bool = True prompt_template: Optional[str] = None @dataclass class MCPServerConfig: server_id: str name: str command: str args: List[str] env_vars: Dict[str, str] = field(default_factory=dict) enabled: bool = True auto_restart: bool = True health_check_interval: int = 30 # MCP Client Implementation class MCPClient: def __init__(self, server_id: str, command: str, args: List[str], env_vars: Dict[str, str] = None): self.server_id = server_id self.command = command self.args = args self.env_vars = env_vars or {} self.process = None self.request_id = 0 self.logger = logging.getLogger(f'MCPClient-{server_id}') async def start(self): """Start the MCP server process""" try: # Handle different command types with proper paths if self.command == "npx": # npx is a PowerShell script, run it through PowerShell cmd = ["powershell", "-Command", "npx"] + self.args elif self.command == "uvx": # Use direct path to uvx executable cmd = ["C:\\Users\\bake\\AppData\\Local\\Programs\\Python\\Python312\\Scripts\\uvx.exe"] + self.args else: # Use command as-is for other cases cmd = [self.command] + self.args # Set up environment with current PATH plus any additional env vars import os env = os.environ.copy() env.update(self.env_vars) self.process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) # Initialize MCP connection await self._initialize_connection() self.logger.info(f"MCP server {self.server_id} started successfully") return True except Exception as e: self.logger.error(f"Failed to start MCP server {self.server_id}: {e}") return False async def _initialize_connection(self): """Initialize MCP connection with handshake""" # Send initialize request init_request = { "jsonrpc": "2.0", "id": self._next_request_id(), "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "clientInfo": { "name": "local-system-agent", "version": "1.0.0" } } } response = await self._send_request(init_request) if response.get("error"): raise Exception(f"MCP initialization failed: {response['error']}") # Send initialized notification initialized_notification = { "jsonrpc": "2.0", "method": "notifications/initialized" } await self._send_notification(initialized_notification) def _next_request_id(self): """Get next request ID""" self.request_id += 1 return self.request_id async def _send_request(self, request: dict) -> dict: """Send JSON-RPC request and wait for response""" if not self.process: raise Exception("MCP server not started") # Send request request_json = json.dumps(request) + "\n" self.process.stdin.write(request_json.encode()) await self.process.stdin.drain() # Read response response_line = await self.process.stdout.readline() response = json.loads(response_line.decode().strip()) return response async def _send_notification(self, notification: dict): """Send JSON-RPC notification (no response expected)""" if not self.process: raise Exception("MCP server not started") notification_json = json.dumps(notification) + "\n" self.process.stdin.write(notification_json.encode()) await self.process.stdin.drain() async def call_tool(self, tool_name: str, arguments: dict) -> dict: """Call a tool on the MCP server""" request = { "jsonrpc": "2.0", "id": self._next_request_id(), "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } response = await self._send_request(request) if response.get("error"): raise Exception(f"Tool call failed: {response['error']}") return response.get("result", {}) async def list_tools(self) -> List[dict]: """Get list of available tools""" request = { "jsonrpc": "2.0", "id": self._next_request_id(), "method": "tools/list" } response = await self._send_request(request) if response.get("error"): raise Exception(f"Failed to list tools: {response['error']}") return response.get("result", {}).get("tools", []) async def stop(self): """Stop the MCP server process""" if self.process: self.process.terminate() await self.process.wait() self.logger.info(f"MCP server {self.server_id} stopped") # Core Agent Class class LocalSystemAgent: def __init__(self): self.operations: Dict[str, Operation] = {} self.task_queue = asyncio.Queue() self.active_tasks: Dict[str, asyncio.Task] = {} self.models: Dict[str, ModelConfig] = {} self.mcp_servers: Dict[str, MCPServerConfig] = {} self.mcp_clients: Dict[str, MCPClient] = {} self.default_model = None self.running = False # Initialize logging self._setup_logging() # Setup baseline MCP servers self._setup_baseline_mcp_servers() self.logger.info("Local System Agent initialized") def _setup_logging(self): """Setup comprehensive logging system""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('agent.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger('LocalSystemAgent') def _setup_baseline_mcp_servers(self): """Setup hardcoded baseline MCP server configurations""" # PostgreSQL MCP Server postgres_config = MCPServerConfig( server_id="postgres", name="PostgreSQL Database", command="npx", args=["-y", "@modelcontextprotocol/server-postgres", "postgresql://mcpuser:mcppoop123@bakecms.com:5432/postgres"], enabled=True, auto_restart=True ) self.mcp_servers["postgres"] = postgres_config # Neo4j MCP Server neo4j_config = MCPServerConfig( server_id="neo4j", name="Neo4j Memory Database", command="uvx", args=["mcp-neo4j-cypher@0.2.1"], env_vars={ "NEO4J_URI": "bolt://bakecms.com:7687", "NEO4J_USERNAME": "neo4j", "NEO4J_PASSWORD": "pooppoop", "NEO4J_DATABASE": "neo4j" }, enabled=True, auto_restart=True ) self.mcp_servers["neo4j"] = neo4j_config async def _init_mcp_servers(self): """Initialize and start MCP server connections""" for server_id, config in self.mcp_servers.items(): if not config.enabled: continue print(f"Starting MCP server: {server_id}") self.logger.info(f"Starting MCP server: {server_id}") client = MCPClient( server_id=server_id, command=config.command, args=config.args, env_vars=config.env_vars ) success = await client.start() if success: self.mcp_clients[server_id] = client print(f"✅ MCP server {server_id} started successfully") # List available tools try: tools = await client.list_tools() tool_names = [tool.get("name", "unknown") for tool in tools] print(f" Available tools: {', '.join(tool_names)}") self.logger.info(f"MCP server {server_id} tools: {tool_names}") except Exception as e: print(f" Warning: Could not list tools for {server_id}: {e}") else: print(f"❌ Failed to start MCP server: {server_id}") async def _init_database_tables(self): """Initialize database tables via MCP""" try: print("Creating database tables via MCP...") # Create agent_models table await self.postgres_query(''' CREATE TABLE IF NOT EXISTS agent_models ( model_id TEXT PRIMARY KEY, name TEXT NOT NULL, type TEXT NOT NULL, endpoint TEXT NOT NULL, api_key TEXT, capabilities JSONB DEFAULT '[]', cost_per_token REAL DEFAULT 0.0, max_tokens INTEGER DEFAULT 4096, temperature REAL DEFAULT 0.1, enabled BOOLEAN DEFAULT TRUE, prompt_template TEXT, created_at TIMESTAMP DEFAULT NOW() ) ''') # Create agent_mcp_servers table await self.postgres_query(''' CREATE TABLE IF NOT EXISTS agent_mcp_servers ( server_id TEXT PRIMARY KEY, name TEXT NOT NULL, command TEXT NOT NULL, args JSONB NOT NULL, env_vars JSONB DEFAULT '{}', enabled BOOLEAN DEFAULT TRUE, auto_restart BOOLEAN DEFAULT TRUE, health_check_interval INTEGER DEFAULT 30, created_at TIMESTAMP DEFAULT NOW() ) ''') # Create agent_operations table await self.postgres_query(''' CREATE TABLE IF NOT EXISTS agent_operations ( operation_id TEXT PRIMARY KEY, type TEXT NOT NULL, status TEXT NOT NULL, description TEXT NOT NULL, created_at TIMESTAMP NOT NULL, started_at TIMESTAMP, updated_at TIMESTAMP, estimated_completion TIMESTAMP, progress_percentage INTEGER DEFAULT 0, priority TEXT DEFAULT 'normal', context JSONB DEFAULT '{}', result JSONB, spawned_processes JSONB DEFAULT '[]', file_operations JSONB DEFAULT '[]', network_calls JSONB DEFAULT '[]', model_calls JSONB DEFAULT '[]', error_messages JSONB DEFAULT '[]', warning_messages JSONB DEFAULT '[]', info_messages JSONB DEFAULT '[]' ) ''') print("✅ Database tables created successfully") self.logger.info("Database tables initialized via MCP") except Exception as e: print(f"❌ Failed to create database tables: {e}") self.logger.error(f"Failed to create database tables: {e}") raise async def postgres_query(self, sql: str, params: List[Any] = None) -> List[dict]: """Execute PostgreSQL query via MCP""" if "postgres" not in self.mcp_clients: raise Exception("PostgreSQL MCP client not available") client = self.mcp_clients["postgres"] # The postgres MCP server expects 'query' tool with 'sql' parameter result = await client.call_tool("query", {"sql": sql}) return result.get("content", []) async def neo4j_query(self, cypher: str, params: dict = None) -> List[dict]: """Execute Neo4j Cypher query via MCP""" if "neo4j" not in self.mcp_clients: raise Exception("Neo4j MCP client not available") client = self.mcp_clients["neo4j"] # The neo4j MCP server expects different tool names # Let's first try to get the tools to see what's available tools = await client.list_tools() tool_names = [tool.get("name", "") for tool in tools] # Common neo4j MCP tool names if "read_neo4j_cypher" in tool_names: result = await client.call_tool("read_neo4j_cypher", { "query": cypher, "params": params or {} }) elif "cypher" in tool_names: result = await client.call_tool("cypher", { "query": cypher, "params": params or {} }) else: # Try the first available tool if tool_names: result = await client.call_tool(tool_names[0], { "query": cypher, "params": params or {} }) else: raise Exception("No Neo4j tools available") return result.get("content", []) async def _load_configuration(self): """Load models and MCP server configurations from database""" try: # Load models models_rows = await self.postgres_query("SELECT * FROM agent_models WHERE enabled = TRUE") for row in models_rows: model = ModelConfig( model_id=row['model_id'], name=row['name'], type=row['type'], endpoint=row['endpoint'], api_key=row.get('api_key'), capabilities=row.get('capabilities', []), cost_per_token=row.get('cost_per_token', 0.0), max_tokens=row.get('max_tokens', 4096), temperature=row.get('temperature', 0.1), enabled=row.get('enabled', True), prompt_template=row.get('prompt_template') ) self.models[model.model_id] = model # Set default model (first available Ollama model) ollama_models = [m for m in self.models.values() if m.type == "ollama"] if ollama_models: self.default_model = ollama_models[0].model_id self.logger.info(f"Default model set to: {self.default_model}") print(f"Loaded {len(self.models)} model configurations") except Exception as e: print(f"Note: Could not load existing configurations: {e}") self.logger.info(f"Could not load existing configurations (normal for first run): {e}") async def add_model(self, model_config: ModelConfig) -> bool: """Add a new model configuration via MCP""" try: await self.postgres_query(''' INSERT INTO agent_models (model_id, name, type, endpoint, api_key, capabilities, cost_per_token, max_tokens, temperature, enabled, prompt_template) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (model_id) DO UPDATE SET name = $2, type = $3, endpoint = $4, api_key = $5, capabilities = $6, cost_per_token = $7, max_tokens = $8, temperature = $9, enabled = $10, prompt_template = $11 ''', [ model_config.model_id, model_config.name, model_config.type, model_config.endpoint, model_config.api_key, json.dumps(model_config.capabilities), model_config.cost_per_token, model_config.max_tokens, model_config.temperature, model_config.enabled, model_config.prompt_template ]) self.models[model_config.model_id] = model_config self.logger.info(f"Added model: {model_config.model_id}") return True except Exception as e: self.logger.error(f"Failed to add model {model_config.model_id}: {e}") return False async def create_operation(self, operation_type: str, description: str, priority: TaskPriority = TaskPriority.NORMAL, context: Dict[str, Any] = None) -> str: """Create a new operation and add it to the queue""" operation_id = str(uuid.uuid4()) operation = Operation( operation_id=operation_id, type=operation_type, status=OperationStatus.QUEUED, description=description, created_at=datetime.now(), priority=priority, context=context or {} ) self.operations[operation_id] = operation await self.task_queue.put(operation) # Store in database via MCP try: await self.postgres_query(''' INSERT INTO agent_operations (operation_id, type, status, description, created_at, priority, context) VALUES ($1, $2, $3, $4, $5, $6, $7) ''', [ operation_id, operation_type, operation.status.value, description, operation.created_at.isoformat(), priority.value, json.dumps(context or {}) ]) except Exception as e: self.logger.warning(f"Could not store operation in database: {e}") self.logger.info(f"Created operation: {operation_id} - {description}") return operation_id async def get_operation_status(self, operation_id: str) -> Optional[Operation]: """Get the current status of an operation""" return self.operations.get(operation_id) async def _handle_test_task(self, operation: Operation): """Handle a simple test task""" operation.info_messages.append("Starting test task") operation.progress_percentage = 25 await asyncio.sleep(2) # Simulate work operation.info_messages.append("Test task in progress") operation.progress_percentage = 75 await asyncio.sleep(1) # Simulate more work # Test database operations try: # Test PostgreSQL via MCP version_result = await self.postgres_query("SELECT version()") pg_version = version_result[0].get('version', 'Unknown') if version_result else 'Unknown' operation.info_messages.append(f"PostgreSQL connection test: {pg_version[:50]}...") # Test Neo4j via MCP neo_result = await self.neo4j_query("RETURN 'Neo4j connection successful' as message") neo_message = neo_result[0].get('message', 'Test failed') if neo_result else 'Test failed' operation.info_messages.append(f"Neo4j connection test: {neo_message}") except Exception as e: operation.warning_messages.append(f"Database test warning: {e}") operation.result = { "message": "Test task completed successfully", "timestamp": datetime.now().isoformat(), "database_tests": "completed" } operation.info_messages.append("Test task completed") async def process_task_queue(self): """Main task processing loop""" while self.running: try: operation = await asyncio.wait_for(self.task_queue.get(), timeout=1.0) # Update operation status operation.status = OperationStatus.RUNNING operation.started_at = datetime.now() # Execute operation directly for testing try: if operation.type == "test_task": await self._handle_test_task(operation) operation.status = OperationStatus.COMPLETED operation.progress_percentage = 100 else: operation.error_messages.append(f"Unknown operation type: {operation.type}") operation.status = OperationStatus.FAILED except Exception as e: operation.status = OperationStatus.FAILED operation.error_messages.append(f"Execution failed: {str(e)}") self.logger.error(f"Operation {operation.operation_id} failed: {e}") self.logger.info(f"Completed operation: {operation.operation_id}") except asyncio.TimeoutError: continue except Exception as e: self.logger.error(f"Error in task queue processing: {e}") async def start(self): """Start the agent""" # Initialize MCP servers first await self._init_mcp_servers() # Initialize database tables await self._init_database_tables() # Load configuration await self._load_configuration() self.running = True self.logger.info("Starting Local System Agent") # Start task processing await self.process_task_queue() async def stop(self): """Stop the agent gracefully""" self.running = False self.logger.info("Stopping Local System Agent") # Cancel active tasks for task in self.active_tasks.values(): task.cancel() # Stop MCP clients for client in self.mcp_clients.values(): await client.stop() # Initialize default Ollama models async def setup_default_models(agent: LocalSystemAgent): """Setup default Ollama models""" default_models = [ ModelConfig( model_id="llama3.2:3b", name="Llama 3.2 3B", type="ollama", endpoint="http://localhost:11434", capabilities=["general", "coding", "analysis"] ), ModelConfig( model_id="qwen2.5:7b", name="Qwen 2.5 7B", type="ollama", endpoint="http://localhost:11434", capabilities=["general", "coding", "reasoning"] ), ModelConfig( model_id="llama3.2:1b", name="Llama 3.2 1B (Fast)", type="ollama", endpoint="http://localhost:11434", capabilities=["general", "quick_tasks"] ) ] for model in default_models: await agent.add_model(model) # Example usage and testing async def main(): try: print("=== Local System Agent - MCP Integration Test ===") print("Initializing Local System Agent...") # Initialize agent agent = LocalSystemAgent() print("Setting up default models...") # Setup default models await setup_default_models(agent) print("Creating test operation...") # Create a test operation operation_id = await agent.create_operation( "test_task", "Testing agent with MCP integration", TaskPriority.NORMAL, {"test_param": "mcp_integration_test"} ) print(f"Created operation: {operation_id}") # Start agent in background print("Starting agent task processing...") agent_task = asyncio.create_task(agent.start()) # Monitor operation progress print("Monitoring operation progress...") for i in range(15): # Extended timeout for MCP operations await asyncio.sleep(1) operation = await agent.get_operation_status(operation_id) if operation: print(f"Operation status: {operation.status.value}, Progress: {operation.progress_percentage}%") if operation.info_messages: for msg in operation.info_messages[-1:]: # Show latest message print(f" ℹ️ {msg}") if operation.warning_messages: for msg in operation.warning_messages[-1:]: print(f" ⚠️ {msg}") if operation.error_messages: for msg in operation.error_messages[-1:]: print(f" ❌ {msg}") if operation.status in [OperationStatus.COMPLETED, OperationStatus.FAILED]: print(f"Operation result: {operation.result}") break # Stop agent print("Stopping agent...") await agent.stop() print("✅ Agent stopped successfully") except Exception as e: print(f"❌ Error in main: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())