""" Integration tests for ASGI mount functionality. These tests start PyServe with mounted ASGI applications and verify that requests are correctly routed to the mounted apps. """ import asyncio import pytest import httpx import socket from typing import Dict, Any import uvicorn from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse, Response from starlette.routing import Route from pyserve.config import Config, ServerConfig, HttpConfig, LoggingConfig, ExtensionConfig from pyserve.server import PyServeServer from pyserve.asgi_mount import ( ASGIAppLoader, MountedApp, ASGIMountManager, ) def get_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] # ============== Test ASGI Applications ============== def create_api_v1_app() -> Starlette: """Create a test API v1 application.""" async def root(request: Request) -> JSONResponse: return JSONResponse({ "app": "api-v1", "message": "Welcome to API v1", "path": request.url.path, "root_path": request.scope.get("root_path", ""), }) async def health(request: Request) -> JSONResponse: return JSONResponse({"status": "healthy", "app": "api-v1"}) async def users_list(request: Request) -> JSONResponse: return JSONResponse({ "users": [ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, ], "app": "api-v1", }) async def user_detail(request: Request) -> JSONResponse: user_id = request.path_params.get("user_id") return JSONResponse({ "user": {"id": user_id, "name": f"User {user_id}"}, "app": "api-v1", }) async def create_user(request: Request) -> JSONResponse: body = await request.json() return JSONResponse({ "created": body, "app": "api-v1", }, status_code=201) async def echo(request: Request) -> Response: body = await request.body() return Response( content=body, media_type=request.headers.get("content-type", "text/plain"), ) routes = [ Route("/", root, methods=["GET"]), Route("/health", health, methods=["GET"]), Route("/users", users_list, methods=["GET"]), Route("/users", create_user, methods=["POST"]), Route("/users/{user_id:int}", user_detail, methods=["GET"]), Route("/echo", echo, methods=["POST"]), ] return Starlette(routes=routes) def create_api_v2_app() -> Starlette: """Create a test API v2 application with different responses.""" async def root(request: Request) -> JSONResponse: return JSONResponse({ "app": "api-v2", "message": "Welcome to API v2 - Enhanced!", "version": "2.0.0", "path": request.url.path, }) async def health(request: Request) -> JSONResponse: return JSONResponse({ "status": "healthy", "app": "api-v2", "version": "2.0.0", }) async def users_list(request: Request) -> JSONResponse: return JSONResponse({ "data": { "users": [ {"id": 1, "name": "Alice", "email": "alice@test.com"}, {"id": 2, "name": "Bob", "email": "bob@test.com"}, ], }, "meta": {"total": 2, "page": 1}, "app": "api-v2", }) routes = [ Route("/", root, methods=["GET"]), Route("/health", health, methods=["GET"]), Route("/users", users_list, methods=["GET"]), ] return Starlette(routes=routes) def create_admin_app() -> Starlette: """Create a test admin application.""" async def dashboard(request: Request) -> JSONResponse: return JSONResponse({ "app": "admin", "page": "dashboard", "path": request.url.path, }) async def settings(request: Request) -> JSONResponse: return JSONResponse({ "app": "admin", "page": "settings", "config": {"debug": True, "theme": "dark"}, }) async def stats(request: Request) -> JSONResponse: return JSONResponse({ "app": "admin", "stats": { "requests": 1000, "errors": 5, "uptime": "24h", }, }) routes = [ Route("/", dashboard, methods=["GET"]), Route("/settings", settings, methods=["GET"]), Route("/stats", stats, methods=["GET"]), ] return Starlette(routes=routes) def create_websocket_test_app() -> Starlette: """Create a test app that also has websocket endpoint info.""" async def root(request: Request) -> JSONResponse: return JSONResponse({ "app": "ws-app", "message": "WebSocket test app", "ws_endpoint": "/ws", }) async def info(request: Request) -> JSONResponse: return JSONResponse({ "app": "ws-app", "supports": ["http", "websocket"], }) routes = [ Route("/", root, methods=["GET"]), Route("/info", info, methods=["GET"]), ] return Starlette(routes=routes) # ============== PyServe Test Server ============== class PyServeTestServer: """Test server wrapper for PyServe with ASGI mounts.""" def __init__(self, config: Config): self.config = config self.server = PyServeServer(config) self._server_task = None async def start(self) -> None: assert self.server.app is not None, "Server app not initialized" config = uvicorn.Config( app=self.server.app, host=self.config.server.host, port=self.config.server.port, log_level="critical", access_log=False, ) server = uvicorn.Server(config) self._server_task = asyncio.create_task(server.serve()) # Wait for server to be ready for _ in range(50): try: async with httpx.AsyncClient() as client: await client.get(f"http://127.0.0.1:{self.config.server.port}/health") return except httpx.ConnectError: await asyncio.sleep(0.1) raise RuntimeError(f"PyServe server failed to start on port {self.config.server.port}") async def stop(self) -> None: if self._server_task: self._server_task.cancel() try: await self._server_task except asyncio.CancelledError: pass # ============== Fixtures ============== @pytest.fixture def pyserve_port() -> int: """Get a free port for PyServe.""" return get_free_port() @pytest.fixture def api_v1_app() -> Starlette: """Create API v1 test app.""" return create_api_v1_app() @pytest.fixture def api_v2_app() -> Starlette: """Create API v2 test app.""" return create_api_v2_app() @pytest.fixture def admin_app() -> Starlette: """Create admin test app.""" return create_admin_app() # ============== Unit Tests ============== class TestMountedApp: """Unit tests for MountedApp class.""" def test_matches_exact_path(self, api_v1_app): """Test exact path matching.""" mounted = MountedApp("/api", api_v1_app) assert mounted.matches("/api") is True assert mounted.matches("/api/users") is True assert mounted.matches("/api/users/123") is True assert mounted.matches("/other") is False assert mounted.matches("/apiv2") is False def test_matches_empty_path(self, api_v1_app): """Test root mount matching.""" mounted = MountedApp("", api_v1_app) assert mounted.matches("/") is True assert mounted.matches("/anything") is True assert mounted.matches("/nested/path") is True def test_get_modified_path_with_strip(self, api_v1_app): """Test path modification with strip_path=True.""" mounted = MountedApp("/api/v1", api_v1_app, strip_path=True) assert mounted.get_modified_path("/api/v1") == "/" assert mounted.get_modified_path("/api/v1/users") == "/users" assert mounted.get_modified_path("/api/v1/users/123") == "/users/123" def test_get_modified_path_without_strip(self, api_v1_app): """Test path modification with strip_path=False.""" mounted = MountedApp("/api/v1", api_v1_app, strip_path=False) assert mounted.get_modified_path("/api/v1/users") == "/api/v1/users" class TestASGIMountManager: """Unit tests for ASGIMountManager class.""" def test_mount_direct_app(self, api_v1_app): """Test mounting a direct ASGI app.""" manager = ASGIMountManager() result = manager.mount( path="/api", app=api_v1_app, name="api-v1" ) assert result is True assert len(manager.mounts) == 1 assert manager.mounts[0].name == "api-v1" assert manager.mounts[0].path == "/api" def test_mount_requires_app_or_path(self): """Test that mount requires either app or app_path.""" manager = ASGIMountManager() result = manager.mount(path="/test") assert result is False assert len(manager.mounts) == 0 def test_mount_ordering_by_path_length(self, api_v1_app, api_v2_app, admin_app): """Test that mounts are ordered by path length (longest first).""" manager = ASGIMountManager() manager.mount(path="/api", app=api_v1_app, name="short") manager.mount(path="/api/v1", app=api_v2_app, name="medium") manager.mount(path="/api/v1/admin", app=admin_app, name="long") # Verify ordering assert manager.mounts[0].name == "long" assert manager.mounts[1].name == "medium" assert manager.mounts[2].name == "short" # Should match the longest prefix first mount = manager.get_mount("/api/v1/admin/dashboard") assert mount is not None assert mount.name == "long" mount = manager.get_mount("/api/v1/users") assert mount is not None assert mount.name == "medium" mount = manager.get_mount("/api/other") assert mount is not None assert mount.name == "short" def test_unmount(self, api_v1_app): """Test unmounting an application.""" manager = ASGIMountManager() manager.mount(path="/api", app=api_v1_app) assert len(manager.mounts) == 1 result = manager.unmount("/api") assert result is True assert len(manager.mounts) == 0 def test_list_mounts(self, api_v1_app, api_v2_app): """Test listing all mounts.""" manager = ASGIMountManager() manager.mount(path="/api/v1", app=api_v1_app, name="api-v1") manager.mount(path="/api/v2", app=api_v2_app, name="api-v2") mounts_info = manager.list_mounts() assert len(mounts_info) == 2 names = {m["name"] for m in mounts_info} assert "api-v1" in names assert "api-v2" in names class TestASGIAppLoader: """Unit tests for ASGIAppLoader class.""" def test_load_app_invalid_module(self): """Test loading app with invalid module path.""" loader = ASGIAppLoader() app = loader.load_app("nonexistent.module:app") assert app is None def test_load_app_invalid_attribute(self): """Test loading app with invalid attribute.""" loader = ASGIAppLoader() app = loader.load_app("starlette.applications:nonexistent") assert app is None def test_get_app_cached(self, api_v1_app): """Test getting a cached app.""" loader = ASGIAppLoader() loader._apps["test:app"] = api_v1_app app = loader.get_app("test:app") assert app is api_v1_app app = loader.get_app("nonexistent:app") assert app is None # ============== Integration Tests ============== class TestASGIMountIntegration: """Integration tests for ASGIMountManager request handling.""" @pytest.mark.asyncio async def test_handle_request_to_mounted_app(self, api_v1_app): """Test handling a request through mounted app.""" manager = ASGIMountManager() manager.mount(path="/api", app=api_v1_app) scope = { "type": "http", "asgi": {"version": "3.0"}, "http_version": "1.1", "method": "GET", "path": "/api/health", "query_string": b"", "headers": [], "server": ("127.0.0.1", 8000), } received_messages = [] async def receive(): return {"type": "http.request", "body": b""} async def send(message): received_messages.append(message) result = await manager.handle_request(scope, receive, send) assert result is True assert len(received_messages) == 2 # response.start + response.body assert received_messages[0]["type"] == "http.response.start" assert received_messages[0]["status"] == 200 @pytest.mark.asyncio async def test_handle_request_no_match(self): """Test handling request with no matching mount.""" manager = ASGIMountManager() scope = { "type": "http", "path": "/unmatched", } async def receive(): return {} async def send(message): pass result = await manager.handle_request(scope, receive, send) assert result is False @pytest.mark.asyncio async def test_handle_non_http_request(self, api_v1_app): """Test that non-HTTP requests are not handled.""" manager = ASGIMountManager() manager.mount(path="/api", app=api_v1_app) scope = { "type": "websocket", "path": "/api/ws", } async def receive(): return {} async def send(message): pass result = await manager.handle_request(scope, receive, send) assert result is False @pytest.mark.asyncio async def test_path_stripping(self, api_v1_app): """Test that mount path is correctly stripped from request.""" manager = ASGIMountManager() manager.mount(path="/api/v1", app=api_v1_app, strip_path=True) scope = { "type": "http", "asgi": {"version": "3.0"}, "http_version": "1.1", "method": "GET", "path": "/api/v1/users", "query_string": b"", "headers": [], "server": ("127.0.0.1", 8000), } received_messages = [] async def receive(): return {"type": "http.request", "body": b""} async def send(message): received_messages.append(message) result = await manager.handle_request(scope, receive, send) assert result is True assert received_messages[0]["status"] == 200 # ============== Full Server Integration Tests ============== class TestPyServeWithASGIMounts: """Full integration tests with PyServe server and ASGI mounts.""" @pytest.mark.asyncio async def test_basic_asgi_mount(self, pyserve_port, api_v1_app): """Test basic ASGI app mounting through PyServe.""" config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) # Create server and manually add ASGI extension server = PyServeServer(config) # Add ASGI mount directly via extension manager from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/api", app=api_v1_app, name="api-v1") server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: # Test root endpoint of mounted app response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/") assert response.status_code == 200 data = response.json() assert data["app"] == "api-v1" assert data["message"] == "Welcome to API v1" # Test health endpoint response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/health") assert response.status_code == 200 data = response.json() assert data["status"] == "healthy" assert data["app"] == "api-v1" # Test users list response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/users") assert response.status_code == 200 data = response.json() assert "users" in data assert len(data["users"]) == 2 # Test user detail response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/users/1") assert response.status_code == 200 data = response.json() assert data["user"]["id"] == 1 finally: await test_server.stop() @pytest.mark.asyncio async def test_multiple_asgi_mounts(self, pyserve_port, api_v1_app, api_v2_app, admin_app): """Test multiple ASGI apps mounted at different paths.""" config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) server = PyServeServer(config) from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/api/v1", app=api_v1_app, name="api-v1") asgi_ext.mount_manager.mount(path="/api/v2", app=api_v2_app, name="api-v2") asgi_ext.mount_manager.mount(path="/admin", app=admin_app, name="admin") server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: # Test API v1 response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/v1/") assert response.status_code == 200 assert response.json()["app"] == "api-v1" # Test API v2 response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/v2/") assert response.status_code == 200 data = response.json() assert data["app"] == "api-v2" assert data["version"] == "2.0.0" # Test API v2 users (different response format) response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/v2/users") assert response.status_code == 200 data = response.json() assert "data" in data assert "meta" in data # Test Admin response = await client.get(f"http://127.0.0.1:{pyserve_port}/admin/") assert response.status_code == 200 assert response.json()["app"] == "admin" assert response.json()["page"] == "dashboard" # Test Admin settings response = await client.get(f"http://127.0.0.1:{pyserve_port}/admin/settings") assert response.status_code == 200 data = response.json() assert data["config"]["theme"] == "dark" finally: await test_server.stop() @pytest.mark.asyncio async def test_asgi_mount_post_request(self, pyserve_port, api_v1_app): """Test POST requests to mounted ASGI app.""" config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) server = PyServeServer(config) from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/api", app=api_v1_app, name="api") server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: # Test POST to create user response = await client.post( f"http://127.0.0.1:{pyserve_port}/api/users", json={"name": "Charlie", "email": "charlie@test.com"} ) assert response.status_code == 201 data = response.json() assert data["created"]["name"] == "Charlie" # Test echo endpoint response = await client.post( f"http://127.0.0.1:{pyserve_port}/api/echo", content=b"Hello, World!", headers={"Content-Type": "text/plain"} ) assert response.status_code == 200 assert response.content == b"Hello, World!" finally: await test_server.stop() @pytest.mark.asyncio async def test_asgi_mount_with_routing_extension(self, pyserve_port, api_v1_app): """Test ASGI mounts working alongside routing extension.""" config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[ ExtensionConfig( type="routing", config={ "regex_locations": { "=/health": {"return": "200 PyServe OK"}, "=/status": {"return": "200 Server Running"}, } } ) ], ) server = PyServeServer(config) # Add ASGI extension BEFORE routing extension from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/api", app=api_v1_app, name="api") server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: # Test ASGI mounted app response = await client.get(f"http://127.0.0.1:{pyserve_port}/api/users") assert response.status_code == 200 assert response.json()["app"] == "api-v1" # Test routing extension endpoints response = await client.get(f"http://127.0.0.1:{pyserve_port}/status") assert response.status_code == 200 assert "Server Running" in response.text finally: await test_server.stop() @pytest.mark.asyncio async def test_asgi_mount_path_not_stripped(self, pyserve_port): """Test ASGI mount with strip_path=False.""" # Create an app that expects full path async def handler(request: Request) -> JSONResponse: return JSONResponse({ "full_path": request.url.path, "received": True, }) app = Starlette(routes=[ Route("/mounted/data", handler, methods=["GET"]), ]) config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) server = PyServeServer(config) from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount( path="/mounted", app=app, name="full-path-app", strip_path=False ) server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: response = await client.get(f"http://127.0.0.1:{pyserve_port}/mounted/data") assert response.status_code == 200 data = response.json() assert data["full_path"] == "/mounted/data" finally: await test_server.stop() @pytest.mark.asyncio async def test_asgi_mount_metrics(self, pyserve_port, api_v1_app): """Test that ASGI extension reports metrics correctly.""" config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) server = PyServeServer(config) from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/api/v1", app=api_v1_app, name="api-v1") asgi_ext.mount_manager.mount(path="/api/v2", app=create_api_v2_app(), name="api-v2") server.extension_manager.extensions.insert(0, asgi_ext) # Check metrics metrics = asgi_ext.get_metrics() assert metrics["asgi_mount_count"] == 2 assert len(metrics["asgi_mounts"]) == 2 mount_names = {m["name"] for m in metrics["asgi_mounts"]} assert "api-v1" in mount_names assert "api-v2" in mount_names @pytest.mark.asyncio async def test_asgi_mount_error_handling(self, pyserve_port): """Test error handling when mounted app raises exception.""" async def failing_handler(request: Request) -> JSONResponse: raise ValueError("Intentional error for testing") async def working_handler(request: Request) -> JSONResponse: return JSONResponse({"status": "ok"}) app = Starlette(routes=[ Route("/fail", failing_handler, methods=["GET"]), Route("/ok", working_handler, methods=["GET"]), ]) config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) server = PyServeServer(config) from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/test", app=app, name="test-app") server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: # Working endpoint should work response = await client.get(f"http://127.0.0.1:{pyserve_port}/test/ok") assert response.status_code == 200 # Failing endpoint should return 500 response = await client.get(f"http://127.0.0.1:{pyserve_port}/test/fail") assert response.status_code == 500 finally: await test_server.stop() @pytest.mark.asyncio async def test_concurrent_requests_to_mounted_apps(self, pyserve_port, api_v1_app, api_v2_app): """Test concurrent requests to different mounted apps.""" config = Config( server=ServerConfig(host="127.0.0.1", port=pyserve_port), http=HttpConfig(static_dir="./static", templates_dir="./templates"), logging=LoggingConfig(level="ERROR", console_output=False), extensions=[], ) server = PyServeServer(config) from pyserve.extensions import ASGIExtension asgi_ext = ASGIExtension({"mounts": []}) asgi_ext.mount_manager.mount(path="/v1", app=api_v1_app, name="v1") asgi_ext.mount_manager.mount(path="/v2", app=api_v2_app, name="v2") server.extension_manager.extensions.insert(0, asgi_ext) test_server = PyServeTestServer.__new__(PyServeTestServer) test_server.config = config test_server.server = server test_server._server_task = None try: await test_server.start() async with httpx.AsyncClient() as client: # Send concurrent requests tasks = [ client.get(f"http://127.0.0.1:{pyserve_port}/v1/health"), client.get(f"http://127.0.0.1:{pyserve_port}/v2/health"), client.get(f"http://127.0.0.1:{pyserve_port}/v1/users"), client.get(f"http://127.0.0.1:{pyserve_port}/v2/users"), client.get(f"http://127.0.0.1:{pyserve_port}/v1/"), client.get(f"http://127.0.0.1:{pyserve_port}/v2/"), ] responses = await asyncio.gather(*tasks) # All requests should succeed for response in responses: assert response.status_code == 200 # Verify correct app responded assert responses[0].json()["app"] == "api-v1" assert responses[1].json()["app"] == "api-v2" finally: await test_server.stop()