konduktor/examples/apps/fastapi_app.py
2025-12-03 12:10:28 +03:00

119 lines
3.0 KiB
Python

"""
Example FastAPI application for PyServe ASGI mounting.
This demonstrates how to create a FastAPI application that can be
mounted at a specific path in PyServe.
"""
from typing import Optional, Dict, Any
try:
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
except ImportError:
raise ImportError(
"FastAPI is not installed. Install with: pip install fastapi"
)
app = FastAPI(
title="Example FastAPI App",
description="This is an example FastAPI application mounted in PyServe",
version="1.0.0",
)
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
class Message(BaseModel):
message: str
items_db: Dict[int, Dict[str, Any]] = {
1: {"name": "Item 1", "description": "First item", "price": 10.5, "tax": 1.05},
2: {"name": "Item 2", "description": "Second item", "price": 20.0, "tax": 2.0},
}
@app.get("/")
async def root():
return {"message": "Welcome to FastAPI mounted in PyServe!"}
@app.get("/health")
async def health_check():
return {"status": "healthy", "app": "fastapi"}
@app.get("/items")
async def list_items():
return {"items": list(items_db.values()), "count": len(items_db)}
@app.get("/items/{item_id}")
async def get_item(item_id: int):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
return items_db[item_id]
@app.post("/items", response_model=Message)
async def create_item(item: Item):
new_id = max(items_db.keys()) + 1 if items_db else 1
items_db[new_id] = item.model_dump()
return {"message": f"Item created with ID {new_id}"}
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
items_db[item_id] = item.model_dump()
return {"message": f"Item {item_id} updated"}
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
del items_db[item_id]
return {"message": f"Item {item_id} deleted"}
def create_app(debug: bool = False, **kwargs) -> FastAPI:
application = FastAPI(
title="Example FastAPI App (Factory)",
description="FastAPI application created via factory function",
version="2.0.0",
debug=debug,
)
@application.get("/")
async def factory_root():
return {
"message": "Welcome to FastAPI (factory) mounted in PyServe!",
"debug": debug,
"config": kwargs,
}
@application.get("/health")
async def factory_health():
return {"status": "healthy", "app": "fastapi-factory", "debug": debug}
@application.get("/echo/{message}")
async def echo(message: str):
return {"echo": message}
return application
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)