226 lines
5.9 KiB
Cython
226 lines
5.9 KiB
Cython
# cython: language_level=3
|
|
# cython: boundscheck=False
|
|
# cython: wraparound=False
|
|
# cython: cdivision=True
|
|
"""
|
|
Fast path matching module for PyServe.
|
|
|
|
This Cython module provides optimized path matching operations
|
|
for ASGI mount routing, significantly reducing overhead on hot paths.
|
|
"""
|
|
|
|
from cpython.object cimport PyObject
|
|
|
|
|
|
cdef class FastMountedPath:
|
|
cdef:
|
|
str _path
|
|
str _path_with_slash
|
|
Py_ssize_t _path_len
|
|
bint _is_root
|
|
public str name
|
|
public bint strip_path
|
|
|
|
def __cinit__(self):
|
|
self._path = ""
|
|
self._path_with_slash = "/"
|
|
self._path_len = 0
|
|
self._is_root = 1
|
|
self.name = ""
|
|
self.strip_path = 1
|
|
|
|
def __init__(self, str path, str name="", bint strip_path=True):
|
|
cdef Py_ssize_t path_len
|
|
|
|
path_len = len(path)
|
|
if path_len > 1 and path[path_len - 1] == '/':
|
|
path = path[:path_len - 1]
|
|
|
|
self._path = path
|
|
self._path_len = len(path)
|
|
self._is_root = 1 if (path == "" or path == "/") else 0
|
|
self._path_with_slash = path + "/" if self._is_root == 0 else "/"
|
|
self.name = name if name else path
|
|
self.strip_path = 1 if strip_path else 0
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
return self._path
|
|
|
|
cpdef bint matches(self, str request_path):
|
|
cdef Py_ssize_t req_len
|
|
|
|
if self._is_root:
|
|
return 1
|
|
|
|
req_len = len(request_path)
|
|
|
|
if req_len < self._path_len:
|
|
return 0
|
|
|
|
if req_len == self._path_len:
|
|
return 1 if request_path == self._path else 0
|
|
|
|
if request_path[self._path_len] == '/':
|
|
return 1 if request_path[:self._path_len] == self._path else 0
|
|
|
|
return 0
|
|
|
|
cpdef str get_modified_path(self, str original_path):
|
|
cdef str new_path
|
|
|
|
if not self.strip_path:
|
|
return original_path
|
|
|
|
if self._is_root:
|
|
return original_path
|
|
|
|
new_path = original_path[self._path_len:]
|
|
|
|
if not new_path:
|
|
return "/"
|
|
|
|
return new_path
|
|
|
|
def __repr__(self):
|
|
return f"FastMountedPath(path={self._path!r}, name={self.name!r})"
|
|
|
|
|
|
def _get_path_len_neg(mount):
|
|
return -len(mount.path)
|
|
|
|
|
|
cdef class FastMountManager:
|
|
cdef:
|
|
list _mounts
|
|
int _mount_count
|
|
|
|
def __cinit__(self):
|
|
self._mounts = []
|
|
self._mount_count = 0
|
|
|
|
def __init__(self):
|
|
self._mounts = []
|
|
self._mount_count = 0
|
|
|
|
cpdef void add_mount(self, FastMountedPath mount):
|
|
self._mounts.append(mount)
|
|
self._mounts = sorted(self._mounts, key=_get_path_len_neg, reverse=False)
|
|
self._mount_count = len(self._mounts)
|
|
|
|
cpdef FastMountedPath get_mount(self, str request_path):
|
|
cdef:
|
|
int i
|
|
FastMountedPath mount
|
|
|
|
for i in range(self._mount_count):
|
|
mount = <FastMountedPath>self._mounts[i]
|
|
if mount.matches(request_path):
|
|
return mount
|
|
|
|
return None
|
|
|
|
cpdef bint remove_mount(self, str path):
|
|
cdef:
|
|
int i
|
|
Py_ssize_t path_len
|
|
FastMountedPath mount
|
|
|
|
path_len = len(path)
|
|
if path_len > 1 and path[path_len - 1] == '/':
|
|
path = path[:path_len - 1]
|
|
|
|
for i in range(self._mount_count):
|
|
mount = <FastMountedPath>self._mounts[i]
|
|
if mount._path == path:
|
|
del self._mounts[i]
|
|
self._mount_count -= 1
|
|
return 1
|
|
|
|
return 0
|
|
|
|
@property
|
|
def mounts(self) -> list:
|
|
return list(self._mounts)
|
|
|
|
@property
|
|
def mount_count(self) -> int:
|
|
return self._mount_count
|
|
|
|
cpdef list list_mounts(self):
|
|
cdef:
|
|
list result = []
|
|
FastMountedPath mount
|
|
|
|
for mount in self._mounts:
|
|
result.append({
|
|
"path": mount._path,
|
|
"name": mount.name,
|
|
"strip_path": mount.strip_path,
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
cpdef bint path_matches_prefix(str request_path, str mount_path):
|
|
cdef:
|
|
Py_ssize_t mount_len = len(mount_path)
|
|
Py_ssize_t req_len = len(request_path)
|
|
|
|
if mount_len == 0 or mount_path == "/":
|
|
return 1
|
|
|
|
if req_len < mount_len:
|
|
return 0
|
|
|
|
if req_len == mount_len:
|
|
return 1 if request_path == mount_path else 0
|
|
|
|
if request_path[mount_len] == '/':
|
|
return 1 if request_path[:mount_len] == mount_path else 0
|
|
|
|
return 0
|
|
|
|
|
|
cpdef str strip_path_prefix(str original_path, str mount_path):
|
|
cdef:
|
|
Py_ssize_t mount_len = len(mount_path)
|
|
str result
|
|
|
|
if mount_len == 0 or mount_path == "/":
|
|
return original_path
|
|
|
|
result = original_path[mount_len:]
|
|
|
|
if not result:
|
|
return "/"
|
|
|
|
return result
|
|
|
|
|
|
cpdef tuple match_and_modify_path(str request_path, str mount_path, bint strip_path=True):
|
|
cdef:
|
|
Py_ssize_t mount_len = len(mount_path)
|
|
Py_ssize_t req_len = len(request_path)
|
|
bint is_root = 1 if (mount_len == 0 or mount_path == "/") else 0
|
|
str modified
|
|
|
|
if is_root:
|
|
return (True, request_path if strip_path else request_path)
|
|
|
|
if req_len < mount_len:
|
|
return (False, None)
|
|
|
|
if req_len == mount_len:
|
|
if request_path == mount_path:
|
|
return (True, "/" if strip_path else request_path)
|
|
return (False, None)
|
|
|
|
if request_path[mount_len] == '/' and request_path[:mount_len] == mount_path:
|
|
if strip_path:
|
|
modified = request_path[mount_len:]
|
|
return (True, modified if modified else "/")
|
|
return (True, request_path)
|
|
|
|
return (False, None)
|