konduktor/pyserve/routing.py
Илья Глазунов 83cb7d68b0 initial commit
2025-09-01 23:49:50 +03:00

184 lines
6.9 KiB
Python

import re
import mimetypes
from pathlib import Path
from typing import Dict, Any, Optional, Pattern
from starlette.requests import Request
from starlette.responses import Response, FileResponse, PlainTextResponse
from .logging_utils import get_logger
logger = get_logger(__name__)
class RouteMatch:
def __init__(self, config: Dict[str, Any], params: Optional[Dict[str, str]] = None):
self.config = config
self.params = params or {}
class Router:
def __init__(self, static_dir: str = "./static"):
self.static_dir = Path(static_dir)
self.routes: Dict[Pattern, Dict[str, Any]] = {}
self.exact_routes: Dict[str, Dict[str, Any]] = {}
self.default_route: Optional[Dict[str, Any]] = None
def add_route(self, pattern: str, config: Dict[str, Any]) -> None:
if pattern.startswith("="):
exact_path = pattern[1:]
self.exact_routes[exact_path] = config
logger.debug(f"Добавлен exact маршрут: {exact_path}")
return
if pattern == "__default__":
self.default_route = config
logger.debug("Добавлен default маршрут")
return
if pattern.startswith("~"):
case_insensitive = pattern.startswith("~*")
regex_pattern = pattern[2:] if case_insensitive else pattern[1:]
flags = re.IGNORECASE if case_insensitive else 0
try:
compiled_pattern = re.compile(regex_pattern, flags)
self.routes[compiled_pattern] = config
logger.debug(f"Добавлен regex маршрут: {pattern}")
except re.error as e:
logger.error(f"Ошибка компиляции regex {pattern}: {e}")
def match(self, path: str) -> Optional[RouteMatch]:
if path in self.exact_routes:
return RouteMatch(self.exact_routes[path])
for pattern, config in self.routes.items():
match = pattern.search(path)
if match:
params = match.groupdict()
return RouteMatch(config, params)
if self.default_route:
return RouteMatch(self.default_route)
return None
class RequestHandler:
def __init__(self, router: Router, static_dir: str = "./static"):
self.router = router
self.static_dir = Path(static_dir)
async def handle(self, request: Request) -> Response:
path = request.url.path
logger.info(f"{request.method} {path}")
route_match = self.router.match(path)
if not route_match:
return PlainTextResponse("404 Not Found", status_code=404)
try:
return await self._process_route(request, route_match)
except Exception as e:
logger.error(f"Ошибка обработки запроса {path}: {e}")
return PlainTextResponse("500 Internal Server Error", status_code=500)
async def _process_route(self, request: Request, route_match: RouteMatch) -> Response:
config = route_match.config
path = request.url.path
if "return" in config:
status_text = config["return"]
if " " in status_text:
status_code, text = status_text.split(" ", 1)
status_code = int(status_code)
else:
status_code = int(status_text)
text = ""
content_type = config.get("content_type", "text/plain")
return PlainTextResponse(text, status_code=status_code,
media_type=content_type)
if "proxy_pass" in config:
return await self._handle_proxy(request, config, route_match.params)
if "root" in config:
return await self._handle_static(request, config)
if config.get("spa_fallback"):
return await self._handle_spa_fallback(request, config)
return PlainTextResponse("404 Not Found", status_code=404)
async def _handle_static(self, request: Request, config: Dict[str, Any]) -> Response:
root = Path(config["root"])
path = request.url.path.lstrip("/")
if not path or path == "/":
index_file = config.get("index_file", "index.html")
file_path = root / index_file
else:
file_path = root / path
try:
file_path = file_path.resolve()
root = root.resolve()
if not str(file_path).startswith(str(root)):
return PlainTextResponse("403 Forbidden", status_code=403)
except OSError:
return PlainTextResponse("404 Not Found", status_code=404)
if not file_path.exists() or not file_path.is_file():
return PlainTextResponse("404 Not Found", status_code=404)
content_type, _ = mimetypes.guess_type(str(file_path))
response = FileResponse(str(file_path), media_type=content_type)
if "headers" in config:
for header in config["headers"]:
if ":" in header:
name, value = header.split(":", 1)
response.headers[name.strip()] = value.strip()
if "cache_control" in config:
response.headers["Cache-Control"] = config["cache_control"]
return response
async def _handle_spa_fallback(self, request: Request, config: Dict[str, Any]) -> Response:
path = request.url.path
exclude_patterns = config.get("exclude_patterns", [])
for pattern in exclude_patterns:
if path.startswith(pattern):
return PlainTextResponse("404 Not Found", status_code=404)
root = Path(config.get("root", self.static_dir))
index_file = config.get("index_file", "index.html")
file_path = root / index_file
if file_path.exists() and file_path.is_file():
return FileResponse(str(file_path), media_type="text/html")
return PlainTextResponse("404 Not Found", status_code=404)
async def _handle_proxy(self, request: Request, config: Dict[str, Any],
params: Dict[str, str]) -> Response:
# TODO: Реализовать полноценное проксирование
proxy_url = config["proxy_pass"]
for key, value in params.items():
proxy_url = proxy_url.replace(f"{{{key}}}", value)
logger.info(f"Проксирование запроса на: {proxy_url}")
return PlainTextResponse(f"Proxy to: {proxy_url}", status_code=200)
def create_router_from_config(regex_locations: Dict[str, Dict[str, Any]]) -> Router:
router = Router()
for pattern, config in regex_locations.items():
router.add_route(pattern, config)
return router