pyserveX/pyserve/ctl/_runner.py
Илья Глазунов 7662a7924a fixed flake8 lint errors
2025-12-04 03:06:58 +03:00

390 lines
13 KiB
Python

"""
PyServe Service Runner
Handles starting, stopping, and managing services.
Integrates with ProcessManager for actual process management.
"""
import asyncio
import os
import sys
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from ..config import Config
from ..process_manager import ProcessConfig, ProcessManager, ProcessState
from .state import StateManager
@dataclass
class ServiceDefinition:
name: str
path: str
app_path: str
app_type: str = "asgi"
module_path: Optional[str] = None
workers: int = 1
health_check_path: str = "/health"
health_check_interval: float = 10.0
health_check_timeout: float = 5.0
health_check_retries: int = 3
max_restart_count: int = 5
restart_delay: float = 1.0
shutdown_timeout: float = 30.0
strip_path: bool = True
env: Dict[str, str] = field(default_factory=dict)
class ServiceRunner:
def __init__(self, config: Config, state_manager: StateManager):
self.config = config
self.state_manager = state_manager
self._process_manager: Optional[ProcessManager] = None
self._services: Dict[str, ServiceDefinition] = {}
self._running = False
self._parse_services()
def _parse_services(self) -> None:
for ext in self.config.extensions:
if ext.type == "process_orchestration":
apps = ext.config.get("apps", [])
for app_config in apps:
service = ServiceDefinition(
name=app_config.get("name", "unnamed"),
path=app_config.get("path", "/"),
app_path=app_config.get("app_path", ""),
app_type=app_config.get("app_type", "asgi"),
module_path=app_config.get("module_path"),
workers=app_config.get("workers", 1),
health_check_path=app_config.get("health_check_path", "/health"),
health_check_interval=app_config.get("health_check_interval", 10.0),
health_check_timeout=app_config.get("health_check_timeout", 5.0),
health_check_retries=app_config.get("health_check_retries", 3),
max_restart_count=app_config.get("max_restart_count", 5),
restart_delay=app_config.get("restart_delay", 1.0),
shutdown_timeout=app_config.get("shutdown_timeout", 30.0),
strip_path=app_config.get("strip_path", True),
env=app_config.get("env", {}),
)
self._services[service.name] = service
def get_services(self) -> Dict[str, ServiceDefinition]:
return self._services.copy()
def get_service(self, name: str) -> Optional[ServiceDefinition]:
return self._services.get(name)
async def start(
self,
services: Optional[List[str]] = None,
scale_map: Optional[Dict[str, int]] = None,
force_recreate: bool = False,
wait_healthy: bool = False,
timeout: int = 60,
) -> None:
from .output import console, print_error, print_info, print_success
scale_map = scale_map or {}
target_services = services or list(self._services.keys())
if not target_services:
print_info("No services configured. Add services to your config.yaml")
return
for name in target_services:
if name not in self._services:
print_error(f"Service '{name}' not found in configuration")
return
port_range = (9000, 9999)
for ext in self.config.extensions:
if ext.type == "process_orchestration":
port_range = tuple(ext.config.get("port_range", [9000, 9999]))
break
self._process_manager = ProcessManager(
port_range=port_range,
health_check_enabled=True,
)
await self._process_manager.start()
self._running = True
for name in target_services:
service = self._services[name]
workers = scale_map.get(name, service.workers)
proc_config = ProcessConfig(
name=name,
app_path=service.app_path,
app_type=service.app_type,
workers=workers,
module_path=service.module_path,
health_check_enabled=True,
health_check_path=service.health_check_path,
health_check_interval=service.health_check_interval,
health_check_timeout=service.health_check_timeout,
health_check_retries=service.health_check_retries,
max_restart_count=service.max_restart_count,
restart_delay=service.restart_delay,
shutdown_timeout=service.shutdown_timeout,
env=service.env,
)
try:
await self._process_manager.register(proc_config)
success = await self._process_manager.start_process(name)
if success:
info = self._process_manager.get_process(name)
if info:
self.state_manager.update_service(
name,
state="running",
pid=info.pid,
port=info.port,
workers=workers,
started_at=time.time(),
)
print_success(f"Started service: {name}")
else:
self.state_manager.update_service(name, state="failed")
print_error(f"Failed to start service: {name}")
except Exception as e:
print_error(f"Error starting {name}: {e}")
self.state_manager.update_service(name, state="failed")
if wait_healthy:
print_info("Waiting for services to be healthy...")
await self._wait_healthy(target_services, timeout)
console.print("\n[bold]Services running. Press Ctrl+C to stop.[/bold]\n")
try:
while self._running:
await asyncio.sleep(1)
await self._sync_state()
except asyncio.CancelledError:
pass
finally:
await self.stop_all()
async def _sync_state(self) -> None:
if not self._process_manager:
return
for name, info in self._process_manager.get_all_processes().items():
state_str = info.state.value
health_status = "healthy" if info.health_check_failures == 0 else "unhealthy"
self.state_manager.update_service(
name,
state=state_str,
pid=info.pid,
port=info.port,
)
service_state = self.state_manager.get_service(name)
if service_state:
service_state.health.status = health_status
service_state.health.failures = info.health_check_failures
self.state_manager.save()
async def _wait_healthy(self, services: List[str], timeout: int) -> None:
from .output import print_info, print_warning
start_time = time.time()
while time.time() - start_time < timeout:
all_healthy = True
for name in services:
if not self._process_manager:
continue
info = self._process_manager.get_process(name)
if not info or info.state != ProcessState.RUNNING:
all_healthy = False
break
if all_healthy:
print_info("All services healthy")
return
await asyncio.sleep(1)
print_warning("Timeout waiting for services to become healthy")
async def stop_all(self, timeout: int = 30) -> None:
from .output import print_info
self._running = False
if self._process_manager:
print_info("Stopping all services...")
await self._process_manager.stop()
self._process_manager = None
for name in self._services:
self.state_manager.update_service(
name,
state="stopped",
pid=None,
)
def stop(self) -> None:
self._running = False
async def start_service(self, name: str, timeout: int = 60) -> bool:
from .output import print_error
service = self._services.get(name)
if not service:
print_error(f"Service '{name}' not found")
return False
if not self._process_manager:
self._process_manager = ProcessManager()
await self._process_manager.start()
proc_config = ProcessConfig(
name=name,
app_path=service.app_path,
app_type=service.app_type,
workers=service.workers,
module_path=service.module_path,
health_check_enabled=True,
health_check_path=service.health_check_path,
env=service.env,
)
try:
existing = self._process_manager.get_process(name)
if not existing:
await self._process_manager.register(proc_config)
success = await self._process_manager.start_process(name)
if success:
info = self._process_manager.get_process(name)
self.state_manager.update_service(
name,
state="running",
pid=info.pid if info else None,
port=info.port if info else 0,
started_at=time.time(),
)
return success
except Exception as e:
print_error(f"Error starting {name}: {e}")
return False
async def stop_service(self, name: str, timeout: int = 30, force: bool = False) -> bool:
if not self._process_manager:
self.state_manager.update_service(name, state="stopped", pid=None)
return True
try:
success = await self._process_manager.stop_process(name)
if success:
self.state_manager.update_service(
name,
state="stopped",
pid=None,
)
return success
except Exception as e:
from .output import print_error
print_error(f"Error stopping {name}: {e}")
return False
async def restart_service(self, name: str, timeout: int = 60) -> bool:
if not self._process_manager:
return False
try:
self.state_manager.update_service(name, state="restarting")
success = await self._process_manager.restart_process(name)
if success:
info = self._process_manager.get_process(name)
self.state_manager.update_service(
name,
state="running",
pid=info.pid if info else None,
port=info.port if info else 0,
started_at=time.time(),
)
return success
except Exception as e:
from .output import print_error
print_error(f"Error restarting {name}: {e}")
return False
async def scale_service(self, name: str, workers: int, timeout: int = 60, wait: bool = True) -> bool:
# For now, this requires restart with new worker count
# In future, could implement hot-reloading
service = self._services.get(name)
if not service:
return False
# Update service definition
service.workers = workers
# Restart with new configuration
return await self.restart_service(name, timeout)
def start_daemon(
self,
services: Optional[List[str]] = None,
scale_map: Optional[Dict[str, int]] = None,
force_recreate: bool = False,
) -> int:
import subprocess
cmd = [
sys.executable,
"-m",
"pyserve.cli._daemon",
"--config",
str(self.state_manager.state_dir.parent / "config.yaml"),
"--state-dir",
str(self.state_manager.state_dir),
]
if services:
cmd.extend(["--services", ",".join(services)])
if scale_map:
for name, workers in scale_map.items():
cmd.extend(["--scale", f"{name}={workers}"])
if force_recreate:
cmd.append("--force-recreate")
env = os.environ.copy()
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
return process.pid