Source code for searx.botdetection.trusted_proxies
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Implementation of a middleware to determine the real IP of an HTTP request
(:py:obj:`flask.request.remote_addr`) behind a proxy chain."""
# pylint: disable=too-many-branches
from __future__ import annotations
import typing as t
from collections import abc
from ipaddress import IPv4Address, IPv6Address, ip_address, ip_network, IPv4Network, IPv6Network
from werkzeug.http import parse_list_header
from . import config
from ._helpers import log_error_only_once, logger
if t.TYPE_CHECKING:
from _typeshed.wsgi import StartResponse
from _typeshed.wsgi import WSGIApplication
from _typeshed.wsgi import WSGIEnvironment
[docs]
class ProxyFix:
"""A middleware like the ProxyFix_ class, where the ``x_for`` argument is
replaced by a method that determines the number of trusted proxies via the
``botdetection.trusted_proxies`` setting.
.. sidebar:: :py:obj:`flask.Request.remote_addr`
SearXNG uses Werkzeug's ProxyFix_ (with it default ``x_for=1``).
The remote IP (:py:obj:`flask.Request.remote_addr`) of the request is taken
from (first match):
- X-Forwarded-For_: If the header is set, the first untrusted IP that comes
before the IPs that are still part of the ``botdetection.trusted_proxies``
is used.
- `X-Real-IP <https://github.com/searxng/searxng/issues/1237#issuecomment-1147564516>`__:
If X-Forwarded-For_ is not set, `X-Real-IP` is used
(``botdetection.trusted_proxies`` is ignored).
If none of the header is set, the REMOTE_ADDR_ from the WSGI layer is used.
If (for whatever reasons) none IP can be determined, an error message is
displayed and ``100::`` is used instead (:rfc:`6666`).
.. _ProxyFix:
https://werkzeug.palletsprojects.com/middleware/proxy_fix/
.. _X-Forwarded-For:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
.. _REMOTE_ADDR:
https://wsgi.readthedocs.io/en/latest/proposals-2.0.html#making-some-keys-required
"""
def __init__(self, wsgi_app: WSGIApplication) -> None:
self.wsgi_app = wsgi_app
def trusted_proxies(self) -> list[IPv4Network | IPv6Network]:
cfg = config.get_global_cfg()
proxy_list: list[str] = cfg.get("botdetection.trusted_proxies", default=[])
return [ip_network(net, strict=False) for net in proxy_list]
def trusted_remote_addr(
self,
x_forwarded_for: list[IPv4Address | IPv6Address],
trusted_proxies: list[IPv4Network | IPv6Network],
) -> str:
# always rtl
for addr in reversed(x_forwarded_for):
trust: bool = False
for net in trusted_proxies:
if addr.version == net.version and addr in net:
logger.debug("trust proxy %s (member of %s)", addr, net)
trust = True
break
# client address
if not trust:
return addr.compressed
# fallback to first address
return x_forwarded_for[0].compressed
def __call__(self, environ: WSGIEnvironment, start_response: StartResponse) -> abc.Iterable[bytes]:
# pylint: disable=too-many-statements
trusted_proxies = self.trusted_proxies()
# We do not rely on the REMOTE_ADDR from the WSGI environment / the
# variable is first removed from the WSGI environment and explicitly set
# in this function!
orig_remote_addr: str | None = environ.pop("REMOTE_ADDR")
# Validate the IPs involved in this game and delete all invalid ones
# from the WSGI environment.
if orig_remote_addr:
try:
addr = ip_address(orig_remote_addr)
if addr.version == 6 and addr.ipv4_mapped:
addr = addr.ipv4_mapped
orig_remote_addr = addr.compressed
except ValueError as exc:
logger.error("REMOTE_ADDR: %s / discard REMOTE_ADDR from WSGI environment", exc)
orig_remote_addr = None
x_real_ip: str | None = environ.get("HTTP_X_REAL_IP")
if x_real_ip:
try:
addr = ip_address(x_real_ip)
if addr.version == 6 and addr.ipv4_mapped:
addr = addr.ipv4_mapped
x_real_ip = addr.compressed
except ValueError as exc:
logger.error("X-Real-IP: %s / discard HTTP_X_REAL_IP from WSGI environment", exc)
environ.pop("HTTP_X_REAL_IP")
x_real_ip = None
x_forwarded_for: list[IPv4Address | IPv6Address] = []
if environ.get("HTTP_X_FORWARDED_FOR"):
for x_for_ip in parse_list_header(str(environ.get("HTTP_X_FORWARDED_FOR"))):
try:
addr = ip_address(x_for_ip)
except ValueError as exc:
logger.error("X-Forwarded-For: %s / discard HTTP_X_FORWARDED_FOR from WSGI environment", exc)
environ.pop("HTTP_X_FORWARDED_FOR")
x_forwarded_for = []
break
if addr.version == 6 and addr.ipv4_mapped:
addr = addr.ipv4_mapped
x_forwarded_for.append(addr)
# log questionable WSGI environments
if not x_forwarded_for and not x_real_ip:
log_error_only_once("X-Forwarded-For nor X-Real-IP header is set!")
if x_forwarded_for and not trusted_proxies:
log_error_only_once("missing botdetection.trusted_proxies config")
# without trusted_proxies, this variable is useless for determining
# the real IP
x_forwarded_for = []
# securing the WSGI environment variables that are adjusted
environ.update({"botdetection.trusted_proxies.orig": {"REMOTE_ADDR": orig_remote_addr}})
# determine *the real IP*
if x_forwarded_for:
environ["REMOTE_ADDR"] = self.trusted_remote_addr(x_forwarded_for, trusted_proxies)
elif x_real_ip:
environ["REMOTE_ADDR"] = x_real_ip
elif orig_remote_addr:
environ["REMOTE_ADDR"] = orig_remote_addr
else:
logger.error("No remote IP could be determined, use black-hole address: 100::")
environ["REMOTE_ADDR"] = "100::"
try:
_ = ip_address(environ["REMOTE_ADDR"])
except ValueError as exc:
logger.error("REMOTE_ADDR: %s, use black-hole address: 100::", exc)
environ["REMOTE_ADDR"] = "100::"
logger.debug("final REMOTE_ADDR is: %s", environ["REMOTE_ADDR"])
return self.wsgi_app(environ, start_response)