549 lines
20 KiB
Python
549 lines
20 KiB
Python
# Local System Agent - Core Framework
|
|
# Phase 1: Foundation Setup - Single Connection Version
|
|
|
|
import asyncio
|
|
import uuid
|
|
import time
|
|
import json
|
|
import logging
|
|
import psutil
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import httpx
|
|
from pathlib import Path
|
|
import asyncpg
|
|
from contextlib import asynccontextmanager
|
|
|
|
# Configuration and Data Models
|
|
class OperationStatus(Enum):
|
|
QUEUED = "queued"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
class TaskPriority(Enum):
|
|
LOW = "low"
|
|
NORMAL = "normal"
|
|
HIGH = "high"
|
|
URGENT = "urgent"
|
|
|
|
@dataclass
|
|
class ProcessInfo:
|
|
pid: int
|
|
command: str
|
|
status: str
|
|
cpu_percent: float
|
|
memory_mb: float
|
|
start_time: datetime
|
|
|
|
@dataclass
|
|
class FileOperation:
|
|
path: str
|
|
operation: str # create, read, write, delete, move
|
|
timestamp: datetime
|
|
size_bytes: int
|
|
permissions: str
|
|
|
|
@dataclass
|
|
class NetworkCall:
|
|
url: str
|
|
method: str
|
|
status_code: int
|
|
response_time_ms: float
|
|
data_size_bytes: int
|
|
timestamp: datetime
|
|
|
|
@dataclass
|
|
class ModelCall:
|
|
model_id: str
|
|
tokens_used: int
|
|
cost_usd: float
|
|
response_time_ms: float
|
|
success: bool
|
|
timestamp: datetime
|
|
|
|
@dataclass
|
|
class Operation:
|
|
operation_id: str
|
|
type: str
|
|
status: OperationStatus
|
|
description: str
|
|
created_at: datetime
|
|
started_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
estimated_completion: Optional[datetime] = None
|
|
progress_percentage: int = 0
|
|
priority: TaskPriority = TaskPriority.NORMAL
|
|
spawned_processes: List[ProcessInfo] = field(default_factory=list)
|
|
file_operations: List[FileOperation] = field(default_factory=list)
|
|
network_calls: List[NetworkCall] = field(default_factory=list)
|
|
model_calls: List[ModelCall] = field(default_factory=list)
|
|
error_messages: List[str] = field(default_factory=list)
|
|
warning_messages: List[str] = field(default_factory=list)
|
|
info_messages: List[str] = field(default_factory=list)
|
|
context: Dict[str, Any] = field(default_factory=dict)
|
|
result: Optional[Any] = None
|
|
|
|
@dataclass
|
|
class ModelConfig:
|
|
model_id: str
|
|
name: str
|
|
type: str # "ollama", "lmstudio", "openai_compatible"
|
|
endpoint: str
|
|
api_key: Optional[str] = None
|
|
capabilities: List[str] = field(default_factory=list)
|
|
cost_per_token: float = 0.0
|
|
max_tokens: int = 4096
|
|
temperature: float = 0.1
|
|
enabled: bool = True
|
|
prompt_template: Optional[str] = None
|
|
|
|
@dataclass
|
|
class MCPServerConfig:
|
|
server_id: str
|
|
name: str
|
|
command: str
|
|
args: List[str]
|
|
env_vars: Dict[str, str] = field(default_factory=dict)
|
|
enabled: bool = True
|
|
auto_restart: bool = True
|
|
health_check_interval: int = 30
|
|
|
|
# Core Agent Class
|
|
class LocalSystemAgent:
|
|
def __init__(self, db_url: str = "postgresql://mcpuser:mcppoop123@bakecms.com:5432/postgres?sslmode=disable"):
|
|
self.db_url = db_url
|
|
self.db_conn = None # Single connection instead of pool
|
|
self.operations: Dict[str, Operation] = {}
|
|
self.task_queue = asyncio.Queue()
|
|
self.active_tasks: Dict[str, asyncio.Task] = {}
|
|
self.models: Dict[str, ModelConfig] = {}
|
|
self.mcp_servers: Dict[str, MCPServerConfig] = {}
|
|
self.default_model = None
|
|
self.running = False
|
|
|
|
# Initialize logging
|
|
self._setup_logging()
|
|
|
|
self.logger.info("Local System Agent initialized")
|
|
|
|
def _setup_logging(self):
|
|
"""Setup comprehensive logging system"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('agent.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger('LocalSystemAgent')
|
|
|
|
async def _init_database(self):
|
|
"""Initialize PostgreSQL connection and create tables"""
|
|
try:
|
|
print(f"Attempting to connect to database: {self.db_url}")
|
|
self.logger.info(f"Attempting to connect to database: {self.db_url}")
|
|
|
|
# Try single connection instead of pool
|
|
self.db_conn = await asyncio.wait_for(
|
|
asyncpg.connect(self.db_url),
|
|
timeout=10.0
|
|
)
|
|
self.logger.info("Connected to PostgreSQL database")
|
|
print("Connected to PostgreSQL database")
|
|
|
|
# Test the connection
|
|
version = await self.db_conn.fetchval("SELECT version()")
|
|
print(f"Database version: {version}")
|
|
self.logger.info(f"Database version: {version}")
|
|
|
|
# Create agent_models table
|
|
await self.db_conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS agent_models (
|
|
model_id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
type TEXT NOT NULL,
|
|
endpoint TEXT NOT NULL,
|
|
api_key TEXT,
|
|
capabilities JSONB DEFAULT '[]',
|
|
cost_per_token REAL DEFAULT 0.0,
|
|
max_tokens INTEGER DEFAULT 4096,
|
|
temperature REAL DEFAULT 0.1,
|
|
enabled BOOLEAN DEFAULT TRUE,
|
|
prompt_template TEXT,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
)
|
|
''')
|
|
|
|
# Create agent_mcp_servers table
|
|
await self.db_conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS agent_mcp_servers (
|
|
server_id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
command TEXT NOT NULL,
|
|
args JSONB NOT NULL,
|
|
env_vars JSONB DEFAULT '{}',
|
|
enabled BOOLEAN DEFAULT TRUE,
|
|
auto_restart BOOLEAN DEFAULT TRUE,
|
|
health_check_interval INTEGER DEFAULT 30,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
)
|
|
''')
|
|
|
|
# Create agent_operations table
|
|
await self.db_conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS agent_operations (
|
|
operation_id TEXT PRIMARY KEY,
|
|
type TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
description TEXT NOT NULL,
|
|
created_at TIMESTAMP NOT NULL,
|
|
started_at TIMESTAMP,
|
|
updated_at TIMESTAMP,
|
|
estimated_completion TIMESTAMP,
|
|
progress_percentage INTEGER DEFAULT 0,
|
|
priority TEXT DEFAULT 'normal',
|
|
context JSONB DEFAULT '{}',
|
|
result JSONB,
|
|
spawned_processes JSONB DEFAULT '[]',
|
|
file_operations JSONB DEFAULT '[]',
|
|
network_calls JSONB DEFAULT '[]',
|
|
model_calls JSONB DEFAULT '[]',
|
|
error_messages JSONB DEFAULT '[]',
|
|
warning_messages JSONB DEFAULT '[]',
|
|
info_messages JSONB DEFAULT '[]'
|
|
)
|
|
''')
|
|
|
|
self.logger.info("Database tables initialized")
|
|
print("Database tables created successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to initialize database: {e}")
|
|
print(f"Database initialization failed: {e}")
|
|
raise
|
|
|
|
async def _load_configuration(self):
|
|
"""Load models and MCP server configurations from database"""
|
|
if not self.db_conn:
|
|
await self._init_database()
|
|
|
|
# Load models
|
|
models_rows = await self.db_conn.fetch("SELECT * FROM agent_models WHERE enabled = TRUE")
|
|
for row in models_rows:
|
|
model = ModelConfig(
|
|
model_id=row['model_id'],
|
|
name=row['name'],
|
|
type=row['type'],
|
|
endpoint=row['endpoint'],
|
|
api_key=row['api_key'],
|
|
capabilities=row['capabilities'] or [],
|
|
cost_per_token=row['cost_per_token'],
|
|
max_tokens=row['max_tokens'],
|
|
temperature=row['temperature'],
|
|
enabled=row['enabled'],
|
|
prompt_template=row['prompt_template']
|
|
)
|
|
self.models[model.model_id] = model
|
|
|
|
# Load MCP servers
|
|
servers_rows = await self.db_conn.fetch("SELECT * FROM agent_mcp_servers WHERE enabled = TRUE")
|
|
for row in servers_rows:
|
|
server = MCPServerConfig(
|
|
server_id=row['server_id'],
|
|
name=row['name'],
|
|
command=row['command'],
|
|
args=row['args'] or [],
|
|
env_vars=row['env_vars'] or {},
|
|
enabled=row['enabled'],
|
|
auto_restart=row['auto_restart'],
|
|
health_check_interval=row['health_check_interval']
|
|
)
|
|
self.mcp_servers[server.server_id] = server
|
|
|
|
# Set default model (first available Ollama model)
|
|
ollama_models = [m for m in self.models.values() if m.type == "ollama"]
|
|
if ollama_models:
|
|
self.default_model = ollama_models[0].model_id
|
|
self.logger.info(f"Default model set to: {self.default_model}")
|
|
|
|
async def add_model(self, model_config: ModelConfig) -> bool:
|
|
"""Add a new model configuration"""
|
|
try:
|
|
await self.db_conn.execute('''
|
|
INSERT INTO agent_models
|
|
(model_id, name, type, endpoint, api_key, capabilities,
|
|
cost_per_token, max_tokens, temperature, enabled, prompt_template)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
ON CONFLICT (model_id) DO UPDATE SET
|
|
name = $2, type = $3, endpoint = $4, api_key = $5,
|
|
capabilities = $6, cost_per_token = $7, max_tokens = $8,
|
|
temperature = $9, enabled = $10, prompt_template = $11
|
|
''', (
|
|
model_config.model_id, model_config.name, model_config.type,
|
|
model_config.endpoint, model_config.api_key,
|
|
json.dumps(model_config.capabilities),
|
|
model_config.cost_per_token, model_config.max_tokens,
|
|
model_config.temperature, model_config.enabled,
|
|
model_config.prompt_template
|
|
))
|
|
|
|
self.models[model_config.model_id] = model_config
|
|
self.logger.info(f"Added model: {model_config.model_id}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to add model {model_config.model_id}: {e}")
|
|
return False
|
|
|
|
async def create_operation(self, operation_type: str, description: str,
|
|
priority: TaskPriority = TaskPriority.NORMAL,
|
|
context: Dict[str, Any] = None) -> str:
|
|
"""Create a new operation and add it to the queue"""
|
|
operation_id = str(uuid.uuid4())
|
|
operation = Operation(
|
|
operation_id=operation_id,
|
|
type=operation_type,
|
|
status=OperationStatus.QUEUED,
|
|
description=description,
|
|
created_at=datetime.now(),
|
|
priority=priority,
|
|
context=context or {}
|
|
)
|
|
|
|
self.operations[operation_id] = operation
|
|
await self.task_queue.put(operation)
|
|
|
|
# Store in database
|
|
await self.db_conn.execute('''
|
|
INSERT INTO agent_operations
|
|
(operation_id, type, status, description, created_at, priority, context)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
''', (
|
|
operation_id, operation_type, operation.status.value,
|
|
description, operation.created_at,
|
|
priority.value, json.dumps(context or {})
|
|
))
|
|
|
|
self.logger.info(f"Created operation: {operation_id} - {description}")
|
|
return operation_id
|
|
|
|
async def get_operation_status(self, operation_id: str) -> Optional[Operation]:
|
|
"""Get the current status of an operation"""
|
|
return self.operations.get(operation_id)
|
|
|
|
async def update_operation(self, operation_id: str, **updates):
|
|
"""Update operation status and information"""
|
|
if operation_id not in self.operations:
|
|
return False
|
|
|
|
operation = self.operations[operation_id]
|
|
|
|
for key, value in updates.items():
|
|
if hasattr(operation, key):
|
|
setattr(operation, key, value)
|
|
|
|
operation.updated_at = datetime.now()
|
|
|
|
# Convert dataclass lists to JSON for storage
|
|
def serialize_list(items):
|
|
if not items:
|
|
return []
|
|
return [item.__dict__ if hasattr(item, '__dict__') else item for item in items]
|
|
|
|
# Update database
|
|
await self.db_conn.execute('''
|
|
UPDATE agent_operations
|
|
SET status = $1, updated_at = $2, progress_percentage = $3,
|
|
context = $4, result = $5, spawned_processes = $6,
|
|
file_operations = $7, network_calls = $8, model_calls = $9,
|
|
error_messages = $10, warning_messages = $11, info_messages = $12
|
|
WHERE operation_id = $13
|
|
''', (
|
|
operation.status.value, operation.updated_at,
|
|
operation.progress_percentage, json.dumps(operation.context),
|
|
json.dumps(operation.result) if operation.result else None,
|
|
json.dumps(serialize_list(operation.spawned_processes)),
|
|
json.dumps(serialize_list(operation.file_operations)),
|
|
json.dumps(serialize_list(operation.network_calls)),
|
|
json.dumps(serialize_list(operation.model_calls)),
|
|
json.dumps(operation.error_messages),
|
|
json.dumps(operation.warning_messages),
|
|
json.dumps(operation.info_messages),
|
|
operation_id
|
|
))
|
|
|
|
return True
|
|
|
|
async def _handle_test_task(self, operation: Operation):
|
|
"""Handle a simple test task"""
|
|
operation.info_messages.append("Starting test task")
|
|
operation.progress_percentage = 25
|
|
await self.update_operation(operation.operation_id, progress_percentage=25)
|
|
|
|
await asyncio.sleep(2) # Simulate work
|
|
|
|
operation.info_messages.append("Test task in progress")
|
|
operation.progress_percentage = 75
|
|
await self.update_operation(operation.operation_id, progress_percentage=75)
|
|
|
|
await asyncio.sleep(1) # Simulate more work
|
|
|
|
operation.result = {"message": "Test task completed successfully", "timestamp": datetime.now().isoformat()}
|
|
operation.info_messages.append("Test task completed")
|
|
|
|
async def process_task_queue(self):
|
|
"""Main task processing loop"""
|
|
while self.running:
|
|
try:
|
|
operation = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
|
|
|
|
# Update operation status
|
|
operation.status = OperationStatus.RUNNING
|
|
operation.started_at = datetime.now()
|
|
await self.update_operation(operation.operation_id,
|
|
status=OperationStatus.RUNNING,
|
|
started_at=operation.started_at)
|
|
|
|
# Execute operation directly for testing
|
|
try:
|
|
if operation.type == "test_task":
|
|
await self._handle_test_task(operation)
|
|
operation.status = OperationStatus.COMPLETED
|
|
operation.progress_percentage = 100
|
|
else:
|
|
operation.error_messages.append(f"Unknown operation type: {operation.type}")
|
|
operation.status = OperationStatus.FAILED
|
|
|
|
except Exception as e:
|
|
operation.status = OperationStatus.FAILED
|
|
operation.error_messages.append(f"Execution failed: {str(e)}")
|
|
self.logger.error(f"Operation {operation.operation_id} failed: {e}")
|
|
|
|
finally:
|
|
await self.update_operation(operation.operation_id,
|
|
status=operation.status,
|
|
progress_percentage=operation.progress_percentage)
|
|
|
|
self.logger.info(f"Completed operation: {operation.operation_id}")
|
|
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
self.logger.error(f"Error in task queue processing: {e}")
|
|
|
|
async def start(self):
|
|
"""Start the agent"""
|
|
if not self.db_conn:
|
|
await self._init_database()
|
|
await self._load_configuration()
|
|
|
|
self.running = True
|
|
self.logger.info("Starting Local System Agent")
|
|
|
|
# Start task processing
|
|
await self.process_task_queue()
|
|
|
|
async def stop(self):
|
|
"""Stop the agent gracefully"""
|
|
self.running = False
|
|
self.logger.info("Stopping Local System Agent")
|
|
|
|
# Cancel active tasks
|
|
for task in self.active_tasks.values():
|
|
task.cancel()
|
|
|
|
# Close database connection
|
|
if self.db_conn:
|
|
await self.db_conn.close()
|
|
|
|
# Initialize default Ollama models
|
|
async def setup_default_models(agent: LocalSystemAgent):
|
|
"""Setup default Ollama models"""
|
|
default_models = [
|
|
ModelConfig(
|
|
model_id="llama3.2:3b",
|
|
name="Llama 3.2 3B",
|
|
type="ollama",
|
|
endpoint="http://localhost:11434",
|
|
capabilities=["general", "coding", "analysis"]
|
|
),
|
|
ModelConfig(
|
|
model_id="qwen2.5:7b",
|
|
name="Qwen 2.5 7B",
|
|
type="ollama",
|
|
endpoint="http://localhost:11434",
|
|
capabilities=["general", "coding", "reasoning"]
|
|
),
|
|
ModelConfig(
|
|
model_id="llama3.2:1b",
|
|
name="Llama 3.2 1B (Fast)",
|
|
type="ollama",
|
|
endpoint="http://localhost:11434",
|
|
capabilities=["general", "quick_tasks"]
|
|
)
|
|
]
|
|
|
|
for model in default_models:
|
|
await agent.add_model(model)
|
|
|
|
# Example usage and testing
|
|
async def main():
|
|
try:
|
|
print("Initializing Local System Agent...")
|
|
# Initialize agent
|
|
agent = LocalSystemAgent()
|
|
|
|
print("Initializing database connection...")
|
|
# Initialize database and load config
|
|
await agent._init_database()
|
|
|
|
print("Loading configuration...")
|
|
await agent._load_configuration()
|
|
|
|
print("Setting up default models...")
|
|
# Setup default models
|
|
await setup_default_models(agent)
|
|
|
|
print("Creating test operation...")
|
|
# Create a test operation
|
|
operation_id = await agent.create_operation(
|
|
"test_task",
|
|
"Testing agent operation tracking",
|
|
TaskPriority.NORMAL,
|
|
{"test_param": "test_value"}
|
|
)
|
|
|
|
print(f"Created operation: {operation_id}")
|
|
|
|
# Start agent in background
|
|
print("Starting agent task processing...")
|
|
agent_task = asyncio.create_task(agent.start())
|
|
|
|
# Monitor operation progress
|
|
print("Monitoring operation progress...")
|
|
for i in range(10):
|
|
await asyncio.sleep(1)
|
|
operation = await agent.get_operation_status(operation_id)
|
|
if operation:
|
|
print(f"Operation status: {operation.status.value}, Progress: {operation.progress_percentage}%")
|
|
if operation.status in [OperationStatus.COMPLETED, OperationStatus.FAILED]:
|
|
print(f"Operation result: {operation.result}")
|
|
break
|
|
|
|
# Stop agent
|
|
print("Stopping agent...")
|
|
await agent.stop()
|
|
print("Agent stopped successfully")
|
|
|
|
except Exception as e:
|
|
print(f"Error in main: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|