local-system-agent/local_system_agent.py
2025-05-31 16:56:33 -07:00

648 lines
25 KiB
Python

# Local System Agent - Core Framework
# Phase 1: Foundation Setup
import asyncio
import uuid
import time
import json
import logging
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
import httpx
from pathlib import Path
import asyncpg
from contextlib import asynccontextmanager
# Configuration and Data Models
class OperationStatus(Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskPriority(Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
@dataclass
class ProcessInfo:
pid: int
command: str
status: str
cpu_percent: float
memory_mb: float
start_time: datetime
@dataclass
class FileOperation:
path: str
operation: str # create, read, write, delete, move
timestamp: datetime
size_bytes: int
permissions: str
@dataclass
class NetworkCall:
url: str
method: str
status_code: int
response_time_ms: float
data_size_bytes: int
timestamp: datetime
@dataclass
class ModelCall:
model_id: str
tokens_used: int
cost_usd: float
response_time_ms: float
success: bool
timestamp: datetime
@dataclass
class Operation:
operation_id: str
type: str
status: OperationStatus
description: str
created_at: datetime
started_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
estimated_completion: Optional[datetime] = None
progress_percentage: int = 0
priority: TaskPriority = TaskPriority.NORMAL
spawned_processes: List[ProcessInfo] = field(default_factory=list)
file_operations: List[FileOperation] = field(default_factory=list)
network_calls: List[NetworkCall] = field(default_factory=list)
model_calls: List[ModelCall] = field(default_factory=list)
error_messages: List[str] = field(default_factory=list)
warning_messages: List[str] = field(default_factory=list)
info_messages: List[str] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
result: Optional[Any] = None
@dataclass
class ModelConfig:
model_id: str
name: str
type: str # "ollama", "lmstudio", "openai_compatible"
endpoint: str
api_key: Optional[str] = None
capabilities: List[str] = field(default_factory=list)
cost_per_token: float = 0.0
max_tokens: int = 4096
temperature: float = 0.1
enabled: bool = True
prompt_template: Optional[str] = None
@dataclass
class MCPServerConfig:
server_id: str
name: str
command: str
args: List[str]
env_vars: Dict[str, str] = field(default_factory=dict)
enabled: bool = True
auto_restart: bool = True
health_check_interval: int = 30
# Core Agent Class
class LocalSystemAgent:
def __init__(self, db_url: str = "postgresql://mcpuser:mcppoop123@bakecms.com:5432/postgres?sslmode=disable"):
self.db_url = db_url
self.db_pool = None
self.operations: Dict[str, Operation] = {}
self.task_queue = asyncio.Queue()
self.active_tasks: Dict[str, asyncio.Task] = {}
self.models: Dict[str, ModelConfig] = {}
self.mcp_servers: Dict[str, MCPServerConfig] = {}
self.default_model = None
self.running = False
# Initialize logging
self._setup_logging()
self.logger.info("Local System Agent initialized")
def _setup_logging(self):
"""Setup comprehensive logging system"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('LocalSystemAgent')
async def _init_database(self):
"""Initialize PostgreSQL connection and create tables"""
try:
print(f"Attempting to connect to database: {self.db_url}")
self.logger.info(f"Attempting to connect to database: {self.db_url}")
self.db_pool = await asyncio.wait_for(
asyncpg.create_pool(self.db_url, min_size=2, max_size=10),
timeout=10.0
)
self.logger.info("Connected to PostgreSQL database")
print("Connected to PostgreSQL database")
async with self.db_pool.acquire() as conn:
# Create agent_models table
await conn.execute('''
CREATE TABLE IF NOT EXISTS agent_models (
model_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
endpoint TEXT NOT NULL,
api_key TEXT,
capabilities JSONB DEFAULT '[]',
cost_per_token REAL DEFAULT 0.0,
max_tokens INTEGER DEFAULT 4096,
temperature REAL DEFAULT 0.1,
enabled BOOLEAN DEFAULT TRUE,
prompt_template TEXT,
created_at TIMESTAMP DEFAULT NOW()
)
''')
# Create agent_mcp_servers table
await conn.execute('''
CREATE TABLE IF NOT EXISTS agent_mcp_servers (
server_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
command TEXT NOT NULL,
args JSONB NOT NULL,
env_vars JSONB DEFAULT '{}',
enabled BOOLEAN DEFAULT TRUE,
auto_restart BOOLEAN DEFAULT TRUE,
health_check_interval INTEGER DEFAULT 30,
created_at TIMESTAMP DEFAULT NOW()
)
''')
# Create agent_operations table
await conn.execute('''
CREATE TABLE IF NOT EXISTS agent_operations (
operation_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
status TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
updated_at TIMESTAMP,
estimated_completion TIMESTAMP,
progress_percentage INTEGER DEFAULT 0,
priority TEXT DEFAULT 'normal',
context JSONB DEFAULT '{}',
result JSONB,
spawned_processes JSONB DEFAULT '[]',
file_operations JSONB DEFAULT '[]',
network_calls JSONB DEFAULT '[]',
model_calls JSONB DEFAULT '[]',
error_messages JSONB DEFAULT '[]',
warning_messages JSONB DEFAULT '[]',
info_messages JSONB DEFAULT '[]'
)
''')
self.logger.info("Database tables initialized")
except Exception as e:
self.logger.error(f"Failed to initialize database: {e}")
raise
async def _load_configuration(self):
"""Load models and MCP server configurations from database"""
if not self.db_pool:
await self._init_database()
async with self.db_pool.acquire() as conn:
# Load models
models_rows = await conn.fetch("SELECT * FROM agent_models WHERE enabled = TRUE")
for row in models_rows:
model = ModelConfig(
model_id=row['model_id'],
name=row['name'],
type=row['type'],
endpoint=row['endpoint'],
api_key=row['api_key'],
capabilities=row['capabilities'] or [],
cost_per_token=row['cost_per_token'],
max_tokens=row['max_tokens'],
temperature=row['temperature'],
enabled=row['enabled'],
prompt_template=row['prompt_template']
)
self.models[model.model_id] = model
# Load MCP servers
servers_rows = await conn.fetch("SELECT * FROM agent_mcp_servers WHERE enabled = TRUE")
for row in servers_rows:
server = MCPServerConfig(
server_id=row['server_id'],
name=row['name'],
command=row['command'],
args=row['args'] or [],
env_vars=row['env_vars'] or {},
enabled=row['enabled'],
auto_restart=row['auto_restart'],
health_check_interval=row['health_check_interval']
)
self.mcp_servers[server.server_id] = server
# Set default model (first available Ollama model)
ollama_models = [m for m in self.models.values() if m.type == "ollama"]
if ollama_models:
self.default_model = ollama_models[0].model_id
self.logger.info(f"Default model set to: {self.default_model}")
async def add_model(self, model_config: ModelConfig) -> bool:
"""Add a new model configuration"""
try:
async with self.db_pool.acquire() as conn:
await conn.execute('''
INSERT INTO agent_models
(model_id, name, type, endpoint, api_key, capabilities,
cost_per_token, max_tokens, temperature, enabled, prompt_template)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (model_id) DO UPDATE SET
name = $2, type = $3, endpoint = $4, api_key = $5,
capabilities = $6, cost_per_token = $7, max_tokens = $8,
temperature = $9, enabled = $10, prompt_template = $11
''', (
model_config.model_id, model_config.name, model_config.type,
model_config.endpoint, model_config.api_key,
json.dumps(model_config.capabilities),
model_config.cost_per_token, model_config.max_tokens,
model_config.temperature, model_config.enabled,
model_config.prompt_template
))
self.models[model_config.model_id] = model_config
self.logger.info(f"Added model: {model_config.model_id}")
return True
except Exception as e:
self.logger.error(f"Failed to add model {model_config.model_id}: {e}")
return False
async def create_operation(self, operation_type: str, description: str,
priority: TaskPriority = TaskPriority.NORMAL,
context: Dict[str, Any] = None) -> str:
"""Create a new operation and add it to the queue"""
operation_id = str(uuid.uuid4())
operation = Operation(
operation_id=operation_id,
type=operation_type,
status=OperationStatus.QUEUED,
description=description,
created_at=datetime.now(),
priority=priority,
context=context or {}
)
self.operations[operation_id] = operation
await self.task_queue.put(operation)
# Store in database
async with self.db_pool.acquire() as conn:
await conn.execute('''
INSERT INTO agent_operations
(operation_id, type, status, description, created_at, priority, context)
VALUES ($1, $2, $3, $4, $5, $6, $7)
''', (
operation_id, operation_type, operation.status.value,
description, operation.created_at,
priority.value, json.dumps(context or {})
))
self.logger.info(f"Created operation: {operation_id} - {description}")
return operation_id
async def get_operation_status(self, operation_id: str) -> Optional[Operation]:
"""Get the current status of an operation"""
return self.operations.get(operation_id)
async def update_operation(self, operation_id: str, **updates):
"""Update operation status and information"""
if operation_id not in self.operations:
return False
operation = self.operations[operation_id]
for key, value in updates.items():
if hasattr(operation, key):
setattr(operation, key, value)
operation.updated_at = datetime.now()
# Convert dataclass lists to JSON for storage
def serialize_list(items):
if not items:
return []
return [item.__dict__ if hasattr(item, '__dict__') else item for item in items]
# Update database
async with self.db_pool.acquire() as conn:
await conn.execute('''
UPDATE agent_operations
SET status = $1, updated_at = $2, progress_percentage = $3,
context = $4, result = $5, spawned_processes = $6,
file_operations = $7, network_calls = $8, model_calls = $9,
error_messages = $10, warning_messages = $11, info_messages = $12
WHERE operation_id = $13
''', (
operation.status.value, operation.updated_at,
operation.progress_percentage, json.dumps(operation.context),
json.dumps(operation.result) if operation.result else None,
json.dumps(serialize_list(operation.spawned_processes)),
json.dumps(serialize_list(operation.file_operations)),
json.dumps(serialize_list(operation.network_calls)),
json.dumps(serialize_list(operation.model_calls)),
json.dumps(operation.error_messages),
json.dumps(operation.warning_messages),
json.dumps(operation.info_messages),
operation_id
))
return True
async def call_model(self, model_id: str, prompt: str,
operation_id: Optional[str] = None) -> Dict[str, Any]:
"""Make a call to the specified model"""
if model_id not in self.models:
model_id = self.default_model
if not model_id or model_id not in self.models:
raise ValueError("No valid model available")
model = self.models[model_id]
start_time = time.time()
try:
async with httpx.AsyncClient() as client:
if model.type == "ollama":
response = await client.post(
f"{model.endpoint}/api/generate",
json={
"model": model.model_id,
"prompt": prompt,
"stream": False,
"options": {
"temperature": model.temperature,
"num_predict": model.max_tokens
}
},
timeout=60.0
)
response.raise_for_status()
result = response.json()
response_time = (time.time() - start_time) * 1000
# Track model usage
model_call = ModelCall(
model_id=model_id,
tokens_used=result.get("eval_count", 0),
cost_usd=0.0, # Ollama is free
response_time_ms=response_time,
success=True,
timestamp=datetime.now()
)
if operation_id and operation_id in self.operations:
self.operations[operation_id].model_calls.append(model_call)
return {
"response": result.get("response", ""),
"model_used": model_id,
"tokens_used": result.get("eval_count", 0),
"response_time_ms": response_time
}
except Exception as e:
response_time = (time.time() - start_time) * 1000
model_call = ModelCall(
model_id=model_id,
tokens_used=0,
cost_usd=0.0,
response_time_ms=response_time,
success=False,
timestamp=datetime.now()
)
if operation_id and operation_id in self.operations:
self.operations[operation_id].model_calls.append(model_call)
self.operations[operation_id].error_messages.append(f"Model call failed: {str(e)}")
raise
async def process_task_queue(self):
"""Main task processing loop"""
while self.running:
try:
operation = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
# Update operation status
operation.status = OperationStatus.RUNNING
operation.started_at = datetime.now()
await self.update_operation(operation.operation_id,
status=OperationStatus.RUNNING,
started_at=operation.started_at)
# Create task for operation
task = asyncio.create_task(self._execute_operation(operation))
self.active_tasks[operation.operation_id] = task
self.logger.info(f"Started processing operation: {operation.operation_id}")
except asyncio.TimeoutError:
continue
except Exception as e:
self.logger.error(f"Error in task queue processing: {e}")
async def _execute_operation(self, operation: Operation):
"""Execute a specific operation"""
try:
self.logger.info(f"Executing operation: {operation.operation_id} - {operation.description}")
# This is where we'll implement the research-first decision loop
# For now, we'll do a simple operation based on type
if operation.type == "research_task":
await self._handle_research_task(operation)
elif operation.type == "system_task":
await self._handle_system_task(operation)
elif operation.type == "test_task":
await self._handle_test_task(operation)
else:
operation.error_messages.append(f"Unknown operation type: {operation.type}")
operation.status = OperationStatus.FAILED
if operation.status != OperationStatus.FAILED:
operation.status = OperationStatus.COMPLETED
operation.progress_percentage = 100
except Exception as e:
operation.status = OperationStatus.FAILED
operation.error_messages.append(f"Execution failed: {str(e)}")
self.logger.error(f"Operation {operation.operation_id} failed: {e}")
finally:
await self.update_operation(operation.operation_id,
status=operation.status,
progress_percentage=operation.progress_percentage)
# Clean up active task
if operation.operation_id in self.active_tasks:
del self.active_tasks[operation.operation_id]
async def _handle_test_task(self, operation: Operation):
"""Handle a simple test task"""
operation.info_messages.append("Starting test task")
operation.progress_percentage = 25
await self.update_operation(operation.operation_id, progress_percentage=25)
await asyncio.sleep(2) # Simulate work
operation.info_messages.append("Test task in progress")
operation.progress_percentage = 75
await self.update_operation(operation.operation_id, progress_percentage=75)
await asyncio.sleep(1) # Simulate more work
operation.result = {"message": "Test task completed successfully", "timestamp": datetime.now().isoformat()}
operation.info_messages.append("Test task completed")
async def _handle_research_task(self, operation: Operation):
"""Handle research tasks with web search"""
# This will be implemented with SearxNG integration
operation.info_messages.append("Research task handler not yet implemented")
operation.result = {"status": "pending_implementation"}
async def _handle_system_task(self, operation: Operation):
"""Handle system tasks via MCP servers"""
# This will be implemented with MCP client integration
operation.info_messages.append("System task handler not yet implemented")
operation.result = {"status": "pending_implementation"}
async def start(self):
"""Start the agent"""
if not self.db_pool:
await self._init_database()
await self._load_configuration()
self.running = True
self.logger.info("Starting Local System Agent")
# Start task processing
await self.process_task_queue()
async def stop(self):
"""Stop the agent gracefully"""
self.running = False
self.logger.info("Stopping Local System Agent")
# Cancel active tasks
for task in self.active_tasks.values():
task.cancel()
# Close database pool
if self.db_pool:
await self.db_pool.close()
# Initialize default Ollama models
async def setup_default_models(agent: LocalSystemAgent):
"""Setup default Ollama models"""
default_models = [
ModelConfig(
model_id="llama3.2:3b",
name="Llama 3.2 3B",
type="ollama",
endpoint="http://localhost:11434",
capabilities=["general", "coding", "analysis"]
),
ModelConfig(
model_id="qwen2.5:7b",
name="Qwen 2.5 7B",
type="ollama",
endpoint="http://localhost:11434",
capabilities=["general", "coding", "reasoning"]
),
ModelConfig(
model_id="llama3.2:1b",
name="Llama 3.2 1B (Fast)",
type="ollama",
endpoint="http://localhost:11434",
capabilities=["general", "quick_tasks"]
)
]
for model in default_models:
await agent.add_model(model)
# Example usage and testing
async def main():
try:
print("Initializing Local System Agent...")
# Initialize agent
agent = LocalSystemAgent()
print("Initializing database connection...")
# Initialize database and load config
await agent._init_database()
print("Loading configuration...")
await agent._load_configuration()
print("Setting up default models...")
# Setup default models
await setup_default_models(agent)
print("Creating test operation...")
# Create a test operation
operation_id = await agent.create_operation(
"test_task",
"Testing agent operation tracking",
TaskPriority.NORMAL,
{"test_param": "test_value"}
)
print(f"Created operation: {operation_id}")
# Start agent in background
print("Starting agent task processing...")
agent_task = asyncio.create_task(agent.start())
# Monitor operation progress
print("Monitoring operation progress...")
for i in range(10):
await asyncio.sleep(1)
operation = await agent.get_operation_status(operation_id)
if operation:
print(f"Operation status: {operation.status.value}, Progress: {operation.progress_percentage}%")
if operation.status in [OperationStatus.COMPLETED, OperationStatus.FAILED]:
print(f"Operation result: {operation.result}")
break
# Stop agent
print("Stopping agent...")
await agent.stop()
print("Agent stopped successfully")
except Exception as e:
print(f"Error in main: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())