local-system-agent/local_system_agent_v2.py
2025-05-31 16:56:33 -07:00

549 lines
20 KiB
Python

# Local System Agent - Core Framework
# Phase 1: Foundation Setup - Single Connection Version
import asyncio
import uuid
import time
import json
import logging
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
import httpx
from pathlib import Path
import asyncpg
from contextlib import asynccontextmanager
# Configuration and Data Models
class OperationStatus(Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskPriority(Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
@dataclass
class ProcessInfo:
pid: int
command: str
status: str
cpu_percent: float
memory_mb: float
start_time: datetime
@dataclass
class FileOperation:
path: str
operation: str # create, read, write, delete, move
timestamp: datetime
size_bytes: int
permissions: str
@dataclass
class NetworkCall:
url: str
method: str
status_code: int
response_time_ms: float
data_size_bytes: int
timestamp: datetime
@dataclass
class ModelCall:
model_id: str
tokens_used: int
cost_usd: float
response_time_ms: float
success: bool
timestamp: datetime
@dataclass
class Operation:
operation_id: str
type: str
status: OperationStatus
description: str
created_at: datetime
started_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
estimated_completion: Optional[datetime] = None
progress_percentage: int = 0
priority: TaskPriority = TaskPriority.NORMAL
spawned_processes: List[ProcessInfo] = field(default_factory=list)
file_operations: List[FileOperation] = field(default_factory=list)
network_calls: List[NetworkCall] = field(default_factory=list)
model_calls: List[ModelCall] = field(default_factory=list)
error_messages: List[str] = field(default_factory=list)
warning_messages: List[str] = field(default_factory=list)
info_messages: List[str] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
result: Optional[Any] = None
@dataclass
class ModelConfig:
model_id: str
name: str
type: str # "ollama", "lmstudio", "openai_compatible"
endpoint: str
api_key: Optional[str] = None
capabilities: List[str] = field(default_factory=list)
cost_per_token: float = 0.0
max_tokens: int = 4096
temperature: float = 0.1
enabled: bool = True
prompt_template: Optional[str] = None
@dataclass
class MCPServerConfig:
server_id: str
name: str
command: str
args: List[str]
env_vars: Dict[str, str] = field(default_factory=dict)
enabled: bool = True
auto_restart: bool = True
health_check_interval: int = 30
# Core Agent Class
class LocalSystemAgent:
def __init__(self, db_url: str = "postgresql://mcpuser:mcppoop123@bakecms.com:5432/postgres?sslmode=disable"):
self.db_url = db_url
self.db_conn = None # Single connection instead of pool
self.operations: Dict[str, Operation] = {}
self.task_queue = asyncio.Queue()
self.active_tasks: Dict[str, asyncio.Task] = {}
self.models: Dict[str, ModelConfig] = {}
self.mcp_servers: Dict[str, MCPServerConfig] = {}
self.default_model = None
self.running = False
# Initialize logging
self._setup_logging()
self.logger.info("Local System Agent initialized")
def _setup_logging(self):
"""Setup comprehensive logging system"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('LocalSystemAgent')
async def _init_database(self):
"""Initialize PostgreSQL connection and create tables"""
try:
print(f"Attempting to connect to database: {self.db_url}")
self.logger.info(f"Attempting to connect to database: {self.db_url}")
# Try single connection instead of pool
self.db_conn = await asyncio.wait_for(
asyncpg.connect(self.db_url),
timeout=10.0
)
self.logger.info("Connected to PostgreSQL database")
print("Connected to PostgreSQL database")
# Test the connection
version = await self.db_conn.fetchval("SELECT version()")
print(f"Database version: {version}")
self.logger.info(f"Database version: {version}")
# Create agent_models table
await self.db_conn.execute('''
CREATE TABLE IF NOT EXISTS agent_models (
model_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
endpoint TEXT NOT NULL,
api_key TEXT,
capabilities JSONB DEFAULT '[]',
cost_per_token REAL DEFAULT 0.0,
max_tokens INTEGER DEFAULT 4096,
temperature REAL DEFAULT 0.1,
enabled BOOLEAN DEFAULT TRUE,
prompt_template TEXT,
created_at TIMESTAMP DEFAULT NOW()
)
''')
# Create agent_mcp_servers table
await self.db_conn.execute('''
CREATE TABLE IF NOT EXISTS agent_mcp_servers (
server_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
command TEXT NOT NULL,
args JSONB NOT NULL,
env_vars JSONB DEFAULT '{}',
enabled BOOLEAN DEFAULT TRUE,
auto_restart BOOLEAN DEFAULT TRUE,
health_check_interval INTEGER DEFAULT 30,
created_at TIMESTAMP DEFAULT NOW()
)
''')
# Create agent_operations table
await self.db_conn.execute('''
CREATE TABLE IF NOT EXISTS agent_operations (
operation_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
status TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
started_at TIMESTAMP,
updated_at TIMESTAMP,
estimated_completion TIMESTAMP,
progress_percentage INTEGER DEFAULT 0,
priority TEXT DEFAULT 'normal',
context JSONB DEFAULT '{}',
result JSONB,
spawned_processes JSONB DEFAULT '[]',
file_operations JSONB DEFAULT '[]',
network_calls JSONB DEFAULT '[]',
model_calls JSONB DEFAULT '[]',
error_messages JSONB DEFAULT '[]',
warning_messages JSONB DEFAULT '[]',
info_messages JSONB DEFAULT '[]'
)
''')
self.logger.info("Database tables initialized")
print("Database tables created successfully")
except Exception as e:
self.logger.error(f"Failed to initialize database: {e}")
print(f"Database initialization failed: {e}")
raise
async def _load_configuration(self):
"""Load models and MCP server configurations from database"""
if not self.db_conn:
await self._init_database()
# Load models
models_rows = await self.db_conn.fetch("SELECT * FROM agent_models WHERE enabled = TRUE")
for row in models_rows:
model = ModelConfig(
model_id=row['model_id'],
name=row['name'],
type=row['type'],
endpoint=row['endpoint'],
api_key=row['api_key'],
capabilities=row['capabilities'] or [],
cost_per_token=row['cost_per_token'],
max_tokens=row['max_tokens'],
temperature=row['temperature'],
enabled=row['enabled'],
prompt_template=row['prompt_template']
)
self.models[model.model_id] = model
# Load MCP servers
servers_rows = await self.db_conn.fetch("SELECT * FROM agent_mcp_servers WHERE enabled = TRUE")
for row in servers_rows:
server = MCPServerConfig(
server_id=row['server_id'],
name=row['name'],
command=row['command'],
args=row['args'] or [],
env_vars=row['env_vars'] or {},
enabled=row['enabled'],
auto_restart=row['auto_restart'],
health_check_interval=row['health_check_interval']
)
self.mcp_servers[server.server_id] = server
# Set default model (first available Ollama model)
ollama_models = [m for m in self.models.values() if m.type == "ollama"]
if ollama_models:
self.default_model = ollama_models[0].model_id
self.logger.info(f"Default model set to: {self.default_model}")
async def add_model(self, model_config: ModelConfig) -> bool:
"""Add a new model configuration"""
try:
await self.db_conn.execute('''
INSERT INTO agent_models
(model_id, name, type, endpoint, api_key, capabilities,
cost_per_token, max_tokens, temperature, enabled, prompt_template)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (model_id) DO UPDATE SET
name = $2, type = $3, endpoint = $4, api_key = $5,
capabilities = $6, cost_per_token = $7, max_tokens = $8,
temperature = $9, enabled = $10, prompt_template = $11
''', (
model_config.model_id, model_config.name, model_config.type,
model_config.endpoint, model_config.api_key,
json.dumps(model_config.capabilities),
model_config.cost_per_token, model_config.max_tokens,
model_config.temperature, model_config.enabled,
model_config.prompt_template
))
self.models[model_config.model_id] = model_config
self.logger.info(f"Added model: {model_config.model_id}")
return True
except Exception as e:
self.logger.error(f"Failed to add model {model_config.model_id}: {e}")
return False
async def create_operation(self, operation_type: str, description: str,
priority: TaskPriority = TaskPriority.NORMAL,
context: Dict[str, Any] = None) -> str:
"""Create a new operation and add it to the queue"""
operation_id = str(uuid.uuid4())
operation = Operation(
operation_id=operation_id,
type=operation_type,
status=OperationStatus.QUEUED,
description=description,
created_at=datetime.now(),
priority=priority,
context=context or {}
)
self.operations[operation_id] = operation
await self.task_queue.put(operation)
# Store in database
await self.db_conn.execute('''
INSERT INTO agent_operations
(operation_id, type, status, description, created_at, priority, context)
VALUES ($1, $2, $3, $4, $5, $6, $7)
''', (
operation_id, operation_type, operation.status.value,
description, operation.created_at,
priority.value, json.dumps(context or {})
))
self.logger.info(f"Created operation: {operation_id} - {description}")
return operation_id
async def get_operation_status(self, operation_id: str) -> Optional[Operation]:
"""Get the current status of an operation"""
return self.operations.get(operation_id)
async def update_operation(self, operation_id: str, **updates):
"""Update operation status and information"""
if operation_id not in self.operations:
return False
operation = self.operations[operation_id]
for key, value in updates.items():
if hasattr(operation, key):
setattr(operation, key, value)
operation.updated_at = datetime.now()
# Convert dataclass lists to JSON for storage
def serialize_list(items):
if not items:
return []
return [item.__dict__ if hasattr(item, '__dict__') else item for item in items]
# Update database
await self.db_conn.execute('''
UPDATE agent_operations
SET status = $1, updated_at = $2, progress_percentage = $3,
context = $4, result = $5, spawned_processes = $6,
file_operations = $7, network_calls = $8, model_calls = $9,
error_messages = $10, warning_messages = $11, info_messages = $12
WHERE operation_id = $13
''', (
operation.status.value, operation.updated_at,
operation.progress_percentage, json.dumps(operation.context),
json.dumps(operation.result) if operation.result else None,
json.dumps(serialize_list(operation.spawned_processes)),
json.dumps(serialize_list(operation.file_operations)),
json.dumps(serialize_list(operation.network_calls)),
json.dumps(serialize_list(operation.model_calls)),
json.dumps(operation.error_messages),
json.dumps(operation.warning_messages),
json.dumps(operation.info_messages),
operation_id
))
return True
async def _handle_test_task(self, operation: Operation):
"""Handle a simple test task"""
operation.info_messages.append("Starting test task")
operation.progress_percentage = 25
await self.update_operation(operation.operation_id, progress_percentage=25)
await asyncio.sleep(2) # Simulate work
operation.info_messages.append("Test task in progress")
operation.progress_percentage = 75
await self.update_operation(operation.operation_id, progress_percentage=75)
await asyncio.sleep(1) # Simulate more work
operation.result = {"message": "Test task completed successfully", "timestamp": datetime.now().isoformat()}
operation.info_messages.append("Test task completed")
async def process_task_queue(self):
"""Main task processing loop"""
while self.running:
try:
operation = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
# Update operation status
operation.status = OperationStatus.RUNNING
operation.started_at = datetime.now()
await self.update_operation(operation.operation_id,
status=OperationStatus.RUNNING,
started_at=operation.started_at)
# Execute operation directly for testing
try:
if operation.type == "test_task":
await self._handle_test_task(operation)
operation.status = OperationStatus.COMPLETED
operation.progress_percentage = 100
else:
operation.error_messages.append(f"Unknown operation type: {operation.type}")
operation.status = OperationStatus.FAILED
except Exception as e:
operation.status = OperationStatus.FAILED
operation.error_messages.append(f"Execution failed: {str(e)}")
self.logger.error(f"Operation {operation.operation_id} failed: {e}")
finally:
await self.update_operation(operation.operation_id,
status=operation.status,
progress_percentage=operation.progress_percentage)
self.logger.info(f"Completed operation: {operation.operation_id}")
except asyncio.TimeoutError:
continue
except Exception as e:
self.logger.error(f"Error in task queue processing: {e}")
async def start(self):
"""Start the agent"""
if not self.db_conn:
await self._init_database()
await self._load_configuration()
self.running = True
self.logger.info("Starting Local System Agent")
# Start task processing
await self.process_task_queue()
async def stop(self):
"""Stop the agent gracefully"""
self.running = False
self.logger.info("Stopping Local System Agent")
# Cancel active tasks
for task in self.active_tasks.values():
task.cancel()
# Close database connection
if self.db_conn:
await self.db_conn.close()
# Initialize default Ollama models
async def setup_default_models(agent: LocalSystemAgent):
"""Setup default Ollama models"""
default_models = [
ModelConfig(
model_id="llama3.2:3b",
name="Llama 3.2 3B",
type="ollama",
endpoint="http://localhost:11434",
capabilities=["general", "coding", "analysis"]
),
ModelConfig(
model_id="qwen2.5:7b",
name="Qwen 2.5 7B",
type="ollama",
endpoint="http://localhost:11434",
capabilities=["general", "coding", "reasoning"]
),
ModelConfig(
model_id="llama3.2:1b",
name="Llama 3.2 1B (Fast)",
type="ollama",
endpoint="http://localhost:11434",
capabilities=["general", "quick_tasks"]
)
]
for model in default_models:
await agent.add_model(model)
# Example usage and testing
async def main():
try:
print("Initializing Local System Agent...")
# Initialize agent
agent = LocalSystemAgent()
print("Initializing database connection...")
# Initialize database and load config
await agent._init_database()
print("Loading configuration...")
await agent._load_configuration()
print("Setting up default models...")
# Setup default models
await setup_default_models(agent)
print("Creating test operation...")
# Create a test operation
operation_id = await agent.create_operation(
"test_task",
"Testing agent operation tracking",
TaskPriority.NORMAL,
{"test_param": "test_value"}
)
print(f"Created operation: {operation_id}")
# Start agent in background
print("Starting agent task processing...")
agent_task = asyncio.create_task(agent.start())
# Monitor operation progress
print("Monitoring operation progress...")
for i in range(10):
await asyncio.sleep(1)
operation = await agent.get_operation_status(operation_id)
if operation:
print(f"Operation status: {operation.status.value}, Progress: {operation.progress_percentage}%")
if operation.status in [OperationStatus.COMPLETED, OperationStatus.FAILED]:
print(f"Operation result: {operation.result}")
break
# Stop agent
print("Stopping agent...")
await agent.stop()
print("Agent stopped successfully")
except Exception as e:
print(f"Error in main: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())