From 3454801be7d3440c4315372a6a88617fa2b87ecc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=98=D0=BB=D1=8C=D1=8F=20=D0=93=D0=BB=D0=B0=D0=B7=D1=83?= =?UTF-8?q?=D0=BD=D0=BE=D0=B2?= Date: Thu, 4 Dec 2025 01:25:13 +0300 Subject: [PATCH] process_orchestration for asgi added --- README.md | 172 ++-- examples/config.example.orchestration.yaml | 113 +++ pyproject.toml | 4 +- pyserve/__init__.py | 21 +- pyserve/_wsgi_wrapper.py | 66 ++ pyserve/extensions.py | 36 + pyserve/process_extension.py | 365 +++++++ pyserve/process_manager.py | 553 ++++++++++ pyserve/server.py | 25 +- tests/test_process_orchestration.py | 1063 ++++++++++++++++++++ 10 files changed, 2303 insertions(+), 115 deletions(-) create mode 100644 examples/config.example.orchestration.yaml create mode 100644 pyserve/_wsgi_wrapper.py create mode 100644 pyserve/process_extension.py create mode 100644 pyserve/process_manager.py create mode 100644 tests/test_process_orchestration.py diff --git a/README.md b/README.md index 0418141..01d459a 100644 --- a/README.md +++ b/README.md @@ -1,145 +1,97 @@ # PyServe -PyServe is a modern, async HTTP server written in Python. Originally created for educational purposes, it has evolved into a powerful tool for rapid prototyping and serving web applications with unique features like AI-generated content. +Python application orchestrator and HTTP server. Runs multiple ASGI/WSGI applications through a single entry point with process isolation, health monitoring, and auto-restart. -isolated +PyServe Logo -[More on web page](https://pyserve.org/) +Website: [pyserve.org](https://pyserve.org) · Documentation: [docs.pyserve.org](https://docs.pyserve.org) -## Project Overview +## Overview -PyServe v0.6.0 introduces a completely refactored architecture with modern async/await syntax and new exciting features like **Vibe-Serving** - AI-powered dynamic content generation. +PyServe manages multiple Python web applications (FastAPI, Flask, Django, etc.) as isolated subprocesses behind a single gateway. Each app runs on its own port with independent lifecycle, health checks, and automatic restarts on failure. -### Key Features: +``` + PyServe Gateway (:8000) + │ + ┌────────────────┼────────────────┐ + ▼ ▼ ▼ + FastAPI Flask Starlette + :9001 :9002 :9003 + /api/* /admin/* /ws/* +``` -- **Async HTTP Server** - Built with Python's asyncio for high performance -- **Advanced Configuration System V2** - Powerful extensible configuration with full backward compatibility -- **Regex Routing & SPA Support** - nginx-style routing patterns with Single Page Application fallback -- **Static File Serving** - Efficient serving with correct MIME types -- **Template System** - Dynamic content generation -- **Vibe-Serving Mode** - AI-generated content using language models (OpenAI, Claude, etc.) -- **Reverse Proxy** - Forward requests to backend services with advanced routing -- **SSL/HTTPS Support** - Secure connections with certificate configuration -- **Modular Extensions** - Plugin-like architecture for security, caching, monitoring -- **Beautiful Logging** - Colored terminal output with file rotation -- **Error Handling** - Styled error pages and graceful fallbacks -- **CLI Interface** - Command-line interface for easy deployment and configuration - -## Getting Started - -### Prerequisites - -- Python 3.12 or higher -- Poetry (recommended) or pip - -### Installation - -#### Via Poetry (рекомендуется) +## Installation ```bash git clone https://github.com/ShiftyX1/PyServe.git cd PyServe -make init # Initialize project +make init ``` -#### Или установка пакета +## Quick Start -```bash -# local install -make install-package +```yaml +# config.yaml +server: + host: 0.0.0.0 + port: 8000 -# after installing project you can use command pyserve -pyserve --help +extensions: + - type: process_orchestration + config: + apps: + - name: api + path: /api + app_path: myapp.api:app + + - name: admin + path: /admin + app_path: myapp.admin:app ``` -### Running the Server - -#### Using Makefile (recommended) - ```bash -# start in development mode -make run - -# start in production mode -make run-prod - -# show all available commands -make help +pyserve -c config.yaml ``` -#### Using CLI directly +Requests to `/api/*` are proxied to the api process, `/admin/*` to admin. + +## Process Orchestration + +The main case of using PyServe is orchestration of python web applications. Each application runs as a separate uvicorn process on a dynamically or manually allocated port (9000-9999 by default). PyServe proxies requests to the appropriate process based on URL path. + +For each application you can configure the number of workers, environment variables, health check endpoint path, and auto-restart parameters. If a process crashes or stops responding to health checks, PyServe automatically restarts it with exponential backoff. + +WSGI applications (Flask, Django) are supported through automatic wrapping — just specify `app_type: wsgi`. + +## In-Process Mounting + +For simpler cases when process isolation is not needed, applications can be mounted directly into the PyServe process via the `asgi` extension. This is lighter and faster, but all applications share one process. + +## Static Files & Routing + +PyServe can serve static files with nginx-like routing: regex patterns, SPA fallback for frontend applications, custom caching headers. Routes are processed in priority order — exact match, then regex, then default. + +## Reverse Proxy + +Requests can be proxied to external backends. Useful for integration with legacy services or microservices in other languages. + +## CLI ```bash -# after installing package -pyserve - -# or with Poetry -poetry run pyserve - -# or legacy (for backward compatibility) -python run.py -``` - -#### CLI options - -```bash -# help -pyserve --help - -# path to config -pyserve -c /path/to/config.yaml - -# rewrite host and port +pyserve -c config.yaml pyserve --host 0.0.0.0 --port 9000 - -# debug mode pyserve --debug - -# show version pyserve --version ``` ## Development -### Makefile Commands - ```bash -make help # Show help for commands -make install # Install dependencies -make dev-install # Install development dependencies -make build # Build the package -make test # Run tests -make test-cov # Tests with code coverage -make lint # Check code with linters -make format # Format code -make clean # Clean up temporary files -make version # Show version -make publish-test # Publish to Test PyPI -make publish # Publish to PyPI -``` - -### Project Structure - -``` -pyserveX/ -├── pyserve/ # Main package -│ ├── __init__.py -│ ├── cli.py # CLI interface -│ ├── server.py # Main server module -│ ├── config.py # Configuration system -│ ├── routing.py # Routing -│ ├── extensions.py # Extensions -│ └── logging_utils.py -├── tests/ # Tests -├── static/ # Static files -├── templates/ # Templates -├── logs/ # Logs -├── Makefile # Automation tasks -├── pyproject.toml # Project configuration -├── config.yaml # Server configuration -└── run.py # Entry point (backward compatibility) +make test # run tests +make lint # linting +make format # formatting ``` ## License -This project is distributed under the MIT license. \ No newline at end of file +[MIT License](./LICENSE) diff --git a/examples/config.example.orchestration.yaml b/examples/config.example.orchestration.yaml new file mode 100644 index 0000000..be194cf --- /dev/null +++ b/examples/config.example.orchestration.yaml @@ -0,0 +1,113 @@ +# PyServe Process Orchestration Example +# +# This configuration demonstrates running multiple ASGI/WSGI applications +# as isolated processes with automatic health monitoring and restart. + +server: + host: "0.0.0.0" + port: 8000 + backlog: 2048 + proxy_timeout: 60.0 + +logging: + level: DEBUG + console_output: true + format: + type: standard + use_colors: true + +extensions: + # Process Orchestration - runs each app in its own process + - type: process_orchestration + config: + # Port range for worker processes + port_range: [9000, 9999] + + # Enable health monitoring + health_check_enabled: true + + # Proxy timeout for requests + proxy_timeout: 60.0 + + apps: + # FastAPI application + - name: api + path: /api + app_path: examples.apps.fastapi_app:app + module_path: "." + workers: 2 + health_check_path: /health + health_check_interval: 10.0 + health_check_timeout: 5.0 + health_check_retries: 3 + max_restart_count: 5 + restart_delay: 1.0 + shutdown_timeout: 30.0 + strip_path: true + env: + APP_ENV: "production" + DEBUG: "false" + + # Flask application (WSGI wrapped to ASGI) + - name: admin + path: /admin + app_path: examples.apps.flask_app:app + app_type: wsgi + module_path: "." + workers: 1 + health_check_path: /health + strip_path: true + + # Starlette application + - name: web + path: /web + app_path: examples.apps.starlette_app:app + module_path: "." + workers: 2 + health_check_path: /health + strip_path: true + + # Custom ASGI application + - name: custom + path: /custom + app_path: examples.apps.custom_asgi:app + module_path: "." + workers: 1 + health_check_path: /health + strip_path: true + + # Routing for static files and reverse proxy + - type: routing + config: + regex_locations: + # Static files + "^/static/.*": + type: static + root: "./static" + strip_prefix: "/static" + + # Documentation + "^/docs/?.*": + type: static + root: "./docs" + strip_prefix: "/docs" + + # External API proxy + "^/external/.*": + type: proxy + upstream: "https://api.example.com" + strip_prefix: "/external" + + # Security headers + - type: security + config: + security_headers: + X-Content-Type-Options: "nosniff" + X-Frame-Options: "DENY" + X-XSS-Protection: "1; mode=block" + Strict-Transport-Security: "max-age=31536000; includeSubDomains" + + # Monitoring + - type: monitoring + config: + enable_metrics: true diff --git a/pyproject.toml b/pyproject.toml index b64299a..419f76c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "pyserve" -version = "0.9.1" -description = "Simple HTTP Web server written in Python" +version = "0.9.9" +description = "Python Application Orchestrator & HTTP Server - unified gateway for multiple Python web apps" authors = [ {name = "Илья Глазунов",email = "i.glazunov@sapiens.solutions"} ] diff --git a/pyserve/__init__.py b/pyserve/__init__.py index 5373739..0f0f7ac 100644 --- a/pyserve/__init__.py +++ b/pyserve/__init__.py @@ -2,7 +2,7 @@ PyServe - HTTP web server written on Python """ -__version__ = "0.9.0" +__version__ = "0.9.10" __author__ = "Ilya Glazunov" from .asgi_mount import ( @@ -15,13 +15,22 @@ from .asgi_mount import ( create_starlette_app, ) from .config import Config +from .process_manager import ( + ProcessConfig, + ProcessInfo, + ProcessManager, + ProcessState, + get_process_manager, + init_process_manager, + shutdown_process_manager, +) from .server import PyServeServer __all__ = [ "PyServeServer", "Config", "__version__", - # ASGI mounting + # ASGI mounting (in-process) "ASGIAppLoader", "ASGIMountManager", "MountedApp", @@ -29,4 +38,12 @@ __all__ = [ "create_flask_app", "create_django_app", "create_starlette_app", + # Process orchestration (multi-process) + "ProcessManager", + "ProcessConfig", + "ProcessInfo", + "ProcessState", + "get_process_manager", + "init_process_manager", + "shutdown_process_manager", ] diff --git a/pyserve/_wsgi_wrapper.py b/pyserve/_wsgi_wrapper.py new file mode 100644 index 0000000..e5d0168 --- /dev/null +++ b/pyserve/_wsgi_wrapper.py @@ -0,0 +1,66 @@ +""" +WSGI Wrapper Module for Process Orchestration. + +This module provides a wrapper that allows WSGI applications to be run +via uvicorn by wrapping them with a2wsgi. + +The WSGI app path is passed via environment variables: +- PYSERVE_WSGI_APP: The app path (e.g., "myapp:app" or "myapp.main:create_app") +- PYSERVE_WSGI_FACTORY: "1" if the app path points to a factory function +""" + +import importlib +import os +from typing import Any, Callable + +try: + from a2wsgi import WSGIMiddleware + + WSGI_ADAPTER = "a2wsgi" +except ImportError: + try: + from asgiref.wsgi import WsgiToAsgi as WSGIMiddleware # type: ignore + + WSGI_ADAPTER = "asgiref" + except ImportError: + WSGIMiddleware = None # type: ignore + WSGI_ADAPTER = None + + +def _load_wsgi_app() -> Callable: + app_path = os.environ.get("PYSERVE_WSGI_APP") + is_factory = os.environ.get("PYSERVE_WSGI_FACTORY", "0") == "1" + + if not app_path: + raise RuntimeError("PYSERVE_WSGI_APP environment variable not set. " "This module should only be used by PyServe process orchestration.") + + if ":" in app_path: + module_name, attr_name = app_path.rsplit(":", 1) + else: + module_name = app_path + attr_name = "app" + + try: + module = importlib.import_module(module_name) + except ImportError as e: + raise RuntimeError(f"Failed to import WSGI module '{module_name}': {e}") + + try: + app_or_factory = getattr(module, attr_name) + except AttributeError: + raise RuntimeError(f"Module '{module_name}' has no attribute '{attr_name}'") + + if is_factory: + return app_or_factory() + return app_or_factory + + +def _create_asgi_app() -> Any: + if WSGIMiddleware is None: + raise RuntimeError("No WSGI adapter available. " "Install a2wsgi (recommended) or asgiref: pip install a2wsgi") + + wsgi_app = _load_wsgi_app() + return WSGIMiddleware(wsgi_app) + + +app = _create_asgi_app() diff --git a/pyserve/extensions.py b/pyserve/extensions.py index 07c8aa9..cba5bdb 100644 --- a/pyserve/extensions.py +++ b/pyserve/extensions.py @@ -1,3 +1,4 @@ +import asyncio from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Type @@ -216,6 +217,15 @@ class ExtensionManager: "monitoring": MonitoringExtension, "asgi": ASGIExtension, } + self._register_process_orchestration() + + def _register_process_orchestration(self) -> None: + try: + from .process_extension import ProcessOrchestrationExtension + + self.extension_registry["process_orchestration"] = ProcessOrchestrationExtension # type: ignore + except ImportError: + pass # Optional dependency def register_extension_type(self, name: str, extension_class: Type[Extension]) -> None: self.extension_registry[name] = extension_class @@ -234,6 +244,32 @@ class ExtensionManager: except Exception as e: logger.error(f"Error loading extension {extension_type}: {e}") + async def load_extension_async(self, extension_type: str, config: Dict[str, Any]) -> None: + """Load extension with async setup support (for ProcessOrchestration).""" + if extension_type not in self.extension_registry: + logger.error(f"Unknown extension type: {extension_type}") + return + + try: + extension_class = self.extension_registry[extension_type] + extension = extension_class(config) + + setup_method = getattr(extension, "setup", None) + if setup_method is not None and asyncio.iscoroutinefunction(setup_method): + await setup_method(config) + else: + extension.initialize() + + start_method = getattr(extension, "start", None) + if start_method is not None and asyncio.iscoroutinefunction(start_method): + await start_method() + + # Insert at the beginning so process_orchestration is checked first + self.extensions.insert(0, extension) + logger.info(f"Loaded extension (async): {extension_type}") + except Exception as e: + logger.error(f"Error loading extension {extension_type}: {e}") + async def process_request(self, request: Request) -> Optional[Response]: for extension in self.extensions: if not extension.enabled: diff --git a/pyserve/process_extension.py b/pyserve/process_extension.py new file mode 100644 index 0000000..3cf9bda --- /dev/null +++ b/pyserve/process_extension.py @@ -0,0 +1,365 @@ +"""Process Orchestration Extension + +Extension that manages ASGI/WSGI applications as isolated processes +and routes requests to them via reverse proxy. +""" + +import asyncio +import logging +import time +import uuid +from typing import Any, Dict, Optional + +import httpx +from starlette.requests import Request +from starlette.responses import Response + +from .extensions import Extension +from .logging_utils import get_logger +from .process_manager import ProcessConfig, ProcessManager + +logger = get_logger(__name__) + + +class ProcessOrchestrationExtension(Extension): + """ + Extension that orchestrates ASGI/WSGI applications as separate processes. + + Unlike ASGIExtension which runs apps in-process, this extension: + - Runs each app in its own isolated process + - Provides health monitoring and auto-restart + - Routes requests via HTTP reverse proxy + - Supports multiple workers per app + + Configuration example: + ```yaml + extensions: + - type: process_orchestration + config: + port_range: [9000, 9999] + health_check_enabled: true + apps: + - name: api + path: /api + app_path: myapp.api:app + workers: 4 + health_check_path: /health + + - name: admin + path: /admin + app_path: myapp.admin:create_app + factory: true + workers: 2 + ``` + """ + + name = "process_orchestration" + + def __init__(self, config: Dict[str, Any]) -> None: + super().__init__(config) + self._manager: Optional[ProcessManager] = None + self._mounts: Dict[str, MountConfig] = {} # path -> config + self._http_client: Optional[httpx.AsyncClient] = None + self._started = False + self._proxy_timeout: float = config.get("proxy_timeout", 60.0) + self._pending_config = config # Store for async setup + + logging_config = config.get("logging", {}) + self._log_proxy_requests: bool = logging_config.get("proxy_logs", True) + self._log_health_checks: bool = logging_config.get("health_check_logs", False) + + httpx_level = logging_config.get("httpx_level", "warning").upper() + logging.getLogger("httpx").setLevel(getattr(logging, httpx_level, logging.WARNING)) + logging.getLogger("httpcore").setLevel(getattr(logging, httpx_level, logging.WARNING)) + + async def setup(self, config: Optional[Dict[str, Any]] = None) -> None: + if config is None: + config = self._pending_config + + port_range = tuple(config.get("port_range", [9000, 9999])) + health_check_enabled = config.get("health_check_enabled", True) + self._proxy_timeout = config.get("proxy_timeout", 60.0) + + self._manager = ProcessManager( + port_range=port_range, + health_check_enabled=health_check_enabled, + ) + + self._http_client = httpx.AsyncClient( + timeout=httpx.Timeout(self._proxy_timeout), + follow_redirects=False, + limits=httpx.Limits( + max_keepalive_connections=100, + max_connections=200, + ), + ) + + apps_config = config.get("apps", []) + for app_config in apps_config: + await self._register_app(app_config) + + logger.info( + "Process orchestration extension initialized", + app_count=len(self._mounts), + ) + + async def _register_app(self, app_config: Dict[str, Any]) -> None: + if not self._manager: + return + + name = app_config.get("name") + path = app_config.get("path", "").rstrip("/") + app_path = app_config.get("app_path") + + if not name or not app_path: + logger.error("App config missing 'name' or 'app_path'") + return + + process_config = ProcessConfig( + name=name, + app_path=app_path, + app_type=app_config.get("app_type", "asgi"), + workers=app_config.get("workers", 1), + module_path=app_config.get("module_path"), + factory=app_config.get("factory", False), + factory_args=app_config.get("factory_args"), + env=app_config.get("env", {}), + health_check_enabled=app_config.get("health_check_enabled", True), + health_check_path=app_config.get("health_check_path", "/health"), + health_check_interval=app_config.get("health_check_interval", 10.0), + health_check_timeout=app_config.get("health_check_timeout", 5.0), + health_check_retries=app_config.get("health_check_retries", 3), + max_memory_mb=app_config.get("max_memory_mb"), + max_restart_count=app_config.get("max_restart_count", 5), + restart_delay=app_config.get("restart_delay", 1.0), + shutdown_timeout=app_config.get("shutdown_timeout", 30.0), + ) + + await self._manager.register(process_config) + + self._mounts[path] = MountConfig( + path=path, + process_name=name, + strip_path=app_config.get("strip_path", True), + ) + + logger.info(f"Registered app '{name}' at path '{path}'") + + async def start(self) -> None: + if self._started or not self._manager: + return + + await self._manager.start() + results = await self._manager.start_all() + + self._started = True + + success = sum(1 for v in results.values() if v) + failed = len(results) - success + + logger.info( + "Process orchestration started", + success=success, + failed=failed, + ) + + async def stop(self) -> None: + if not self._started: + return + + if self._http_client: + await self._http_client.aclose() + self._http_client = None + + if self._manager: + await self._manager.stop() + + self._started = False + logger.info("Process orchestration stopped") + + def cleanup(self) -> None: + try: + loop = asyncio.get_running_loop() + loop.create_task(self.stop()) + except RuntimeError: + asyncio.run(self.stop()) + + async def process_request(self, request: Request) -> Optional[Response]: + if not self._started or not self._manager: + logger.debug( + "Process orchestration not ready", + started=self._started, + has_manager=self._manager is not None, + ) + return None + + mount = self._get_mount(request.url.path) + if not mount: + logger.debug( + "No mount found for path", + path=request.url.path, + available_mounts=list(self._mounts.keys()), + ) + return None + + upstream_url = self._manager.get_upstream_url(mount.process_name) + if not upstream_url: + logger.warning( + f"Process '{mount.process_name}' not running", + path=request.url.path, + ) + return Response("Service Unavailable", status_code=503) + + request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8]) + + start_time = time.perf_counter() + response = await self._proxy_request(request, upstream_url, mount, request_id) + latency_ms = (time.perf_counter() - start_time) * 1000 + + if self._log_proxy_requests: + logger.info( + "Proxy request completed", + request_id=request_id, + method=request.method, + path=request.url.path, + process=mount.process_name, + upstream=upstream_url, + status=response.status_code, + latency_ms=round(latency_ms, 2), + ) + + return response + + def _get_mount(self, path: str) -> Optional["MountConfig"]: + for mount_path in sorted(self._mounts.keys(), key=len, reverse=True): + if mount_path == "": + return self._mounts[mount_path] + if path == mount_path or path.startswith(f"{mount_path}/"): + return self._mounts[mount_path] + return None + + async def _proxy_request( + self, + request: Request, + upstream_url: str, + mount: "MountConfig", + request_id: str = "", + ) -> Response: + path = request.url.path + if mount.strip_path and mount.path: + path = path[len(mount.path) :] or "/" + + target_url = f"{upstream_url}{path}" + if request.url.query: + target_url += f"?{request.url.query}" + + headers = dict(request.headers) + headers.pop("host", None) + headers["X-Forwarded-For"] = request.client.host if request.client else "unknown" + headers["X-Forwarded-Proto"] = request.url.scheme + headers["X-Forwarded-Host"] = request.headers.get("host", "") + if request_id: + headers["X-Request-ID"] = request_id + + try: + if not self._http_client: + return Response("Service Unavailable", status_code=503) + + body = await request.body() + + response = await self._http_client.request( + method=request.method, + url=target_url, + headers=headers, + content=body, + ) + + response_headers = dict(response.headers) + for header in ["transfer-encoding", "connection", "keep-alive"]: + response_headers.pop(header, None) + + return Response( + content=response.content, + status_code=response.status_code, + headers=response_headers, + ) + + except httpx.TimeoutException: + logger.error(f"Proxy timeout to {upstream_url}") + return Response("Gateway Timeout", status_code=504) + except httpx.ConnectError as e: + logger.error(f"Proxy connection error to {upstream_url}: {e}") + return Response("Bad Gateway", status_code=502) + except Exception as e: + logger.error(f"Proxy error to {upstream_url}: {e}") + return Response("Internal Server Error", status_code=500) + + async def process_response( + self, + request: Request, + response: Response, + ) -> Response: + return response + + def get_metrics(self) -> Dict[str, Any]: + metrics = { + "process_orchestration": { + "enabled": self._started, + "mounts": len(self._mounts), + } + } + + if self._manager: + metrics["process_orchestration"].update(self._manager.get_metrics()) + + return metrics + + async def get_process_status(self, name: str) -> Optional[Dict[str, Any]]: + if not self._manager: + return None + info = self._manager.get_process(name) + return info.to_dict() if info else None + + async def get_all_status(self) -> Dict[str, Any]: + if not self._manager: + return {} + return {name: info.to_dict() for name, info in self._manager.get_all_processes().items()} + + async def restart_process(self, name: str) -> bool: + if not self._manager: + return False + return await self._manager.restart_process(name) + + async def scale_process(self, name: str, workers: int) -> bool: + if not self._manager: + return False + + info = self._manager.get_process(name) + if not info: + return False + + info.config.workers = workers + return await self._manager.restart_process(name) + + +class MountConfig: + def __init__( + self, + path: str, + process_name: str, + strip_path: bool = True, + ): + self.path = path + self.process_name = process_name + self.strip_path = strip_path + + +async def setup_process_orchestration(config: Dict[str, Any]) -> ProcessOrchestrationExtension: + ext = ProcessOrchestrationExtension(config) + await ext.setup(config) + await ext.start() + return ext + + +async def shutdown_process_orchestration(ext: ProcessOrchestrationExtension) -> None: + await ext.stop() diff --git a/pyserve/process_manager.py b/pyserve/process_manager.py new file mode 100644 index 0000000..b1411b3 --- /dev/null +++ b/pyserve/process_manager.py @@ -0,0 +1,553 @@ +"""Process Manager Module + +Orchestrates ASGI/WSGI applications as separate processes +""" + +import asyncio +import logging +import os +import signal +import socket +import subprocess +import sys +import time +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional + +from .logging_utils import get_logger + +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + +logger = get_logger(__name__) + + +class ProcessState(Enum): + PENDING = "pending" + STARTING = "starting" + RUNNING = "running" + STOPPING = "stopping" + STOPPED = "stopped" + FAILED = "failed" + RESTARTING = "restarting" + + +@dataclass +class ProcessConfig: + name: str + app_path: str + app_type: str = "asgi" # asgi, wsgi + host: str = "127.0.0.1" + port: int = 0 # 0 = auto-assign + workers: int = 1 + module_path: Optional[str] = None + factory: bool = False + factory_args: Optional[Dict[str, Any]] = None + env: Dict[str, str] = field(default_factory=dict) + + health_check_enabled: bool = True + health_check_path: str = "/health" + health_check_interval: float = 10.0 + health_check_timeout: float = 5.0 + health_check_retries: int = 3 + + max_memory_mb: Optional[int] = None + max_restart_count: int = 5 + restart_delay: float = 1.0 # seconds + + shutdown_timeout: float = 30.0 # seconds + + +@dataclass +class ProcessInfo: + config: ProcessConfig + state: ProcessState = ProcessState.PENDING + pid: Optional[int] = None + port: int = 0 + start_time: Optional[float] = None + restart_count: int = 0 + last_health_check: Optional[float] = None + health_check_failures: int = 0 + process: Optional[subprocess.Popen] = None + + @property + def uptime(self) -> float: + if self.start_time is None: + return 0.0 + return time.time() - self.start_time + + @property + def is_running(self) -> bool: + return self.state == ProcessState.RUNNING and self.process is not None + + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.config.name, + "state": self.state.value, + "pid": self.pid, + "port": self.port, + "uptime": round(self.uptime, 2), + "restart_count": self.restart_count, + "health_check_failures": self.health_check_failures, + "workers": self.config.workers, + } + + +class PortAllocator: + def __init__(self, start_port: int = 9000, end_port: int = 9999): + self.start_port = start_port + self.end_port = end_port + self._allocated: set[int] = set() + self._lock = asyncio.Lock() + + async def allocate(self) -> int: + async with self._lock: + for port in range(self.start_port, self.end_port + 1): + if port in self._allocated: + continue + if self._is_port_available(port): + self._allocated.add(port) + return port + raise RuntimeError(f"No available ports in range {self.start_port}-{self.end_port}") + + async def release(self, port: int) -> None: + async with self._lock: + self._allocated.discard(port) + + def _is_port_available(self, port: int) -> bool: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(("127.0.0.1", port)) + return True + except OSError: + return False + + +class ProcessManager: + def __init__( + self, + port_range: tuple[int, int] = (9000, 9999), + health_check_enabled: bool = True, + ): + self._processes: Dict[str, ProcessInfo] = {} + self._port_allocator = PortAllocator(*port_range) + self._health_check_enabled = health_check_enabled + self._health_check_task: Optional[asyncio.Task] = None + self._shutdown_event = asyncio.Event() + self._started = False + self._lock = asyncio.Lock() + + async def start(self) -> None: + if self._started: + return + + self._started = True + self._shutdown_event.clear() + + if self._health_check_enabled: + self._health_check_task = asyncio.create_task(self._health_check_loop(), name="process_manager_health_check") + + logger.info("Process manager started") + + async def stop(self) -> None: + if not self._started: + return + + logger.info("Stopping process manager...") + self._shutdown_event.set() + + if self._health_check_task: + self._health_check_task.cancel() + try: + await self._health_check_task + except asyncio.CancelledError: + pass + + await self.stop_all() + + self._started = False + logger.info("Process manager stopped") + + async def register(self, config: ProcessConfig) -> ProcessInfo: + async with self._lock: + if config.name in self._processes: + raise ValueError(f"Process '{config.name}' already registered") + + info = ProcessInfo(config=config) + self._processes[config.name] = info + + logger.info(f"Registered process '{config.name}'", app_path=config.app_path) + return info + + async def unregister(self, name: str) -> None: + async with self._lock: + if name not in self._processes: + return + + info = self._processes[name] + if info.is_running: + await self._stop_process(info) + + if info.port: + await self._port_allocator.release(info.port) + + del self._processes[name] + logger.info(f"Unregistered process '{name}'") + + async def start_process(self, name: str) -> bool: + info = self._processes.get(name) + if not info: + logger.error(f"Process '{name}' not found") + return False + + if info.is_running: + logger.warning(f"Process '{name}' is already running") + return True + + return await self._start_process(info) + + async def stop_process(self, name: str) -> bool: + info = self._processes.get(name) + if not info: + logger.error(f"Process '{name}' not found") + return False + + return await self._stop_process(info) + + async def restart_process(self, name: str) -> bool: + info = self._processes.get(name) + if not info: + logger.error(f"Process '{name}' not found") + return False + + info.state = ProcessState.RESTARTING + + if info.is_running: + await self._stop_process(info) + + await asyncio.sleep(info.config.restart_delay) + return await self._start_process(info) + + async def start_all(self) -> Dict[str, bool]: + results = {} + for name in self._processes: + results[name] = await self.start_process(name) + return results + + async def stop_all(self) -> None: + tasks = [] + for info in self._processes.values(): + if info.is_running: + tasks.append(self._stop_process(info)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + def get_process(self, name: str) -> Optional[ProcessInfo]: + return self._processes.get(name) + + def get_all_processes(self) -> Dict[str, ProcessInfo]: + return self._processes.copy() + + def get_process_by_port(self, port: int) -> Optional[ProcessInfo]: + for info in self._processes.values(): + if info.port == port: + return info + return None + + def get_upstream_url(self, name: str) -> Optional[str]: + info = self._processes.get(name) + if not info or not info.is_running: + return None + return f"http://{info.config.host}:{info.port}" + + async def _start_process(self, info: ProcessInfo) -> bool: + config = info.config + + try: + info.state = ProcessState.STARTING + + if info.port == 0: + info.port = await self._port_allocator.allocate() + + cmd = self._build_command(config, info.port) + + env = os.environ.copy() + env.update(config.env) + + if config.module_path: + python_path = env.get("PYTHONPATH", "") + module_dir = str(Path(config.module_path).resolve()) + env["PYTHONPATH"] = f"{module_dir}:{python_path}" if python_path else module_dir + + # For WSGI apps, pass configuration via environment variables + if config.app_type == "wsgi": + env["PYSERVE_WSGI_APP"] = config.app_path + env["PYSERVE_WSGI_FACTORY"] = "1" if config.factory else "0" + + logger.info( + f"Starting process '{config.name}'", + command=" ".join(cmd), + port=info.port, + ) + + info.process = subprocess.Popen( + cmd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid if hasattr(os, "setsid") else None, + ) + + info.pid = info.process.pid + info.start_time = time.time() + + if not await self._wait_for_ready(info): + raise RuntimeError(f"Process '{config.name}' failed to start") + + info.state = ProcessState.RUNNING + logger.info( + f"Process '{config.name}' started successfully", + pid=info.pid, + port=info.port, + ) + return True + + except Exception as e: + logger.error(f"Failed to start process '{config.name}': {e}") + info.state = ProcessState.FAILED + if info.port: + await self._port_allocator.release(info.port) + info.port = 0 + return False + + async def _stop_process(self, info: ProcessInfo) -> bool: + if not info.process: + info.state = ProcessState.STOPPED + return True + + config = info.config + info.state = ProcessState.STOPPING + + try: + if hasattr(os, "killpg"): + try: + os.killpg(os.getpgid(info.process.pid), signal.SIGTERM) + except ProcessLookupError: + pass + else: + info.process.terminate() + + try: + await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, info.process.wait), timeout=config.shutdown_timeout) + except asyncio.TimeoutError: + logger.warning(f"Process '{config.name}' did not stop gracefully, forcing kill") + if hasattr(os, "killpg"): + try: + os.killpg(os.getpgid(info.process.pid), signal.SIGKILL) + except ProcessLookupError: + pass + else: + info.process.kill() + info.process.wait() + + if info.port: + await self._port_allocator.release(info.port) + + info.state = ProcessState.STOPPED + info.process = None + info.pid = None + + logger.info(f"Process '{config.name}' stopped") + return True + + except Exception as e: + logger.error(f"Error stopping process '{config.name}': {e}") + info.state = ProcessState.FAILED + return False + + async def _wait_for_ready(self, info: ProcessInfo, timeout: float = 30.0) -> bool: + import httpx + + start_time = time.time() + url = f"http://{info.config.host}:{info.port}{info.config.health_check_path}" + + while time.time() - start_time < timeout: + if info.process and info.process.poll() is not None: + stdout, stderr = info.process.communicate() + logger.error( + f"Process '{info.config.name}' exited during startup", + returncode=info.process.returncode, + stderr=stderr.decode() if stderr else "", + ) + return False + + try: + async with httpx.AsyncClient(timeout=2.0) as client: + resp = await client.get(url) + if resp.status_code < 500: + return True + except Exception: + pass + + await asyncio.sleep(0.5) + + return False + + async def _health_check_loop(self) -> None: + while not self._shutdown_event.is_set(): + try: + for info in list(self._processes.values()): + if not info.is_running or not info.config.health_check_enabled: + continue + + await self._check_process_health(info) + + try: + await asyncio.wait_for( + self._shutdown_event.wait(), + timeout=( + min(p.config.health_check_interval for p in self._processes.values() if p.config.health_check_enabled) + if self._processes + else 10.0 + ), + ) + break + except asyncio.TimeoutError: + pass + + except Exception as e: + logger.error(f"Error in health check loop: {e}") + await asyncio.sleep(5) + + async def _check_process_health(self, info: ProcessInfo) -> bool: + import httpx + + config = info.config + url = f"http://{config.host}:{info.port}{config.health_check_path}" + + try: + async with httpx.AsyncClient(timeout=config.health_check_timeout) as client: + resp = await client.get(url) + if resp.status_code < 500: + info.health_check_failures = 0 + info.last_health_check = time.time() + return True + else: + raise Exception(f"Health check returned status {resp.status_code}") + + except Exception as e: + info.health_check_failures += 1 + logger.warning( + f"Health check failed for '{config.name}'", + failures=info.health_check_failures, + error=str(e), + ) + + if info.health_check_failures >= config.health_check_retries: + logger.error(f"Process '{config.name}' is unhealthy, restarting...") + await self._handle_unhealthy_process(info) + + return False + + async def _handle_unhealthy_process(self, info: ProcessInfo) -> None: + config = info.config + + if info.restart_count >= config.max_restart_count: + logger.error(f"Process '{config.name}' exceeded max restart count, marking as failed") + info.state = ProcessState.FAILED + return + + info.restart_count += 1 + info.health_check_failures = 0 + + delay = config.restart_delay * (2 ** (info.restart_count - 1)) + delay = min(delay, 60.0) + + logger.info( + f"Restarting process '{config.name}'", + restart_count=info.restart_count, + delay=delay, + ) + + await self._stop_process(info) + await asyncio.sleep(delay) + await self._start_process(info) + + def _build_command(self, config: ProcessConfig, port: int) -> List[str]: + if config.app_type == "wsgi": + wrapper_app = self._create_wsgi_wrapper_path(config) + app_path = wrapper_app + else: + app_path = config.app_path + + cmd = [ + sys.executable, + "-m", + "uvicorn", + app_path, + "--host", + config.host, + "--port", + str(port), + "--workers", + str(config.workers), + "--log-level", + "warning", + "--no-access-log", + ] + + if config.factory and config.app_type != "wsgi": + cmd.append("--factory") + + return cmd + + def _create_wsgi_wrapper_path(self, config: ProcessConfig) -> str: + """ + Since uvicorn can't directly run WSGI apps, we create a wrapper + that imports the WSGI app and wraps it with a2wsgi. + """ + # For WSGI apps, we'll use a special wrapper module + # The wrapper is: pyserve._wsgi_wrapper:create_app + # It will be called with app_path as environment variable + return "pyserve._wsgi_wrapper:app" + + def get_metrics(self) -> Dict[str, Any]: + return { + "managed_processes": len(self._processes), + "running_processes": sum(1 for p in self._processes.values() if p.is_running), + "processes": {name: info.to_dict() for name, info in self._processes.items()}, + } + + +_process_manager: Optional[ProcessManager] = None + + +def get_process_manager() -> ProcessManager: + global _process_manager + if _process_manager is None: + _process_manager = ProcessManager() + return _process_manager + + +async def init_process_manager( + port_range: tuple[int, int] = (9000, 9999), + health_check_enabled: bool = True, +) -> ProcessManager: + global _process_manager + _process_manager = ProcessManager( + port_range=port_range, + health_check_enabled=health_check_enabled, + ) + await _process_manager.start() + return _process_manager + + +async def shutdown_process_manager() -> None: + global _process_manager + if _process_manager: + await _process_manager.stop() + _process_manager = None diff --git a/pyserve/server.py b/pyserve/server.py index 9805b1c..f1ae8e7 100644 --- a/pyserve/server.py +++ b/pyserve/server.py @@ -119,6 +119,7 @@ class PyServeServer: self.config = config self.extension_manager = ExtensionManager() self.app: Optional[Starlette] = None + self._async_extensions_loaded = False self._setup_logging() self._load_extensions() self._create_app() @@ -133,16 +134,38 @@ class PyServeServer: if ext_config.type == "routing": config.setdefault("default_proxy_timeout", self.config.server.proxy_timeout) + if ext_config.type == "process_orchestration": + continue + self.extension_manager.load_extension(ext_config.type, config) + async def _load_async_extensions(self) -> None: + if self._async_extensions_loaded: + return + + for ext_config in self.config.extensions: + if ext_config.type == "process_orchestration": + config = ext_config.config.copy() + await self.extension_manager.load_extension_async(ext_config.type, config) + + self._async_extensions_loaded = True + def _create_app(self) -> None: + from contextlib import asynccontextmanager + + @asynccontextmanager + async def lifespan(app: Starlette): + await self._load_async_extensions() + logger.info("Async extensions loaded") + yield + routes = [ Route("/health", self._health_check, methods=["GET"]), Route("/metrics", self._metrics, methods=["GET"]), Route("/{path:path}", self._catch_all, methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]), ] - self.app = Starlette(routes=routes) + self.app = Starlette(routes=routes, lifespan=lifespan) self.app.add_middleware(PyServeMiddleware, extension_manager=self.extension_manager) async def _health_check(self, request: Request) -> Response: diff --git a/tests/test_process_orchestration.py b/tests/test_process_orchestration.py new file mode 100644 index 0000000..8e34df7 --- /dev/null +++ b/tests/test_process_orchestration.py @@ -0,0 +1,1063 @@ +""" +Integration tests for Process Orchestration functionality. + +These tests start PyServe with process orchestration extension and verify +that requests are correctly routed to isolated worker processes. +""" + +import asyncio +import os +import sys +import tempfile +import pytest +import httpx +import socket +from pathlib import Path +from typing import Dict, Any + +import uvicorn +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import JSONResponse, PlainTextResponse +from starlette.routing import Route + +from pyserve.config import Config, ServerConfig, HttpConfig, LoggingConfig, ExtensionConfig +from pyserve.server import PyServeServer +from pyserve.process_manager import ( + ProcessConfig, + ProcessInfo, + ProcessManager, + ProcessState, + PortAllocator, +) + + +def get_free_port() -> int: + """Get an available port.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +# ============== Test Applications (saved as temp files) ============== + +FASTAPI_APP_CODE = ''' +from fastapi import FastAPI +import os + +app = FastAPI() + +@app.get("/") +def root(): + return {"app": "fastapi-worker", "pid": os.getpid()} + +@app.get("/health") +def health(): + return {"status": "healthy", "pid": os.getpid()} + +@app.get("/users") +def users(): + return {"users": [{"id": 1, "name": "Alice"}], "pid": os.getpid()} + +@app.post("/users") +def create_user(data: dict): + return {"created": data, "pid": os.getpid()} +''' + +STARLETTE_APP_CODE = ''' +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Route +import os + +async def root(request): + return JSONResponse({"app": "starlette-worker", "pid": os.getpid()}) + +async def health(request): + return JSONResponse({"status": "healthy", "pid": os.getpid()}) + +async def info(request): + return JSONResponse({"info": "test", "pid": os.getpid()}) + +app = Starlette(routes=[ + Route("/", root), + Route("/health", health), + Route("/info", info), +]) +''' + +SIMPLE_ASGI_APP_CODE = ''' +import os + +async def app(scope, receive, send): + if scope["type"] == "http": + path = scope.get("path", "/") + + if path == "/health": + body = b'{"status": "healthy", "pid": ' + str(os.getpid()).encode() + b'}' + else: + body = b'{"app": "simple-asgi", "path": "' + path.encode() + b'", "pid": ' + str(os.getpid()).encode() + b'}' + + await send({ + "type": "http.response.start", + "status": 200, + "headers": [[b"content-type", b"application/json"]], + }) + await send({ + "type": "http.response.body", + "body": body, + }) +''' + + +# ============== Unit Tests for ProcessConfig ============== + +class TestProcessConfig: + def test_default_values(self): + config = ProcessConfig(name="test", app_path="myapp:app") + + assert config.name == "test" + assert config.app_path == "myapp:app" + assert config.app_type == "asgi" + assert config.host == "127.0.0.1" + assert config.port == 0 + assert config.workers == 1 + assert config.health_check_enabled is True + assert config.health_check_path == "/health" + assert config.max_restart_count == 5 + + def test_custom_values(self): + config = ProcessConfig( + name="api", + app_path="myapp.api:create_app", + workers=4, + factory=True, + health_check_path="/api/health", + max_memory_mb=512, + ) + + assert config.workers == 4 + assert config.factory is True + assert config.health_check_path == "/api/health" + assert config.max_memory_mb == 512 + + +class TestProcessInfo: + def test_initial_state(self): + config = ProcessConfig(name="test", app_path="myapp:app") + info = ProcessInfo(config=config) + + assert info.state == ProcessState.PENDING + assert info.pid is None + assert info.port == 0 + assert info.restart_count == 0 + assert info.is_running is False + + def test_uptime(self): + import time + + config = ProcessConfig(name="test", app_path="myapp:app") + info = ProcessInfo(config=config) + + assert info.uptime == 0.0 + + info.start_time = time.time() - 10 + assert 9 < info.uptime < 11 + + def test_to_dict(self): + config = ProcessConfig(name="test", app_path="myapp:app", workers=2) + info = ProcessInfo(config=config, state=ProcessState.RUNNING, pid=12345, port=9001) + + result = info.to_dict() + + assert result["name"] == "test" + assert result["state"] == "running" + assert result["pid"] == 12345 + assert result["port"] == 9001 + assert result["workers"] == 2 + + +class TestPortAllocator: + @pytest.mark.asyncio + async def test_allocate_port(self): + allocator = PortAllocator(start_port=19000, end_port=19010) + + port = await allocator.allocate() + + assert 19000 <= port <= 19010 + assert port in allocator._allocated + + @pytest.mark.asyncio + async def test_release_port(self): + allocator = PortAllocator(start_port=19000, end_port=19010) + + port = await allocator.allocate() + assert port in allocator._allocated + + await allocator.release(port) + assert port not in allocator._allocated + + @pytest.mark.asyncio + async def test_no_available_ports(self): + allocator = PortAllocator(start_port=19000, end_port=19001) + + # Allocate all ports + port1 = await allocator.allocate() + port2 = await allocator.allocate() + + # Should raise error + with pytest.raises(RuntimeError, match="No available ports"): + await allocator.allocate() + + # Release one and try again + await allocator.release(port1) + port3 = await allocator.allocate() + assert port3 == port1 + + +class TestProcessManager: + """Tests for ProcessManager.""" + + @pytest.mark.asyncio + async def test_register_process(self): + manager = ProcessManager(port_range=(19000, 19999)) + + config = ProcessConfig(name="test", app_path="myapp:app") + info = await manager.register(config) + + assert info.config.name == "test" + assert "test" in manager._processes + + @pytest.mark.asyncio + async def test_register_duplicate(self): + manager = ProcessManager(port_range=(19000, 19999)) + + config = ProcessConfig(name="test", app_path="myapp:app") + await manager.register(config) + + with pytest.raises(ValueError, match="already registered"): + await manager.register(config) + + @pytest.mark.asyncio + async def test_unregister_process(self): + manager = ProcessManager(port_range=(19000, 19999)) + + config = ProcessConfig(name="test", app_path="myapp:app") + await manager.register(config) + + await manager.unregister("test") + + assert "test" not in manager._processes + + @pytest.mark.asyncio + async def test_get_process(self): + manager = ProcessManager(port_range=(19000, 19999)) + + config = ProcessConfig(name="test", app_path="myapp:app") + await manager.register(config) + + info = manager.get_process("test") + assert info is not None + assert info.config.name == "test" + + info = manager.get_process("nonexistent") + assert info is None + + @pytest.mark.asyncio + async def test_build_command(self): + manager = ProcessManager() + + config = ProcessConfig( + name="test", + app_path="myapp:app", + workers=4, + ) + + cmd = manager._build_command(config, 9001) + + # Command format: [python, -m, uvicorn, app_path, ...] + assert "-m" in cmd + assert "uvicorn" in cmd + assert "myapp:app" in cmd + assert "--port" in cmd + assert "9001" in cmd + assert "--workers" in cmd + assert "4" in cmd + + @pytest.mark.asyncio + async def test_build_command_with_factory(self): + manager = ProcessManager() + + config = ProcessConfig( + name="test", + app_path="myapp:create_app", + factory=True, + ) + + cmd = manager._build_command(config, 9001) + + assert "--factory" in cmd + + @pytest.mark.asyncio + async def test_get_metrics(self): + manager = ProcessManager(port_range=(19000, 19999)) + + config1 = ProcessConfig(name="app1", app_path="myapp1:app") + config2 = ProcessConfig(name="app2", app_path="myapp2:app") + + await manager.register(config1) + await manager.register(config2) + + metrics = manager.get_metrics() + + assert metrics["managed_processes"] == 2 + assert metrics["running_processes"] == 0 + assert "app1" in metrics["processes"] + assert "app2" in metrics["processes"] + + @pytest.mark.asyncio + async def test_start_stop_lifecycle(self): + manager = ProcessManager(port_range=(19000, 19999)) + + await manager.start() + assert manager._started is True + + await manager.stop() + assert manager._started is False + + +class TestProcessState: + """Tests for ProcessState enum.""" + + def test_all_states(self): + assert ProcessState.PENDING.value == "pending" + assert ProcessState.STARTING.value == "starting" + assert ProcessState.RUNNING.value == "running" + assert ProcessState.STOPPING.value == "stopping" + assert ProcessState.STOPPED.value == "stopped" + assert ProcessState.FAILED.value == "failed" + assert ProcessState.RESTARTING.value == "restarting" + + +# ============== Integration Tests ============== + +class TestProcessManagerIntegration: + """Integration tests for ProcessManager with real processes.""" + + @pytest.fixture + def temp_app_dir(self): + """Create temp directory with test apps.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Write starlette app (no extra dependencies needed) + app_file = Path(tmpdir) / "test_app.py" + app_file.write_text(STARLETTE_APP_CODE) + + # Write simple ASGI app + simple_file = Path(tmpdir) / "simple_app.py" + simple_file.write_text(SIMPLE_ASGI_APP_CODE) + + yield tmpdir + + @pytest.mark.asyncio + async def test_start_real_process(self, temp_app_dir): + """Test starting a real uvicorn process.""" + manager = ProcessManager(port_range=(19100, 19199), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="test_starlette", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + health_check_enabled=True, + ) + + await manager.register(config) + success = await manager.start_process("test_starlette") + + assert success is True + + info = manager.get_process("test_starlette") + assert info is not None + assert info.state == ProcessState.RUNNING + assert info.port > 0 + assert info.pid is not None + + # Make request to the running process + async with httpx.AsyncClient() as client: + resp = await client.get(f"http://127.0.0.1:{info.port}/") + assert resp.status_code == 200 + data = resp.json() + assert data["app"] == "starlette-worker" + assert data["pid"] == info.pid + + # Health check + resp = await client.get(f"http://127.0.0.1:{info.port}/health") + assert resp.status_code == 200 + assert resp.json()["status"] == "healthy" + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_start_multiple_processes(self, temp_app_dir): + """Test starting multiple processes on different ports.""" + manager = ProcessManager(port_range=(19200, 19299), health_check_enabled=False) + await manager.start() + + try: + # Register two apps + config1 = ProcessConfig( + name="app1", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + config2 = ProcessConfig( + name="app2", + app_path="simple_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config1) + await manager.register(config2) + + results = await manager.start_all() + + assert results["app1"] is True + assert results["app2"] is True + + info1 = manager.get_process("app1") + info2 = manager.get_process("app2") + + assert info1.port != info2.port + assert info1.pid != info2.pid + + # Both should respond + async with httpx.AsyncClient() as client: + resp1 = await client.get(f"http://127.0.0.1:{info1.port}/") + resp2 = await client.get(f"http://127.0.0.1:{info2.port}/") + + assert resp1.status_code == 200 + assert resp2.status_code == 200 + + assert resp1.json()["app"] == "starlette-worker" + assert resp2.json()["app"] == "simple-asgi" + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_stop_process(self, temp_app_dir): + """Test stopping a running process.""" + manager = ProcessManager(port_range=(19300, 19399), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="test_app", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + await manager.start_process("test_app") + + info = manager.get_process("test_app") + port = info.port + + # Process should be running + async with httpx.AsyncClient() as client: + resp = await client.get(f"http://127.0.0.1:{port}/health") + assert resp.status_code == 200 + + # Stop the process + await manager.stop_process("test_app") + + assert info.state == ProcessState.STOPPED + assert info.process is None + + # Process should no longer respond + async with httpx.AsyncClient() as client: + with pytest.raises(httpx.ConnectError): + await client.get(f"http://127.0.0.1:{port}/health", timeout=1.0) + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_restart_process(self, temp_app_dir): + """Test restarting a process gets a new PID.""" + manager = ProcessManager(port_range=(19400, 19499), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="test_app", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + restart_delay=0.1, + ) + + await manager.register(config) + await manager.start_process("test_app") + + info = manager.get_process("test_app") + old_pid = info.pid + + # Restart + await manager.restart_process("test_app") + + assert info.state == ProcessState.RUNNING + assert info.pid != old_pid # New PID + assert info.restart_count == 0 # Manual restart doesn't increment + + # Should still respond + async with httpx.AsyncClient() as client: + resp = await client.get(f"http://127.0.0.1:{info.port}/health") + assert resp.status_code == 200 + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_get_upstream_url(self, temp_app_dir): + """Test getting upstream URL for proxy routing.""" + manager = ProcessManager(port_range=(19500, 19599), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="api", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + + # Not running yet + assert manager.get_upstream_url("api") is None + + await manager.start_process("api") + + info = manager.get_process("api") + url = manager.get_upstream_url("api") + + assert url == f"http://127.0.0.1:{info.port}" + + # Verify the URL works + async with httpx.AsyncClient() as client: + resp = await client.get(f"{url}/health") + assert resp.status_code == 200 + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_process_isolation(self, temp_app_dir): + """Test that processes are truly isolated (different PIDs).""" + manager = ProcessManager(port_range=(19600, 19699), health_check_enabled=False) + await manager.start() + + try: + # Start same app twice with different names + config1 = ProcessConfig( + name="worker1", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + config2 = ProcessConfig( + name="worker2", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config1) + await manager.register(config2) + await manager.start_all() + + info1 = manager.get_process("worker1") + info2 = manager.get_process("worker2") + + # Get PIDs from the apps themselves + async with httpx.AsyncClient() as client: + resp1 = await client.get(f"http://127.0.0.1:{info1.port}/") + resp2 = await client.get(f"http://127.0.0.1:{info2.port}/") + + pid1 = resp1.json()["pid"] + pid2 = resp2.json()["pid"] + + # PIDs should be different - true process isolation + assert pid1 != pid2 + assert pid1 == info1.pid + assert pid2 == info2.pid + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_metrics_with_running_processes(self, temp_app_dir): + """Test metrics reflect actual process state.""" + manager = ProcessManager(port_range=(19700, 19799), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="monitored_app", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + + # Before start + metrics = manager.get_metrics() + assert metrics["managed_processes"] == 1 + assert metrics["running_processes"] == 0 + assert metrics["processes"]["monitored_app"]["state"] == "pending" + + # After start + await manager.start_process("monitored_app") + + metrics = manager.get_metrics() + assert metrics["running_processes"] == 1 + assert metrics["processes"]["monitored_app"]["state"] == "running" + assert metrics["processes"]["monitored_app"]["pid"] is not None + assert metrics["processes"]["monitored_app"]["port"] > 0 + + # After stop + await manager.stop_process("monitored_app") + + metrics = manager.get_metrics() + assert metrics["running_processes"] == 0 + assert metrics["processes"]["monitored_app"]["state"] == "stopped" + + finally: + await manager.stop() + + +# ============== Error Handling Tests ============== + +class TestErrorHandling: + """Tests for error scenarios and edge cases.""" + + @pytest.fixture + def temp_app_dir(self): + """Create temp directory with test apps.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Write starlette app (no extra dependencies needed) + app_file = Path(tmpdir) / "test_app.py" + app_file.write_text(STARLETTE_APP_CODE) + + yield tmpdir + + @pytest.mark.asyncio + async def test_process_crash_detection(self, temp_app_dir): + """Process dies unexpectedly - state should reflect failure.""" + manager = ProcessManager(port_range=(19800, 19899), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="crash_test", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + await manager.start_process("crash_test") + + info = manager.get_process("crash_test") + assert info.state == ProcessState.RUNNING + old_pid = info.pid + + # Kill the process externally + import os + import signal + os.kill(old_pid, signal.SIGKILL) + + # Wait a bit for process to die + await asyncio.sleep(0.5) + + # Process should have exited - check via poll + assert info.process.poll() is not None + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_startup_timeout_with_bad_app(self): + """App that fails to start should timeout gracefully.""" + manager = ProcessManager(port_range=(19900, 19999), health_check_enabled=False) + await manager.start() + + try: + # Create a temp dir with a broken app + with tempfile.TemporaryDirectory() as tmpdir: + broken_app = Path(tmpdir) / "broken_app.py" + broken_app.write_text(''' +# App that crashes on import +raise RuntimeError("Intentional crash") +''') + + config = ProcessConfig( + name="broken_app", + app_path="broken_app:app", + module_path=tmpdir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + + # Should fail to start + success = await manager.start_process("broken_app") + + assert success is False + info = manager.get_process("broken_app") + assert info.state == ProcessState.FAILED + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_malformed_app_path(self): + """Invalid app_path should fail gracefully.""" + manager = ProcessManager(port_range=(20000, 20099), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="bad_path", + app_path="nonexistent_module:app", + workers=1, + ) + + await manager.register(config) + + # Should fail to start + success = await manager.start_process("bad_path") + + assert success is False + info = manager.get_process("bad_path") + assert info.state == ProcessState.FAILED + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_process_not_found(self): + """Operations on non-existent process should return False.""" + manager = ProcessManager(port_range=(20100, 20199)) + await manager.start() + + try: + # Start non-existent process + result = await manager.start_process("nonexistent") + assert result is False + + # Stop non-existent process + result = await manager.stop_process("nonexistent") + assert result is False + + # Restart non-existent process + result = await manager.restart_process("nonexistent") + assert result is False + + # Get non-existent process + info = manager.get_process("nonexistent") + assert info is None + + # Get upstream URL for non-existent process + url = manager.get_upstream_url("nonexistent") + assert url is None + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_double_start(self, temp_app_dir): + """Starting already running process should succeed (idempotent).""" + manager = ProcessManager(port_range=(20200, 20299), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="double_start", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + + # First start + success1 = await manager.start_process("double_start") + assert success1 is True + + info = manager.get_process("double_start") + pid1 = info.pid + + # Second start should be idempotent + success2 = await manager.start_process("double_start") + assert success2 is True + + # Same process, same PID + assert info.pid == pid1 + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_double_stop(self, temp_app_dir): + """Stopping already stopped process should succeed (idempotent).""" + manager = ProcessManager(port_range=(20300, 20399), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="double_stop", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + await manager.start_process("double_stop") + + # First stop + success1 = await manager.stop_process("double_stop") + assert success1 is True + + info = manager.get_process("double_stop") + assert info.state == ProcessState.STOPPED + + # Second stop should be idempotent + success2 = await manager.stop_process("double_stop") + assert success2 is True + + finally: + await manager.stop() + + +# ============== Health Check Edge Case Tests ============== + +class TestHealthCheckEdgeCases: + """Tests for health check scenarios.""" + + @pytest.fixture + def temp_app_dir(self): + """Create temp directory with test apps.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Write starlette app (no extra dependencies needed) + app_file = Path(tmpdir) / "test_app.py" + app_file.write_text(STARLETTE_APP_CODE) + + yield tmpdir + + @pytest.mark.asyncio + async def test_health_check_missing_endpoint(self, temp_app_dir): + """App without /health endpoint - process starts but health checks fail.""" + # Create an app without /health endpoint + with tempfile.TemporaryDirectory() as tmpdir: + no_health_app = Path(tmpdir) / "no_health_app.py" + no_health_app.write_text(''' +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Route + +async def root(request): + return JSONResponse({"app": "no-health"}) + +app = Starlette(routes=[ + Route("/", root), +]) +''') + + manager = ProcessManager(port_range=(20400, 20499), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="no_health", + app_path="no_health_app:app", + module_path=tmpdir, + workers=1, + health_check_path="/health", # This endpoint doesn't exist + health_check_timeout=2.0, + ) + + await manager.register(config) + + # Should fail because health check endpoint doesn't exist + # _wait_for_ready expects < 500 status code + success = await manager.start_process("no_health") + + # Process may start but health check will get 404 + # Our implementation accepts < 500 so 404 is OK + # (This tests current behavior - may want to change later) + info = manager.get_process("no_health") + assert info is not None + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_health_check_returns_500(self, temp_app_dir): + """App health endpoint returns 500 - should fail health check.""" + with tempfile.TemporaryDirectory() as tmpdir: + unhealthy_app = Path(tmpdir) / "unhealthy_app.py" + unhealthy_app.write_text(''' +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Route + +async def root(request): + return JSONResponse({"app": "unhealthy"}) + +async def health(request): + return JSONResponse({"status": "unhealthy"}, status_code=500) + +app = Starlette(routes=[ + Route("/", root), + Route("/health", health), +]) +''') + + manager = ProcessManager(port_range=(20500, 20599), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="unhealthy", + app_path="unhealthy_app:app", + module_path=tmpdir, + workers=1, + health_check_path="/health", + health_check_timeout=2.0, + ) + + await manager.register(config) + + # Should fail to start because health returns 500 + success = await manager.start_process("unhealthy") + + assert success is False + info = manager.get_process("unhealthy") + assert info.state == ProcessState.FAILED + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_process_info_uptime_calculation(self, temp_app_dir): + """Uptime should be calculated correctly.""" + manager = ProcessManager(port_range=(20600, 20699), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="uptime_test", + app_path="test_app:app", + module_path=temp_app_dir, + workers=1, + health_check_path="/health", + ) + + await manager.register(config) + await manager.start_process("uptime_test") + + info = manager.get_process("uptime_test") + + # Wait a bit + await asyncio.sleep(1.0) + + # Uptime should be > 1 second + assert info.uptime >= 1.0 + + # to_dict should include uptime + data = info.to_dict() + assert data["uptime"] >= 1.0 + + finally: + await manager.stop() + + @pytest.mark.asyncio + async def test_env_variables_passed_to_process(self, temp_app_dir): + """Environment variables should be passed to subprocess.""" + with tempfile.TemporaryDirectory() as tmpdir: + env_app = Path(tmpdir) / "env_app.py" + env_app.write_text(''' +import os +from starlette.applications import Starlette +from starlette.responses import JSONResponse +from starlette.routing import Route + +async def root(request): + return JSONResponse({ + "MY_VAR": os.environ.get("MY_VAR", "not set"), + "ANOTHER_VAR": os.environ.get("ANOTHER_VAR", "not set"), + }) + +async def health(request): + return JSONResponse({"status": "healthy"}) + +app = Starlette(routes=[ + Route("/", root), + Route("/health", health), +]) +''') + + manager = ProcessManager(port_range=(20700, 20799), health_check_enabled=False) + await manager.start() + + try: + config = ProcessConfig( + name="env_test", + app_path="env_app:app", + module_path=tmpdir, + workers=1, + health_check_path="/health", + env={ + "MY_VAR": "hello", + "ANOTHER_VAR": "world", + }, + ) + + await manager.register(config) + await manager.start_process("env_test") + + info = manager.get_process("env_test") + + # Check that env vars were passed + async with httpx.AsyncClient() as client: + resp = await client.get(f"http://127.0.0.1:{info.port}/") + data = resp.json() + + assert data["MY_VAR"] == "hello" + assert data["ANOTHER_VAR"] == "world" + + finally: + await manager.stop()