Compare commits

...

10 Commits
v0.9.1 ... main

Author SHA1 Message Date
Илья Глазунов
d03ade18c5 increase server backlog to improve connection handling
All checks were successful
Lint Code / lint (push) Successful in 44s
CI/CD Pipeline / lint (push) Successful in 0s
Run Tests / test (3.12) (push) Successful in 2m40s
Run Tests / test (3.13) (push) Successful in 2m32s
CI/CD Pipeline / test (push) Successful in 0s
CI/CD Pipeline / build-and-release (push) Has been skipped
CI/CD Pipeline / notify (push) Successful in 0s
2025-12-04 03:39:52 +03:00
Илья Глазунов
129785706c remove unnecessary blank lines in health and service command files
All checks were successful
Lint Code / lint (push) Successful in 47s
CI/CD Pipeline / lint (push) Successful in 0s
Run Tests / test (3.12) (push) Successful in 2m48s
Run Tests / test (3.13) (push) Successful in 2m40s
CI/CD Pipeline / test (push) Successful in 0s
CI/CD Pipeline / build-and-release (push) Has been skipped
CI/CD Pipeline / notify (push) Successful in 1s
2025-12-04 03:17:28 +03:00
Илья Глазунов
3b59994fc9 fixed pyservectl linter errors and formatting 2025-12-04 03:17:21 +03:00
Илья Глазунов
7662a7924a fixed flake8 lint errors 2025-12-04 03:06:58 +03:00
Илья Глазунов
cec6e927a7 tests for pyservectl 2025-12-04 03:00:56 +03:00
Илья Глазунов
80544d5b95 pyservectl init 2025-12-04 02:55:14 +03:00
Илья Глазунов
b4f63c6804 bump version to 0.9.10
Some checks failed
Run Tests / test (3.12) (push) Successful in 2m39s
Run Tests / test (3.13) (push) Successful in 2m31s
CI/CD Pipeline / lint (push) Successful in 0s
Build and Release / build (push) Successful in 36s
CI/CD Pipeline / test (push) Has been skipped
CI/CD Pipeline / build-and-release (push) Has been skipped
Build and Release / release (push) Successful in 6s
CI/CD Pipeline / notify (push) Successful in 1s
Lint Code / lint (push) Failing after 40s
2025-12-04 01:31:19 +03:00
Илья Глазунов
59d6ae2fd2 fix: correct return type in _load_wsgi_app function 2025-12-04 01:30:18 +03:00
Илья Глазунов
edaccb59bb lint fixes 2025-12-04 01:27:43 +03:00
Илья Глазунов
3454801be7 process_orchestration for asgi added 2025-12-04 01:25:13 +03:00
31 changed files with 7043 additions and 125 deletions

172
README.md
View File

@ -1,145 +1,97 @@
# PyServe
PyServe is a modern, async HTTP server written in Python. Originally created for educational purposes, it has evolved into a powerful tool for rapid prototyping and serving web applications with unique features like AI-generated content.
Python application orchestrator and HTTP server. Runs multiple ASGI/WSGI applications through a single entry point with process isolation, health monitoring, and auto-restart.
<img src="https://raw.githubusercontent.com/ShiftyX1/PyServe/refs/heads/master/images/logo.png" alt="isolated" width="150"/>
<img src="https://raw.githubusercontent.com/ShiftyX1/PyServe/refs/heads/master/images/logo.png" alt="PyServe Logo" width="150"/>
[More on web page](https://pyserve.org/)
Website: [pyserve.org](https://pyserve.org) · Documentation: [docs.pyserve.org](https://docs.pyserve.org)
## Project Overview
## Overview
PyServe v0.6.0 introduces a completely refactored architecture with modern async/await syntax and new exciting features like **Vibe-Serving** - AI-powered dynamic content generation.
PyServe manages multiple Python web applications (FastAPI, Flask, Django, etc.) as isolated subprocesses behind a single gateway. Each app runs on its own port with independent lifecycle, health checks, and automatic restarts on failure.
### Key Features:
```
PyServe Gateway (:8000)
┌────────────────┼────────────────┐
▼ ▼ ▼
FastAPI Flask Starlette
:9001 :9002 :9003
/api/* /admin/* /ws/*
```
- **Async HTTP Server** - Built with Python's asyncio for high performance
- **Advanced Configuration System V2** - Powerful extensible configuration with full backward compatibility
- **Regex Routing & SPA Support** - nginx-style routing patterns with Single Page Application fallback
- **Static File Serving** - Efficient serving with correct MIME types
- **Template System** - Dynamic content generation
- **Vibe-Serving Mode** - AI-generated content using language models (OpenAI, Claude, etc.)
- **Reverse Proxy** - Forward requests to backend services with advanced routing
- **SSL/HTTPS Support** - Secure connections with certificate configuration
- **Modular Extensions** - Plugin-like architecture for security, caching, monitoring
- **Beautiful Logging** - Colored terminal output with file rotation
- **Error Handling** - Styled error pages and graceful fallbacks
- **CLI Interface** - Command-line interface for easy deployment and configuration
## Getting Started
### Prerequisites
- Python 3.12 or higher
- Poetry (recommended) or pip
### Installation
#### Via Poetry (рекомендуется)
## Installation
```bash
git clone https://github.com/ShiftyX1/PyServe.git
cd PyServe
make init # Initialize project
make init
```
#### Или установка пакета
## Quick Start
```bash
# local install
make install-package
```yaml
# config.yaml
server:
host: 0.0.0.0
port: 8000
# after installing project you can use command pyserve
pyserve --help
extensions:
- type: process_orchestration
config:
apps:
- name: api
path: /api
app_path: myapp.api:app
- name: admin
path: /admin
app_path: myapp.admin:app
```
### Running the Server
#### Using Makefile (recommended)
```bash
# start in development mode
make run
# start in production mode
make run-prod
# show all available commands
make help
pyserve -c config.yaml
```
#### Using CLI directly
Requests to `/api/*` are proxied to the api process, `/admin/*` to admin.
## Process Orchestration
The main case of using PyServe is orchestration of python web applications. Each application runs as a separate uvicorn process on a dynamically or manually allocated port (9000-9999 by default). PyServe proxies requests to the appropriate process based on URL path.
For each application you can configure the number of workers, environment variables, health check endpoint path, and auto-restart parameters. If a process crashes or stops responding to health checks, PyServe automatically restarts it with exponential backoff.
WSGI applications (Flask, Django) are supported through automatic wrapping — just specify `app_type: wsgi`.
## In-Process Mounting
For simpler cases when process isolation is not needed, applications can be mounted directly into the PyServe process via the `asgi` extension. This is lighter and faster, but all applications share one process.
## Static Files & Routing
PyServe can serve static files with nginx-like routing: regex patterns, SPA fallback for frontend applications, custom caching headers. Routes are processed in priority order — exact match, then regex, then default.
## Reverse Proxy
Requests can be proxied to external backends. Useful for integration with legacy services or microservices in other languages.
## CLI
```bash
# after installing package
pyserve
# or with Poetry
poetry run pyserve
# or legacy (for backward compatibility)
python run.py
```
#### CLI options
```bash
# help
pyserve --help
# path to config
pyserve -c /path/to/config.yaml
# rewrite host and port
pyserve -c config.yaml
pyserve --host 0.0.0.0 --port 9000
# debug mode
pyserve --debug
# show version
pyserve --version
```
## Development
### Makefile Commands
```bash
make help # Show help for commands
make install # Install dependencies
make dev-install # Install development dependencies
make build # Build the package
make test # Run tests
make test-cov # Tests with code coverage
make lint # Check code with linters
make format # Format code
make clean # Clean up temporary files
make version # Show version
make publish-test # Publish to Test PyPI
make publish # Publish to PyPI
```
### Project Structure
```
pyserveX/
├── pyserve/ # Main package
│ ├── __init__.py
│ ├── cli.py # CLI interface
│ ├── server.py # Main server module
│ ├── config.py # Configuration system
│ ├── routing.py # Routing
│ ├── extensions.py # Extensions
│ └── logging_utils.py
├── tests/ # Tests
├── static/ # Static files
├── templates/ # Templates
├── logs/ # Logs
├── Makefile # Automation tasks
├── pyproject.toml # Project configuration
├── config.yaml # Server configuration
└── run.py # Entry point (backward compatibility)
make test # run tests
make lint # linting
make format # formatting
```
## License
This project is distributed under the MIT license.
[MIT License](./LICENSE)

View File

@ -8,7 +8,7 @@ http:
server:
host: 0.0.0.0
port: 8000
backlog: 5
backlog: 1000
default_root: false
ssl:

View File

@ -0,0 +1,113 @@
# PyServe Process Orchestration Example
#
# This configuration demonstrates running multiple ASGI/WSGI applications
# as isolated processes with automatic health monitoring and restart.
server:
host: "0.0.0.0"
port: 8000
backlog: 2048
proxy_timeout: 60.0
logging:
level: DEBUG
console_output: true
format:
type: standard
use_colors: true
extensions:
# Process Orchestration - runs each app in its own process
- type: process_orchestration
config:
# Port range for worker processes
port_range: [9000, 9999]
# Enable health monitoring
health_check_enabled: true
# Proxy timeout for requests
proxy_timeout: 60.0
apps:
# FastAPI application
- name: api
path: /api
app_path: examples.apps.fastapi_app:app
module_path: "."
workers: 2
health_check_path: /health
health_check_interval: 10.0
health_check_timeout: 5.0
health_check_retries: 3
max_restart_count: 5
restart_delay: 1.0
shutdown_timeout: 30.0
strip_path: true
env:
APP_ENV: "production"
DEBUG: "false"
# Flask application (WSGI wrapped to ASGI)
- name: admin
path: /admin
app_path: examples.apps.flask_app:app
app_type: wsgi
module_path: "."
workers: 1
health_check_path: /health
strip_path: true
# Starlette application
- name: web
path: /web
app_path: examples.apps.starlette_app:app
module_path: "."
workers: 2
health_check_path: /health
strip_path: true
# Custom ASGI application
- name: custom
path: /custom
app_path: examples.apps.custom_asgi:app
module_path: "."
workers: 1
health_check_path: /health
strip_path: true
# Routing for static files and reverse proxy
- type: routing
config:
regex_locations:
# Static files
"^/static/.*":
type: static
root: "./static"
strip_prefix: "/static"
# Documentation
"^/docs/?.*":
type: static
root: "./docs"
strip_prefix: "/docs"
# External API proxy
"^/external/.*":
type: proxy
upstream: "https://api.example.com"
strip_prefix: "/external"
# Security headers
- type: security
config:
security_headers:
X-Content-Type-Options: "nosniff"
X-Frame-Options: "DENY"
X-XSS-Protection: "1; mode=block"
Strict-Transport-Security: "max-age=31536000; includeSubDomains"
# Monitoring
- type: monitoring
config:
enable_metrics: true

108
poetry.lock generated
View File

@ -147,14 +147,14 @@ files = [
[[package]]
name = "click"
version = "8.2.1"
version = "8.3.1"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.10"
groups = ["main", "dev"]
files = [
{file = "click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b"},
{file = "click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202"},
{file = "click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6"},
{file = "click-8.3.1.tar.gz", hash = "sha256:12ff4785d337a1bb490bb7e9c2b1ee5da3112e94a8622f26a6c77f5d2fc6842a"},
]
[package.dependencies]
@ -602,6 +602,30 @@ MarkupSafe = ">=2.0"
[package.extras]
i18n = ["Babel (>=2.7)"]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
description = "Python port of markdown-it. Markdown parsing, done right!"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147"},
{file = "markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3"},
]
[package.dependencies]
mdurl = ">=0.1,<1.0"
[package.extras]
benchmarking = ["psutil", "pytest", "pytest-benchmark"]
compare = ["commonmark (>=0.9,<1.0)", "markdown (>=3.4,<4.0)", "markdown-it-pyrs", "mistletoe (>=1.0,<2.0)", "mistune (>=3.0,<4.0)", "panflute (>=2.3,<3.0)"]
linkify = ["linkify-it-py (>=1,<3)"]
plugins = ["mdit-py-plugins (>=0.5.0)"]
profiling = ["gprof2dot"]
rtd = ["ipykernel", "jupyter_sphinx", "mdit-py-plugins (>=0.5.0)", "myst-parser", "pyyaml", "sphinx", "sphinx-book-theme (>=1.0,<2.0)", "sphinx-copybutton", "sphinx-design"]
testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions", "requests"]
[[package]]
name = "markupsafe"
version = "3.0.3"
@ -714,6 +738,18 @@ files = [
{file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"},
]
[[package]]
name = "mdurl"
version = "0.1.2"
description = "Markdown URL utilities"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"},
{file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"},
]
[[package]]
name = "mypy"
version = "1.17.1"
@ -843,6 +879,39 @@ files = [
dev = ["pre-commit", "tox"]
testing = ["coverage", "pytest", "pytest-benchmark"]
[[package]]
name = "psutil"
version = "7.1.3"
description = "Cross-platform lib for process and system monitoring."
optional = false
python-versions = ">=3.6"
groups = ["main"]
files = [
{file = "psutil-7.1.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:0005da714eee687b4b8decd3d6cc7c6db36215c9e74e5ad2264b90c3df7d92dc"},
{file = "psutil-7.1.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:19644c85dcb987e35eeeaefdc3915d059dac7bd1167cdcdbf27e0ce2df0c08c0"},
{file = "psutil-7.1.3-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:95ef04cf2e5ba0ab9eaafc4a11eaae91b44f4ef5541acd2ee91d9108d00d59a7"},
{file = "psutil-7.1.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1068c303be3a72f8e18e412c5b2a8f6d31750fb152f9cb106b54090296c9d251"},
{file = "psutil-7.1.3-cp313-cp313t-win_amd64.whl", hash = "sha256:18349c5c24b06ac5612c0428ec2a0331c26443d259e2a0144a9b24b4395b58fa"},
{file = "psutil-7.1.3-cp313-cp313t-win_arm64.whl", hash = "sha256:c525ffa774fe4496282fb0b1187725793de3e7c6b29e41562733cae9ada151ee"},
{file = "psutil-7.1.3-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:b403da1df4d6d43973dc004d19cee3b848e998ae3154cc8097d139b77156c353"},
{file = "psutil-7.1.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:ad81425efc5e75da3f39b3e636293360ad8d0b49bed7df824c79764fb4ba9b8b"},
{file = "psutil-7.1.3-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f33a3702e167783a9213db10ad29650ebf383946e91bc77f28a5eb083496bc9"},
{file = "psutil-7.1.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:fac9cd332c67f4422504297889da5ab7e05fd11e3c4392140f7370f4208ded1f"},
{file = "psutil-7.1.3-cp314-cp314t-win_amd64.whl", hash = "sha256:3792983e23b69843aea49c8f5b8f115572c5ab64c153bada5270086a2123c7e7"},
{file = "psutil-7.1.3-cp314-cp314t-win_arm64.whl", hash = "sha256:31d77fcedb7529f27bb3a0472bea9334349f9a04160e8e6e5020f22c59893264"},
{file = "psutil-7.1.3-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:2bdbcd0e58ca14996a42adf3621a6244f1bb2e2e528886959c72cf1e326677ab"},
{file = "psutil-7.1.3-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:bc31fa00f1fbc3c3802141eede66f3a2d51d89716a194bf2cd6fc68310a19880"},
{file = "psutil-7.1.3-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3bb428f9f05c1225a558f53e30ccbad9930b11c3fc206836242de1091d3e7dd3"},
{file = "psutil-7.1.3-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:56d974e02ca2c8eb4812c3f76c30e28836fffc311d55d979f1465c1feeb2b68b"},
{file = "psutil-7.1.3-cp37-abi3-win_amd64.whl", hash = "sha256:f39c2c19fe824b47484b96f9692932248a54c43799a84282cfe58d05a6449efd"},
{file = "psutil-7.1.3-cp37-abi3-win_arm64.whl", hash = "sha256:bd0d69cee829226a761e92f28140bec9a5ee9d5b4fb4b0cc589068dbfff559b1"},
{file = "psutil-7.1.3.tar.gz", hash = "sha256:6c86281738d77335af7aec228328e944b30930899ea760ecf33a4dba66be5e74"},
]
[package.extras]
dev = ["abi3audit", "black", "check-manifest", "colorama ; os_name == \"nt\"", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pyreadline ; os_name == \"nt\"", "pytest", "pytest-cov", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "requests", "rstcheck", "ruff", "setuptools", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "validate-pyproject[all]", "virtualenv", "vulture", "wheel", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""]
test = ["pytest", "pytest-instafail", "pytest-subtests", "pytest-xdist", "pywin32 ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "setuptools", "wheel ; os_name == \"nt\" and platform_python_implementation != \"PyPy\"", "wmi ; os_name == \"nt\" and platform_python_implementation != \"PyPy\""]
[[package]]
name = "pycodestyle"
version = "2.14.0"
@ -1180,6 +1249,25 @@ files = [
{file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"},
]
[[package]]
name = "rich"
version = "14.2.0"
description = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal"
optional = false
python-versions = ">=3.8.0"
groups = ["main"]
files = [
{file = "rich-14.2.0-py3-none-any.whl", hash = "sha256:76bc51fe2e57d2b1be1f96c524b890b816e334ab4c1e45888799bfaab0021edd"},
{file = "rich-14.2.0.tar.gz", hash = "sha256:73ff50c7c0c1c77c8243079283f4edb376f0f6442433aecb8ce7e6d0b92d1fe4"},
]
[package.dependencies]
markdown-it-py = ">=2.2.0"
pygments = ">=2.13.0,<3.0.0"
[package.extras]
jupyter = ["ipywidgets (>=7.5.1,<9)"]
[[package]]
name = "setuptools"
version = "80.9.0"
@ -1261,6 +1349,18 @@ files = [
{file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"},
]
[[package]]
name = "types-psutil"
version = "7.1.3.20251202"
description = "Typing stubs for psutil"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_psutil-7.1.3.20251202-py3-none-any.whl", hash = "sha256:39bfc44780de7ab686c65169e36a7969db09e7f39d92de643b55789292953400"},
{file = "types_psutil-7.1.3.20251202.tar.gz", hash = "sha256:5cfecaced7c486fb3995bb290eab45043d697a261718aca01b9b340d1ab7968a"},
]
[[package]]
name = "types-pyyaml"
version = "6.0.12.20250822"
@ -1621,4 +1721,4 @@ wsgi = ["a2wsgi"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.12"
content-hash = "32ebf260f6792987cb4236fe29ad3329374e063504d507b5a0319684e24a30a8"
content-hash = "653d7b992e2bb133abde2e8b1c44265e948ed90487ab3f2670429510a8aa0683"

View File

@ -1,7 +1,7 @@
[project]
name = "pyserve"
version = "0.9.1"
description = "Simple HTTP Web server written in Python"
version = "0.9.10"
description = "Python Application Orchestrator & HTTP Server - unified gateway for multiple Python web apps"
authors = [
{name = "Илья Глазунов",email = "i.glazunov@sapiens.solutions"}
]
@ -15,10 +15,14 @@ dependencies = [
"types-pyyaml (>=6.0.12.20250822,<7.0.0.0)",
"structlog (>=25.4.0,<26.0.0)",
"httpx (>=0.27.0,<0.28.0)",
"click (>=8.0)",
"rich (>=13.0)",
"psutil (>=5.9)",
]
[project.scripts]
pyserve = "pyserve.cli:main"
pyservectl = "pyserve.ctl:main"
[project.optional-dependencies]
dev = [
@ -97,4 +101,5 @@ flake8 = "^7.3.0"
pytest-asyncio = "^1.3.0"
cython = "^3.0.0"
setuptools = "^80.0.0"
types-psutil = "^7.1.3.20251202"

View File

@ -2,7 +2,7 @@
PyServe - HTTP web server written on Python
"""
__version__ = "0.9.0"
__version__ = "0.10.0"
__author__ = "Ilya Glazunov"
from .asgi_mount import (
@ -15,13 +15,22 @@ from .asgi_mount import (
create_starlette_app,
)
from .config import Config
from .process_manager import (
ProcessConfig,
ProcessInfo,
ProcessManager,
ProcessState,
get_process_manager,
init_process_manager,
shutdown_process_manager,
)
from .server import PyServeServer
__all__ = [
"PyServeServer",
"Config",
"__version__",
# ASGI mounting
# ASGI mounting (in-process)
"ASGIAppLoader",
"ASGIMountManager",
"MountedApp",
@ -29,4 +38,12 @@ __all__ = [
"create_flask_app",
"create_django_app",
"create_starlette_app",
# Process orchestration (multi-process)
"ProcessManager",
"ProcessConfig",
"ProcessInfo",
"ProcessState",
"get_process_manager",
"init_process_manager",
"shutdown_process_manager",
]

73
pyserve/_wsgi_wrapper.py Normal file
View File

@ -0,0 +1,73 @@
"""
WSGI Wrapper Module for Process Orchestration.
This module provides a wrapper that allows WSGI applications to be run
via uvicorn by wrapping them with a2wsgi.
The WSGI app path is passed via environment variables:
- PYSERVE_WSGI_APP: The app path (e.g., "myapp:app" or "myapp.main:create_app")
- PYSERVE_WSGI_FACTORY: "1" if the app path points to a factory function
"""
import importlib
import os
from typing import Any, Callable, Optional, Type
WSGIMiddlewareType = Optional[Type[Any]]
WSGI_ADAPTER: Optional[str] = None
WSGIMiddleware: WSGIMiddlewareType = None
try:
from a2wsgi import WSGIMiddleware as _A2WSGIMiddleware
WSGIMiddleware = _A2WSGIMiddleware
WSGI_ADAPTER = "a2wsgi"
except ImportError:
try:
from asgiref.wsgi import WsgiToAsgi as _AsgirefMiddleware
WSGIMiddleware = _AsgirefMiddleware
WSGI_ADAPTER = "asgiref"
except ImportError:
pass
def _load_wsgi_app() -> Callable[..., Any]:
app_path = os.environ.get("PYSERVE_WSGI_APP")
is_factory = os.environ.get("PYSERVE_WSGI_FACTORY", "0") == "1"
if not app_path:
raise RuntimeError("PYSERVE_WSGI_APP environment variable not set. " "This module should only be used by PyServe process orchestration.")
if ":" in app_path:
module_name, attr_name = app_path.rsplit(":", 1)
else:
module_name = app_path
attr_name = "app"
try:
module = importlib.import_module(module_name)
except ImportError as e:
raise RuntimeError(f"Failed to import WSGI module '{module_name}': {e}")
try:
app_or_factory = getattr(module, attr_name)
except AttributeError:
raise RuntimeError(f"Module '{module_name}' has no attribute '{attr_name}'")
if is_factory:
result: Callable[..., Any] = app_or_factory()
return result
loaded_app: Callable[..., Any] = app_or_factory
return loaded_app
def _create_asgi_app() -> Any:
if WSGIMiddleware is None:
raise RuntimeError("No WSGI adapter available. " "Install a2wsgi (recommended) or asgiref: pip install a2wsgi")
wsgi_app = _load_wsgi_app()
return WSGIMiddleware(wsgi_app)
app = _create_asgi_app()

View File

@ -1,3 +1,10 @@
"""
PyServe CLI - Server entry point
Simple CLI for running the PyServe HTTP server.
For service management, use pyservectl.
"""
import argparse
import sys
from pathlib import Path
@ -9,12 +16,33 @@ def main() -> None:
parser = argparse.ArgumentParser(
description="PyServe - HTTP web server",
prog="pyserve",
epilog="For service management (start/stop/restart/logs), use: pyservectl",
)
parser.add_argument(
"-c",
"--config",
default="config.yaml",
help="Path to configuration file (default: config.yaml)",
)
parser.add_argument(
"--host",
help="Host to bind the server to",
)
parser.add_argument(
"--port",
type=int,
help="Port to bind the server to",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode",
)
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
parser.add_argument("-c", "--config", default="config.yaml", help="Path to configuration file (default: config.yaml)")
parser.add_argument("--host", help="Host to bind the server to")
parser.add_argument("--port", type=int, help="Port to bind the server to")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
args = parser.parse_args()

26
pyserve/ctl/__init__.py Normal file
View File

@ -0,0 +1,26 @@
"""
PyServeCtl - Service management CLI
Docker-compose-like tool for managing PyServe services.
Usage:
pyservectl [OPTIONS] COMMAND [ARGS]...
Commands:
init Initialize a new project
config Configuration management
up Start all services
down Stop all services
start Start specific services
stop Stop specific services
restart Restart services
ps Show service status
logs View service logs
top Live monitoring dashboard
health Check service health
scale Scale services
"""
from .main import cli, main
__all__ = ["cli", "main"]

93
pyserve/ctl/_daemon.py Normal file
View File

@ -0,0 +1,93 @@
"""
PyServe Daemon Process
Runs pyserve services in background mode.
"""
import argparse
import asyncio
import logging
import os
import signal
import sys
from pathlib import Path
from types import FrameType
from typing import Optional
def main() -> None:
parser = argparse.ArgumentParser(description="PyServe Daemon")
parser.add_argument("--config", required=True, help="Configuration file path")
parser.add_argument("--state-dir", required=True, help="State directory path")
parser.add_argument("--services", default=None, help="Comma-separated list of services")
parser.add_argument("--scale", action="append", default=[], help="Scale overrides (name=workers)")
parser.add_argument("--force-recreate", action="store_true", help="Force recreate services")
args = parser.parse_args()
config_path = Path(args.config)
state_dir = Path(args.state_dir)
services = args.services.split(",") if args.services else None
scale_map = {}
for scale in args.scale:
name, workers = scale.split("=")
scale_map[name] = int(workers)
from ..config import Config
config = Config.from_yaml(str(config_path))
from .state import StateManager
state_manager = StateManager(state_dir)
log_file = state_dir / "logs" / "daemon.log"
log_file.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file),
],
)
logger = logging.getLogger("pyserve.daemon")
pid_file = state_dir / "pyserve.pid"
pid_file.write_text(str(os.getpid()))
logger.info(f"Starting daemon with PID {os.getpid()}")
from ._runner import ServiceRunner
runner = ServiceRunner(config, state_manager)
def signal_handler(signum: int, frame: Optional[FrameType]) -> None:
logger.info(f"Received signal {signum}, shutting down...")
runner.stop()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
asyncio.run(
runner.start(
services=services,
scale_map=scale_map,
force_recreate=args.force_recreate,
)
)
except Exception as e:
logger.error(f"Daemon error: {e}")
sys.exit(1)
finally:
if pid_file.exists():
pid_file.unlink()
logger.info("Daemon stopped")
if __name__ == "__main__":
main()

389
pyserve/ctl/_runner.py Normal file
View File

@ -0,0 +1,389 @@
"""
PyServe Service Runner
Handles starting, stopping, and managing services.
Integrates with ProcessManager for actual process management.
"""
import asyncio
import os
import sys
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from ..config import Config
from ..process_manager import ProcessConfig, ProcessManager, ProcessState
from .state import StateManager
@dataclass
class ServiceDefinition:
name: str
path: str
app_path: str
app_type: str = "asgi"
module_path: Optional[str] = None
workers: int = 1
health_check_path: str = "/health"
health_check_interval: float = 10.0
health_check_timeout: float = 5.0
health_check_retries: int = 3
max_restart_count: int = 5
restart_delay: float = 1.0
shutdown_timeout: float = 30.0
strip_path: bool = True
env: Dict[str, str] = field(default_factory=dict)
class ServiceRunner:
def __init__(self, config: Config, state_manager: StateManager):
self.config = config
self.state_manager = state_manager
self._process_manager: Optional[ProcessManager] = None
self._services: Dict[str, ServiceDefinition] = {}
self._running = False
self._parse_services()
def _parse_services(self) -> None:
for ext in self.config.extensions:
if ext.type == "process_orchestration":
apps = ext.config.get("apps", [])
for app_config in apps:
service = ServiceDefinition(
name=app_config.get("name", "unnamed"),
path=app_config.get("path", "/"),
app_path=app_config.get("app_path", ""),
app_type=app_config.get("app_type", "asgi"),
module_path=app_config.get("module_path"),
workers=app_config.get("workers", 1),
health_check_path=app_config.get("health_check_path", "/health"),
health_check_interval=app_config.get("health_check_interval", 10.0),
health_check_timeout=app_config.get("health_check_timeout", 5.0),
health_check_retries=app_config.get("health_check_retries", 3),
max_restart_count=app_config.get("max_restart_count", 5),
restart_delay=app_config.get("restart_delay", 1.0),
shutdown_timeout=app_config.get("shutdown_timeout", 30.0),
strip_path=app_config.get("strip_path", True),
env=app_config.get("env", {}),
)
self._services[service.name] = service
def get_services(self) -> Dict[str, ServiceDefinition]:
return self._services.copy()
def get_service(self, name: str) -> Optional[ServiceDefinition]:
return self._services.get(name)
async def start(
self,
services: Optional[List[str]] = None,
scale_map: Optional[Dict[str, int]] = None,
force_recreate: bool = False,
wait_healthy: bool = False,
timeout: int = 60,
) -> None:
from .output import console, print_error, print_info, print_success
scale_map = scale_map or {}
target_services = services or list(self._services.keys())
if not target_services:
print_info("No services configured. Add services to your config.yaml")
return
for name in target_services:
if name not in self._services:
print_error(f"Service '{name}' not found in configuration")
return
port_range = (9000, 9999)
for ext in self.config.extensions:
if ext.type == "process_orchestration":
port_range = tuple(ext.config.get("port_range", [9000, 9999]))
break
self._process_manager = ProcessManager(
port_range=port_range,
health_check_enabled=True,
)
await self._process_manager.start()
self._running = True
for name in target_services:
service = self._services[name]
workers = scale_map.get(name, service.workers)
proc_config = ProcessConfig(
name=name,
app_path=service.app_path,
app_type=service.app_type,
workers=workers,
module_path=service.module_path,
health_check_enabled=True,
health_check_path=service.health_check_path,
health_check_interval=service.health_check_interval,
health_check_timeout=service.health_check_timeout,
health_check_retries=service.health_check_retries,
max_restart_count=service.max_restart_count,
restart_delay=service.restart_delay,
shutdown_timeout=service.shutdown_timeout,
env=service.env,
)
try:
await self._process_manager.register(proc_config)
success = await self._process_manager.start_process(name)
if success:
info = self._process_manager.get_process(name)
if info:
self.state_manager.update_service(
name,
state="running",
pid=info.pid,
port=info.port,
workers=workers,
started_at=time.time(),
)
print_success(f"Started service: {name}")
else:
self.state_manager.update_service(name, state="failed")
print_error(f"Failed to start service: {name}")
except Exception as e:
print_error(f"Error starting {name}: {e}")
self.state_manager.update_service(name, state="failed")
if wait_healthy:
print_info("Waiting for services to be healthy...")
await self._wait_healthy(target_services, timeout)
console.print("\n[bold]Services running. Press Ctrl+C to stop.[/bold]\n")
try:
while self._running:
await asyncio.sleep(1)
await self._sync_state()
except asyncio.CancelledError:
pass
finally:
await self.stop_all()
async def _sync_state(self) -> None:
if not self._process_manager:
return
for name, info in self._process_manager.get_all_processes().items():
state_str = info.state.value
health_status = "healthy" if info.health_check_failures == 0 else "unhealthy"
self.state_manager.update_service(
name,
state=state_str,
pid=info.pid,
port=info.port,
)
service_state = self.state_manager.get_service(name)
if service_state:
service_state.health.status = health_status
service_state.health.failures = info.health_check_failures
self.state_manager.save()
async def _wait_healthy(self, services: List[str], timeout: int) -> None:
from .output import print_info, print_warning
start_time = time.time()
while time.time() - start_time < timeout:
all_healthy = True
for name in services:
if not self._process_manager:
continue
info = self._process_manager.get_process(name)
if not info or info.state != ProcessState.RUNNING:
all_healthy = False
break
if all_healthy:
print_info("All services healthy")
return
await asyncio.sleep(1)
print_warning("Timeout waiting for services to become healthy")
async def stop_all(self, timeout: int = 30) -> None:
from .output import print_info
self._running = False
if self._process_manager:
print_info("Stopping all services...")
await self._process_manager.stop()
self._process_manager = None
for name in self._services:
self.state_manager.update_service(
name,
state="stopped",
pid=None,
)
def stop(self) -> None:
self._running = False
async def start_service(self, name: str, timeout: int = 60) -> bool:
from .output import print_error
service = self._services.get(name)
if not service:
print_error(f"Service '{name}' not found")
return False
if not self._process_manager:
self._process_manager = ProcessManager()
await self._process_manager.start()
proc_config = ProcessConfig(
name=name,
app_path=service.app_path,
app_type=service.app_type,
workers=service.workers,
module_path=service.module_path,
health_check_enabled=True,
health_check_path=service.health_check_path,
env=service.env,
)
try:
existing = self._process_manager.get_process(name)
if not existing:
await self._process_manager.register(proc_config)
success = await self._process_manager.start_process(name)
if success:
info = self._process_manager.get_process(name)
self.state_manager.update_service(
name,
state="running",
pid=info.pid if info else None,
port=info.port if info else 0,
started_at=time.time(),
)
return success
except Exception as e:
print_error(f"Error starting {name}: {e}")
return False
async def stop_service(self, name: str, timeout: int = 30, force: bool = False) -> bool:
if not self._process_manager:
self.state_manager.update_service(name, state="stopped", pid=None)
return True
try:
success = await self._process_manager.stop_process(name)
if success:
self.state_manager.update_service(
name,
state="stopped",
pid=None,
)
return success
except Exception as e:
from .output import print_error
print_error(f"Error stopping {name}: {e}")
return False
async def restart_service(self, name: str, timeout: int = 60) -> bool:
if not self._process_manager:
return False
try:
self.state_manager.update_service(name, state="restarting")
success = await self._process_manager.restart_process(name)
if success:
info = self._process_manager.get_process(name)
self.state_manager.update_service(
name,
state="running",
pid=info.pid if info else None,
port=info.port if info else 0,
started_at=time.time(),
)
return success
except Exception as e:
from .output import print_error
print_error(f"Error restarting {name}: {e}")
return False
async def scale_service(self, name: str, workers: int, timeout: int = 60, wait: bool = True) -> bool:
# For now, this requires restart with new worker count
# In future, could implement hot-reloading
service = self._services.get(name)
if not service:
return False
# Update service definition
service.workers = workers
# Restart with new configuration
return await self.restart_service(name, timeout)
def start_daemon(
self,
services: Optional[List[str]] = None,
scale_map: Optional[Dict[str, int]] = None,
force_recreate: bool = False,
) -> int:
import subprocess
cmd = [
sys.executable,
"-m",
"pyserve.cli._daemon",
"--config",
str(self.state_manager.state_dir.parent / "config.yaml"),
"--state-dir",
str(self.state_manager.state_dir),
]
if services:
cmd.extend(["--services", ",".join(services)])
if scale_map:
for name, workers in scale_map.items():
cmd.extend(["--scale", f"{name}={workers}"])
if force_recreate:
cmd.append("--force-recreate")
env = os.environ.copy()
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
return process.pid

View File

@ -0,0 +1,25 @@
from .config import config_cmd
from .down import down_cmd
from .health import health_cmd
from .init import init_cmd
from .logs import logs_cmd
from .scale import scale_cmd
from .service import restart_cmd, start_cmd, stop_cmd
from .status import ps_cmd
from .top import top_cmd
from .up import up_cmd
__all__ = [
"init_cmd",
"config_cmd",
"up_cmd",
"down_cmd",
"start_cmd",
"stop_cmd",
"restart_cmd",
"ps_cmd",
"logs_cmd",
"top_cmd",
"health_cmd",
"scale_cmd",
]

View File

@ -0,0 +1,419 @@
"""
pyserve config - Configuration management commands
"""
import json
from pathlib import Path
from typing import Any, Optional
import click
import yaml
@click.group("config")
def config_cmd() -> None:
"""
Configuration management commands.
\b
Commands:
validate Validate configuration file
show Display current configuration
get Get a specific configuration value
set Set a configuration value
diff Compare two configuration files
"""
pass
@config_cmd.command("validate")
@click.option(
"-c",
"--config",
"config_file",
default=None,
help="Path to configuration file",
)
@click.option(
"--strict",
is_flag=True,
help="Enable strict validation (warn on unknown fields)",
)
@click.pass_obj
def validate_cmd(ctx: Any, config_file: Optional[str], strict: bool) -> None:
"""
Validate a configuration file.
Checks for syntax errors, missing required fields, and invalid values.
\b
Examples:
pyserve config validate
pyserve config validate -c production.yaml
pyserve config validate --strict
"""
from ..output import console, print_error, print_success, print_warning
config_path = Path(config_file or ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
console.print(f"Validating [cyan]{config_path}[/cyan]...")
try:
with open(config_path) as f:
data = yaml.safe_load(f)
if data is None:
print_error("Configuration file is empty")
raise click.Abort()
from ...config import Config
config = Config.from_yaml(str(config_path))
errors = []
warnings = []
if not (1 <= config.server.port <= 65535):
errors.append(f"Invalid server port: {config.server.port}")
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if config.logging.level.upper() not in valid_levels:
errors.append(f"Invalid logging level: {config.logging.level}")
if config.ssl.enabled:
if not Path(config.ssl.cert_file).exists():
warnings.append(f"SSL cert file not found: {config.ssl.cert_file}")
if not Path(config.ssl.key_file).exists():
warnings.append(f"SSL key file not found: {config.ssl.key_file}")
valid_extension_types = [
"routing",
"process_orchestration",
"asgi_mount",
]
for ext in config.extensions:
if ext.type not in valid_extension_types:
warnings.append(f"Unknown extension type: {ext.type}")
if strict:
known_top_level = {"http", "server", "ssl", "logging", "extensions"}
for key in data.keys():
if key not in known_top_level:
warnings.append(f"Unknown top-level field: {key}")
if errors:
for error in errors:
print_error(error)
raise click.Abort()
if warnings:
for warning in warnings:
print_warning(warning)
print_success("Configuration is valid!")
except yaml.YAMLError as e:
print_error(f"YAML syntax error: {e}")
raise click.Abort()
except Exception as e:
print_error(f"Validation error: {e}")
raise click.Abort()
@config_cmd.command("show")
@click.option(
"-c",
"--config",
"config_file",
default=None,
help="Path to configuration file",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["yaml", "json", "table"]),
default="yaml",
help="Output format",
)
@click.option(
"--section",
"section",
default=None,
help="Show only a specific section (e.g., server, logging)",
)
@click.pass_obj
def show_cmd(ctx: Any, config_file: Optional[str], output_format: str, section: Optional[str]) -> None:
"""
Display current configuration.
\b
Examples:
pyserve config show
pyserve config show --format json
pyserve config show --section server
"""
from ..output import console, print_error
config_path = Path(config_file or ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
try:
with open(config_path) as f:
data = yaml.safe_load(f)
if section:
if section in data:
data = {section: data[section]}
else:
print_error(f"Section '{section}' not found in configuration")
raise click.Abort()
if output_format == "yaml":
from rich.syntax import Syntax
yaml_str = yaml.dump(data, default_flow_style=False, sort_keys=False)
syntax = Syntax(yaml_str, "yaml", theme="monokai", line_numbers=False)
console.print(syntax)
elif output_format == "json":
from rich.syntax import Syntax
json_str = json.dumps(data, indent=2)
syntax = Syntax(json_str, "json", theme="monokai", line_numbers=False)
console.print(syntax)
elif output_format == "table":
from rich.tree import Tree
def build_tree(data: Any, tree: Any) -> None:
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (dict, list)):
branch = tree.add(f"[cyan]{key}[/cyan]")
build_tree(value, branch)
else:
tree.add(f"[cyan]{key}[/cyan]: [green]{value}[/green]")
elif isinstance(data, list):
for i, item in enumerate(data):
if isinstance(item, (dict, list)):
branch = tree.add(f"[dim][{i}][/dim]")
build_tree(item, branch)
else:
tree.add(f"[dim][{i}][/dim] [green]{item}[/green]")
tree = Tree(f"[bold]Configuration: {config_path}[/bold]")
build_tree(data, tree)
console.print(tree)
except Exception as e:
print_error(f"Error reading configuration: {e}")
raise click.Abort()
@config_cmd.command("get")
@click.argument("key")
@click.option(
"-c",
"--config",
"config_file",
default=None,
help="Path to configuration file",
)
@click.pass_obj
def get_cmd(ctx: Any, key: str, config_file: Optional[str]) -> None:
"""
Get a specific configuration value.
Use dot notation to access nested values.
\b
Examples:
pyserve config get server.port
pyserve config get logging.level
pyserve config get extensions.0.type
"""
from ..output import console, print_error
config_path = Path(config_file or ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
try:
with open(config_path) as f:
data = yaml.safe_load(f)
value = data
for part in key.split("."):
if isinstance(value, dict):
if part in value:
value = value[part]
else:
print_error(f"Key '{key}' not found")
raise click.Abort()
elif isinstance(value, list):
try:
index = int(part)
value = value[index]
except (ValueError, IndexError):
print_error(f"Invalid index '{part}' in key '{key}'")
raise click.Abort()
else:
print_error(f"Cannot access '{part}' in {type(value).__name__}")
raise click.Abort()
if isinstance(value, (dict, list)):
console.print(yaml.dump(value, default_flow_style=False))
else:
console.print(str(value))
except Exception as e:
print_error(f"Error: {e}")
raise click.Abort()
@config_cmd.command("set")
@click.argument("key")
@click.argument("value")
@click.option(
"-c",
"--config",
"config_file",
default=None,
help="Path to configuration file",
)
@click.pass_obj
def set_cmd(ctx: Any, key: str, value: str, config_file: Optional[str]) -> None:
"""
Set a configuration value.
Use dot notation to access nested values.
\b
Examples:
pyserve config set server.port 8080
pyserve config set logging.level DEBUG
"""
from ..output import print_error, print_success
config_path = Path(config_file or ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
try:
with open(config_path) as f:
data = yaml.safe_load(f)
parsed_value: Any
if value.lower() == "true":
parsed_value = True
elif value.lower() == "false":
parsed_value = False
elif value.isdigit():
parsed_value = int(value)
else:
try:
parsed_value = float(value)
except ValueError:
parsed_value = value
parts = key.split(".")
current = data
for part in parts[:-1]:
if isinstance(current, dict):
if part not in current:
current[part] = {}
current = current[part]
elif isinstance(current, list):
index = int(part)
current = current[index]
final_key = parts[-1]
if isinstance(current, dict):
current[final_key] = parsed_value
elif isinstance(current, list):
current[int(final_key)] = parsed_value
with open(config_path, "w") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
print_success(f"Set {key} = {parsed_value}")
except Exception as e:
print_error(f"Error: {e}")
raise click.Abort()
@config_cmd.command("diff")
@click.argument("file1", type=click.Path(exists=True))
@click.argument("file2", type=click.Path(exists=True))
def diff_cmd(file1: str, file2: str) -> None:
"""
Compare two configuration files.
\b
Examples:
pyserve config diff config.yaml production.yaml
"""
from ..output import console, print_error
try:
with open(file1) as f:
data1 = yaml.safe_load(f)
with open(file2) as f:
data2 = yaml.safe_load(f)
def compare_dicts(d1: Any, d2: Any, path: str = "") -> list[tuple[str, str, Any, Any]]:
differences: list[tuple[str, str, Any, Any]] = []
all_keys = set(d1.keys() if d1 else []) | set(d2.keys() if d2 else [])
for key in sorted(all_keys):
current_path = f"{path}.{key}" if path else key
v1 = d1.get(key) if d1 else None
v2 = d2.get(key) if d2 else None
if key not in (d1 or {}):
differences.append(("added", current_path, None, v2))
elif key not in (d2 or {}):
differences.append(("removed", current_path, v1, None))
elif isinstance(v1, dict) and isinstance(v2, dict):
differences.extend(compare_dicts(v1, v2, current_path))
elif v1 != v2:
differences.append(("changed", current_path, v1, v2))
return differences
differences = compare_dicts(data1, data2)
if not differences:
console.print("[green]Files are identical[/green]")
return
console.print(f"\n[bold]Differences between {file1} and {file2}:[/bold]\n")
for diff_type, path, v1, v2 in differences:
if diff_type == "added":
console.print(f" [green]+ {path}: {v2}[/green]")
elif diff_type == "removed":
console.print(f" [red]- {path}: {v1}[/red]")
elif diff_type == "changed":
console.print(f" [yellow]~ {path}:[/yellow]")
console.print(f" [red]- {v1}[/red]")
console.print(f" [green]+ {v2}[/green]")
console.print()
except Exception as e:
print_error(f"Error: {e}")
raise click.Abort()

View File

@ -0,0 +1,123 @@
"""
pyserve down - Stop all services
"""
import signal
import time
from pathlib import Path
from typing import Any, cast
import click
@click.command("down")
@click.option(
"--timeout",
"timeout",
default=30,
type=int,
help="Timeout in seconds for graceful shutdown",
)
@click.option(
"-v",
"--volumes",
is_flag=True,
help="Remove volumes/data",
)
@click.option(
"--remove-orphans",
is_flag=True,
help="Remove orphaned services",
)
@click.pass_obj
def down_cmd(
ctx: Any,
timeout: int,
volumes: bool,
remove_orphans: bool,
) -> None:
"""
Stop and remove all services.
\b
Examples:
pyserve down # Stop all services
pyserve down --timeout 60 # Extended shutdown timeout
pyserve down -v # Remove volumes too
"""
from ..output import console, print_error, print_info, print_success, print_warning
from ..state import StateManager
state_manager = StateManager(Path(".pyserve"), ctx.project)
if state_manager.is_daemon_running():
daemon_pid = state_manager.get_daemon_pid()
console.print(f"[bold]Stopping PyServe daemon (PID: {daemon_pid})...[/bold]")
try:
import os
# FIXME: Please fix the cast usage here
os.kill(cast(int, daemon_pid), signal.SIGTERM)
start_time = time.time()
while time.time() - start_time < timeout:
try:
# FIXME: Please fix the cast usage here
os.kill(cast(int, daemon_pid), 0)
time.sleep(0.5)
except ProcessLookupError:
break
else:
print_warning("Graceful shutdown timed out, forcing...")
try:
# FIXME: Please fix the cast usage here
os.kill(cast(int, daemon_pid), signal.SIGKILL)
except ProcessLookupError:
pass
state_manager.clear_daemon_pid()
print_success("PyServe daemon stopped")
except ProcessLookupError:
print_info("Daemon was not running")
state_manager.clear_daemon_pid()
except PermissionError:
print_error("Permission denied to stop daemon")
raise click.Abort()
else:
services = state_manager.get_all_services()
if not services:
print_info("No services are running")
return
console.print("[bold]Stopping services...[/bold]")
from ...config import Config
from .._runner import ServiceRunner
config_path = Path(ctx.config_file)
if config_path.exists():
config = Config.from_yaml(str(config_path))
else:
config = Config()
runner = ServiceRunner(config, state_manager)
import asyncio
try:
asyncio.run(runner.stop_all(timeout=timeout))
print_success("All services stopped")
except Exception as e:
print_error(f"Error stopping services: {e}")
if volumes:
console.print("Cleaning up state...")
state_manager.clear()
print_info("State cleared")
if remove_orphans:
# This would remove services that are in state but not in config
pass

View File

@ -0,0 +1,161 @@
"""
pyserve health - Check health of services
"""
import asyncio
from pathlib import Path
from typing import Any
import click
@click.command("health")
@click.argument("services", nargs=-1)
@click.option(
"--timeout",
"timeout",
default=5,
type=int,
help="Health check timeout in seconds",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["table", "json"]),
default="table",
help="Output format",
)
@click.pass_obj
def health_cmd(ctx: Any, services: tuple[str, ...], timeout: int, output_format: str) -> None:
"""
Check health of services.
Performs active health checks on running services.
\b
Examples:
pyserve health # Check all services
pyserve health api admin # Check specific services
pyserve health --format json # JSON output
"""
from ..output import console, print_error, print_info
from ..state import StateManager
state_manager = StateManager(Path(".pyserve"), ctx.project)
all_services = state_manager.get_all_services()
if services:
all_services = {k: v for k, v in all_services.items() if k in services}
if not all_services:
print_info("No services to check")
return
results = asyncio.run(_check_health(all_services, timeout))
if output_format == "json":
import json
console.print(json.dumps(results, indent=2))
return
from rich.table import Table
from ..output import format_health
table = Table(show_header=True, header_style="bold")
table.add_column("SERVICE", style="cyan")
table.add_column("HEALTH")
table.add_column("CHECKS", justify="right")
table.add_column("LAST CHECK", style="dim")
table.add_column("RESPONSE TIME", justify="right")
for name, result in results.items():
health_str = format_health(result["status"])
checks = f"{result['successes']}/{result['total']}"
last_check = result.get("last_check", "-")
response_time = f"{result['response_time_ms']:.0f}ms" if result.get("response_time_ms") else "-"
table.add_row(name, health_str, checks, last_check, response_time)
console.print()
console.print(table)
console.print()
healthy = sum(1 for r in results.values() if r["status"] == "healthy")
unhealthy = sum(1 for r in results.values() if r["status"] == "unhealthy")
if unhealthy:
print_error(f"{unhealthy} service(s) unhealthy")
raise SystemExit(1)
else:
from ..output import print_success
print_success(f"All {healthy} service(s) healthy")
async def _check_health(services: dict, timeout: int) -> dict:
import time
try:
import httpx
except ImportError:
return {name: {"status": "unknown", "error": "httpx not installed"} for name in services}
results = {}
for name, service in services.items():
if service.state != "running" or not service.port:
results[name] = {
"status": "unknown",
"successes": 0,
"total": 0,
"error": "Service not running",
}
continue
health_path = "/health"
url = f"http://127.0.0.1:{service.port}{health_path}"
start_time = time.time()
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url)
response_time = (time.time() - start_time) * 1000
if resp.status_code < 500:
results[name] = {
"status": "healthy",
"successes": 1,
"total": 1,
"response_time_ms": response_time,
"last_check": "just now",
"status_code": resp.status_code,
}
else:
results[name] = {
"status": "unhealthy",
"successes": 0,
"total": 1,
"response_time_ms": response_time,
"last_check": "just now",
"status_code": resp.status_code,
}
except httpx.TimeoutException:
results[name] = {
"status": "unhealthy",
"successes": 0,
"total": 1,
"error": "timeout",
"last_check": "just now",
}
except Exception as e:
results[name] = {
"status": "unhealthy",
"successes": 0,
"total": 1,
"error": str(e),
"last_check": "just now",
}
return results

View File

@ -0,0 +1,432 @@
"""
pyserve init - Initialize a new pyserve project
"""
from pathlib import Path
import click
TEMPLATES = {
"basic": {
"description": "Basic configuration with static files and routing",
"filename": "config.yaml",
},
"orchestration": {
"description": "Process orchestration with multiple ASGI/WSGI apps",
"filename": "config.yaml",
},
"asgi": {
"description": "ASGI mount configuration for in-process apps",
"filename": "config.yaml",
},
"full": {
"description": "Full configuration with all features",
"filename": "config.yaml",
},
}
BASIC_TEMPLATE = """\
# PyServe Configuration
# Generated by: pyserve init
http:
static_dir: ./static
templates_dir: ./templates
server:
host: 0.0.0.0
port: 8080
backlog: 100
proxy_timeout: 30.0
ssl:
enabled: false
cert_file: ./ssl/cert.pem
key_file: ./ssl/key.pem
logging:
level: INFO
console_output: true
format:
type: standard
use_colors: true
show_module: true
timestamp_format: "%Y-%m-%d %H:%M:%S"
console:
level: INFO
format:
type: standard
use_colors: true
files:
- path: ./logs/pyserve.log
level: INFO
format:
type: standard
use_colors: false
extensions:
- type: routing
config:
regex_locations:
# Health check endpoint
"=/health":
return: "200 OK"
content_type: "text/plain"
# Static files
"^/static/":
root: "./static"
strip_prefix: "/static"
# Default fallback
"__default__":
spa_fallback: true
root: "./static"
index_file: "index.html"
"""
ORCHESTRATION_TEMPLATE = """\
# PyServe Process Orchestration Configuration
# Generated by: pyserve init --template orchestration
#
# This configuration runs multiple ASGI/WSGI apps as isolated processes
# with automatic health monitoring and restart.
server:
host: 0.0.0.0
port: 8080
backlog: 2048
proxy_timeout: 60.0
logging:
level: INFO
console_output: true
format:
type: standard
use_colors: true
files:
- path: ./logs/pyserve.log
level: DEBUG
format:
type: standard
use_colors: false
extensions:
# Process Orchestration - runs each app in its own process
- type: process_orchestration
config:
port_range: [9000, 9999]
health_check_enabled: true
proxy_timeout: 60.0
apps:
# Example: FastAPI application
- name: api
path: /api
app_path: myapp.api:app
module_path: "."
workers: 2
health_check_path: /health
health_check_interval: 10.0
health_check_timeout: 5.0
health_check_retries: 3
max_restart_count: 5
restart_delay: 1.0
strip_path: true
env:
APP_ENV: "production"
# Example: Flask application (WSGI)
# - name: admin
# path: /admin
# app_path: myapp.admin:app
# app_type: wsgi
# module_path: "."
# workers: 1
# health_check_path: /health
# strip_path: true
# Static files routing
- type: routing
config:
regex_locations:
"=/health":
return: "200 OK"
content_type: "text/plain"
"^/static/":
root: "./static"
strip_prefix: "/static"
"""
ASGI_TEMPLATE = """\
# PyServe ASGI Mount Configuration
# Generated by: pyserve init --template asgi
#
# This configuration mounts ASGI apps in-process (like ASGI Lifespan).
# More efficient but apps share the same process.
server:
host: 0.0.0.0
port: 8080
backlog: 100
proxy_timeout: 30.0
logging:
level: INFO
console_output: true
format:
type: standard
use_colors: true
files:
- path: ./logs/pyserve.log
level: DEBUG
extensions:
- type: asgi_mount
config:
mounts:
# FastAPI app mounted at /api
- path: /api
app: myapp.api:app
# factory: false # Set to true if app is a factory function
# Starlette app mounted at /web
# - path: /web
# app: myapp.web:app
- type: routing
config:
regex_locations:
"=/health":
return: "200 OK"
content_type: "text/plain"
"^/static/":
root: "./static"
strip_prefix: "/static"
"__default__":
spa_fallback: true
root: "./static"
index_file: "index.html"
"""
FULL_TEMPLATE = """\
# PyServe Full Configuration
# Generated by: pyserve init --template full
#
# Comprehensive configuration showcasing all PyServe features.
http:
static_dir: ./static
templates_dir: ./templates
server:
host: 0.0.0.0
port: 8080
backlog: 2048
default_root: false
proxy_timeout: 60.0
redirect_instructions:
"/old-path": "/new-path"
ssl:
enabled: false
cert_file: ./ssl/cert.pem
key_file: ./ssl/key.pem
logging:
level: INFO
console_output: true
format:
type: standard
use_colors: true
show_module: true
timestamp_format: "%Y-%m-%d %H:%M:%S"
console:
level: DEBUG
format:
type: standard
use_colors: true
files:
# Main log file
- path: ./logs/pyserve.log
level: DEBUG
format:
type: standard
use_colors: false
# JSON logs for log aggregation
- path: ./logs/pyserve.json
level: INFO
format:
type: json
# Access logs
- path: ./logs/access.log
level: INFO
loggers: ["pyserve.access"]
max_bytes: 10485760 # 10MB
backup_count: 10
extensions:
# Process Orchestration for background services
- type: process_orchestration
config:
port_range: [9000, 9999]
health_check_enabled: true
proxy_timeout: 60.0
apps:
- name: api
path: /api
app_path: myapp.api:app
module_path: "."
workers: 2
health_check_path: /health
strip_path: true
env:
APP_ENV: "production"
# Advanced routing with regex
- type: routing
config:
regex_locations:
# API versioning
"~^/api/v(?P<version>\\\\d+)/":
proxy_pass: "http://localhost:9001"
headers:
- "API-Version: {version}"
- "X-Forwarded-For: $remote_addr"
# Static files with caching
"~*\\\\.(js|css|png|jpg|gif|ico|svg|woff2?)$":
root: "./static"
cache_control: "public, max-age=31536000"
headers:
- "Access-Control-Allow-Origin: *"
# Health check
"=/health":
return: "200 OK"
content_type: "text/plain"
# Static files
"^/static/":
root: "./static"
strip_prefix: "/static"
# SPA fallback
"__default__":
spa_fallback: true
root: "./static"
index_file: "index.html"
"""
def get_template_content(template: str) -> str:
templates = {
"basic": BASIC_TEMPLATE,
"orchestration": ORCHESTRATION_TEMPLATE,
"asgi": ASGI_TEMPLATE,
"full": FULL_TEMPLATE,
}
return templates.get(template, BASIC_TEMPLATE)
@click.command("init")
@click.option(
"-t",
"--template",
"template",
type=click.Choice(list(TEMPLATES.keys())),
default="basic",
help="Configuration template to use",
)
@click.option(
"-o",
"--output",
"output_file",
default="config.yaml",
help="Output file path (default: config.yaml)",
)
@click.option(
"-f",
"--force",
is_flag=True,
help="Overwrite existing configuration",
)
@click.option(
"--list-templates",
is_flag=True,
help="List available templates",
)
@click.pass_context
def init_cmd(
ctx: click.Context,
template: str,
output_file: str,
force: bool,
list_templates: bool,
) -> None:
"""
Initialize a new pyserve project.
Creates a configuration file with sensible defaults and directory structure.
\b
Examples:
pyserve init # Basic configuration
pyserve init -t orchestration # Process orchestration setup
pyserve init -t asgi # ASGI mount setup
pyserve init -t full # All features
pyserve init -o production.yaml # Custom output file
"""
from ..output import console, print_info, print_success, print_warning
if list_templates:
console.print("\n[bold]Available Templates:[/bold]\n")
for name, info in TEMPLATES.items():
console.print(f" [cyan]{name:15}[/cyan] - {info['description']}")
console.print()
return
output_path = Path(output_file)
if output_path.exists() and not force:
print_warning(f"Configuration file '{output_file}' already exists.")
if not click.confirm("Do you want to overwrite it?"):
raise click.Abort()
dirs_to_create = ["static", "templates", "logs"]
if template == "orchestration":
dirs_to_create.append("apps")
for dir_name in dirs_to_create:
dir_path = Path(dir_name)
if not dir_path.exists():
dir_path.mkdir(parents=True)
print_info(f"Created directory: {dir_name}/")
state_dir = Path(".pyserve")
if not state_dir.exists():
state_dir.mkdir()
print_info("Created directory: .pyserve/")
content = get_template_content(template)
output_path.write_text(content)
print_success(f"Created configuration file: {output_file}")
print_info(f"Template: {template}")
gitignore_path = Path(".pyserve/.gitignore")
if not gitignore_path.exists():
gitignore_path.write_text("*\n!.gitignore\n")
console.print()
console.print("[bold]Next steps:[/bold]")
console.print(f" 1. Edit [cyan]{output_file}[/cyan] to configure your services")
console.print(" 2. Run [cyan]pyserve config validate[/cyan] to check configuration")
console.print(" 3. Run [cyan]pyserve up[/cyan] to start services")
console.print()

View File

@ -0,0 +1,280 @@
"""
pyserve logs - View service logs
"""
import asyncio
import time
from pathlib import Path
from typing import Any, Optional
import click
@click.command("logs")
@click.argument("services", nargs=-1)
@click.option(
"-f",
"--follow",
is_flag=True,
help="Follow log output",
)
@click.option(
"--tail",
"tail",
default=100,
type=int,
help="Number of lines to show from the end",
)
@click.option(
"--since",
"since",
default=None,
help="Show logs since timestamp (e.g., '10m', '1h', '2024-01-01')",
)
@click.option(
"--until",
"until_time",
default=None,
help="Show logs until timestamp",
)
@click.option(
"-t",
"--timestamps",
is_flag=True,
help="Show timestamps",
)
@click.option(
"--no-color",
is_flag=True,
help="Disable colored output",
)
@click.option(
"--filter",
"filter_pattern",
default=None,
help="Filter logs by pattern",
)
@click.pass_obj
def logs_cmd(
ctx: Any,
services: tuple[str, ...],
follow: bool,
tail: int,
since: Optional[str],
until_time: Optional[str],
timestamps: bool,
no_color: bool,
filter_pattern: Optional[str],
) -> None:
"""
View service logs.
If no services are specified, shows logs from all services.
\b
Examples:
pyserve logs # All logs
pyserve logs api # Logs from api service
pyserve logs api admin # Logs from multiple services
pyserve logs -f # Follow logs
pyserve logs --tail 50 # Last 50 lines
pyserve logs --since "10m" # Logs from last 10 minutes
"""
from ..output import print_info
from ..state import StateManager
state_manager = StateManager(Path(".pyserve"), ctx.project)
if services:
log_files = [(name, state_manager.get_service_log_file(name)) for name in services]
else:
all_services = state_manager.get_all_services()
if not all_services:
main_log = Path("logs/pyserve.log")
if main_log.exists():
log_files = [("pyserve", main_log)]
else:
print_info("No logs available. Start services with 'pyserve up'")
return
else:
log_files = [(name, state_manager.get_service_log_file(name)) for name in all_services]
existing_logs = [(name, path) for name, path in log_files if path.exists()]
if not existing_logs:
print_info("No log files found")
return
since_time = _parse_time(since) if since else None
until_timestamp = _parse_time(until_time) if until_time else None
colors = ["cyan", "green", "yellow", "blue", "magenta"]
service_colors = {name: colors[i % len(colors)] for i, (name, _) in enumerate(existing_logs)}
if follow:
asyncio.run(
_follow_logs(
existing_logs,
service_colors,
timestamps,
no_color,
filter_pattern,
)
)
else:
_read_logs(
existing_logs,
service_colors,
tail,
since_time,
until_timestamp,
timestamps,
no_color,
filter_pattern,
)
def _parse_time(time_str: str) -> Optional[float]:
import re
from datetime import datetime
# Relative time (e.g., "10m", "1h", "2d")
match = re.match(r"^(\d+)([smhd])$", time_str)
if match:
value = int(match.group(1))
unit = match.group(2)
units = {"s": 1, "m": 60, "h": 3600, "d": 86400}
return time.time() - (value * units[unit])
# Relative phrase (e.g., "10m ago")
match = re.match(r"^(\d+)([smhd])\s+ago$", time_str)
if match:
value = int(match.group(1))
unit = match.group(2)
units = {"s": 1, "m": 60, "h": 3600, "d": 86400}
return time.time() - (value * units[unit])
# ISO format
try:
dt = datetime.fromisoformat(time_str)
return dt.timestamp()
except ValueError:
pass
return None
def _read_logs(
log_files: list[tuple[str, Path]],
service_colors: dict[str, str],
tail: int,
since_time: Optional[float],
until_time: Optional[float],
timestamps: bool,
no_color: bool,
filter_pattern: Optional[str],
) -> None:
import re
from ..output import console
all_lines = []
for service_name, log_path in log_files:
try:
with open(log_path) as f:
lines = f.readlines()
# Take last N lines
lines = lines[-tail:] if tail else lines
for line in lines:
line = line.rstrip()
if not line:
continue
if filter_pattern and filter_pattern not in line:
continue
line_time = None
timestamp_match = re.match(r"^(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})", line)
if timestamp_match:
try:
from datetime import datetime
line_time = datetime.fromisoformat(timestamp_match.group(1).replace(" ", "T")).timestamp()
except ValueError:
pass
if since_time and line_time and line_time < since_time:
continue
if until_time and line_time and line_time > until_time:
continue
all_lines.append((line_time or 0, service_name, line))
except Exception as e:
console.print(f"[red]Error reading {log_path}: {e}[/red]")
all_lines.sort(key=lambda x: x[0])
for _, service_name, line in all_lines:
if len(log_files) > 1:
# Multiple services - prefix with service name
if no_color:
console.print(f"{service_name} | {line}")
else:
color = service_colors.get(service_name, "white")
console.print(f"[{color}]{service_name}[/{color}] | {line}")
else:
console.print(line)
async def _follow_logs(
log_files: list[tuple[str, Path]],
service_colors: dict[str, str],
timestamps: bool,
no_color: bool,
filter_pattern: Optional[str],
) -> None:
from ..output import console
positions = {}
for service_name, log_path in log_files:
if log_path.exists():
positions[service_name] = log_path.stat().st_size
else:
positions[service_name] = 0
console.print("[dim]Following logs... Press Ctrl+C to stop[/dim]\n")
try:
while True:
for service_name, log_path in log_files:
if not log_path.exists():
continue
current_size = log_path.stat().st_size
if current_size > positions[service_name]:
with open(log_path) as f:
f.seek(positions[service_name])
new_content = f.read()
positions[service_name] = f.tell()
for line in new_content.splitlines():
if filter_pattern and filter_pattern not in line:
continue
if len(log_files) > 1:
if no_color:
console.print(f"{service_name} | {line}")
else:
color = service_colors.get(service_name, "white")
console.print(f"[{color}]{service_name}[/{color}] | {line}")
else:
console.print(line)
await asyncio.sleep(0.5)
except KeyboardInterrupt:
console.print("\n[dim]Stopped following logs[/dim]")

View File

@ -0,0 +1,88 @@
"""
pyserve scale - Scale services
"""
import asyncio
from pathlib import Path
from typing import Any
import click
@click.command("scale")
@click.argument("scales", nargs=-1, required=True)
@click.option(
"--timeout",
"timeout",
default=60,
type=int,
help="Timeout in seconds for scaling operation",
)
@click.option(
"--no-wait",
is_flag=True,
help="Don't wait for services to be ready",
)
@click.pass_obj
def scale_cmd(ctx: Any, scales: tuple[str, ...], timeout: int, no_wait: bool) -> None:
"""
Scale services to specified number of workers.
Use SERVICE=NUM format to specify scaling.
\b
Examples:
pyserve scale api=4 # Scale api to 4 workers
pyserve scale api=4 admin=2 # Scale multiple services
"""
from ...config import Config
from .._runner import ServiceRunner
from ..output import console, print_error, print_info, print_success
from ..state import StateManager
scale_map = {}
for scale in scales:
try:
service, num = scale.split("=")
scale_map[service] = int(num)
except ValueError:
print_error(f"Invalid scale format: {scale}. Use SERVICE=NUM")
raise click.Abort()
config_path = Path(ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
config = Config.from_yaml(str(config_path))
state_manager = StateManager(Path(".pyserve"), ctx.project)
all_services = state_manager.get_all_services()
for service in scale_map:
if service not in all_services:
print_error(f"Service '{service}' not found")
raise click.Abort()
runner = ServiceRunner(config, state_manager)
console.print("[bold]Scaling services...[/bold]")
async def do_scale() -> None:
for service, workers in scale_map.items():
current = all_services[service].workers or 1
print_info(f"Scaling {service}: {current}{workers} workers")
try:
success = await runner.scale_service(service, workers, timeout=timeout, wait=not no_wait)
if success:
print_success(f"Scaled {service} to {workers} workers")
else:
print_error(f"Failed to scale {service}")
except Exception as e:
print_error(f"Error scaling {service}: {e}")
try:
asyncio.run(do_scale())
except Exception as e:
print_error(f"Scaling failed: {e}")
raise click.Abort()

View File

@ -0,0 +1,190 @@
"""
pyserve start/stop/restart - Service management commands
"""
import asyncio
from pathlib import Path
from typing import Any, Dict
import click
@click.command("start")
@click.argument("services", nargs=-1, required=True)
@click.option(
"--timeout",
"timeout",
default=60,
type=int,
help="Timeout in seconds for service startup",
)
@click.pass_obj
def start_cmd(ctx: Any, services: tuple[str, ...], timeout: int) -> None:
"""
Start one or more services.
\b
Examples:
pyserve start api # Start api service
pyserve start api admin # Start multiple services
"""
from ...config import Config
from .._runner import ServiceRunner
from ..output import console, print_error, print_success
from ..state import StateManager
config_path = Path(ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
config = Config.from_yaml(str(config_path))
state_manager = StateManager(Path(".pyserve"), ctx.project)
runner = ServiceRunner(config, state_manager)
console.print(f"[bold]Starting services: {', '.join(services)}[/bold]")
async def do_start() -> Dict[str, bool]:
results: Dict[str, bool] = {}
for service in services:
try:
success = await runner.start_service(service, timeout=timeout)
results[service] = success
if success:
print_success(f"Started {service}")
else:
print_error(f"Failed to start {service}")
except Exception as e:
print_error(f"Error starting {service}: {e}")
results[service] = False
return results
try:
results = asyncio.run(do_start())
if not all(results.values()):
raise click.Abort()
except Exception as e:
print_error(f"Error: {e}")
raise click.Abort()
@click.command("stop")
@click.argument("services", nargs=-1, required=True)
@click.option(
"--timeout",
"timeout",
default=30,
type=int,
help="Timeout in seconds for graceful shutdown",
)
@click.option(
"-f",
"--force",
is_flag=True,
help="Force stop (SIGKILL)",
)
@click.pass_obj
def stop_cmd(ctx: Any, services: tuple[str, ...], timeout: int, force: bool) -> None:
"""
Stop one or more services.
\b
Examples:
pyserve stop api # Stop api service
pyserve stop api admin # Stop multiple services
pyserve stop api --force # Force stop
"""
from ...config import Config
from .._runner import ServiceRunner
from ..output import console, print_error, print_success
from ..state import StateManager
config_path = Path(ctx.config_file)
config = Config.from_yaml(str(config_path)) if config_path.exists() else Config()
state_manager = StateManager(Path(".pyserve"), ctx.project)
runner = ServiceRunner(config, state_manager)
console.print(f"[bold]Stopping services: {', '.join(services)}[/bold]")
async def do_stop() -> Dict[str, bool]:
results: Dict[str, bool] = {}
for service in services:
try:
success = await runner.stop_service(service, timeout=timeout, force=force)
results[service] = success
if success:
print_success(f"Stopped {service}")
else:
print_error(f"Failed to stop {service}")
except Exception as e:
print_error(f"Error stopping {service}: {e}")
results[service] = False
return results
try:
results = asyncio.run(do_stop())
if not all(results.values()):
raise click.Abort()
except Exception as e:
print_error(f"Error: {e}")
raise click.Abort()
@click.command("restart")
@click.argument("services", nargs=-1, required=True)
@click.option(
"--timeout",
"timeout",
default=60,
type=int,
help="Timeout in seconds for restart",
)
@click.pass_obj
def restart_cmd(ctx: Any, services: tuple[str, ...], timeout: int) -> None:
"""
Restart one or more services.
\b
Examples:
pyserve restart api # Restart api service
pyserve restart api admin # Restart multiple services
"""
from ...config import Config
from .._runner import ServiceRunner
from ..output import console, print_error, print_success
from ..state import StateManager
config_path = Path(ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
raise click.Abort()
config = Config.from_yaml(str(config_path))
state_manager = StateManager(Path(".pyserve"), ctx.project)
runner = ServiceRunner(config, state_manager)
console.print(f"[bold]Restarting services: {', '.join(services)}[/bold]")
async def do_restart() -> Dict[str, bool]:
results = {}
for service in services:
try:
success = await runner.restart_service(service, timeout=timeout)
results[service] = success
if success:
print_success(f"Restarted {service}")
else:
print_error(f"Failed to restart {service}")
except Exception as e:
print_error(f"Error restarting {service}: {e}")
results[service] = False
return results
try:
results = asyncio.run(do_restart())
if not all(results.values()):
raise click.Abort()
except Exception as e:
print_error(f"Error: {e}")
raise click.Abort()

View File

@ -0,0 +1,147 @@
"""
pyserve ps / status - Show service status
"""
import json
from pathlib import Path
from typing import Any, Optional
import click
@click.command("ps")
@click.argument("services", nargs=-1)
@click.option(
"-a",
"--all",
"show_all",
is_flag=True,
help="Show all services (including stopped)",
)
@click.option(
"-q",
"--quiet",
is_flag=True,
help="Only show service names",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["table", "json", "yaml"]),
default="table",
help="Output format",
)
@click.option(
"--filter",
"filter_status",
default=None,
help="Filter by status (running, stopped, failed)",
)
@click.pass_obj
def ps_cmd(
ctx: Any,
services: tuple[str, ...],
show_all: bool,
quiet: bool,
output_format: str,
filter_status: Optional[str],
) -> None:
"""
Show status of services.
\b
Examples:
pyserve ps # Show running services
pyserve ps -a # Show all services
pyserve ps api admin # Show specific services
pyserve ps --format json # JSON output
pyserve ps --filter running # Filter by status
"""
from ..output import (
console,
create_services_table,
format_health,
format_status,
format_uptime,
print_info,
)
from ..state import StateManager
state_manager = StateManager(Path(".pyserve"), ctx.project)
all_services = state_manager.get_all_services()
# Check if daemon is running
daemon_running = state_manager.is_daemon_running()
# Filter services
if services:
all_services = {k: v for k, v in all_services.items() if k in services}
if filter_status:
all_services = {k: v for k, v in all_services.items() if v.state.lower() == filter_status.lower()}
if not show_all:
# By default, show only running/starting/failed services
all_services = {k: v for k, v in all_services.items() if v.state.lower() in ("running", "starting", "stopping", "failed", "restarting")}
if not all_services:
if daemon_running:
print_info("No services found. Daemon is running but no services are configured.")
else:
print_info("No services running. Use 'pyserve up' to start services.")
return
if quiet:
for name in all_services:
click.echo(name)
return
if output_format == "json":
data = {name: svc.to_dict() for name, svc in all_services.items()}
console.print(json.dumps(data, indent=2))
return
if output_format == "yaml":
import yaml
data = {name: svc.to_dict() for name, svc in all_services.items()}
console.print(yaml.dump(data, default_flow_style=False))
return
table = create_services_table()
for name, service in sorted(all_services.items()):
ports = f"{service.port}" if service.port else "-"
uptime = format_uptime(service.uptime) if service.state == "running" else "-"
health = format_health(service.health.status if service.state == "running" else "-")
pid = str(service.pid) if service.pid else "-"
workers = f"{service.workers}" if service.workers else "-"
table.add_row(
name,
format_status(service.state),
ports,
uptime,
health,
pid,
workers,
)
console.print()
console.print(table)
console.print()
total = len(all_services)
running = sum(1 for s in all_services.values() if s.state == "running")
failed = sum(1 for s in all_services.values() if s.state == "failed")
summary_parts = [f"[bold]{total}[/bold] service(s)"]
if running:
summary_parts.append(f"[green]{running} running[/green]")
if failed:
summary_parts.append(f"[red]{failed} failed[/red]")
if total - running - failed > 0:
summary_parts.append(f"[dim]{total - running - failed} stopped[/dim]")
console.print(" | ".join(summary_parts))
console.print()

183
pyserve/ctl/commands/top.py Normal file
View File

@ -0,0 +1,183 @@
"""
pyserve top - Live monitoring dashboard
"""
import asyncio
import time
from pathlib import Path
from typing import Any, Optional
import click
@click.command("top")
@click.argument("services", nargs=-1)
@click.option(
"--refresh",
"refresh_interval",
default=2,
type=float,
help="Refresh interval in seconds",
)
@click.option(
"--no-color",
is_flag=True,
help="Disable colored output",
)
@click.pass_obj
def top_cmd(ctx: Any, services: tuple[str, ...], refresh_interval: float, no_color: bool) -> None:
"""
Live monitoring dashboard for services.
Shows real-time CPU, memory usage, and request metrics.
\b
Examples:
pyserve top # Monitor all services
pyserve top api admin # Monitor specific services
pyserve top --refresh 5 # Slower refresh rate
"""
from ..output import console, print_info
from ..state import StateManager
state_manager = StateManager(Path(".pyserve"), ctx.project)
if not state_manager.is_daemon_running():
print_info("No services running. Start with 'pyserve up -d'")
return
try:
asyncio.run(
_run_dashboard(
state_manager,
list(services) if services else None,
refresh_interval,
no_color,
)
)
except KeyboardInterrupt:
console.print("\n")
async def _run_dashboard(
state_manager: Any,
filter_services: Optional[list[str]],
refresh_interval: float,
no_color: bool,
) -> None:
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from ..output import console, format_bytes, format_uptime
try:
import psutil
except ImportError:
console.print("[yellow]psutil not installed. Install with: pip install psutil[/yellow]")
return
start_time = time.time()
def make_dashboard() -> Any:
all_services = state_manager.get_all_services()
if filter_services:
all_services = {k: v for k, v in all_services.items() if k in filter_services}
table = Table(
title=None,
show_header=True,
header_style="bold",
border_style="dim",
expand=True,
)
table.add_column("SERVICE", style="cyan", no_wrap=True)
table.add_column("STATUS", no_wrap=True)
table.add_column("CPU%", justify="right")
table.add_column("MEM", justify="right")
table.add_column("PID", style="dim")
table.add_column("UPTIME", style="dim")
table.add_column("HEALTH", no_wrap=True)
total_cpu = 0.0
total_mem = 0
running_count = 0
total_count = len(all_services)
for name, service in sorted(all_services.items()):
status_style = {
"running": "[green]● RUN[/green]",
"stopped": "[dim]○ STOP[/dim]",
"failed": "[red]✗ FAIL[/red]",
"starting": "[yellow]◐ START[/yellow]",
"stopping": "[yellow]◑ STOP[/yellow]",
}.get(service.state, service.state)
cpu_str = "-"
mem_str = "-"
if service.pid and service.state == "running":
try:
proc = psutil.Process(service.pid)
cpu = proc.cpu_percent(interval=0.1)
mem = proc.memory_info().rss
cpu_str = f"{cpu:.1f}%"
mem_str = format_bytes(mem)
total_cpu += cpu
total_mem += mem
running_count += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
health_style = {
"healthy": "[green]✓[/green]",
"unhealthy": "[red]✗[/red]",
"degraded": "[yellow]⚠[/yellow]",
"unknown": "[dim]?[/dim]",
}.get(service.health.status, "[dim]-[/dim]")
uptime = format_uptime(service.uptime) if service.state == "running" else "-"
pid = str(service.pid) if service.pid else "-"
table.add_row(
name,
status_style,
cpu_str,
mem_str,
pid,
uptime,
health_style,
)
elapsed = format_uptime(time.time() - start_time)
summary = Text()
summary.append(f"Running: {running_count}/{total_count}", style="bold")
summary.append(" | ")
summary.append(f"CPU: {total_cpu:.1f}%", style="cyan")
summary.append(" | ")
summary.append(f"MEM: {format_bytes(total_mem)}", style="cyan")
summary.append(" | ")
summary.append(f"Session: {elapsed}", style="dim")
layout = Layout()
layout.split_column(
Layout(
Panel(
Text("PyServe Dashboard", style="bold cyan", justify="center"),
border_style="cyan",
),
size=3,
),
Layout(table),
Layout(Panel(summary, border_style="dim"), size=3),
)
return layout
with Live(make_dashboard(), refresh_per_second=1 / refresh_interval, console=console) as live:
while True:
await asyncio.sleep(refresh_interval)
live.update(make_dashboard())

175
pyserve/ctl/commands/up.py Normal file
View File

@ -0,0 +1,175 @@
"""
pyserve up - Start all services
"""
import asyncio
import signal
import sys
import time
from pathlib import Path
from typing import Any
import click
@click.command("up")
@click.argument("services", nargs=-1)
@click.option(
"-d",
"--detach",
is_flag=True,
help="Run in background (detached mode)",
)
@click.option(
"--build",
is_flag=True,
help="Build/reload applications before starting",
)
@click.option(
"--force-recreate",
is_flag=True,
help="Recreate services even if configuration hasn't changed",
)
@click.option(
"--scale",
"scales",
multiple=True,
help="Scale SERVICE to NUM workers (e.g., --scale api=4)",
)
@click.option(
"--timeout",
"timeout",
default=60,
type=int,
help="Timeout in seconds for service startup",
)
@click.option(
"--wait",
is_flag=True,
help="Wait for services to be healthy before returning",
)
@click.option(
"--remove-orphans",
is_flag=True,
help="Remove services not defined in configuration",
)
@click.pass_obj
def up_cmd(
ctx: Any,
services: tuple[str, ...],
detach: bool,
build: bool,
force_recreate: bool,
scales: tuple[str, ...],
timeout: int,
wait: bool,
remove_orphans: bool,
) -> None:
"""
Start services defined in configuration.
If no services are specified, all services will be started.
\b
Examples:
pyserve up # Start all services
pyserve up -d # Start in background
pyserve up api admin # Start specific services
pyserve up --scale api=4 # Scale api to 4 workers
pyserve up --wait # Wait for healthy status
"""
from .._runner import ServiceRunner
from ..output import console, print_error, print_info, print_success, print_warning
from ..state import StateManager
config_path = Path(ctx.config_file)
if not config_path.exists():
print_error(f"Configuration file not found: {config_path}")
print_info("Run 'pyserve init' to create a configuration file")
raise click.Abort()
scale_map = {}
for scale in scales:
try:
service, num = scale.split("=")
scale_map[service] = int(num)
except ValueError:
print_error(f"Invalid scale format: {scale}. Use SERVICE=NUM")
raise click.Abort()
try:
from ...config import Config
config = Config.from_yaml(str(config_path))
except Exception as e:
print_error(f"Failed to load configuration: {e}")
raise click.Abort()
state_manager = StateManager(Path(".pyserve"), ctx.project)
if state_manager.is_daemon_running():
daemon_pid = state_manager.get_daemon_pid()
print_warning(f"PyServe daemon is already running (PID: {daemon_pid})")
if not click.confirm("Do you want to restart it?"):
raise click.Abort()
try:
import os
from typing import cast
# FIXME: Please fix the cast usage here
os.kill(cast(int, daemon_pid), signal.SIGTERM)
time.sleep(2)
except ProcessLookupError:
pass
state_manager.clear_daemon_pid()
runner = ServiceRunner(config, state_manager)
service_list = list(services) if services else None
if detach:
console.print("[bold]Starting PyServe in background...[/bold]")
try:
pid = runner.start_daemon(
service_list,
scale_map=scale_map,
force_recreate=force_recreate,
)
state_manager.set_daemon_pid(pid)
print_success(f"PyServe started in background (PID: {pid})")
print_info("Use 'pyserve ps' to see service status")
print_info("Use 'pyserve logs -f' to follow logs")
print_info("Use 'pyserve down' to stop")
except Exception as e:
print_error(f"Failed to start daemon: {e}")
raise click.Abort()
else:
console.print("[bold]Starting PyServe...[/bold]")
def signal_handler(signum: int, frame: Any) -> None:
console.print("\n[yellow]Received shutdown signal...[/yellow]")
runner.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
asyncio.run(
runner.start(
service_list,
scale_map=scale_map,
force_recreate=force_recreate,
wait_healthy=wait,
timeout=timeout,
)
)
except KeyboardInterrupt:
console.print("\n[yellow]Shutting down...[/yellow]")
except Exception as e:
print_error(f"Failed to start services: {e}")
if ctx.debug:
raise
raise click.Abort()

168
pyserve/ctl/main.py Normal file
View File

@ -0,0 +1,168 @@
"""
PyServeCTL - Main entry point
Usage:
pyservectl [OPTIONS] COMMAND [ARGS]...
"""
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Optional
import click
from .. import __version__
from .commands import (
config_cmd,
down_cmd,
health_cmd,
init_cmd,
logs_cmd,
ps_cmd,
restart_cmd,
scale_cmd,
start_cmd,
stop_cmd,
top_cmd,
up_cmd,
)
if TYPE_CHECKING:
from ..config import Config
from .state import StateManager
DEFAULT_CONFIG = "config.yaml"
DEFAULT_STATE_DIR = ".pyserve"
class Context:
def __init__(self) -> None:
self.config_file: str = DEFAULT_CONFIG
self.state_dir: Path = Path(DEFAULT_STATE_DIR)
self.verbose: bool = False
self.debug: bool = False
self.project: Optional[str] = None
self._config: Optional["Config"] = None
self._state: Optional["StateManager"] = None
@property
def config(self) -> "Config":
if self._config is None:
from ..config import Config
if Path(self.config_file).exists():
self._config = Config.from_yaml(self.config_file)
else:
self._config = Config()
return self._config
@property
def state(self) -> "StateManager":
if self._state is None:
from .state import StateManager
self._state = StateManager(self.state_dir, self.project)
return self._state
pass_context = click.make_pass_decorator(Context, ensure=True)
@click.group(invoke_without_command=True)
@click.option(
"-c",
"--config",
"config_file",
default=DEFAULT_CONFIG,
envvar="PYSERVE_CONFIG",
help=f"Path to configuration file (default: {DEFAULT_CONFIG})",
type=click.Path(),
)
@click.option(
"-p",
"--project",
"project",
default=None,
envvar="PYSERVE_PROJECT",
help="Project name for isolation",
)
@click.option(
"-v",
"--verbose",
is_flag=True,
help="Enable verbose output",
)
@click.option(
"--debug",
is_flag=True,
help="Enable debug mode",
)
@click.version_option(version=__version__, prog_name="pyservectl")
@click.pass_context
def cli(ctx: click.Context, config_file: str, project: Optional[str], verbose: bool, debug: bool) -> None:
"""
PyServeCTL - Service management CLI for PyServe.
Docker-compose-like tool for managing PyServe services.
\b
Quick Start:
pyservectl init # Initialize a new project
pyservectl up # Start all services
pyservectl ps # Show service status
pyservectl logs -f # Follow logs
pyservectl down # Stop all services
\b
Examples:
pyservectl up -d # Start in background
pyservectl up -c prod.yaml # Use custom config
pyservectl logs api -f --tail 100 # Follow api logs
pyservectl restart api admin # Restart specific services
pyservectl scale api=4 # Scale api to 4 workers
"""
ctx.ensure_object(Context)
ctx.obj.config_file = config_file
ctx.obj.verbose = verbose
ctx.obj.debug = debug
ctx.obj.project = project
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
cli.add_command(init_cmd, name="init")
cli.add_command(config_cmd, name="config")
cli.add_command(up_cmd, name="up")
cli.add_command(down_cmd, name="down")
cli.add_command(start_cmd, name="start")
cli.add_command(stop_cmd, name="stop")
cli.add_command(restart_cmd, name="restart")
cli.add_command(ps_cmd, name="ps")
cli.add_command(logs_cmd, name="logs")
cli.add_command(top_cmd, name="top")
cli.add_command(health_cmd, name="health")
cli.add_command(scale_cmd, name="scale")
# Alias 'status' -> 'ps'
cli.add_command(ps_cmd, name="status")
def main() -> None:
try:
cli(standalone_mode=False)
except click.ClickException as e:
e.show()
sys.exit(e.exit_code)
except KeyboardInterrupt:
click.echo("\nInterrupted by user")
sys.exit(130)
except Exception as e:
if "--debug" in sys.argv:
raise
click.secho(f"Error: {e}", fg="red", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,110 @@
"""
PyServe CLI Output utilities
Rich-based formatters and helpers for CLI output.
"""
from rich.console import Console
from rich.table import Table
from rich.theme import Theme
pyserve_theme = Theme(
{
"info": "cyan",
"warning": "yellow",
"error": "red bold",
"success": "green",
"service.running": "green",
"service.stopped": "dim",
"service.failed": "red",
"service.starting": "yellow",
}
)
console = Console(theme=pyserve_theme)
def print_error(message: str) -> None:
console.print(f"[error] {message}[/error]")
def print_warning(message: str) -> None:
console.print(f"[warning] {message}[/warning]")
def print_success(message: str) -> None:
console.print(f"[success] {message}[/success]")
def print_info(message: str) -> None:
console.print(f"[info] {message}[/info]")
def create_services_table() -> Table:
table = Table(
title=None,
show_header=True,
header_style="bold",
border_style="dim",
)
table.add_column("NAME", style="cyan", no_wrap=True)
table.add_column("STATUS", no_wrap=True)
table.add_column("PORTS", style="dim")
table.add_column("UPTIME", style="dim")
table.add_column("HEALTH", no_wrap=True)
table.add_column("PID", style="dim")
table.add_column("WORKERS", style="dim")
return table
def format_status(status: str) -> str:
status_styles = {
"running": "[service.running]● running[/service.running]",
"stopped": "[service.stopped]○ stopped[/service.stopped]",
"failed": "[service.failed]✗ failed[/service.failed]",
"starting": "[service.starting]◐ starting[/service.starting]",
"stopping": "[service.starting]◑ stopping[/service.starting]",
"restarting": "[service.starting]↻ restarting[/service.starting]",
"pending": "[service.stopped]○ pending[/service.stopped]",
}
return status_styles.get(status.lower(), status)
def format_health(health: str) -> str:
health_styles = {
"healthy": "[green] healthy[/green]",
"unhealthy": "[red] unhealthy[/red]",
"degraded": "[yellow] degraded[/yellow]",
"unknown": "[dim] unknown[/dim]",
"-": "[dim]-[/dim]",
}
return health_styles.get(health.lower(), health)
def format_uptime(seconds: float) -> str:
if seconds <= 0:
return "-"
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
minutes = int(seconds / 60)
secs = int(seconds % 60)
return f"{minutes}m {secs}s"
elif seconds < 86400:
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
return f"{hours}h {minutes}m"
else:
days = int(seconds / 86400)
hours = int((seconds % 86400) / 3600)
return f"{days}d {hours}h"
def format_bytes(num_bytes: int) -> str:
value = float(num_bytes)
for unit in ["B", "KB", "MB", "GB", "TB"]:
if abs(value) < 1024.0:
return f"{value:.1f}{unit}"
value /= 1024.0
return f"{value:.1f}PB"

View File

@ -0,0 +1,232 @@
"""
PyServe CLI State Management
Manages the state of running services.
"""
import json
import os
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional
@dataclass
class ServiceHealth:
status: str = "unknown" # healthy, unhealthy, degraded, unknown
last_check: Optional[float] = None
failures: int = 0
response_time_ms: Optional[float] = None
@dataclass
class ServiceState:
name: str
state: str = "stopped" # pending, starting, running, stopping, stopped, failed, restarting
pid: Optional[int] = None
port: int = 0
workers: int = 0
started_at: Optional[float] = None
restart_count: int = 0
health: ServiceHealth = field(default_factory=ServiceHealth)
config_hash: str = ""
@property
def uptime(self) -> float:
if self.started_at is None:
return 0.0
return time.time() - self.started_at
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"state": self.state,
"pid": self.pid,
"port": self.port,
"workers": self.workers,
"started_at": self.started_at,
"restart_count": self.restart_count,
"health": asdict(self.health),
"config_hash": self.config_hash,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ServiceState":
health_data = data.pop("health", {})
health = ServiceHealth(**health_data) if health_data else ServiceHealth()
return cls(**data, health=health)
@dataclass
class ProjectState:
version: str = "1.0"
project: str = ""
config_file: str = ""
config_hash: str = ""
started_at: Optional[float] = None
daemon_pid: Optional[int] = None
services: Dict[str, ServiceState] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"version": self.version,
"project": self.project,
"config_file": self.config_file,
"config_hash": self.config_hash,
"started_at": self.started_at,
"daemon_pid": self.daemon_pid,
"services": {name: svc.to_dict() for name, svc in self.services.items()},
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ProjectState":
services_data = data.pop("services", {})
services = {name: ServiceState.from_dict(svc) for name, svc in services_data.items()}
return cls(**data, services=services)
class StateManager:
STATE_FILE = "state.json"
PID_FILE = "pyserve.pid"
SOCKET_FILE = "pyserve.sock"
LOGS_DIR = "logs"
def __init__(self, state_dir: Path, project: Optional[str] = None):
self.state_dir = Path(state_dir)
self.project = project or self._detect_project()
self._state: Optional[ProjectState] = None
def _detect_project(self) -> str:
return Path.cwd().name
@property
def state_file(self) -> Path:
return self.state_dir / self.STATE_FILE
@property
def pid_file(self) -> Path:
return self.state_dir / self.PID_FILE
@property
def socket_file(self) -> Path:
return self.state_dir / self.SOCKET_FILE
@property
def logs_dir(self) -> Path:
return self.state_dir / self.LOGS_DIR
def ensure_dirs(self) -> None:
self.state_dir.mkdir(parents=True, exist_ok=True)
self.logs_dir.mkdir(parents=True, exist_ok=True)
def load(self) -> ProjectState:
if self._state is not None:
return self._state
if self.state_file.exists():
try:
with open(self.state_file) as f:
data = json.load(f)
self._state = ProjectState.from_dict(data)
except (json.JSONDecodeError, KeyError):
self._state = ProjectState(project=self.project)
else:
self._state = ProjectState(project=self.project)
return self._state
def save(self) -> None:
if self._state is None:
return
self.ensure_dirs()
with open(self.state_file, "w") as f:
json.dump(self._state.to_dict(), f, indent=2)
def get_state(self) -> ProjectState:
return self.load()
def update_service(self, name: str, **kwargs: Any) -> ServiceState:
state = self.load()
if name not in state.services:
state.services[name] = ServiceState(name=name)
service = state.services[name]
for key, value in kwargs.items():
if hasattr(service, key):
setattr(service, key, value)
self.save()
return service
def remove_service(self, name: str) -> None:
state = self.load()
if name in state.services:
del state.services[name]
self.save()
def get_service(self, name: str) -> Optional[ServiceState]:
state = self.load()
return state.services.get(name)
def get_all_services(self) -> Dict[str, ServiceState]:
state = self.load()
return state.services.copy()
def clear(self) -> None:
self._state = ProjectState(project=self.project)
self.save()
def is_daemon_running(self) -> bool:
if not self.pid_file.exists():
return False
try:
pid = int(self.pid_file.read_text().strip())
# Check if process exists
os.kill(pid, 0)
return True
except (ValueError, ProcessLookupError, PermissionError):
return False
def get_daemon_pid(self) -> Optional[int]:
if not self.is_daemon_running():
return None
try:
return int(self.pid_file.read_text().strip())
except ValueError:
return None
def set_daemon_pid(self, pid: int) -> None:
self.ensure_dirs()
self.pid_file.write_text(str(pid))
state = self.load()
state.daemon_pid = pid
self.save()
def clear_daemon_pid(self) -> None:
if self.pid_file.exists():
self.pid_file.unlink()
state = self.load()
state.daemon_pid = None
self.save()
def get_service_log_file(self, service_name: str) -> Path:
self.ensure_dirs()
return self.logs_dir / f"{service_name}.log"
def compute_config_hash(self, config_file: str) -> str:
import hashlib
path = Path(config_file)
if not path.exists():
return ""
content = path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]

View File

@ -1,3 +1,4 @@
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type
@ -216,6 +217,15 @@ class ExtensionManager:
"monitoring": MonitoringExtension,
"asgi": ASGIExtension,
}
self._register_process_orchestration()
def _register_process_orchestration(self) -> None:
try:
from .process_extension import ProcessOrchestrationExtension
self.extension_registry["process_orchestration"] = ProcessOrchestrationExtension
except ImportError:
pass # Optional dependency
def register_extension_type(self, name: str, extension_class: Type[Extension]) -> None:
self.extension_registry[name] = extension_class
@ -234,6 +244,32 @@ class ExtensionManager:
except Exception as e:
logger.error(f"Error loading extension {extension_type}: {e}")
async def load_extension_async(self, extension_type: str, config: Dict[str, Any]) -> None:
"""Load extension with async setup support (for ProcessOrchestration)."""
if extension_type not in self.extension_registry:
logger.error(f"Unknown extension type: {extension_type}")
return
try:
extension_class = self.extension_registry[extension_type]
extension = extension_class(config)
setup_method = getattr(extension, "setup", None)
if setup_method is not None and asyncio.iscoroutinefunction(setup_method):
await setup_method(config)
else:
extension.initialize()
start_method = getattr(extension, "start", None)
if start_method is not None and asyncio.iscoroutinefunction(start_method):
await start_method()
# Insert at the beginning so process_orchestration is checked first
self.extensions.insert(0, extension)
logger.info(f"Loaded extension (async): {extension_type}")
except Exception as e:
logger.error(f"Error loading extension {extension_type}: {e}")
async def process_request(self, request: Request) -> Optional[Response]:
for extension in self.extensions:
if not extension.enabled:

View File

@ -0,0 +1,365 @@
"""Process Orchestration Extension
Extension that manages ASGI/WSGI applications as isolated processes
and routes requests to them via reverse proxy.
"""
import asyncio
import logging
import time
import uuid
from typing import Any, Dict, Optional
import httpx
from starlette.requests import Request
from starlette.responses import Response
from .extensions import Extension
from .logging_utils import get_logger
from .process_manager import ProcessConfig, ProcessManager
logger = get_logger(__name__)
class ProcessOrchestrationExtension(Extension):
"""
Extension that orchestrates ASGI/WSGI applications as separate processes.
Unlike ASGIExtension which runs apps in-process, this extension:
- Runs each app in its own isolated process
- Provides health monitoring and auto-restart
- Routes requests via HTTP reverse proxy
- Supports multiple workers per app
Configuration example:
```yaml
extensions:
- type: process_orchestration
config:
port_range: [9000, 9999]
health_check_enabled: true
apps:
- name: api
path: /api
app_path: myapp.api:app
workers: 4
health_check_path: /health
- name: admin
path: /admin
app_path: myapp.admin:create_app
factory: true
workers: 2
```
"""
name = "process_orchestration"
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
self._manager: Optional[ProcessManager] = None
self._mounts: Dict[str, MountConfig] = {} # path -> config
self._http_client: Optional[httpx.AsyncClient] = None
self._started = False
self._proxy_timeout: float = config.get("proxy_timeout", 60.0)
self._pending_config = config # Store for async setup
logging_config = config.get("logging", {})
self._log_proxy_requests: bool = logging_config.get("proxy_logs", True)
self._log_health_checks: bool = logging_config.get("health_check_logs", False)
httpx_level = logging_config.get("httpx_level", "warning").upper()
logging.getLogger("httpx").setLevel(getattr(logging, httpx_level, logging.WARNING))
logging.getLogger("httpcore").setLevel(getattr(logging, httpx_level, logging.WARNING))
async def setup(self, config: Optional[Dict[str, Any]] = None) -> None:
if config is None:
config = self._pending_config
port_range = tuple(config.get("port_range", [9000, 9999]))
health_check_enabled = config.get("health_check_enabled", True)
self._proxy_timeout = config.get("proxy_timeout", 60.0)
self._manager = ProcessManager(
port_range=port_range,
health_check_enabled=health_check_enabled,
)
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(self._proxy_timeout),
follow_redirects=False,
limits=httpx.Limits(
max_keepalive_connections=100,
max_connections=200,
),
)
apps_config = config.get("apps", [])
for app_config in apps_config:
await self._register_app(app_config)
logger.info(
"Process orchestration extension initialized",
app_count=len(self._mounts),
)
async def _register_app(self, app_config: Dict[str, Any]) -> None:
if not self._manager:
return
name = app_config.get("name")
path = app_config.get("path", "").rstrip("/")
app_path = app_config.get("app_path")
if not name or not app_path:
logger.error("App config missing 'name' or 'app_path'")
return
process_config = ProcessConfig(
name=name,
app_path=app_path,
app_type=app_config.get("app_type", "asgi"),
workers=app_config.get("workers", 1),
module_path=app_config.get("module_path"),
factory=app_config.get("factory", False),
factory_args=app_config.get("factory_args"),
env=app_config.get("env", {}),
health_check_enabled=app_config.get("health_check_enabled", True),
health_check_path=app_config.get("health_check_path", "/health"),
health_check_interval=app_config.get("health_check_interval", 10.0),
health_check_timeout=app_config.get("health_check_timeout", 5.0),
health_check_retries=app_config.get("health_check_retries", 3),
max_memory_mb=app_config.get("max_memory_mb"),
max_restart_count=app_config.get("max_restart_count", 5),
restart_delay=app_config.get("restart_delay", 1.0),
shutdown_timeout=app_config.get("shutdown_timeout", 30.0),
)
await self._manager.register(process_config)
self._mounts[path] = MountConfig(
path=path,
process_name=name,
strip_path=app_config.get("strip_path", True),
)
logger.info(f"Registered app '{name}' at path '{path}'")
async def start(self) -> None:
if self._started or not self._manager:
return
await self._manager.start()
results = await self._manager.start_all()
self._started = True
success = sum(1 for v in results.values() if v)
failed = len(results) - success
logger.info(
"Process orchestration started",
success=success,
failed=failed,
)
async def stop(self) -> None:
if not self._started:
return
if self._http_client:
await self._http_client.aclose()
self._http_client = None
if self._manager:
await self._manager.stop()
self._started = False
logger.info("Process orchestration stopped")
def cleanup(self) -> None:
try:
loop = asyncio.get_running_loop()
loop.create_task(self.stop())
except RuntimeError:
asyncio.run(self.stop())
async def process_request(self, request: Request) -> Optional[Response]:
if not self._started or not self._manager:
logger.debug(
"Process orchestration not ready",
started=self._started,
has_manager=self._manager is not None,
)
return None
mount = self._get_mount(request.url.path)
if not mount:
logger.debug(
"No mount found for path",
path=request.url.path,
available_mounts=list(self._mounts.keys()),
)
return None
upstream_url = self._manager.get_upstream_url(mount.process_name)
if not upstream_url:
logger.warning(
f"Process '{mount.process_name}' not running",
path=request.url.path,
)
return Response("Service Unavailable", status_code=503)
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
start_time = time.perf_counter()
response = await self._proxy_request(request, upstream_url, mount, request_id)
latency_ms = (time.perf_counter() - start_time) * 1000
if self._log_proxy_requests:
logger.info(
"Proxy request completed",
request_id=request_id,
method=request.method,
path=request.url.path,
process=mount.process_name,
upstream=upstream_url,
status=response.status_code,
latency_ms=round(latency_ms, 2),
)
return response
def _get_mount(self, path: str) -> Optional["MountConfig"]:
for mount_path in sorted(self._mounts.keys(), key=len, reverse=True):
if mount_path == "":
return self._mounts[mount_path]
if path == mount_path or path.startswith(f"{mount_path}/"):
return self._mounts[mount_path]
return None
async def _proxy_request(
self,
request: Request,
upstream_url: str,
mount: "MountConfig",
request_id: str = "",
) -> Response:
path = request.url.path
if mount.strip_path and mount.path:
path = path[len(mount.path) :] or "/"
target_url = f"{upstream_url}{path}"
if request.url.query:
target_url += f"?{request.url.query}"
headers = dict(request.headers)
headers.pop("host", None)
headers["X-Forwarded-For"] = request.client.host if request.client else "unknown"
headers["X-Forwarded-Proto"] = request.url.scheme
headers["X-Forwarded-Host"] = request.headers.get("host", "")
if request_id:
headers["X-Request-ID"] = request_id
try:
if not self._http_client:
return Response("Service Unavailable", status_code=503)
body = await request.body()
response = await self._http_client.request(
method=request.method,
url=target_url,
headers=headers,
content=body,
)
response_headers = dict(response.headers)
for header in ["transfer-encoding", "connection", "keep-alive"]:
response_headers.pop(header, None)
return Response(
content=response.content,
status_code=response.status_code,
headers=response_headers,
)
except httpx.TimeoutException:
logger.error(f"Proxy timeout to {upstream_url}")
return Response("Gateway Timeout", status_code=504)
except httpx.ConnectError as e:
logger.error(f"Proxy connection error to {upstream_url}: {e}")
return Response("Bad Gateway", status_code=502)
except Exception as e:
logger.error(f"Proxy error to {upstream_url}: {e}")
return Response("Internal Server Error", status_code=500)
async def process_response(
self,
request: Request,
response: Response,
) -> Response:
return response
def get_metrics(self) -> Dict[str, Any]:
metrics = {
"process_orchestration": {
"enabled": self._started,
"mounts": len(self._mounts),
}
}
if self._manager:
metrics["process_orchestration"].update(self._manager.get_metrics())
return metrics
async def get_process_status(self, name: str) -> Optional[Dict[str, Any]]:
if not self._manager:
return None
info = self._manager.get_process(name)
return info.to_dict() if info else None
async def get_all_status(self) -> Dict[str, Any]:
if not self._manager:
return {}
return {name: info.to_dict() for name, info in self._manager.get_all_processes().items()}
async def restart_process(self, name: str) -> bool:
if not self._manager:
return False
return await self._manager.restart_process(name)
async def scale_process(self, name: str, workers: int) -> bool:
if not self._manager:
return False
info = self._manager.get_process(name)
if not info:
return False
info.config.workers = workers
return await self._manager.restart_process(name)
class MountConfig:
def __init__(
self,
path: str,
process_name: str,
strip_path: bool = True,
):
self.path = path
self.process_name = process_name
self.strip_path = strip_path
async def setup_process_orchestration(config: Dict[str, Any]) -> ProcessOrchestrationExtension:
ext = ProcessOrchestrationExtension(config)
await ext.setup(config)
await ext.start()
return ext
async def shutdown_process_orchestration(ext: ProcessOrchestrationExtension) -> None:
await ext.stop()

553
pyserve/process_manager.py Normal file
View File

@ -0,0 +1,553 @@
"""Process Manager Module
Orchestrates ASGI/WSGI applications as separate processes
"""
import asyncio
import logging
import os
import signal
import socket
import subprocess
import sys
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from .logging_utils import get_logger
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger = get_logger(__name__)
class ProcessState(Enum):
PENDING = "pending"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
FAILED = "failed"
RESTARTING = "restarting"
@dataclass
class ProcessConfig:
name: str
app_path: str
app_type: str = "asgi" # asgi, wsgi
host: str = "127.0.0.1"
port: int = 0 # 0 = auto-assign
workers: int = 1
module_path: Optional[str] = None
factory: bool = False
factory_args: Optional[Dict[str, Any]] = None
env: Dict[str, str] = field(default_factory=dict)
health_check_enabled: bool = True
health_check_path: str = "/health"
health_check_interval: float = 10.0
health_check_timeout: float = 5.0
health_check_retries: int = 3
max_memory_mb: Optional[int] = None
max_restart_count: int = 5
restart_delay: float = 1.0 # seconds
shutdown_timeout: float = 30.0 # seconds
@dataclass
class ProcessInfo:
config: ProcessConfig
state: ProcessState = ProcessState.PENDING
pid: Optional[int] = None
port: int = 0
start_time: Optional[float] = None
restart_count: int = 0
last_health_check: Optional[float] = None
health_check_failures: int = 0
process: Optional[subprocess.Popen] = None
@property
def uptime(self) -> float:
if self.start_time is None:
return 0.0
return time.time() - self.start_time
@property
def is_running(self) -> bool:
return self.state == ProcessState.RUNNING and self.process is not None
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.config.name,
"state": self.state.value,
"pid": self.pid,
"port": self.port,
"uptime": round(self.uptime, 2),
"restart_count": self.restart_count,
"health_check_failures": self.health_check_failures,
"workers": self.config.workers,
}
class PortAllocator:
def __init__(self, start_port: int = 9000, end_port: int = 9999):
self.start_port = start_port
self.end_port = end_port
self._allocated: set[int] = set()
self._lock = asyncio.Lock()
async def allocate(self) -> int:
async with self._lock:
for port in range(self.start_port, self.end_port + 1):
if port in self._allocated:
continue
if self._is_port_available(port):
self._allocated.add(port)
return port
raise RuntimeError(f"No available ports in range {self.start_port}-{self.end_port}")
async def release(self, port: int) -> None:
async with self._lock:
self._allocated.discard(port)
def _is_port_available(self, port: int) -> bool:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", port))
return True
except OSError:
return False
class ProcessManager:
def __init__(
self,
port_range: tuple[int, int] = (9000, 9999),
health_check_enabled: bool = True,
):
self._processes: Dict[str, ProcessInfo] = {}
self._port_allocator = PortAllocator(*port_range)
self._health_check_enabled = health_check_enabled
self._health_check_task: Optional[asyncio.Task] = None
self._shutdown_event = asyncio.Event()
self._started = False
self._lock = asyncio.Lock()
async def start(self) -> None:
if self._started:
return
self._started = True
self._shutdown_event.clear()
if self._health_check_enabled:
self._health_check_task = asyncio.create_task(self._health_check_loop(), name="process_manager_health_check")
logger.info("Process manager started")
async def stop(self) -> None:
if not self._started:
return
logger.info("Stopping process manager...")
self._shutdown_event.set()
if self._health_check_task:
self._health_check_task.cancel()
try:
await self._health_check_task
except asyncio.CancelledError:
pass
await self.stop_all()
self._started = False
logger.info("Process manager stopped")
async def register(self, config: ProcessConfig) -> ProcessInfo:
async with self._lock:
if config.name in self._processes:
raise ValueError(f"Process '{config.name}' already registered")
info = ProcessInfo(config=config)
self._processes[config.name] = info
logger.info(f"Registered process '{config.name}'", app_path=config.app_path)
return info
async def unregister(self, name: str) -> None:
async with self._lock:
if name not in self._processes:
return
info = self._processes[name]
if info.is_running:
await self._stop_process(info)
if info.port:
await self._port_allocator.release(info.port)
del self._processes[name]
logger.info(f"Unregistered process '{name}'")
async def start_process(self, name: str) -> bool:
info = self._processes.get(name)
if not info:
logger.error(f"Process '{name}' not found")
return False
if info.is_running:
logger.warning(f"Process '{name}' is already running")
return True
return await self._start_process(info)
async def stop_process(self, name: str) -> bool:
info = self._processes.get(name)
if not info:
logger.error(f"Process '{name}' not found")
return False
return await self._stop_process(info)
async def restart_process(self, name: str) -> bool:
info = self._processes.get(name)
if not info:
logger.error(f"Process '{name}' not found")
return False
info.state = ProcessState.RESTARTING
if info.is_running:
await self._stop_process(info)
await asyncio.sleep(info.config.restart_delay)
return await self._start_process(info)
async def start_all(self) -> Dict[str, bool]:
results = {}
for name in self._processes:
results[name] = await self.start_process(name)
return results
async def stop_all(self) -> None:
tasks = []
for info in self._processes.values():
if info.is_running:
tasks.append(self._stop_process(info))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
def get_process(self, name: str) -> Optional[ProcessInfo]:
return self._processes.get(name)
def get_all_processes(self) -> Dict[str, ProcessInfo]:
return self._processes.copy()
def get_process_by_port(self, port: int) -> Optional[ProcessInfo]:
for info in self._processes.values():
if info.port == port:
return info
return None
def get_upstream_url(self, name: str) -> Optional[str]:
info = self._processes.get(name)
if not info or not info.is_running:
return None
return f"http://{info.config.host}:{info.port}"
async def _start_process(self, info: ProcessInfo) -> bool:
config = info.config
try:
info.state = ProcessState.STARTING
if info.port == 0:
info.port = await self._port_allocator.allocate()
cmd = self._build_command(config, info.port)
env = os.environ.copy()
env.update(config.env)
if config.module_path:
python_path = env.get("PYTHONPATH", "")
module_dir = str(Path(config.module_path).resolve())
env["PYTHONPATH"] = f"{module_dir}:{python_path}" if python_path else module_dir
# For WSGI apps, pass configuration via environment variables
if config.app_type == "wsgi":
env["PYSERVE_WSGI_APP"] = config.app_path
env["PYSERVE_WSGI_FACTORY"] = "1" if config.factory else "0"
logger.info(
f"Starting process '{config.name}'",
command=" ".join(cmd),
port=info.port,
)
info.process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid if hasattr(os, "setsid") else None,
)
info.pid = info.process.pid
info.start_time = time.time()
if not await self._wait_for_ready(info):
raise RuntimeError(f"Process '{config.name}' failed to start")
info.state = ProcessState.RUNNING
logger.info(
f"Process '{config.name}' started successfully",
pid=info.pid,
port=info.port,
)
return True
except Exception as e:
logger.error(f"Failed to start process '{config.name}': {e}")
info.state = ProcessState.FAILED
if info.port:
await self._port_allocator.release(info.port)
info.port = 0
return False
async def _stop_process(self, info: ProcessInfo) -> bool:
if not info.process:
info.state = ProcessState.STOPPED
return True
config = info.config
info.state = ProcessState.STOPPING
try:
if hasattr(os, "killpg"):
try:
os.killpg(os.getpgid(info.process.pid), signal.SIGTERM)
except ProcessLookupError:
pass
else:
info.process.terminate()
try:
await asyncio.wait_for(asyncio.get_event_loop().run_in_executor(None, info.process.wait), timeout=config.shutdown_timeout)
except asyncio.TimeoutError:
logger.warning(f"Process '{config.name}' did not stop gracefully, forcing kill")
if hasattr(os, "killpg"):
try:
os.killpg(os.getpgid(info.process.pid), signal.SIGKILL)
except ProcessLookupError:
pass
else:
info.process.kill()
info.process.wait()
if info.port:
await self._port_allocator.release(info.port)
info.state = ProcessState.STOPPED
info.process = None
info.pid = None
logger.info(f"Process '{config.name}' stopped")
return True
except Exception as e:
logger.error(f"Error stopping process '{config.name}': {e}")
info.state = ProcessState.FAILED
return False
async def _wait_for_ready(self, info: ProcessInfo, timeout: float = 30.0) -> bool:
import httpx
start_time = time.time()
url = f"http://{info.config.host}:{info.port}{info.config.health_check_path}"
while time.time() - start_time < timeout:
if info.process and info.process.poll() is not None:
stdout, stderr = info.process.communicate()
logger.error(
f"Process '{info.config.name}' exited during startup",
returncode=info.process.returncode,
stderr=stderr.decode() if stderr else "",
)
return False
try:
async with httpx.AsyncClient(timeout=2.0) as client:
resp = await client.get(url)
if resp.status_code < 500:
return True
except Exception:
pass
await asyncio.sleep(0.5)
return False
async def _health_check_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
for info in list(self._processes.values()):
if not info.is_running or not info.config.health_check_enabled:
continue
await self._check_process_health(info)
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=(
min(p.config.health_check_interval for p in self._processes.values() if p.config.health_check_enabled)
if self._processes
else 10.0
),
)
break
except asyncio.TimeoutError:
pass
except Exception as e:
logger.error(f"Error in health check loop: {e}")
await asyncio.sleep(5)
async def _check_process_health(self, info: ProcessInfo) -> bool:
import httpx
config = info.config
url = f"http://{config.host}:{info.port}{config.health_check_path}"
try:
async with httpx.AsyncClient(timeout=config.health_check_timeout) as client:
resp = await client.get(url)
if resp.status_code < 500:
info.health_check_failures = 0
info.last_health_check = time.time()
return True
else:
raise Exception(f"Health check returned status {resp.status_code}")
except Exception as e:
info.health_check_failures += 1
logger.warning(
f"Health check failed for '{config.name}'",
failures=info.health_check_failures,
error=str(e),
)
if info.health_check_failures >= config.health_check_retries:
logger.error(f"Process '{config.name}' is unhealthy, restarting...")
await self._handle_unhealthy_process(info)
return False
async def _handle_unhealthy_process(self, info: ProcessInfo) -> None:
config = info.config
if info.restart_count >= config.max_restart_count:
logger.error(f"Process '{config.name}' exceeded max restart count, marking as failed")
info.state = ProcessState.FAILED
return
info.restart_count += 1
info.health_check_failures = 0
delay = config.restart_delay * (2 ** (info.restart_count - 1))
delay = min(delay, 60.0)
logger.info(
f"Restarting process '{config.name}'",
restart_count=info.restart_count,
delay=delay,
)
await self._stop_process(info)
await asyncio.sleep(delay)
await self._start_process(info)
def _build_command(self, config: ProcessConfig, port: int) -> List[str]:
if config.app_type == "wsgi":
wrapper_app = self._create_wsgi_wrapper_path(config)
app_path = wrapper_app
else:
app_path = config.app_path
cmd = [
sys.executable,
"-m",
"uvicorn",
app_path,
"--host",
config.host,
"--port",
str(port),
"--workers",
str(config.workers),
"--log-level",
"warning",
"--no-access-log",
]
if config.factory and config.app_type != "wsgi":
cmd.append("--factory")
return cmd
def _create_wsgi_wrapper_path(self, config: ProcessConfig) -> str:
"""
Since uvicorn can't directly run WSGI apps, we create a wrapper
that imports the WSGI app and wraps it with a2wsgi.
"""
# For WSGI apps, we'll use a special wrapper module
# The wrapper is: pyserve._wsgi_wrapper:create_app
# It will be called with app_path as environment variable
return "pyserve._wsgi_wrapper:app"
def get_metrics(self) -> Dict[str, Any]:
return {
"managed_processes": len(self._processes),
"running_processes": sum(1 for p in self._processes.values() if p.is_running),
"processes": {name: info.to_dict() for name, info in self._processes.items()},
}
_process_manager: Optional[ProcessManager] = None
def get_process_manager() -> ProcessManager:
global _process_manager
if _process_manager is None:
_process_manager = ProcessManager()
return _process_manager
async def init_process_manager(
port_range: tuple[int, int] = (9000, 9999),
health_check_enabled: bool = True,
) -> ProcessManager:
global _process_manager
_process_manager = ProcessManager(
port_range=port_range,
health_check_enabled=health_check_enabled,
)
await _process_manager.start()
return _process_manager
async def shutdown_process_manager() -> None:
global _process_manager
if _process_manager:
await _process_manager.stop()
_process_manager = None

View File

@ -119,6 +119,7 @@ class PyServeServer:
self.config = config
self.extension_manager = ExtensionManager()
self.app: Optional[Starlette] = None
self._async_extensions_loaded = False
self._setup_logging()
self._load_extensions()
self._create_app()
@ -133,16 +134,39 @@ class PyServeServer:
if ext_config.type == "routing":
config.setdefault("default_proxy_timeout", self.config.server.proxy_timeout)
if ext_config.type == "process_orchestration":
continue
self.extension_manager.load_extension(ext_config.type, config)
async def _load_async_extensions(self) -> None:
if self._async_extensions_loaded:
return
for ext_config in self.config.extensions:
if ext_config.type == "process_orchestration":
config = ext_config.config.copy()
await self.extension_manager.load_extension_async(ext_config.type, config)
self._async_extensions_loaded = True
def _create_app(self) -> None:
from contextlib import asynccontextmanager
from typing import AsyncIterator
@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
await self._load_async_extensions()
logger.info("Async extensions loaded")
yield
routes = [
Route("/health", self._health_check, methods=["GET"]),
Route("/metrics", self._metrics, methods=["GET"]),
Route("/{path:path}", self._catch_all, methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]),
]
self.app = Starlette(routes=routes)
self.app = Starlette(routes=routes, lifespan=lifespan)
self.app.add_middleware(PyServeMiddleware, extension_manager=self.extension_manager)
async def _health_check(self, request: Request) -> Response:

File diff suppressed because it is too large Load Diff

1348
tests/test_pyservectl.py Normal file

File diff suppressed because it is too large Load Diff