mirror of
https://github.com/searxng/searxng.git
synced 2025-09-04 01:08:33 +02:00
[mod] addition of various type hints / tbc
- pyright configuration [1]_ - stub files: types-lxml [2]_ - addition of various type hints - enable use of new type system features on older Python versions [3]_ - ``.tool-versions`` - set python to lowest version we support (3.10.18) [4]_: Older versions typically lack some typing features found in newer Python versions. Therefore, for local type checking (before commit), it is necessary to use the older Python interpreter. .. [1] https://docs.basedpyright.com/v1.20.0/configuration/config-files/ .. [2] https://pypi.org/project/types-lxml/ .. [3] https://typing-extensions.readthedocs.io/en/latest/# .. [4] https://mise.jdx.dev/configuration.html#tool-versions Signed-off-by: Markus Heiser <markus.heiser@darmarit.de> Format: reST
This commit is contained in:
parent
09500459fe
commit
57b9673efb
107 changed files with 1205 additions and 1251 deletions
|
@ -53,7 +53,7 @@ def too_many_requests(network: IPv4Network | IPv6Network, log_msg: str) -> werkz
|
|||
return flask.make_response(('Too Many Requests', 429))
|
||||
|
||||
|
||||
def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4Network | IPv6Network:
|
||||
def get_network(real_ip: IPv4Address | IPv6Address, cfg: "config.Config") -> IPv4Network | IPv6Network:
|
||||
"""Returns the (client) network of whether the ``real_ip`` is part of.
|
||||
|
||||
The ``ipv4_prefix`` and ``ipv6_prefix`` define the number of leading bits in
|
||||
|
@ -71,7 +71,7 @@ def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4N
|
|||
|
||||
prefix: int = cfg["botdetection.ipv4_prefix"]
|
||||
if real_ip.version == 6:
|
||||
prefix: int = cfg["botdetection.ipv6_prefix"]
|
||||
prefix = cfg["botdetection.ipv6_prefix"]
|
||||
network = ip_network(f"{real_ip}/{prefix}", strict=False)
|
||||
# logger.debug("get_network(): %s", network.compressed)
|
||||
return network
|
||||
|
|
|
@ -19,26 +19,27 @@ __all__ = ['Config', 'UNSET', 'SchemaIssue', 'set_global_cfg', 'get_global_cfg']
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
CFG: Config | None = None
|
||||
CFG: "Config | None" = None
|
||||
"""Global config of the botdetection."""
|
||||
|
||||
|
||||
def set_global_cfg(cfg: Config):
|
||||
def set_global_cfg(cfg: "Config"):
|
||||
global CFG # pylint: disable=global-statement
|
||||
CFG = cfg
|
||||
|
||||
|
||||
def get_global_cfg() -> Config:
|
||||
def get_global_cfg() -> "Config":
|
||||
if CFG is None:
|
||||
raise ValueError("Botdetection's config is not yet initialized.")
|
||||
return CFG
|
||||
|
||||
|
||||
@typing.final
|
||||
class FALSE:
|
||||
"""Class of ``False`` singleton"""
|
||||
|
||||
# pylint: disable=multiple-statements
|
||||
def __init__(self, msg):
|
||||
def __init__(self, msg: str):
|
||||
self.msg = msg
|
||||
|
||||
def __bool__(self):
|
||||
|
@ -53,6 +54,7 @@ class FALSE:
|
|||
UNSET = FALSE('<UNSET>')
|
||||
|
||||
|
||||
@typing.final
|
||||
class SchemaIssue(ValueError):
|
||||
"""Exception to store and/or raise a message from a schema issue."""
|
||||
|
||||
|
@ -67,10 +69,10 @@ class SchemaIssue(ValueError):
|
|||
class Config:
|
||||
"""Base class used for configuration"""
|
||||
|
||||
UNSET = UNSET
|
||||
UNSET: object = UNSET
|
||||
|
||||
@classmethod
|
||||
def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict[str, str]) -> Config:
|
||||
def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict[str, str]) -> "Config":
|
||||
|
||||
# init schema
|
||||
|
||||
|
@ -102,9 +104,9 @@ class Config:
|
|||
These values are needed for validation, see :py:obj:`validate`.
|
||||
|
||||
"""
|
||||
self.cfg_schema = cfg_schema
|
||||
self.deprecated = deprecated
|
||||
self.cfg = copy.deepcopy(cfg_schema)
|
||||
self.cfg_schema: dict[str, typing.Any] = cfg_schema
|
||||
self.deprecated: dict[str, str] = deprecated
|
||||
self.cfg: dict[str, typing.Any] = copy.deepcopy(cfg_schema)
|
||||
|
||||
def __getitem__(self, key: str) -> typing.Any:
|
||||
return self.get(key)
|
||||
|
@ -115,7 +117,7 @@ class Config:
|
|||
|
||||
return validate(self.cfg_schema, cfg, self.deprecated)
|
||||
|
||||
def update(self, upd_cfg: dict):
|
||||
def update(self, upd_cfg: dict[str, typing.Any]):
|
||||
"""Update this configuration by ``upd_cfg``."""
|
||||
|
||||
dict_deepupdate(self.cfg, upd_cfg)
|
||||
|
@ -142,7 +144,7 @@ class Config:
|
|||
val = val % self
|
||||
return val
|
||||
|
||||
def set(self, name: str, val):
|
||||
def set(self, name: str, val: typing.Any):
|
||||
"""Set the value to which ``name`` points in the configuration.
|
||||
|
||||
If there is no such ``name`` in the config, a :py:obj:`KeyError` is
|
||||
|
@ -151,17 +153,17 @@ class Config:
|
|||
parent = self._get_parent_dict(name)
|
||||
parent[name.split('.')[-1]] = val
|
||||
|
||||
def _get_parent_dict(self, name):
|
||||
def _get_parent_dict(self, name: str) -> dict[str, typing.Any]:
|
||||
parent_name = '.'.join(name.split('.')[:-1])
|
||||
if parent_name:
|
||||
parent = value(parent_name, self.cfg)
|
||||
parent: dict[str, typing.Any] = value(parent_name, self.cfg)
|
||||
else:
|
||||
parent = self.cfg
|
||||
if (parent is UNSET) or (not isinstance(parent, dict)):
|
||||
raise KeyError(parent_name)
|
||||
return parent
|
||||
|
||||
def path(self, name: str, default=UNSET):
|
||||
def path(self, name: str, default: typing.Any = UNSET):
|
||||
"""Get a :py:class:`pathlib.Path` object from a config string."""
|
||||
|
||||
val = self.get(name, default)
|
||||
|
@ -171,7 +173,7 @@ class Config:
|
|||
return default
|
||||
return pathlib.Path(str(val))
|
||||
|
||||
def pyobj(self, name, default=UNSET):
|
||||
def pyobj(self, name: str, default: typing.Any = UNSET):
|
||||
"""Get python object referred by full qualiffied name (FQN) in the config
|
||||
string."""
|
||||
|
||||
|
@ -185,7 +187,7 @@ class Config:
|
|||
return getattr(m, name)
|
||||
|
||||
|
||||
def toml_load(file_name):
|
||||
def toml_load(file_name: str | pathlib.Path):
|
||||
try:
|
||||
with open(file_name, "rb") as f:
|
||||
return tomllib.load(f)
|
||||
|
@ -198,7 +200,7 @@ def toml_load(file_name):
|
|||
# working with dictionaries
|
||||
|
||||
|
||||
def value(name: str, data_dict: dict):
|
||||
def value(name: str, data_dict: dict[str, typing.Any]):
|
||||
"""Returns the value to which ``name`` points in the ``dat_dict``.
|
||||
|
||||
.. code: python
|
||||
|
@ -228,7 +230,7 @@ def value(name: str, data_dict: dict):
|
|||
|
||||
def validate(
|
||||
schema_dict: dict[str, typing.Any], data_dict: dict[str, typing.Any], deprecated: dict[str, str]
|
||||
) -> tuple[bool, list[str]]:
|
||||
) -> tuple[bool, list[SchemaIssue]]:
|
||||
"""Deep validation of dictionary in ``data_dict`` against dictionary in
|
||||
``schema_dict``. Argument deprecated is a dictionary that maps deprecated
|
||||
configuration names to a messages::
|
||||
|
@ -254,9 +256,9 @@ def validate(
|
|||
:py:obj:`SchemaIssue` is raised.
|
||||
|
||||
"""
|
||||
names = []
|
||||
is_valid = True
|
||||
issue_list = []
|
||||
names: list[str] = []
|
||||
is_valid: bool = True
|
||||
issue_list: list[SchemaIssue] = []
|
||||
|
||||
if not isinstance(schema_dict, dict):
|
||||
raise SchemaIssue('invalid', "schema_dict is not a dict type")
|
||||
|
@ -268,15 +270,16 @@ def validate(
|
|||
|
||||
|
||||
def _validate(
|
||||
names: typing.List,
|
||||
issue_list: typing.List,
|
||||
schema_dict: typing.Dict,
|
||||
data_dict: typing.Dict,
|
||||
deprecated: typing.Dict[str, str],
|
||||
) -> typing.Tuple[bool, typing.List]:
|
||||
names: list[str],
|
||||
issue_list: list[SchemaIssue],
|
||||
schema_dict: dict[str, typing.Any],
|
||||
data_dict: dict[str, typing.Any],
|
||||
deprecated: dict[str, str],
|
||||
) -> tuple[bool, list[SchemaIssue]]:
|
||||
|
||||
is_valid = True
|
||||
|
||||
data_value: dict[str, typing.Any]
|
||||
for key, data_value in data_dict.items():
|
||||
|
||||
names.append(key)
|
||||
|
@ -311,7 +314,7 @@ def _validate(
|
|||
return is_valid, issue_list
|
||||
|
||||
|
||||
def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
|
||||
def dict_deepupdate(base_dict: dict[str, typing.Any], upd_dict: dict[str, typing.Any], names: list[str] | None = None):
|
||||
"""Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.
|
||||
|
||||
For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:
|
||||
|
@ -350,7 +353,7 @@ def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
|
|||
raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")
|
||||
dict_deepupdate(
|
||||
base_dict[upd_key],
|
||||
upd_val,
|
||||
upd_val, # pyright: ignore[reportUnknownArgumentType]
|
||||
names
|
||||
+ [
|
||||
upd_key,
|
||||
|
@ -359,7 +362,7 @@ def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
|
|||
|
||||
else:
|
||||
# if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val
|
||||
base_dict[upd_key] = copy.deepcopy(upd_val)
|
||||
base_dict[upd_key] = copy.deepcopy(upd_val) # pyright: ignore[reportUnknownArgumentType]
|
||||
|
||||
elif isinstance(upd_val, list):
|
||||
|
||||
|
@ -373,7 +376,7 @@ def dict_deepupdate(base_dict: dict, upd_dict: dict, names=None):
|
|||
else:
|
||||
# if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the
|
||||
# list in upd_val.
|
||||
base_dict[upd_key] = copy.deepcopy(upd_val)
|
||||
base_dict[upd_key] = copy.deepcopy(upd_val) # pyright: ignore[reportUnknownArgumentType]
|
||||
|
||||
elif isinstance(upd_val, set):
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ if t.TYPE_CHECKING:
|
|||
from _typeshed.wsgi import WSGIEnvironment
|
||||
|
||||
|
||||
@t.final
|
||||
class ProxyFix:
|
||||
"""A middleware like the ProxyFix_ class, where the ``x_for`` argument is
|
||||
replaced by a method that determines the number of trusted proxies via the
|
||||
|
@ -54,7 +55,7 @@ class ProxyFix:
|
|||
|
||||
"""
|
||||
|
||||
def __init__(self, wsgi_app: WSGIApplication) -> None:
|
||||
def __init__(self, wsgi_app: "WSGIApplication") -> None:
|
||||
self.wsgi_app = wsgi_app
|
||||
|
||||
def trusted_proxies(self) -> list[IPv4Network | IPv6Network]:
|
||||
|
@ -84,7 +85,7 @@ class ProxyFix:
|
|||
# fallback to first address
|
||||
return x_forwarded_for[0].compressed
|
||||
|
||||
def __call__(self, environ: WSGIEnvironment, start_response: StartResponse) -> abc.Iterable[bytes]:
|
||||
def __call__(self, environ: "WSGIEnvironment", start_response: "StartResponse") -> abc.Iterable[bytes]:
|
||||
# pylint: disable=too-many-statements
|
||||
|
||||
trusted_proxies = self.trusted_proxies()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue