pyserveX/pyserve/_routing.pyx
Илья Глазунов eeeccd57da
Some checks failed
Lint Code / lint (push) Failing after 44s
CI/CD Pipeline / lint (push) Successful in 0s
Run Tests / test (3.12) (push) Successful in 3m48s
Run Tests / test (3.13) (push) Successful in 3m7s
CI/CD Pipeline / test (push) Successful in 1s
CI/CD Pipeline / build-and-release (push) Has been skipped
CI/CD Pipeline / notify (push) Successful in 1s
Cython routing added
2026-01-31 02:44:50 +03:00

487 lines
14 KiB
Cython

# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: cdivision=True
from libc.stdint cimport uint32_t
from libc.stddef cimport size_t
from cpython.bytes cimport PyBytes_AsString, PyBytes_GET_SIZE
cimport cython
from ._routing_pcre2 cimport *
from typing import Optional
# Type aliases for cleaner NULL casts
ctypedef pcre2_compile_context* compile_ctx_ptr
ctypedef pcre2_match_context* match_ctx_ptr
ctypedef pcre2_general_context* general_ctx_ptr
# Buffer size for error messages
DEF ERROR_BUFFER_SIZE = 256
# Maximum capture groups we support
DEF MAX_CAPTURE_GROUPS = 32
cdef class PCRE2Pattern:
cdef:
pcre2_code* _code
pcre2_match_data* _match_data
bint _jit_available
str _pattern_str
uint32_t _capture_count
dict _name_to_index # Named capture groups
list _index_to_name # Index to name mapping
def __cinit__(self):
self._code = NULL
self._match_data = NULL
self._jit_available = <bint>False
self._capture_count = 0
self._name_to_index = {}
self._index_to_name = []
def __dealloc__(self):
if self._match_data is not NULL:
pcre2_match_data_free(self._match_data)
self._match_data = NULL
if self._code is not NULL:
pcre2_code_free(self._code)
self._code = NULL
@staticmethod
cdef PCRE2Pattern _create(str pattern, bint case_insensitive=<bint>False, bint use_jit=<bint>True):
cdef:
PCRE2Pattern self = PCRE2Pattern.__new__(PCRE2Pattern)
bytes pattern_bytes
const char* pattern_ptr
Py_ssize_t pattern_len
uint32_t options = 0
int errorcode = 0
PCRE2_SIZE erroroffset = 0
int jit_result
uint32_t capture_count = 0
self._pattern_str = pattern
self._name_to_index = {}
self._index_to_name = []
pattern_bytes = pattern.encode('utf-8')
pattern_ptr = PyBytes_AsString(pattern_bytes)
pattern_len = PyBytes_GET_SIZE(pattern_bytes)
options = PCRE2_UTF | PCRE2_UCP
if case_insensitive:
options |= PCRE2_CASELESS
self._code = pcre2_compile(
<PCRE2_SPTR>pattern_ptr,
<PCRE2_SIZE>pattern_len,
options,
&errorcode,
&erroroffset,
<compile_ctx_ptr>NULL
)
if self._code is NULL:
error_msg = PCRE2Pattern._get_error_message(errorcode)
raise ValueError(f"PCRE2 compile error at offset {erroroffset}: {error_msg}")
if use_jit:
jit_result = pcre2_jit_compile(self._code, PCRE2_JIT_COMPLETE)
self._jit_available = <bint>(jit_result == 0)
pcre2_pattern_info(self._code, PCRE2_INFO_CAPTURECOUNT, <void*>&capture_count)
self._capture_count = capture_count
self._match_data = pcre2_match_data_create_from_pattern(self._code, <general_ctx_ptr>NULL)
if self._match_data is NULL:
pcre2_code_free(self._code)
self._code = NULL
raise MemoryError("Failed to create match data")
self._extract_named_groups()
return self
cdef void _extract_named_groups(self):
cdef:
uint32_t namecount = 0
uint32_t nameentrysize = 0
PCRE2_SPTR nametable
uint32_t i
int group_num
bytes name_bytes
str name
pcre2_pattern_info(self._code, PCRE2_INFO_NAMECOUNT, <void*>&namecount)
if namecount == 0:
return # void return
pcre2_pattern_info(self._code, PCRE2_INFO_NAMEENTRYSIZE, <void*>&nameentrysize)
pcre2_pattern_info(self._code, PCRE2_INFO_NAMETABLE, <void*>&nametable)
self._index_to_name = [None] * (self._capture_count + 1)
for i in range(namecount):
group_num = (<int>nametable[0] << 8) | <int>nametable[1]
name_bytes = <bytes>(nametable + 2)
name = name_bytes.decode('utf-8')
self._name_to_index[name] = group_num
if <uint32_t>group_num <= self._capture_count:
self._index_to_name[<Py_ssize_t>group_num] = name
nametable += nameentrysize
@staticmethod
cdef str _get_error_message(int errorcode):
cdef:
PCRE2_UCHAR buffer[ERROR_BUFFER_SIZE]
int result
result = pcre2_get_error_message(errorcode, buffer, ERROR_BUFFER_SIZE)
if result < 0:
return f"Unknown error {errorcode}"
return (<bytes>buffer).decode('utf-8')
cpdef bint search(self, str subject):
"""
Search for pattern anywhere in subject.
Returns True if found, False otherwise.
"""
cdef:
bytes subject_bytes
const char* subject_ptr
Py_ssize_t subject_len
int result
if self._code is NULL:
return <bint>False
subject_bytes = subject.encode('utf-8')
subject_ptr = PyBytes_AsString(subject_bytes)
subject_len = PyBytes_GET_SIZE(subject_bytes)
if self._jit_available:
result = pcre2_jit_match(
self._code,
<PCRE2_SPTR>subject_ptr,
<PCRE2_SIZE>subject_len,
0, # start offset
0, # options
self._match_data,
<match_ctx_ptr>NULL
)
else:
result = pcre2_match(
self._code,
<PCRE2_SPTR>subject_ptr,
<PCRE2_SIZE>subject_len,
0,
0,
self._match_data,
<match_ctx_ptr>NULL
)
return <bint>(result >= 0)
cpdef dict groupdict(self, str subject):
"""
Match pattern and return dict of named groups.
Returns empty dict if no match or no named groups.
"""
cdef:
bytes subject_bytes
const char* subject_ptr
Py_ssize_t subject_len
int result
PCRE2_SIZE* ovector
dict groups = {}
str name
int index
PCRE2_SIZE start, end
if self._code is NULL or not self._name_to_index:
return groups
subject_bytes = subject.encode('utf-8')
subject_ptr = PyBytes_AsString(subject_bytes)
subject_len = PyBytes_GET_SIZE(subject_bytes)
if self._jit_available:
result = pcre2_jit_match(
self._code,
<PCRE2_SPTR>subject_ptr,
<PCRE2_SIZE>subject_len,
0, 0,
self._match_data,
<match_ctx_ptr>NULL
)
else:
result = pcre2_match(
self._code,
<PCRE2_SPTR>subject_ptr,
<PCRE2_SIZE>subject_len,
0, 0,
self._match_data,
<match_ctx_ptr>NULL
)
if result < 0:
return groups
ovector = pcre2_get_ovector_pointer(self._match_data)
for name, index in self._name_to_index.items():
start = ovector[<Py_ssize_t>(2 * index)]
end = ovector[<Py_ssize_t>(2 * index + 1)]
if start != PCRE2_UNSET and end != PCRE2_UNSET:
groups[name] = subject_bytes[start:end].decode('utf-8')
else:
groups[name] = None
return groups
cpdef tuple search_with_groups(self, str subject):
cdef:
bytes subject_bytes
const char* subject_ptr
Py_ssize_t subject_len
int result
PCRE2_SIZE* ovector
dict groups = {}
str name
int index
PCRE2_SIZE start, end
if self._code is NULL:
return (False, {})
subject_bytes = subject.encode('utf-8')
subject_ptr = PyBytes_AsString(subject_bytes)
subject_len = PyBytes_GET_SIZE(subject_bytes)
if self._jit_available:
result = pcre2_jit_match(
self._code,
<PCRE2_SPTR>subject_ptr,
<PCRE2_SIZE>subject_len,
0, 0,
self._match_data,
<match_ctx_ptr>NULL
)
else:
result = pcre2_match(
self._code,
<PCRE2_SPTR>subject_ptr,
<PCRE2_SIZE>subject_len,
0, 0,
self._match_data,
<match_ctx_ptr>NULL
)
if result < 0:
return (False, {})
if self._name_to_index:
ovector = pcre2_get_ovector_pointer(self._match_data)
for name, index in self._name_to_index.items():
start = ovector[<Py_ssize_t>(2 * index)]
end = ovector[<Py_ssize_t>(2 * index + 1)]
if start != PCRE2_UNSET and end != PCRE2_UNSET:
groups[name] = subject_bytes[start:end].decode('utf-8')
else:
groups[name] = None
return (True, groups)
@property
def pattern(self) -> str:
return self._pattern_str
@property
def jit_compiled(self) -> bool:
return <bint>self._jit_available
@property
def capture_count(self) -> int:
return self._capture_count
cdef class FastRouteMatch:
cdef:
public dict config
public dict params
def __cinit__(self):
self.config = {}
self.params = {}
def __init__(self, dict config, params=None):
self.config = config
self.params = params if params is not None else {}
cdef class FastRouter:
"""
High-performance router with PCRE2 JIT-compiled patterns.
Matching order (nginx-like):
1. Exact routes (prefix "=") - O(1) dict lookup
2. Regex routes (prefix "~" or "~*") - PCRE2 JIT matching
3. Default route (fallback)
"""
cdef:
dict _exact_routes
list _regex_routes
dict _default_route
bint _has_default
int _regex_count
def __cinit__(self):
self._exact_routes = {}
self._regex_routes = []
self._default_route = {}
self._has_default = <bint>False
self._regex_count = 0
def __init__(self):
self._exact_routes = {}
self._regex_routes = []
self._default_route = {}
self._has_default = <bint>False
self._regex_count = 0
def add_route(self, str pattern, dict config):
cdef:
str exact_path
str regex_pattern
bint case_insensitive
PCRE2Pattern compiled_pattern
if pattern.startswith("="):
exact_path = pattern[1:]
self._exact_routes[exact_path] = config
elif pattern == "__default__":
self._default_route = config
self._has_default = <bint>True
elif pattern.startswith("~"):
case_insensitive = <bint>pattern.startswith("~*")
regex_pattern = pattern[2:] if case_insensitive else pattern[1:]
try:
compiled_pattern = PCRE2Pattern._create(regex_pattern, case_insensitive)
self._regex_routes.append((compiled_pattern, config))
self._regex_count = len(self._regex_routes)
except (ValueError, MemoryError):
pass # Skip invalid patterns
cpdef object match(self, str path):
cdef:
dict config
dict params
int i
PCRE2Pattern pattern
tuple route_entry
bint matched
if path in self._exact_routes:
config = self._exact_routes[path]
return FastRouteMatch(config, {})
for i in range(self._regex_count):
route_entry = <tuple>self._regex_routes[i]
pattern = <PCRE2Pattern>route_entry[0]
config = <dict>route_entry[1]
matched, params = pattern.search_with_groups(path)
if matched:
return FastRouteMatch(config, params)
if self._has_default:
return FastRouteMatch(self._default_route, {})
return None
@property
def exact_routes(self) -> dict:
return self._exact_routes
@property
def routes(self) -> dict:
"""Return regex routes as dict (pattern_str -> config)."""
cdef:
dict result = {}
PCRE2Pattern pattern
for pattern, config in self._regex_routes:
result[pattern.pattern] = config
return result
@property
def default_route(self) -> Optional[dict]:
return self._default_route if self._has_default else None
cpdef list list_routes(self):
cdef:
list result = []
str path_str
dict config
PCRE2Pattern pattern
for path_str, config in self._exact_routes.items():
result.append({
"type": "exact",
"pattern": f"={path_str}",
"config": config,
})
for pattern, config in self._regex_routes:
result.append({
"type": "regex",
"pattern": pattern.pattern,
"jit_compiled": pattern.jit_compiled,
"config": config,
})
if self._has_default:
result.append({
"type": "default",
"pattern": "__default__",
"config": self._default_route,
})
return result
def compile_pattern(str pattern, bint case_insensitive=<bint>False) -> PCRE2Pattern:
"""
Compile a PCRE2 pattern with JIT support.
Args:
pattern: Regular expression pattern
case_insensitive: Whether to match case-insensitively
Returns:
Compiled PCRE2Pattern object
"""
return PCRE2Pattern._create(pattern, case_insensitive)
def fast_match(router: FastRouter, str path):
"""
Convenience function for matching a path.
Args:
router: FastRouter instance
path: URL path to match
Returns:
FastRouteMatch or None
"""
return router.match(path)