diff --git a/README.md b/README.md
index 0418141..01d459a 100644
--- a/README.md
+++ b/README.md
@@ -1,145 +1,97 @@
# PyServe
-PyServe is a modern, async HTTP server written in Python. Originally created for educational purposes, it has evolved into a powerful tool for rapid prototyping and serving web applications with unique features like AI-generated content.
+Python application orchestrator and HTTP server. Runs multiple ASGI/WSGI applications through a single entry point with process isolation, health monitoring, and auto-restart.
-
+
-[More on web page](https://pyserve.org/)
+Website: [pyserve.org](https://pyserve.org) · Documentation: [docs.pyserve.org](https://docs.pyserve.org)
-## Project Overview
+## Overview
-PyServe v0.6.0 introduces a completely refactored architecture with modern async/await syntax and new exciting features like **Vibe-Serving** - AI-powered dynamic content generation.
+PyServe manages multiple Python web applications (FastAPI, Flask, Django, etc.) as isolated subprocesses behind a single gateway. Each app runs on its own port with independent lifecycle, health checks, and automatic restarts on failure.
-### Key Features:
+```
+ PyServe Gateway (:8000)
+ │
+ ┌────────────────┼────────────────┐
+ ▼ ▼ ▼
+ FastAPI Flask Starlette
+ :9001 :9002 :9003
+ /api/* /admin/* /ws/*
+```
-- **Async HTTP Server** - Built with Python's asyncio for high performance
-- **Advanced Configuration System V2** - Powerful extensible configuration with full backward compatibility
-- **Regex Routing & SPA Support** - nginx-style routing patterns with Single Page Application fallback
-- **Static File Serving** - Efficient serving with correct MIME types
-- **Template System** - Dynamic content generation
-- **Vibe-Serving Mode** - AI-generated content using language models (OpenAI, Claude, etc.)
-- **Reverse Proxy** - Forward requests to backend services with advanced routing
-- **SSL/HTTPS Support** - Secure connections with certificate configuration
-- **Modular Extensions** - Plugin-like architecture for security, caching, monitoring
-- **Beautiful Logging** - Colored terminal output with file rotation
-- **Error Handling** - Styled error pages and graceful fallbacks
-- **CLI Interface** - Command-line interface for easy deployment and configuration
-
-## Getting Started
-
-### Prerequisites
-
-- Python 3.12 or higher
-- Poetry (recommended) or pip
-
-### Installation
-
-#### Via Poetry (рекомендуется)
+## Installation
```bash
git clone https://github.com/ShiftyX1/PyServe.git
cd PyServe
-make init # Initialize project
+make init
```
-#### Или установка пакета
+## Quick Start
-```bash
-# local install
-make install-package
+```yaml
+# config.yaml
+server:
+ host: 0.0.0.0
+ port: 8000
-# after installing project you can use command pyserve
-pyserve --help
+extensions:
+ - type: process_orchestration
+ config:
+ apps:
+ - name: api
+ path: /api
+ app_path: myapp.api:app
+
+ - name: admin
+ path: /admin
+ app_path: myapp.admin:app
```
-### Running the Server
-
-#### Using Makefile (recommended)
-
```bash
-# start in development mode
-make run
-
-# start in production mode
-make run-prod
-
-# show all available commands
-make help
+pyserve -c config.yaml
```
-#### Using CLI directly
+Requests to `/api/*` are proxied to the api process, `/admin/*` to admin.
+
+## Process Orchestration
+
+The main case of using PyServe is orchestration of python web applications. Each application runs as a separate uvicorn process on a dynamically or manually allocated port (9000-9999 by default). PyServe proxies requests to the appropriate process based on URL path.
+
+For each application you can configure the number of workers, environment variables, health check endpoint path, and auto-restart parameters. If a process crashes or stops responding to health checks, PyServe automatically restarts it with exponential backoff.
+
+WSGI applications (Flask, Django) are supported through automatic wrapping — just specify `app_type: wsgi`.
+
+## In-Process Mounting
+
+For simpler cases when process isolation is not needed, applications can be mounted directly into the PyServe process via the `asgi` extension. This is lighter and faster, but all applications share one process.
+
+## Static Files & Routing
+
+PyServe can serve static files with nginx-like routing: regex patterns, SPA fallback for frontend applications, custom caching headers. Routes are processed in priority order — exact match, then regex, then default.
+
+## Reverse Proxy
+
+Requests can be proxied to external backends. Useful for integration with legacy services or microservices in other languages.
+
+## CLI
```bash
-# after installing package
-pyserve
-
-# or with Poetry
-poetry run pyserve
-
-# or legacy (for backward compatibility)
-python run.py
-```
-
-#### CLI options
-
-```bash
-# help
-pyserve --help
-
-# path to config
-pyserve -c /path/to/config.yaml
-
-# rewrite host and port
+pyserve -c config.yaml
pyserve --host 0.0.0.0 --port 9000
-
-# debug mode
pyserve --debug
-
-# show version
pyserve --version
```
## Development
-### Makefile Commands
-
```bash
-make help # Show help for commands
-make install # Install dependencies
-make dev-install # Install development dependencies
-make build # Build the package
-make test # Run tests
-make test-cov # Tests with code coverage
-make lint # Check code with linters
-make format # Format code
-make clean # Clean up temporary files
-make version # Show version
-make publish-test # Publish to Test PyPI
-make publish # Publish to PyPI
-```
-
-### Project Structure
-
-```
-pyserveX/
-├── pyserve/ # Main package
-│ ├── __init__.py
-│ ├── cli.py # CLI interface
-│ ├── server.py # Main server module
-│ ├── config.py # Configuration system
-│ ├── routing.py # Routing
-│ ├── extensions.py # Extensions
-│ └── logging_utils.py
-├── tests/ # Tests
-├── static/ # Static files
-├── templates/ # Templates
-├── logs/ # Logs
-├── Makefile # Automation tasks
-├── pyproject.toml # Project configuration
-├── config.yaml # Server configuration
-└── run.py # Entry point (backward compatibility)
+make test # run tests
+make lint # linting
+make format # formatting
```
## License
-This project is distributed under the MIT license.
\ No newline at end of file
+[MIT License](./LICENSE)
diff --git a/examples/config.example.orchestration.yaml b/examples/config.example.orchestration.yaml
new file mode 100644
index 0000000..be194cf
--- /dev/null
+++ b/examples/config.example.orchestration.yaml
@@ -0,0 +1,113 @@
+# PyServe Process Orchestration Example
+#
+# This configuration demonstrates running multiple ASGI/WSGI applications
+# as isolated processes with automatic health monitoring and restart.
+
+server:
+ host: "0.0.0.0"
+ port: 8000
+ backlog: 2048
+ proxy_timeout: 60.0
+
+logging:
+ level: DEBUG
+ console_output: true
+ format:
+ type: standard
+ use_colors: true
+
+extensions:
+ # Process Orchestration - runs each app in its own process
+ - type: process_orchestration
+ config:
+ # Port range for worker processes
+ port_range: [9000, 9999]
+
+ # Enable health monitoring
+ health_check_enabled: true
+
+ # Proxy timeout for requests
+ proxy_timeout: 60.0
+
+ apps:
+ # FastAPI application
+ - name: api
+ path: /api
+ app_path: examples.apps.fastapi_app:app
+ module_path: "."
+ workers: 2
+ health_check_path: /health
+ health_check_interval: 10.0
+ health_check_timeout: 5.0
+ health_check_retries: 3
+ max_restart_count: 5
+ restart_delay: 1.0
+ shutdown_timeout: 30.0
+ strip_path: true
+ env:
+ APP_ENV: "production"
+ DEBUG: "false"
+
+ # Flask application (WSGI wrapped to ASGI)
+ - name: admin
+ path: /admin
+ app_path: examples.apps.flask_app:app
+ app_type: wsgi
+ module_path: "."
+ workers: 1
+ health_check_path: /health
+ strip_path: true
+
+ # Starlette application
+ - name: web
+ path: /web
+ app_path: examples.apps.starlette_app:app
+ module_path: "."
+ workers: 2
+ health_check_path: /health
+ strip_path: true
+
+ # Custom ASGI application
+ - name: custom
+ path: /custom
+ app_path: examples.apps.custom_asgi:app
+ module_path: "."
+ workers: 1
+ health_check_path: /health
+ strip_path: true
+
+ # Routing for static files and reverse proxy
+ - type: routing
+ config:
+ regex_locations:
+ # Static files
+ "^/static/.*":
+ type: static
+ root: "./static"
+ strip_prefix: "/static"
+
+ # Documentation
+ "^/docs/?.*":
+ type: static
+ root: "./docs"
+ strip_prefix: "/docs"
+
+ # External API proxy
+ "^/external/.*":
+ type: proxy
+ upstream: "https://api.example.com"
+ strip_prefix: "/external"
+
+ # Security headers
+ - type: security
+ config:
+ security_headers:
+ X-Content-Type-Options: "nosniff"
+ X-Frame-Options: "DENY"
+ X-XSS-Protection: "1; mode=block"
+ Strict-Transport-Security: "max-age=31536000; includeSubDomains"
+
+ # Monitoring
+ - type: monitoring
+ config:
+ enable_metrics: true
diff --git a/pyproject.toml b/pyproject.toml
index b64299a..419f76c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,7 +1,7 @@
[project]
name = "pyserve"
-version = "0.9.1"
-description = "Simple HTTP Web server written in Python"
+version = "0.9.9"
+description = "Python Application Orchestrator & HTTP Server - unified gateway for multiple Python web apps"
authors = [
{name = "Илья Глазунов",email = "i.glazunov@sapiens.solutions"}
]
diff --git a/pyserve/__init__.py b/pyserve/__init__.py
index 5373739..0f0f7ac 100644
--- a/pyserve/__init__.py
+++ b/pyserve/__init__.py
@@ -2,7 +2,7 @@
PyServe - HTTP web server written on Python
"""
-__version__ = "0.9.0"
+__version__ = "0.9.10"
__author__ = "Ilya Glazunov"
from .asgi_mount import (
@@ -15,13 +15,22 @@ from .asgi_mount import (
create_starlette_app,
)
from .config import Config
+from .process_manager import (
+ ProcessConfig,
+ ProcessInfo,
+ ProcessManager,
+ ProcessState,
+ get_process_manager,
+ init_process_manager,
+ shutdown_process_manager,
+)
from .server import PyServeServer
__all__ = [
"PyServeServer",
"Config",
"__version__",
- # ASGI mounting
+ # ASGI mounting (in-process)
"ASGIAppLoader",
"ASGIMountManager",
"MountedApp",
@@ -29,4 +38,12 @@ __all__ = [
"create_flask_app",
"create_django_app",
"create_starlette_app",
+ # Process orchestration (multi-process)
+ "ProcessManager",
+ "ProcessConfig",
+ "ProcessInfo",
+ "ProcessState",
+ "get_process_manager",
+ "init_process_manager",
+ "shutdown_process_manager",
]
diff --git a/pyserve/_wsgi_wrapper.py b/pyserve/_wsgi_wrapper.py
new file mode 100644
index 0000000..e5d0168
--- /dev/null
+++ b/pyserve/_wsgi_wrapper.py
@@ -0,0 +1,66 @@
+"""
+WSGI Wrapper Module for Process Orchestration.
+
+This module provides a wrapper that allows WSGI applications to be run
+via uvicorn by wrapping them with a2wsgi.
+
+The WSGI app path is passed via environment variables:
+- PYSERVE_WSGI_APP: The app path (e.g., "myapp:app" or "myapp.main:create_app")
+- PYSERVE_WSGI_FACTORY: "1" if the app path points to a factory function
+"""
+
+import importlib
+import os
+from typing import Any, Callable
+
+try:
+ from a2wsgi import WSGIMiddleware
+
+ WSGI_ADAPTER = "a2wsgi"
+except ImportError:
+ try:
+ from asgiref.wsgi import WsgiToAsgi as WSGIMiddleware # type: ignore
+
+ WSGI_ADAPTER = "asgiref"
+ except ImportError:
+ WSGIMiddleware = None # type: ignore
+ WSGI_ADAPTER = None
+
+
+def _load_wsgi_app() -> Callable:
+ app_path = os.environ.get("PYSERVE_WSGI_APP")
+ is_factory = os.environ.get("PYSERVE_WSGI_FACTORY", "0") == "1"
+
+ if not app_path:
+ raise RuntimeError("PYSERVE_WSGI_APP environment variable not set. " "This module should only be used by PyServe process orchestration.")
+
+ if ":" in app_path:
+ module_name, attr_name = app_path.rsplit(":", 1)
+ else:
+ module_name = app_path
+ attr_name = "app"
+
+ try:
+ module = importlib.import_module(module_name)
+ except ImportError as e:
+ raise RuntimeError(f"Failed to import WSGI module '{module_name}': {e}")
+
+ try:
+ app_or_factory = getattr(module, attr_name)
+ except AttributeError:
+ raise RuntimeError(f"Module '{module_name}' has no attribute '{attr_name}'")
+
+ if is_factory:
+ return app_or_factory()
+ return app_or_factory
+
+
+def _create_asgi_app() -> Any:
+ if WSGIMiddleware is None:
+ raise RuntimeError("No WSGI adapter available. " "Install a2wsgi (recommended) or asgiref: pip install a2wsgi")
+
+ wsgi_app = _load_wsgi_app()
+ return WSGIMiddleware(wsgi_app)
+
+
+app = _create_asgi_app()
diff --git a/pyserve/extensions.py b/pyserve/extensions.py
index 07c8aa9..cba5bdb 100644
--- a/pyserve/extensions.py
+++ b/pyserve/extensions.py
@@ -1,3 +1,4 @@
+import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type
@@ -216,6 +217,15 @@ class ExtensionManager:
"monitoring": MonitoringExtension,
"asgi": ASGIExtension,
}
+ self._register_process_orchestration()
+
+ def _register_process_orchestration(self) -> None:
+ try:
+ from .process_extension import ProcessOrchestrationExtension
+
+ self.extension_registry["process_orchestration"] = ProcessOrchestrationExtension # type: ignore
+ except ImportError:
+ pass # Optional dependency
def register_extension_type(self, name: str, extension_class: Type[Extension]) -> None:
self.extension_registry[name] = extension_class
@@ -234,6 +244,32 @@ class ExtensionManager:
except Exception as e:
logger.error(f"Error loading extension {extension_type}: {e}")
+ async def load_extension_async(self, extension_type: str, config: Dict[str, Any]) -> None:
+ """Load extension with async setup support (for ProcessOrchestration)."""
+ if extension_type not in self.extension_registry:
+ logger.error(f"Unknown extension type: {extension_type}")
+ return
+
+ try:
+ extension_class = self.extension_registry[extension_type]
+ extension = extension_class(config)
+
+ setup_method = getattr(extension, "setup", None)
+ if setup_method is not None and asyncio.iscoroutinefunction(setup_method):
+ await setup_method(config)
+ else:
+ extension.initialize()
+
+ start_method = getattr(extension, "start", None)
+ if start_method is not None and asyncio.iscoroutinefunction(start_method):
+ await start_method()
+
+ # Insert at the beginning so process_orchestration is checked first
+ self.extensions.insert(0, extension)
+ logger.info(f"Loaded extension (async): {extension_type}")
+ except Exception as e:
+ logger.error(f"Error loading extension {extension_type}: {e}")
+
async def process_request(self, request: Request) -> Optional[Response]:
for extension in self.extensions:
if not extension.enabled:
diff --git a/pyserve/process_extension.py b/pyserve/process_extension.py
new file mode 100644
index 0000000..3cf9bda
--- /dev/null
+++ b/pyserve/process_extension.py
@@ -0,0 +1,365 @@
+"""Process Orchestration Extension
+
+Extension that manages ASGI/WSGI applications as isolated processes
+and routes requests to them via reverse proxy.
+"""
+
+import asyncio
+import logging
+import time
+import uuid
+from typing import Any, Dict, Optional
+
+import httpx
+from starlette.requests import Request
+from starlette.responses import Response
+
+from .extensions import Extension
+from .logging_utils import get_logger
+from .process_manager import ProcessConfig, ProcessManager
+
+logger = get_logger(__name__)
+
+
+class ProcessOrchestrationExtension(Extension):
+ """
+ Extension that orchestrates ASGI/WSGI applications as separate processes.
+
+ Unlike ASGIExtension which runs apps in-process, this extension:
+ - Runs each app in its own isolated process
+ - Provides health monitoring and auto-restart
+ - Routes requests via HTTP reverse proxy
+ - Supports multiple workers per app
+
+ Configuration example:
+ ```yaml
+ extensions:
+ - type: process_orchestration
+ config:
+ port_range: [9000, 9999]
+ health_check_enabled: true
+ apps:
+ - name: api
+ path: /api
+ app_path: myapp.api:app
+ workers: 4
+ health_check_path: /health
+
+ - name: admin
+ path: /admin
+ app_path: myapp.admin:create_app
+ factory: true
+ workers: 2
+ ```
+ """
+
+ name = "process_orchestration"
+
+ def __init__(self, config: Dict[str, Any]) -> None:
+ super().__init__(config)
+ self._manager: Optional[ProcessManager] = None
+ self._mounts: Dict[str, MountConfig] = {} # path -> config
+ self._http_client: Optional[httpx.AsyncClient] = None
+ self._started = False
+ self._proxy_timeout: float = config.get("proxy_timeout", 60.0)
+ self._pending_config = config # Store for async setup
+
+ logging_config = config.get("logging", {})
+ self._log_proxy_requests: bool = logging_config.get("proxy_logs", True)
+ self._log_health_checks: bool = logging_config.get("health_check_logs", False)
+
+ httpx_level = logging_config.get("httpx_level", "warning").upper()
+ logging.getLogger("httpx").setLevel(getattr(logging, httpx_level, logging.WARNING))
+ logging.getLogger("httpcore").setLevel(getattr(logging, httpx_level, logging.WARNING))
+
+ async def setup(self, config: Optional[Dict[str, Any]] = None) -> None:
+ if config is None:
+ config = self._pending_config
+
+ port_range = tuple(config.get("port_range", [9000, 9999]))
+ health_check_enabled = config.get("health_check_enabled", True)
+ self._proxy_timeout = config.get("proxy_timeout", 60.0)
+
+ self._manager = ProcessManager(
+ port_range=port_range,
+ health_check_enabled=health_check_enabled,
+ )
+
+ self._http_client = httpx.AsyncClient(
+ timeout=httpx.Timeout(self._proxy_timeout),
+ follow_redirects=False,
+ limits=httpx.Limits(
+ max_keepalive_connections=100,
+ max_connections=200,
+ ),
+ )
+
+ apps_config = config.get("apps", [])
+ for app_config in apps_config:
+ await self._register_app(app_config)
+
+ logger.info(
+ "Process orchestration extension initialized",
+ app_count=len(self._mounts),
+ )
+
+ async def _register_app(self, app_config: Dict[str, Any]) -> None:
+ if not self._manager:
+ return
+
+ name = app_config.get("name")
+ path = app_config.get("path", "").rstrip("/")
+ app_path = app_config.get("app_path")
+
+ if not name or not app_path:
+ logger.error("App config missing 'name' or 'app_path'")
+ return
+
+ process_config = ProcessConfig(
+ name=name,
+ app_path=app_path,
+ app_type=app_config.get("app_type", "asgi"),
+ workers=app_config.get("workers", 1),
+ module_path=app_config.get("module_path"),
+ factory=app_config.get("factory", False),
+ factory_args=app_config.get("factory_args"),
+ env=app_config.get("env", {}),
+ health_check_enabled=app_config.get("health_check_enabled", True),
+ health_check_path=app_config.get("health_check_path", "/health"),
+ health_check_interval=app_config.get("health_check_interval", 10.0),
+ health_check_timeout=app_config.get("health_check_timeout", 5.0),
+ health_check_retries=app_config.get("health_check_retries", 3),
+ max_memory_mb=app_config.get("max_memory_mb"),
+ max_restart_count=app_config.get("max_restart_count", 5),
+ restart_delay=app_config.get("restart_delay", 1.0),
+ shutdown_timeout=app_config.get("shutdown_timeout", 30.0),
+ )
+
+ await self._manager.register(process_config)
+
+ self._mounts[path] = MountConfig(
+ path=path,
+ process_name=name,
+ strip_path=app_config.get("strip_path", True),
+ )
+
+ logger.info(f"Registered app '{name}' at path '{path}'")
+
+ async def start(self) -> None:
+ if self._started or not self._manager:
+ return
+
+ await self._manager.start()
+ results = await self._manager.start_all()
+
+ self._started = True
+
+ success = sum(1 for v in results.values() if v)
+ failed = len(results) - success
+
+ logger.info(
+ "Process orchestration started",
+ success=success,
+ failed=failed,
+ )
+
+ async def stop(self) -> None:
+ if not self._started:
+ return
+
+ if self._http_client:
+ await self._http_client.aclose()
+ self._http_client = None
+
+ if self._manager:
+ await self._manager.stop()
+
+ self._started = False
+ logger.info("Process orchestration stopped")
+
+ def cleanup(self) -> None:
+ try:
+ loop = asyncio.get_running_loop()
+ loop.create_task(self.stop())
+ except RuntimeError:
+ asyncio.run(self.stop())
+
+ async def process_request(self, request: Request) -> Optional[Response]:
+ if not self._started or not self._manager:
+ logger.debug(
+ "Process orchestration not ready",
+ started=self._started,
+ has_manager=self._manager is not None,
+ )
+ return None
+
+ mount = self._get_mount(request.url.path)
+ if not mount:
+ logger.debug(
+ "No mount found for path",
+ path=request.url.path,
+ available_mounts=list(self._mounts.keys()),
+ )
+ return None
+
+ upstream_url = self._manager.get_upstream_url(mount.process_name)
+ if not upstream_url:
+ logger.warning(
+ f"Process '{mount.process_name}' not running",
+ path=request.url.path,
+ )
+ return Response("Service Unavailable", status_code=503)
+
+ request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
+
+ start_time = time.perf_counter()
+ response = await self._proxy_request(request, upstream_url, mount, request_id)
+ latency_ms = (time.perf_counter() - start_time) * 1000
+
+ if self._log_proxy_requests:
+ logger.info(
+ "Proxy request completed",
+ request_id=request_id,
+ method=request.method,
+ path=request.url.path,
+ process=mount.process_name,
+ upstream=upstream_url,
+ status=response.status_code,
+ latency_ms=round(latency_ms, 2),
+ )
+
+ return response
+
+ def _get_mount(self, path: str) -> Optional["MountConfig"]:
+ for mount_path in sorted(self._mounts.keys(), key=len, reverse=True):
+ if mount_path == "":
+ return self._mounts[mount_path]
+ if path == mount_path or path.startswith(f"{mount_path}/"):
+ return self._mounts[mount_path]
+ return None
+
+ async def _proxy_request(
+ self,
+ request: Request,
+ upstream_url: str,
+ mount: "MountConfig",
+ request_id: str = "",
+ ) -> Response:
+ path = request.url.path
+ if mount.strip_path and mount.path:
+ path = path[len(mount.path) :] or "/"
+
+ target_url = f"{upstream_url}{path}"
+ if request.url.query:
+ target_url += f"?{request.url.query}"
+
+ headers = dict(request.headers)
+ headers.pop("host", None)
+ headers["X-Forwarded-For"] = request.client.host if request.client else "unknown"
+ headers["X-Forwarded-Proto"] = request.url.scheme
+ headers["X-Forwarded-Host"] = request.headers.get("host", "")
+ if request_id:
+ headers["X-Request-ID"] = request_id
+
+ try:
+ if not self._http_client:
+ return Response("Service Unavailable", status_code=503)
+
+ body = await request.body()
+
+ response = await self._http_client.request(
+ method=request.method,
+ url=target_url,
+ headers=headers,
+ content=body,
+ )
+
+ response_headers = dict(response.headers)
+ for header in ["transfer-encoding", "connection", "keep-alive"]:
+ response_headers.pop(header, None)
+
+ return Response(
+ content=response.content,
+ status_code=response.status_code,
+ headers=response_headers,
+ )
+
+ except httpx.TimeoutException:
+ logger.error(f"Proxy timeout to {upstream_url}")
+ return Response("Gateway Timeout", status_code=504)
+ except httpx.ConnectError as e:
+ logger.error(f"Proxy connection error to {upstream_url}: {e}")
+ return Response("Bad Gateway", status_code=502)
+ except Exception as e:
+ logger.error(f"Proxy error to {upstream_url}: {e}")
+ return Response("Internal Server Error", status_code=500)
+
+ async def process_response(
+ self,
+ request: Request,
+ response: Response,
+ ) -> Response:
+ return response
+
+ def get_metrics(self) -> Dict[str, Any]:
+ metrics = {
+ "process_orchestration": {
+ "enabled": self._started,
+ "mounts": len(self._mounts),
+ }
+ }
+
+ if self._manager:
+ metrics["process_orchestration"].update(self._manager.get_metrics())
+
+ return metrics
+
+ async def get_process_status(self, name: str) -> Optional[Dict[str, Any]]:
+ if not self._manager:
+ return None
+ info = self._manager.get_process(name)
+ return info.to_dict() if info else None
+
+ async def get_all_status(self) -> Dict[str, Any]:
+ if not self._manager:
+ return {}
+ return {name: info.to_dict() for name, info in self._manager.get_all_processes().items()}
+
+ async def restart_process(self, name: str) -> bool:
+ if not self._manager:
+ return False
+ return await self._manager.restart_process(name)
+
+ async def scale_process(self, name: str, workers: int) -> bool:
+ if not self._manager:
+ return False
+
+ info = self._manager.get_process(name)
+ if not info:
+ return False
+
+ info.config.workers = workers
+ return await self._manager.restart_process(name)
+
+
+class MountConfig:
+ def __init__(
+ self,
+ path: str,
+ process_name: str,
+ strip_path: bool = True,
+ ):
+ self.path = path
+ self.process_name = process_name
+ self.strip_path = strip_path
+
+
+async def setup_process_orchestration(config: Dict[str, Any]) -> ProcessOrchestrationExtension:
+ ext = ProcessOrchestrationExtension(config)
+ await ext.setup(config)
+ await ext.start()
+ return ext
+
+
+async def shutdown_process_orchestration(ext: ProcessOrchestrationExtension) -> None:
+ await ext.stop()
diff --git a/pyserve/process_manager.py b/pyserve/process_manager.py
new file mode 100644
index 0000000..b1411b3
--- /dev/null
+++ b/pyserve/process_manager.py
@@ -0,0 +1,553 @@
+"""Process Manager Module
+
+Orchestrates ASGI/WSGI applications as separate processes
+"""
+
+import asyncio
+import logging
+import os
+import signal
+import socket
+import subprocess
+import sys
+import time
+from dataclasses import dataclass, field
+from enum import Enum
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+from .logging_utils import get_logger
+
+logging.getLogger("httpx").setLevel(logging.WARNING)
+logging.getLogger("httpcore").setLevel(logging.WARNING)
+
+logger = get_logger(__name__)
+
+
+class ProcessState(Enum):
+ PENDING = "pending"
+ STARTING = "starting"
+ RUNNING = "running"
+ STOPPING = "stopping"
+ STOPPED = "stopped"
+ FAILED = "failed"
+ RESTARTING = "restarting"
+
+
+@dataclass
+class ProcessConfig:
+ name: str
+ app_path: str
+ app_type: str = "asgi" # asgi, wsgi
+ host: str = "127.0.0.1"
+ port: int = 0 # 0 = auto-assign
+ workers: int = 1
+ module_path: Optional[str] = None
+ factory: bool = False
+ factory_args: Optional[Dict[str, Any]] = None
+ env: Dict[str, str] = field(default_factory=dict)
+
+ health_check_enabled: bool = True
+ health_check_path: str = "/health"
+ health_check_interval: float = 10.0
+ health_check_timeout: float = 5.0
+ health_check_retries: int = 3
+
+ max_memory_mb: Optional[int] = None
+ max_restart_count: int = 5
+ restart_delay: float = 1.0 # seconds
+
+ shutdown_timeout: float = 30.0 # seconds
+
+
+@dataclass
+class ProcessInfo:
+ config: ProcessConfig
+ state: ProcessState = ProcessState.PENDING
+ pid: Optional[int] = None
+ port: int = 0
+ start_time: Optional[float] = None
+ restart_count: int = 0
+ last_health_check: Optional[float] = None
+ health_check_failures: int = 0
+ process: Optional[subprocess.Popen] = None
+
+ @property
+ def uptime(self) -> float:
+ if self.start_time is None:
+ return 0.0
+ return time.time() - self.start_time
+
+ @property
+ def is_running(self) -> bool:
+ return self.state == ProcessState.RUNNING and self.process is not None
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "name": self.config.name,
+ "state": self.state.value,
+ "pid": self.pid,
+ "port": self.port,
+ "uptime": round(self.uptime, 2),
+ "restart_count": self.restart_count,
+ "health_check_failures": self.health_check_failures,
+ "workers": self.config.workers,
+ }
+
+
+class PortAllocator:
+ def __init__(self, start_port: int = 9000, end_port: int = 9999):
+ self.start_port = start_port
+ self.end_port = end_port
+ self._allocated: set[int] = set()
+ self._lock = asyncio.Lock()
+
+ async def allocate(self) -> int:
+ async with self._lock:
+ for port in range(self.start_port, self.end_port + 1):
+ if port in self._allocated:
+ continue
+ if self._is_port_available(port):
+ self._allocated.add(port)
+ return port
+ raise RuntimeError(f"No available ports in range {self.start_port}-{self.end_port}")
+
+ async def release(self, port: int) -> None:
+ async with self._lock:
+ self._allocated.discard(port)
+
+ def _is_port_available(self, port: int) -> bool:
+ try:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(("127.0.0.1", port))
+ return True
+ except OSError:
+ return False
+
+
+class ProcessManager:
+ def __init__(
+ self,
+ port_range: tuple[int, int] = (9000, 9999),
+ health_check_enabled: bool = True,
+ ):
+ self._processes: Dict[str, ProcessInfo] = {}
+ self._port_allocator = PortAllocator(*port_range)
+ self._health_check_enabled = health_check_enabled
+ self._health_check_task: Optional[asyncio.Task] = None
+ self._shutdown_event = asyncio.Event()
+ self._started = False
+ self._lock = asyncio.Lock()
+
+ async def start(self) -> None:
+ if self._started:
+ return
+
+ self._started = True
+ self._shutdown_event.clear()
+
+ if self._health_check_enabled:
+ self._health_check_task = asyncio.create_task(self._health_check_loop(), name="process_manager_health_check")
+
+ logger.info("Process manager started")
+
+ async def stop(self) -> None:
+ if not self._started:
+ return
+
+ logger.info("Stopping process manager...")
+ self._shutdown_event.set()
+
+ if self._health_check_task:
+ self._health_check_task.cancel()
+ try:
+ await self._health_check_task
+ except asyncio.CancelledError:
+ pass
+
+ await self.stop_all()
+
+ self._started = False
+ logger.info("Process manager stopped")
+
+ async def register(self, config: ProcessConfig) -> ProcessInfo:
+ async with self._lock:
+ if config.name in self._processes:
+ raise ValueError(f"Process '{config.name}' already registered")
+
+ info = ProcessInfo(config=config)
+ self._processes[config.name] = info
+
+ logger.info(f"Registered process '{config.name}'", app_path=config.app_path)
+ return info
+
+ async def unregister(self, name: str) -> None:
+ async with self._lock:
+ if name not in self._processes:
+ return
+
+ info = self._processes[name]
+ if info.is_running:
+ await self._stop_process(info)
+
+ if info.port:
+ await self._port_allocator.release(info.port)
+
+ del self._processes[name]
+ logger.info(f"Unregistered process '{name}'")
+
+ async def start_process(self, name: str) -> bool:
+ info = self._processes.get(name)
+ if not info:
+ logger.error(f"Process '{name}' not found")
+ return False
+
+ if info.is_running:
+ logger.warning(f"Process '{name}' is already running")
+ return True
+
+ return await self._start_process(info)
+
+ async def stop_process(self, name: str) -> bool:
+ info = self._processes.get(name)
+ if not info:
+ logger.error(f"Process '{name}' not found")
+ return False
+
+ return await self._stop_process(info)
+
+ async def restart_process(self, name: str) -> bool:
+ info = self._processes.get(name)
+ if not info:
+ logger.error(f"Process '{name}' not found")
+ return False
+
+ info.state = ProcessState.RESTARTING
+
+ if info.is_running:
+ await self._stop_process(info)
+
+ await asyncio.sleep(info.config.restart_delay)
+ return await self._start_process(info)
+
+ async def start_all(self) -> Dict[str, bool]:
+ results = {}
+ for name in self._processes:
+ results[name] = await self.start_process(name)
+ return results
+
+ async def stop_all(self) -> None:
+ tasks = []
+ for info in self._processes.values():
+ if info.is_running:
+ tasks.append(self._stop_process(info))
+
+ if tasks:
+ await asyncio.gather(*tasks, return_exceptions=True)
+
+ def get_process(self, name: str) -> Optional[ProcessInfo]:
+ return self._processes.get(name)
+
+ def get_all_processes(self) -> Dict[str, ProcessInfo]:
+ return self._processes.copy()
+
+ def get_process_by_port(self, port: int) -> Optional[ProcessInfo]:
+ for info in self._processes.values():
+ if info.port == port:
+ return info
+ return None
+
+ def get_upstream_url(self, name: str) -> Optional[str]:
+ info = self._processes.get(name)
+ if not info or not info.is_running:
+ return None
+ return f"http://{info.config.host}:{info.port}"
+
+ async def _start_process(self, info: ProcessInfo) -> bool:
+ config = info.config
+
+ try:
+ info.state = ProcessState.STARTING
+
+ if info.port == 0:
+ info.port = await self._port_allocator.allocate()
+
+ cmd = self._build_command(config, info.port)
+
+ env = os.environ.copy()
+ env.update(config.env)
+
+ if config.module_path:
+ python_path = env.get("PYTHONPATH", "")
+ module_dir = str(Path(config.module_path).resolve())
+ env["PYTHONPATH"] = f"{module_dir}:{python_path}" if python_path else module_dir
+
+ # For WSGI apps, pass configuration via environment variables
+ if config.app_type == "wsgi":
+ env["PYSERVE_WSGI_APP"] = config.app_path
+ env["PYSERVE_WSGI_FACTORY"] = "1" if config.factory else "0"
+
+ logger.info(
+ f"Starting process '{config.name}'",
+ command=" ".join(cmd),
+ port=info.port,
+ )
+
+ info.process = subprocess.Popen(
+ cmd,
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ preexec_fn=os.setsid if hasattr(os, "setsid") else None,
+ )
+
+ info.pid = info.process.pid
+ info.start_time = time.time()
+
+ if not await self._wait_for_ready(info):
+ raise RuntimeError(f"Process '{config.name}' failed to start")
+
+ info.state = ProcessState.RUNNING
+ logger.info(
+ f"Process '{config.name}' started successfully",
+ pid=info.pid,
+ port=info.port,
+ )
+ return True
+
+ except Exception as e:
+ logger.error(f"Failed to start process '{config.name}': {e}")
+ info.state = ProcessState.FAILED
+ if info.port:
+ await self._port_allocator.release(info.port)
+ info.port = 0
+ return False
+
+ async def _stop_process(self, info: ProcessInfo) -> bool:
+ if not info.process:
+ info.state = ProcessState.STOPPED
+ return True
+
+ config = info.config
+ info.state = ProcessState.STOPPING
+
+ try:
+ if hasattr(os, "killpg"):
+ try:
+ os.killpg(os.getpgid(info.process.pid), signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+ else:
+ info.process.terminate()
+
+ try:
+ await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, info.process.wait), timeout=config.shutdown_timeout)
+ except asyncio.TimeoutError:
+ logger.warning(f"Process '{config.name}' did not stop gracefully, forcing kill")
+ if hasattr(os, "killpg"):
+ try:
+ os.killpg(os.getpgid(info.process.pid), signal.SIGKILL)
+ except ProcessLookupError:
+ pass
+ else:
+ info.process.kill()
+ info.process.wait()
+
+ if info.port:
+ await self._port_allocator.release(info.port)
+
+ info.state = ProcessState.STOPPED
+ info.process = None
+ info.pid = None
+
+ logger.info(f"Process '{config.name}' stopped")
+ return True
+
+ except Exception as e:
+ logger.error(f"Error stopping process '{config.name}': {e}")
+ info.state = ProcessState.FAILED
+ return False
+
+ async def _wait_for_ready(self, info: ProcessInfo, timeout: float = 30.0) -> bool:
+ import httpx
+
+ start_time = time.time()
+ url = f"http://{info.config.host}:{info.port}{info.config.health_check_path}"
+
+ while time.time() - start_time < timeout:
+ if info.process and info.process.poll() is not None:
+ stdout, stderr = info.process.communicate()
+ logger.error(
+ f"Process '{info.config.name}' exited during startup",
+ returncode=info.process.returncode,
+ stderr=stderr.decode() if stderr else "",
+ )
+ return False
+
+ try:
+ async with httpx.AsyncClient(timeout=2.0) as client:
+ resp = await client.get(url)
+ if resp.status_code < 500:
+ return True
+ except Exception:
+ pass
+
+ await asyncio.sleep(0.5)
+
+ return False
+
+ async def _health_check_loop(self) -> None:
+ while not self._shutdown_event.is_set():
+ try:
+ for info in list(self._processes.values()):
+ if not info.is_running or not info.config.health_check_enabled:
+ continue
+
+ await self._check_process_health(info)
+
+ try:
+ await asyncio.wait_for(
+ self._shutdown_event.wait(),
+ timeout=(
+ min(p.config.health_check_interval for p in self._processes.values() if p.config.health_check_enabled)
+ if self._processes
+ else 10.0
+ ),
+ )
+ break
+ except asyncio.TimeoutError:
+ pass
+
+ except Exception as e:
+ logger.error(f"Error in health check loop: {e}")
+ await asyncio.sleep(5)
+
+ async def _check_process_health(self, info: ProcessInfo) -> bool:
+ import httpx
+
+ config = info.config
+ url = f"http://{config.host}:{info.port}{config.health_check_path}"
+
+ try:
+ async with httpx.AsyncClient(timeout=config.health_check_timeout) as client:
+ resp = await client.get(url)
+ if resp.status_code < 500:
+ info.health_check_failures = 0
+ info.last_health_check = time.time()
+ return True
+ else:
+ raise Exception(f"Health check returned status {resp.status_code}")
+
+ except Exception as e:
+ info.health_check_failures += 1
+ logger.warning(
+ f"Health check failed for '{config.name}'",
+ failures=info.health_check_failures,
+ error=str(e),
+ )
+
+ if info.health_check_failures >= config.health_check_retries:
+ logger.error(f"Process '{config.name}' is unhealthy, restarting...")
+ await self._handle_unhealthy_process(info)
+
+ return False
+
+ async def _handle_unhealthy_process(self, info: ProcessInfo) -> None:
+ config = info.config
+
+ if info.restart_count >= config.max_restart_count:
+ logger.error(f"Process '{config.name}' exceeded max restart count, marking as failed")
+ info.state = ProcessState.FAILED
+ return
+
+ info.restart_count += 1
+ info.health_check_failures = 0
+
+ delay = config.restart_delay * (2 ** (info.restart_count - 1))
+ delay = min(delay, 60.0)
+
+ logger.info(
+ f"Restarting process '{config.name}'",
+ restart_count=info.restart_count,
+ delay=delay,
+ )
+
+ await self._stop_process(info)
+ await asyncio.sleep(delay)
+ await self._start_process(info)
+
+ def _build_command(self, config: ProcessConfig, port: int) -> List[str]:
+ if config.app_type == "wsgi":
+ wrapper_app = self._create_wsgi_wrapper_path(config)
+ app_path = wrapper_app
+ else:
+ app_path = config.app_path
+
+ cmd = [
+ sys.executable,
+ "-m",
+ "uvicorn",
+ app_path,
+ "--host",
+ config.host,
+ "--port",
+ str(port),
+ "--workers",
+ str(config.workers),
+ "--log-level",
+ "warning",
+ "--no-access-log",
+ ]
+
+ if config.factory and config.app_type != "wsgi":
+ cmd.append("--factory")
+
+ return cmd
+
+ def _create_wsgi_wrapper_path(self, config: ProcessConfig) -> str:
+ """
+ Since uvicorn can't directly run WSGI apps, we create a wrapper
+ that imports the WSGI app and wraps it with a2wsgi.
+ """
+ # For WSGI apps, we'll use a special wrapper module
+ # The wrapper is: pyserve._wsgi_wrapper:create_app
+ # It will be called with app_path as environment variable
+ return "pyserve._wsgi_wrapper:app"
+
+ def get_metrics(self) -> Dict[str, Any]:
+ return {
+ "managed_processes": len(self._processes),
+ "running_processes": sum(1 for p in self._processes.values() if p.is_running),
+ "processes": {name: info.to_dict() for name, info in self._processes.items()},
+ }
+
+
+_process_manager: Optional[ProcessManager] = None
+
+
+def get_process_manager() -> ProcessManager:
+ global _process_manager
+ if _process_manager is None:
+ _process_manager = ProcessManager()
+ return _process_manager
+
+
+async def init_process_manager(
+ port_range: tuple[int, int] = (9000, 9999),
+ health_check_enabled: bool = True,
+) -> ProcessManager:
+ global _process_manager
+ _process_manager = ProcessManager(
+ port_range=port_range,
+ health_check_enabled=health_check_enabled,
+ )
+ await _process_manager.start()
+ return _process_manager
+
+
+async def shutdown_process_manager() -> None:
+ global _process_manager
+ if _process_manager:
+ await _process_manager.stop()
+ _process_manager = None
diff --git a/pyserve/server.py b/pyserve/server.py
index 9805b1c..f1ae8e7 100644
--- a/pyserve/server.py
+++ b/pyserve/server.py
@@ -119,6 +119,7 @@ class PyServeServer:
self.config = config
self.extension_manager = ExtensionManager()
self.app: Optional[Starlette] = None
+ self._async_extensions_loaded = False
self._setup_logging()
self._load_extensions()
self._create_app()
@@ -133,16 +134,38 @@ class PyServeServer:
if ext_config.type == "routing":
config.setdefault("default_proxy_timeout", self.config.server.proxy_timeout)
+ if ext_config.type == "process_orchestration":
+ continue
+
self.extension_manager.load_extension(ext_config.type, config)
+ async def _load_async_extensions(self) -> None:
+ if self._async_extensions_loaded:
+ return
+
+ for ext_config in self.config.extensions:
+ if ext_config.type == "process_orchestration":
+ config = ext_config.config.copy()
+ await self.extension_manager.load_extension_async(ext_config.type, config)
+
+ self._async_extensions_loaded = True
+
def _create_app(self) -> None:
+ from contextlib import asynccontextmanager
+
+ @asynccontextmanager
+ async def lifespan(app: Starlette):
+ await self._load_async_extensions()
+ logger.info("Async extensions loaded")
+ yield
+
routes = [
Route("/health", self._health_check, methods=["GET"]),
Route("/metrics", self._metrics, methods=["GET"]),
Route("/{path:path}", self._catch_all, methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]),
]
- self.app = Starlette(routes=routes)
+ self.app = Starlette(routes=routes, lifespan=lifespan)
self.app.add_middleware(PyServeMiddleware, extension_manager=self.extension_manager)
async def _health_check(self, request: Request) -> Response:
diff --git a/tests/test_process_orchestration.py b/tests/test_process_orchestration.py
new file mode 100644
index 0000000..8e34df7
--- /dev/null
+++ b/tests/test_process_orchestration.py
@@ -0,0 +1,1063 @@
+"""
+Integration tests for Process Orchestration functionality.
+
+These tests start PyServe with process orchestration extension and verify
+that requests are correctly routed to isolated worker processes.
+"""
+
+import asyncio
+import os
+import sys
+import tempfile
+import pytest
+import httpx
+import socket
+from pathlib import Path
+from typing import Dict, Any
+
+import uvicorn
+from starlette.applications import Starlette
+from starlette.requests import Request
+from starlette.responses import JSONResponse, PlainTextResponse
+from starlette.routing import Route
+
+from pyserve.config import Config, ServerConfig, HttpConfig, LoggingConfig, ExtensionConfig
+from pyserve.server import PyServeServer
+from pyserve.process_manager import (
+ ProcessConfig,
+ ProcessInfo,
+ ProcessManager,
+ ProcessState,
+ PortAllocator,
+)
+
+
+def get_free_port() -> int:
+ """Get an available port."""
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.bind(('', 0))
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ return s.getsockname()[1]
+
+
+# ============== Test Applications (saved as temp files) ==============
+
+FASTAPI_APP_CODE = '''
+from fastapi import FastAPI
+import os
+
+app = FastAPI()
+
+@app.get("/")
+def root():
+ return {"app": "fastapi-worker", "pid": os.getpid()}
+
+@app.get("/health")
+def health():
+ return {"status": "healthy", "pid": os.getpid()}
+
+@app.get("/users")
+def users():
+ return {"users": [{"id": 1, "name": "Alice"}], "pid": os.getpid()}
+
+@app.post("/users")
+def create_user(data: dict):
+ return {"created": data, "pid": os.getpid()}
+'''
+
+STARLETTE_APP_CODE = '''
+from starlette.applications import Starlette
+from starlette.responses import JSONResponse
+from starlette.routing import Route
+import os
+
+async def root(request):
+ return JSONResponse({"app": "starlette-worker", "pid": os.getpid()})
+
+async def health(request):
+ return JSONResponse({"status": "healthy", "pid": os.getpid()})
+
+async def info(request):
+ return JSONResponse({"info": "test", "pid": os.getpid()})
+
+app = Starlette(routes=[
+ Route("/", root),
+ Route("/health", health),
+ Route("/info", info),
+])
+'''
+
+SIMPLE_ASGI_APP_CODE = '''
+import os
+
+async def app(scope, receive, send):
+ if scope["type"] == "http":
+ path = scope.get("path", "/")
+
+ if path == "/health":
+ body = b'{"status": "healthy", "pid": ' + str(os.getpid()).encode() + b'}'
+ else:
+ body = b'{"app": "simple-asgi", "path": "' + path.encode() + b'", "pid": ' + str(os.getpid()).encode() + b'}'
+
+ await send({
+ "type": "http.response.start",
+ "status": 200,
+ "headers": [[b"content-type", b"application/json"]],
+ })
+ await send({
+ "type": "http.response.body",
+ "body": body,
+ })
+'''
+
+
+# ============== Unit Tests for ProcessConfig ==============
+
+class TestProcessConfig:
+ def test_default_values(self):
+ config = ProcessConfig(name="test", app_path="myapp:app")
+
+ assert config.name == "test"
+ assert config.app_path == "myapp:app"
+ assert config.app_type == "asgi"
+ assert config.host == "127.0.0.1"
+ assert config.port == 0
+ assert config.workers == 1
+ assert config.health_check_enabled is True
+ assert config.health_check_path == "/health"
+ assert config.max_restart_count == 5
+
+ def test_custom_values(self):
+ config = ProcessConfig(
+ name="api",
+ app_path="myapp.api:create_app",
+ workers=4,
+ factory=True,
+ health_check_path="/api/health",
+ max_memory_mb=512,
+ )
+
+ assert config.workers == 4
+ assert config.factory is True
+ assert config.health_check_path == "/api/health"
+ assert config.max_memory_mb == 512
+
+
+class TestProcessInfo:
+ def test_initial_state(self):
+ config = ProcessConfig(name="test", app_path="myapp:app")
+ info = ProcessInfo(config=config)
+
+ assert info.state == ProcessState.PENDING
+ assert info.pid is None
+ assert info.port == 0
+ assert info.restart_count == 0
+ assert info.is_running is False
+
+ def test_uptime(self):
+ import time
+
+ config = ProcessConfig(name="test", app_path="myapp:app")
+ info = ProcessInfo(config=config)
+
+ assert info.uptime == 0.0
+
+ info.start_time = time.time() - 10
+ assert 9 < info.uptime < 11
+
+ def test_to_dict(self):
+ config = ProcessConfig(name="test", app_path="myapp:app", workers=2)
+ info = ProcessInfo(config=config, state=ProcessState.RUNNING, pid=12345, port=9001)
+
+ result = info.to_dict()
+
+ assert result["name"] == "test"
+ assert result["state"] == "running"
+ assert result["pid"] == 12345
+ assert result["port"] == 9001
+ assert result["workers"] == 2
+
+
+class TestPortAllocator:
+ @pytest.mark.asyncio
+ async def test_allocate_port(self):
+ allocator = PortAllocator(start_port=19000, end_port=19010)
+
+ port = await allocator.allocate()
+
+ assert 19000 <= port <= 19010
+ assert port in allocator._allocated
+
+ @pytest.mark.asyncio
+ async def test_release_port(self):
+ allocator = PortAllocator(start_port=19000, end_port=19010)
+
+ port = await allocator.allocate()
+ assert port in allocator._allocated
+
+ await allocator.release(port)
+ assert port not in allocator._allocated
+
+ @pytest.mark.asyncio
+ async def test_no_available_ports(self):
+ allocator = PortAllocator(start_port=19000, end_port=19001)
+
+ # Allocate all ports
+ port1 = await allocator.allocate()
+ port2 = await allocator.allocate()
+
+ # Should raise error
+ with pytest.raises(RuntimeError, match="No available ports"):
+ await allocator.allocate()
+
+ # Release one and try again
+ await allocator.release(port1)
+ port3 = await allocator.allocate()
+ assert port3 == port1
+
+
+class TestProcessManager:
+ """Tests for ProcessManager."""
+
+ @pytest.mark.asyncio
+ async def test_register_process(self):
+ manager = ProcessManager(port_range=(19000, 19999))
+
+ config = ProcessConfig(name="test", app_path="myapp:app")
+ info = await manager.register(config)
+
+ assert info.config.name == "test"
+ assert "test" in manager._processes
+
+ @pytest.mark.asyncio
+ async def test_register_duplicate(self):
+ manager = ProcessManager(port_range=(19000, 19999))
+
+ config = ProcessConfig(name="test", app_path="myapp:app")
+ await manager.register(config)
+
+ with pytest.raises(ValueError, match="already registered"):
+ await manager.register(config)
+
+ @pytest.mark.asyncio
+ async def test_unregister_process(self):
+ manager = ProcessManager(port_range=(19000, 19999))
+
+ config = ProcessConfig(name="test", app_path="myapp:app")
+ await manager.register(config)
+
+ await manager.unregister("test")
+
+ assert "test" not in manager._processes
+
+ @pytest.mark.asyncio
+ async def test_get_process(self):
+ manager = ProcessManager(port_range=(19000, 19999))
+
+ config = ProcessConfig(name="test", app_path="myapp:app")
+ await manager.register(config)
+
+ info = manager.get_process("test")
+ assert info is not None
+ assert info.config.name == "test"
+
+ info = manager.get_process("nonexistent")
+ assert info is None
+
+ @pytest.mark.asyncio
+ async def test_build_command(self):
+ manager = ProcessManager()
+
+ config = ProcessConfig(
+ name="test",
+ app_path="myapp:app",
+ workers=4,
+ )
+
+ cmd = manager._build_command(config, 9001)
+
+ # Command format: [python, -m, uvicorn, app_path, ...]
+ assert "-m" in cmd
+ assert "uvicorn" in cmd
+ assert "myapp:app" in cmd
+ assert "--port" in cmd
+ assert "9001" in cmd
+ assert "--workers" in cmd
+ assert "4" in cmd
+
+ @pytest.mark.asyncio
+ async def test_build_command_with_factory(self):
+ manager = ProcessManager()
+
+ config = ProcessConfig(
+ name="test",
+ app_path="myapp:create_app",
+ factory=True,
+ )
+
+ cmd = manager._build_command(config, 9001)
+
+ assert "--factory" in cmd
+
+ @pytest.mark.asyncio
+ async def test_get_metrics(self):
+ manager = ProcessManager(port_range=(19000, 19999))
+
+ config1 = ProcessConfig(name="app1", app_path="myapp1:app")
+ config2 = ProcessConfig(name="app2", app_path="myapp2:app")
+
+ await manager.register(config1)
+ await manager.register(config2)
+
+ metrics = manager.get_metrics()
+
+ assert metrics["managed_processes"] == 2
+ assert metrics["running_processes"] == 0
+ assert "app1" in metrics["processes"]
+ assert "app2" in metrics["processes"]
+
+ @pytest.mark.asyncio
+ async def test_start_stop_lifecycle(self):
+ manager = ProcessManager(port_range=(19000, 19999))
+
+ await manager.start()
+ assert manager._started is True
+
+ await manager.stop()
+ assert manager._started is False
+
+
+class TestProcessState:
+ """Tests for ProcessState enum."""
+
+ def test_all_states(self):
+ assert ProcessState.PENDING.value == "pending"
+ assert ProcessState.STARTING.value == "starting"
+ assert ProcessState.RUNNING.value == "running"
+ assert ProcessState.STOPPING.value == "stopping"
+ assert ProcessState.STOPPED.value == "stopped"
+ assert ProcessState.FAILED.value == "failed"
+ assert ProcessState.RESTARTING.value == "restarting"
+
+
+# ============== Integration Tests ==============
+
+class TestProcessManagerIntegration:
+ """Integration tests for ProcessManager with real processes."""
+
+ @pytest.fixture
+ def temp_app_dir(self):
+ """Create temp directory with test apps."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Write starlette app (no extra dependencies needed)
+ app_file = Path(tmpdir) / "test_app.py"
+ app_file.write_text(STARLETTE_APP_CODE)
+
+ # Write simple ASGI app
+ simple_file = Path(tmpdir) / "simple_app.py"
+ simple_file.write_text(SIMPLE_ASGI_APP_CODE)
+
+ yield tmpdir
+
+ @pytest.mark.asyncio
+ async def test_start_real_process(self, temp_app_dir):
+ """Test starting a real uvicorn process."""
+ manager = ProcessManager(port_range=(19100, 19199), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="test_starlette",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ health_check_enabled=True,
+ )
+
+ await manager.register(config)
+ success = await manager.start_process("test_starlette")
+
+ assert success is True
+
+ info = manager.get_process("test_starlette")
+ assert info is not None
+ assert info.state == ProcessState.RUNNING
+ assert info.port > 0
+ assert info.pid is not None
+
+ # Make request to the running process
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"http://127.0.0.1:{info.port}/")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["app"] == "starlette-worker"
+ assert data["pid"] == info.pid
+
+ # Health check
+ resp = await client.get(f"http://127.0.0.1:{info.port}/health")
+ assert resp.status_code == 200
+ assert resp.json()["status"] == "healthy"
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_start_multiple_processes(self, temp_app_dir):
+ """Test starting multiple processes on different ports."""
+ manager = ProcessManager(port_range=(19200, 19299), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ # Register two apps
+ config1 = ProcessConfig(
+ name="app1",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+ config2 = ProcessConfig(
+ name="app2",
+ app_path="simple_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config1)
+ await manager.register(config2)
+
+ results = await manager.start_all()
+
+ assert results["app1"] is True
+ assert results["app2"] is True
+
+ info1 = manager.get_process("app1")
+ info2 = manager.get_process("app2")
+
+ assert info1.port != info2.port
+ assert info1.pid != info2.pid
+
+ # Both should respond
+ async with httpx.AsyncClient() as client:
+ resp1 = await client.get(f"http://127.0.0.1:{info1.port}/")
+ resp2 = await client.get(f"http://127.0.0.1:{info2.port}/")
+
+ assert resp1.status_code == 200
+ assert resp2.status_code == 200
+
+ assert resp1.json()["app"] == "starlette-worker"
+ assert resp2.json()["app"] == "simple-asgi"
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_stop_process(self, temp_app_dir):
+ """Test stopping a running process."""
+ manager = ProcessManager(port_range=(19300, 19399), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="test_app",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+ await manager.start_process("test_app")
+
+ info = manager.get_process("test_app")
+ port = info.port
+
+ # Process should be running
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"http://127.0.0.1:{port}/health")
+ assert resp.status_code == 200
+
+ # Stop the process
+ await manager.stop_process("test_app")
+
+ assert info.state == ProcessState.STOPPED
+ assert info.process is None
+
+ # Process should no longer respond
+ async with httpx.AsyncClient() as client:
+ with pytest.raises(httpx.ConnectError):
+ await client.get(f"http://127.0.0.1:{port}/health", timeout=1.0)
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_restart_process(self, temp_app_dir):
+ """Test restarting a process gets a new PID."""
+ manager = ProcessManager(port_range=(19400, 19499), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="test_app",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ restart_delay=0.1,
+ )
+
+ await manager.register(config)
+ await manager.start_process("test_app")
+
+ info = manager.get_process("test_app")
+ old_pid = info.pid
+
+ # Restart
+ await manager.restart_process("test_app")
+
+ assert info.state == ProcessState.RUNNING
+ assert info.pid != old_pid # New PID
+ assert info.restart_count == 0 # Manual restart doesn't increment
+
+ # Should still respond
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"http://127.0.0.1:{info.port}/health")
+ assert resp.status_code == 200
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_get_upstream_url(self, temp_app_dir):
+ """Test getting upstream URL for proxy routing."""
+ manager = ProcessManager(port_range=(19500, 19599), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="api",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+
+ # Not running yet
+ assert manager.get_upstream_url("api") is None
+
+ await manager.start_process("api")
+
+ info = manager.get_process("api")
+ url = manager.get_upstream_url("api")
+
+ assert url == f"http://127.0.0.1:{info.port}"
+
+ # Verify the URL works
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"{url}/health")
+ assert resp.status_code == 200
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_process_isolation(self, temp_app_dir):
+ """Test that processes are truly isolated (different PIDs)."""
+ manager = ProcessManager(port_range=(19600, 19699), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ # Start same app twice with different names
+ config1 = ProcessConfig(
+ name="worker1",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+ config2 = ProcessConfig(
+ name="worker2",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config1)
+ await manager.register(config2)
+ await manager.start_all()
+
+ info1 = manager.get_process("worker1")
+ info2 = manager.get_process("worker2")
+
+ # Get PIDs from the apps themselves
+ async with httpx.AsyncClient() as client:
+ resp1 = await client.get(f"http://127.0.0.1:{info1.port}/")
+ resp2 = await client.get(f"http://127.0.0.1:{info2.port}/")
+
+ pid1 = resp1.json()["pid"]
+ pid2 = resp2.json()["pid"]
+
+ # PIDs should be different - true process isolation
+ assert pid1 != pid2
+ assert pid1 == info1.pid
+ assert pid2 == info2.pid
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_metrics_with_running_processes(self, temp_app_dir):
+ """Test metrics reflect actual process state."""
+ manager = ProcessManager(port_range=(19700, 19799), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="monitored_app",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+
+ # Before start
+ metrics = manager.get_metrics()
+ assert metrics["managed_processes"] == 1
+ assert metrics["running_processes"] == 0
+ assert metrics["processes"]["monitored_app"]["state"] == "pending"
+
+ # After start
+ await manager.start_process("monitored_app")
+
+ metrics = manager.get_metrics()
+ assert metrics["running_processes"] == 1
+ assert metrics["processes"]["monitored_app"]["state"] == "running"
+ assert metrics["processes"]["monitored_app"]["pid"] is not None
+ assert metrics["processes"]["monitored_app"]["port"] > 0
+
+ # After stop
+ await manager.stop_process("monitored_app")
+
+ metrics = manager.get_metrics()
+ assert metrics["running_processes"] == 0
+ assert metrics["processes"]["monitored_app"]["state"] == "stopped"
+
+ finally:
+ await manager.stop()
+
+
+# ============== Error Handling Tests ==============
+
+class TestErrorHandling:
+ """Tests for error scenarios and edge cases."""
+
+ @pytest.fixture
+ def temp_app_dir(self):
+ """Create temp directory with test apps."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Write starlette app (no extra dependencies needed)
+ app_file = Path(tmpdir) / "test_app.py"
+ app_file.write_text(STARLETTE_APP_CODE)
+
+ yield tmpdir
+
+ @pytest.mark.asyncio
+ async def test_process_crash_detection(self, temp_app_dir):
+ """Process dies unexpectedly - state should reflect failure."""
+ manager = ProcessManager(port_range=(19800, 19899), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="crash_test",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+ await manager.start_process("crash_test")
+
+ info = manager.get_process("crash_test")
+ assert info.state == ProcessState.RUNNING
+ old_pid = info.pid
+
+ # Kill the process externally
+ import os
+ import signal
+ os.kill(old_pid, signal.SIGKILL)
+
+ # Wait a bit for process to die
+ await asyncio.sleep(0.5)
+
+ # Process should have exited - check via poll
+ assert info.process.poll() is not None
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_startup_timeout_with_bad_app(self):
+ """App that fails to start should timeout gracefully."""
+ manager = ProcessManager(port_range=(19900, 19999), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ # Create a temp dir with a broken app
+ with tempfile.TemporaryDirectory() as tmpdir:
+ broken_app = Path(tmpdir) / "broken_app.py"
+ broken_app.write_text('''
+# App that crashes on import
+raise RuntimeError("Intentional crash")
+''')
+
+ config = ProcessConfig(
+ name="broken_app",
+ app_path="broken_app:app",
+ module_path=tmpdir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+
+ # Should fail to start
+ success = await manager.start_process("broken_app")
+
+ assert success is False
+ info = manager.get_process("broken_app")
+ assert info.state == ProcessState.FAILED
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_malformed_app_path(self):
+ """Invalid app_path should fail gracefully."""
+ manager = ProcessManager(port_range=(20000, 20099), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="bad_path",
+ app_path="nonexistent_module:app",
+ workers=1,
+ )
+
+ await manager.register(config)
+
+ # Should fail to start
+ success = await manager.start_process("bad_path")
+
+ assert success is False
+ info = manager.get_process("bad_path")
+ assert info.state == ProcessState.FAILED
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_process_not_found(self):
+ """Operations on non-existent process should return False."""
+ manager = ProcessManager(port_range=(20100, 20199))
+ await manager.start()
+
+ try:
+ # Start non-existent process
+ result = await manager.start_process("nonexistent")
+ assert result is False
+
+ # Stop non-existent process
+ result = await manager.stop_process("nonexistent")
+ assert result is False
+
+ # Restart non-existent process
+ result = await manager.restart_process("nonexistent")
+ assert result is False
+
+ # Get non-existent process
+ info = manager.get_process("nonexistent")
+ assert info is None
+
+ # Get upstream URL for non-existent process
+ url = manager.get_upstream_url("nonexistent")
+ assert url is None
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_double_start(self, temp_app_dir):
+ """Starting already running process should succeed (idempotent)."""
+ manager = ProcessManager(port_range=(20200, 20299), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="double_start",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+
+ # First start
+ success1 = await manager.start_process("double_start")
+ assert success1 is True
+
+ info = manager.get_process("double_start")
+ pid1 = info.pid
+
+ # Second start should be idempotent
+ success2 = await manager.start_process("double_start")
+ assert success2 is True
+
+ # Same process, same PID
+ assert info.pid == pid1
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_double_stop(self, temp_app_dir):
+ """Stopping already stopped process should succeed (idempotent)."""
+ manager = ProcessManager(port_range=(20300, 20399), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="double_stop",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+ await manager.start_process("double_stop")
+
+ # First stop
+ success1 = await manager.stop_process("double_stop")
+ assert success1 is True
+
+ info = manager.get_process("double_stop")
+ assert info.state == ProcessState.STOPPED
+
+ # Second stop should be idempotent
+ success2 = await manager.stop_process("double_stop")
+ assert success2 is True
+
+ finally:
+ await manager.stop()
+
+
+# ============== Health Check Edge Case Tests ==============
+
+class TestHealthCheckEdgeCases:
+ """Tests for health check scenarios."""
+
+ @pytest.fixture
+ def temp_app_dir(self):
+ """Create temp directory with test apps."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Write starlette app (no extra dependencies needed)
+ app_file = Path(tmpdir) / "test_app.py"
+ app_file.write_text(STARLETTE_APP_CODE)
+
+ yield tmpdir
+
+ @pytest.mark.asyncio
+ async def test_health_check_missing_endpoint(self, temp_app_dir):
+ """App without /health endpoint - process starts but health checks fail."""
+ # Create an app without /health endpoint
+ with tempfile.TemporaryDirectory() as tmpdir:
+ no_health_app = Path(tmpdir) / "no_health_app.py"
+ no_health_app.write_text('''
+from starlette.applications import Starlette
+from starlette.responses import JSONResponse
+from starlette.routing import Route
+
+async def root(request):
+ return JSONResponse({"app": "no-health"})
+
+app = Starlette(routes=[
+ Route("/", root),
+])
+''')
+
+ manager = ProcessManager(port_range=(20400, 20499), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="no_health",
+ app_path="no_health_app:app",
+ module_path=tmpdir,
+ workers=1,
+ health_check_path="/health", # This endpoint doesn't exist
+ health_check_timeout=2.0,
+ )
+
+ await manager.register(config)
+
+ # Should fail because health check endpoint doesn't exist
+ # _wait_for_ready expects < 500 status code
+ success = await manager.start_process("no_health")
+
+ # Process may start but health check will get 404
+ # Our implementation accepts < 500 so 404 is OK
+ # (This tests current behavior - may want to change later)
+ info = manager.get_process("no_health")
+ assert info is not None
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_health_check_returns_500(self, temp_app_dir):
+ """App health endpoint returns 500 - should fail health check."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ unhealthy_app = Path(tmpdir) / "unhealthy_app.py"
+ unhealthy_app.write_text('''
+from starlette.applications import Starlette
+from starlette.responses import JSONResponse
+from starlette.routing import Route
+
+async def root(request):
+ return JSONResponse({"app": "unhealthy"})
+
+async def health(request):
+ return JSONResponse({"status": "unhealthy"}, status_code=500)
+
+app = Starlette(routes=[
+ Route("/", root),
+ Route("/health", health),
+])
+''')
+
+ manager = ProcessManager(port_range=(20500, 20599), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="unhealthy",
+ app_path="unhealthy_app:app",
+ module_path=tmpdir,
+ workers=1,
+ health_check_path="/health",
+ health_check_timeout=2.0,
+ )
+
+ await manager.register(config)
+
+ # Should fail to start because health returns 500
+ success = await manager.start_process("unhealthy")
+
+ assert success is False
+ info = manager.get_process("unhealthy")
+ assert info.state == ProcessState.FAILED
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_process_info_uptime_calculation(self, temp_app_dir):
+ """Uptime should be calculated correctly."""
+ manager = ProcessManager(port_range=(20600, 20699), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="uptime_test",
+ app_path="test_app:app",
+ module_path=temp_app_dir,
+ workers=1,
+ health_check_path="/health",
+ )
+
+ await manager.register(config)
+ await manager.start_process("uptime_test")
+
+ info = manager.get_process("uptime_test")
+
+ # Wait a bit
+ await asyncio.sleep(1.0)
+
+ # Uptime should be > 1 second
+ assert info.uptime >= 1.0
+
+ # to_dict should include uptime
+ data = info.to_dict()
+ assert data["uptime"] >= 1.0
+
+ finally:
+ await manager.stop()
+
+ @pytest.mark.asyncio
+ async def test_env_variables_passed_to_process(self, temp_app_dir):
+ """Environment variables should be passed to subprocess."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ env_app = Path(tmpdir) / "env_app.py"
+ env_app.write_text('''
+import os
+from starlette.applications import Starlette
+from starlette.responses import JSONResponse
+from starlette.routing import Route
+
+async def root(request):
+ return JSONResponse({
+ "MY_VAR": os.environ.get("MY_VAR", "not set"),
+ "ANOTHER_VAR": os.environ.get("ANOTHER_VAR", "not set"),
+ })
+
+async def health(request):
+ return JSONResponse({"status": "healthy"})
+
+app = Starlette(routes=[
+ Route("/", root),
+ Route("/health", health),
+])
+''')
+
+ manager = ProcessManager(port_range=(20700, 20799), health_check_enabled=False)
+ await manager.start()
+
+ try:
+ config = ProcessConfig(
+ name="env_test",
+ app_path="env_app:app",
+ module_path=tmpdir,
+ workers=1,
+ health_check_path="/health",
+ env={
+ "MY_VAR": "hello",
+ "ANOTHER_VAR": "world",
+ },
+ )
+
+ await manager.register(config)
+ await manager.start_process("env_test")
+
+ info = manager.get_process("env_test")
+
+ # Check that env vars were passed
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"http://127.0.0.1:{info.port}/")
+ data = resp.json()
+
+ assert data["MY_VAR"] == "hello"
+ assert data["ANOTHER_VAR"] == "world"
+
+ finally:
+ await manager.stop()