diff --git a/searx/botdetection/_helpers.py b/searx/botdetection/_helpers.py index 7b57ae694..d237d62dd 100644 --- a/searx/botdetection/_helpers.py +++ b/searx/botdetection/_helpers.py @@ -10,6 +10,7 @@ from ipaddress import ( ip_network, ip_address, ) + import flask import werkzeug @@ -17,6 +18,7 @@ from searx import logger from searx.extended_types import SXNG_Request from . import config +from .ip_lists import trusted_proxies # pylint: disable=cyclic-import logger = logger.getChild('botdetection') @@ -51,9 +53,11 @@ def too_many_requests(network: IPv4Network | IPv6Network, log_msg: str) -> werkz return flask.make_response(('Too Many Requests', 429)) -def get_network(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> IPv4Network | IPv6Network: +def get_network(real_ip: IPv4Address | IPv6Address) -> IPv4Network | IPv6Network: """Returns the (client) network of whether the real_ip is part of.""" + cfg = config.get_cfg() + if real_ip.version == 6: prefix = cfg['real_ip.ipv6_prefix'] else: @@ -72,66 +76,67 @@ def _log_error_only_once(err_msg): _logged_errors.append(err_msg) -def get_real_ip(request: SXNG_Request) -> str: - """Returns real IP of the request. Since not all proxies set all the HTTP - headers and incoming headers can be faked it may happen that the IP cannot - be determined correctly. - - .. sidebar:: :py:obj:`flask.Request.remote_addr` - - SearXNG uses Werkzeug's ProxyFix_ (with it default ``x_for=1``). +def get_real_ip(request: SXNG_Request) -> IPv4Address | IPv6Address: + """Returns real IP of the request. This function tries to get the remote IP in the order listed below, - additional some tests are done and if inconsistencies or errors are + additional tests are done and if inconsistencies or errors are detected, they are logged. The remote IP of the request is taken from (first match): - - X-Forwarded-For_ header - - `X-real-IP header `__ + - X-Forwarded-For_ header (if from a trusted proxy) + - X-Real-IP_ header (if from a trusted proxy) - :py:obj:`flask.Request.remote_addr` - .. _ProxyFix: - https://werkzeug.palletsprojects.com/middleware/proxy_fix/ - .. _X-Forwarded-For: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For - + .. _X-Real-IP: + https://github.com/searxng/searxng/issues/1237#issuecomment-1147564516 """ - forwarded_for = request.headers.get("X-Forwarded-For") - real_ip = request.headers.get('X-Real-IP') - remote_addr = request.remote_addr - # logger.debug( - # "X-Forwarded-For: %s || X-Real-IP: %s || request.remote_addr: %s", forwarded_for, real_ip, remote_addr - # ) + cfg = config.get_cfg() + remote_addr = ip_address(request.remote_addr or "0.0.0.0") + request_ip = remote_addr - if not forwarded_for: - _log_error_only_once("X-Forwarded-For header is not set!") - else: - from . import cfg # pylint: disable=import-outside-toplevel, cyclic-import + if trusted_proxies(remote_addr, cfg): + forwarded_for = request.headers.get("X-Forwarded-For") + real_ip = request.headers.get("X-Real-IP") - forwarded_for = [x.strip() for x in forwarded_for.split(',')] - x_for: int = cfg['real_ip.x_for'] # type: ignore - forwarded_for = forwarded_for[-min(len(forwarded_for), x_for)] - - if not real_ip: - _log_error_only_once("X-Real-IP header is not set!") - - if forwarded_for and real_ip and forwarded_for != real_ip: - logger.warning("IP from X-Real-IP (%s) is not equal to IP from X-Forwarded-For (%s)", real_ip, forwarded_for) - - if forwarded_for and remote_addr and forwarded_for != remote_addr: - logger.warning( - "IP from WSGI environment (%s) is not equal to IP from X-Forwarded-For (%s)", remote_addr, forwarded_for + logger.debug( + "X-Forwarded-For: %s || X-Real-IP: %s || request.remote_addr: %s", + forwarded_for, + real_ip, + remote_addr.compressed, ) - if real_ip and remote_addr and real_ip != remote_addr: - logger.warning("IP from WSGI environment (%s) is not equal to IP from X-Real-IP (%s)", remote_addr, real_ip) + if not forwarded_for: + _log_error_only_once("X-Forwarded-For header is not set!") + else: + try: + forwarded_for = ip_address(forwarded_for.split(",")[0].strip()).compressed + except ValueError: + forwarded_for = None + + if not real_ip: + _log_error_only_once("X-Real-IP header is not set!") + else: + try: + real_ip = ip_address(real_ip).compressed + except ValueError: + real_ip = None + + if forwarded_for and real_ip and forwarded_for != real_ip: + logger.warning( + "IP from X-Real-IP (%s) is not equal to IP from X-Forwarded-For (%s)", + real_ip, + forwarded_for, + ) + + request_ip = ip_address(forwarded_for or real_ip or remote_addr) - request_ip = ip_address(forwarded_for or real_ip or remote_addr or '0.0.0.0') if request_ip.version == 6 and request_ip.ipv4_mapped: request_ip = request_ip.ipv4_mapped - # logger.debug("get_real_ip() -> %s", request_ip) - return str(request_ip) + logger.debug("get_real_ip() -> %s", request_ip.compressed) + return request_ip diff --git a/searx/botdetection/config.py b/searx/botdetection/config.py index 5b73afe1c..a1f646e4b 100644 --- a/searx/botdetection/config.py +++ b/searx/botdetection/config.py @@ -10,9 +10,10 @@ from __future__ import annotations from typing import Any import copy -import typing +import importlib import logging import pathlib +import typing from ..compat import tomllib @@ -20,6 +21,15 @@ __all__ = ['Config', 'UNSET', 'SchemaIssue'] log = logging.getLogger(__name__) +CFG: Config = None # type: ignore + +LIMITER_CFG_SCHEMA = pathlib.Path(importlib.import_module("searx").__file__).parent / "limiter.toml" +"""Base configuration (schema) of the botdetection.""" + +CFG_DEPRECATED = { + # "dummy.old.foo": "config 'dummy.old.foo' exists only for tests. Don't use it in your real project config." +} + class FALSE: """Class of ``False`` singleton""" @@ -182,6 +192,17 @@ def toml_load(file_name): raise +def get_cfg() -> Config: + global CFG # pylint: disable=global-statement + + if CFG is None: + from searx import settings_loader # pylint: disable=import-outside-toplevel + + cfg_file = (settings_loader.get_user_cfg_folder() or pathlib.Path("/etc/searxng")) / "limiter.toml" + CFG = Config.from_toml(LIMITER_CFG_SCHEMA, cfg_file, CFG_DEPRECATED) + return CFG + + # working with dictionaries @@ -261,7 +282,6 @@ def _validate( data_dict: typing.Dict, deprecated: typing.Dict[str, str], ) -> typing.Tuple[bool, typing.List]: - is_valid = True for key, data_value in data_dict.items(): diff --git a/searx/botdetection/ip_lists.py b/searx/botdetection/ip_lists.py index 2ad1c62d0..3eab5d267 100644 --- a/searx/botdetection/ip_lists.py +++ b/searx/botdetection/ip_lists.py @@ -4,21 +4,29 @@ Method ``ip_lists`` ------------------- -The ``ip_lists`` method implements IP :py:obj:`block- ` and -:py:obj:`pass-lists `. +The ``ip_lists`` method implements IP +:py:obj:`trusted_proxies `, :py:obj:`block-list ` +and :py:obj:`pass-list `. .. code:: toml [botdetection.ip_lists] - pass_ip = [ - '167.235.158.251', # IPv4 of check.searx.space - '192.168.0.0/16', # IPv4 private network - 'fe80::/10' # IPv6 linklocal + trusted_proxies = [ + '127.0.0.0/8', # IPv4 localhost network + '::1', # IPv6 localhost + '192.168.0.0/16', # IPv4 private network ] + + pass_ip = [ + '167.235.158.251', # IPv4 of check.searx.space + '192.168.0.0/16', # IPv4 private network + 'fe80::/10', # IPv6 linklocal + ] + block_ip = [ - '93.184.216.34', # IPv4 of example.org - '257.1.1.1', # invalid IP --> will be ignored, logged in ERROR class + '93.184.216.34', # IPv4 of example.org + '257.1.1.1', # invalid IP --> will be ignored, logged in ERROR class ] """ @@ -45,6 +53,18 @@ SEARXNG_ORG = [ """Passlist of IPs from the SearXNG organization, e.g. `check.searx.space`.""" +def trusted_proxies(remote_addr: IPv4Address | IPv6Address, cfg: config.Config) -> bool: + """Checks if the remote IP is in one of the members of the + ``botdetection.ip_lists.trusted_proxies`` list. + """ + + for net in cfg.get("botdetection.ip_lists.trusted_proxies", default=["127.0.0.0/8", "::1"]): + net = ip_network(net, strict=False) + if remote_addr.version == net.version and remote_addr in net: + return True + return False + + def pass_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bool, str]: """Checks if the IP on the subnet is in one of the members of the ``botdetection.ip_lists.pass_ip`` list. @@ -72,7 +92,6 @@ def block_ip(real_ip: IPv4Address | IPv6Address, cfg: config.Config) -> Tuple[bo def ip_is_subnet_of_member_in_list( real_ip: IPv4Address | IPv6Address, list_name: str, cfg: config.Config ) -> Tuple[bool, str]: - for net in cfg.get(list_name, default=[]): try: net = ip_network(net, strict=False) diff --git a/searx/botdetection/link_token.py b/searx/botdetection/link_token.py index 600796380..23a3d84e3 100644 --- a/searx/botdetection/link_token.py +++ b/searx/botdetection/link_token.py @@ -38,16 +38,15 @@ from __future__ import annotations from ipaddress import ( IPv4Network, IPv6Network, - ip_address, ) -import string import random +import string from searx import logger from searx import valkeydb -from searx.valkeylib import secret_hash from searx.extended_types import SXNG_Request +from searx.valkeylib import secret_hash from ._helpers import ( get_network, @@ -98,15 +97,15 @@ def ping(request: SXNG_Request, token: str): The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`. """ - from . import valkey_client, cfg # pylint: disable=import-outside-toplevel, cyclic-import + from . import valkey_client # pylint: disable=import-outside-toplevel if not valkey_client: return if not token_is_valid(token): return - real_ip = ip_address(get_real_ip(request)) - network = get_network(real_ip, cfg) + real_ip = get_real_ip(request) + network = get_network(real_ip) ping_key = get_ping_key(network, request) logger.debug("store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip, ping_key) diff --git a/searx/limiter.py b/searx/limiter.py index 99bc338d1..ce838274b 100644 --- a/searx/limiter.py +++ b/searx/limiter.py @@ -95,8 +95,6 @@ Implementation from __future__ import annotations import sys -from pathlib import Path -from ipaddress import ip_address import flask import werkzeug @@ -124,34 +122,15 @@ from searx.botdetection import ( # coherency, the logger is "limiter" logger = logger.getChild('limiter') -CFG: config.Config = None # type: ignore _INSTALLED = False -LIMITER_CFG_SCHEMA = Path(__file__).parent / "limiter.toml" -"""Base configuration (schema) of the botdetection.""" - -CFG_DEPRECATED = { - # "dummy.old.foo": "config 'dummy.old.foo' exists only for tests. Don't use it in your real project config." -} - - -def get_cfg() -> config.Config: - global CFG # pylint: disable=global-statement - - if CFG is None: - from . import settings_loader # pylint: disable=import-outside-toplevel - - cfg_file = (settings_loader.get_user_cfg_folder() or Path("/etc/searxng")) / "limiter.toml" - CFG = config.Config.from_toml(LIMITER_CFG_SCHEMA, cfg_file, CFG_DEPRECATED) - return CFG - def filter_request(request: SXNG_Request) -> werkzeug.Response | None: # pylint: disable=too-many-return-statements - cfg = get_cfg() - real_ip = ip_address(get_real_ip(request)) - network = get_network(real_ip, cfg) + cfg = config.get_cfg() + real_ip = get_real_ip(request) + network = get_network(real_ip) if request.path == '/healthz': return None @@ -228,7 +207,7 @@ def initialize(app: flask.Flask, settings): # even if the limiter is not activated, the botdetection must be activated # (e.g. the self_info plugin uses the botdetection to get client IP) - cfg = get_cfg() + cfg = config.get_cfg() valkey_client = valkeydb.client() botdetection.init(cfg, valkey_client) diff --git a/searx/limiter.toml b/searx/limiter.toml index b64a7bf28..0fa069917 100644 --- a/searx/limiter.toml +++ b/searx/limiter.toml @@ -1,9 +1,5 @@ [real_ip] -# Number of values to trust for X-Forwarded-For. - -x_for = 1 - # The prefix defines the number of leading bits in an address that are compared # to determine whether or not an address is part of a (client) network. @@ -21,6 +17,19 @@ link_token = false [botdetection.ip_lists] +# If the request IP is in trusted_proxies list, the client IP address is +# extracted from the X-Forwarded-For and X-Real-IP headers. This should be +# used if SearXNG is behind a reverse proxy or load balancer. + +trusted_proxies = [ + '127.0.0.0/8', + '::1', + # '192.168.0.0/16', + # '172.16.0.0/12', + # '10.0.0.0/8', + # 'fd00::/8', +] + # In the limiter, the ip_lists method has priority over all other methods -> if # an IP is in the pass_ip list, it has unrestricted access and it is also not # checked if e.g. the "user agent" suggests a bot (e.g. curl). @@ -37,4 +46,4 @@ pass_ip = [ # Activate passlist of (hardcoded) IPs from the SearXNG organization, # e.g. `check.searx.space`. -pass_searxng_org = true \ No newline at end of file +pass_searxng_org = true diff --git a/searx/plugins/self_info.py b/searx/plugins/self_info.py index ef035e683..9b2f5fd7e 100644 --- a/searx/plugins/self_info.py +++ b/searx/plugins/self_info.py @@ -49,7 +49,7 @@ class SXNGPlugin(Plugin): return results if self.ip_regex.search(search.search_query.query): - results.add(results.types.Answer(answer=gettext("Your IP is: ") + get_real_ip(request))) + results.add(results.types.Answer(answer=gettext("Your IP is: ") + get_real_ip(request).compressed)) if self.ua_regex.match(search.search_query.query): results.add(results.types.Answer(answer=gettext("Your user-agent is: ") + str(request.user_agent))) diff --git a/searx/plugins/tor_check.py b/searx/plugins/tor_check.py index 3338ff2ed..f6d8c0dd7 100644 --- a/searx/plugins/tor_check.py +++ b/searx/plugins/tor_check.py @@ -66,7 +66,7 @@ class SXNGPlugin(Plugin): results.add(results.types.Answer(answer=f"{msg} {url_exit_list}")) return results - real_ip = get_real_ip(request) + real_ip = get_real_ip(request).compressed if real_ip in node_list: msg = gettext("You are using Tor and it looks like you have the external IP address") diff --git a/searx/webapp.py b/searx/webapp.py index 15f79f151..b6eca06e8 100755 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -59,7 +59,7 @@ from searx import ( from searx import infopage from searx import limiter -from searx.botdetection import link_token +from searx.botdetection import link_token, config as botdetection_config from searx.data import ENGINE_DESCRIPTIONS from searx.result_types import Answer @@ -1266,7 +1266,7 @@ def config(): for _ in searx.plugins.STORAGE: _plugins.append({'name': _.id, 'enabled': _.active}) - _limiter_cfg = limiter.get_cfg() + _limiter_cfg = botdetection_config.get_cfg() return jsonify( { diff --git a/tests/unit/test_plugin_self_info.py b/tests/unit/test_plugin_self_info.py index 4a2e6c416..413f99bea 100644 --- a/tests/unit/test_plugin_self_info.py +++ b/tests/unit/test_plugin_self_info.py @@ -8,7 +8,7 @@ from flask_babel import gettext import searx.plugins import searx.preferences import searx.limiter -import searx.botdetection +import searx.botdetection.config from searx.extended_types import sxng_request from searx.result_types import Answer @@ -29,19 +29,34 @@ class PluginIPSelfInfo(SearxTestCase): self.pref = searx.preferences.Preferences(["simple"], ["general"], engines, self.storage) self.pref.parse_dict({"locale": "en"}) - cfg = searx.limiter.get_cfg() + cfg = searx.botdetection.config.get_cfg() searx.botdetection.init(cfg, None) def test_plugin_store_init(self): self.assertEqual(1, len(self.storage)) - def test_pageno_1_2(self): - + def test_v4_pageno_1_2(self): with self.app.test_request_context(): sxng_request.preferences = self.pref sxng_request.remote_addr = "127.0.0.1" - sxng_request.headers = {"X-Forwarded-For": "1.2.3.4, 127.0.0.1", "X-Real-IP": "127.0.0.1"} # type: ignore - answer = Answer(answer=gettext("Your IP is: ") + "127.0.0.1") + sxng_request.headers = {"X-Forwarded-For": "1.2.3.4, 127.0.0.1", "X-Real-IP": ""} # type: ignore + answer = Answer(answer=gettext("Your IP is: ") + "1.2.3.4") + + search = do_post_search("ip", self.storage, pageno=1) + self.assertIn(answer, search.result_container.answers) + + search = do_post_search("ip", self.storage, pageno=2) + self.assertEqual(list(search.result_container.answers), []) + + def test_v6_pageno_1_2(self): + with self.app.test_request_context(): + sxng_request.preferences = self.pref + sxng_request.remote_addr = "::1" + sxng_request.headers = { # type: ignore + "X-Forwarded-For": "fd0f:a306:f289:0000:0000:0000:ffff:baba, ::1, 127.0.0.1", + "X-Real-IP": "fd0f:a306:f289:0000:0000:0000:ffff:baba", + } + answer = Answer(answer=gettext("Your IP is: ") + "fd0f:a306:f289::ffff:baba") search = do_post_search("ip", self.storage, pageno=1) self.assertIn(answer, search.result_container.answers) @@ -56,7 +71,6 @@ class PluginIPSelfInfo(SearxTestCase): ] ) def test_user_agent_in_answer(self, query: str): - query = "user-agent" with self.app.test_request_context():