mirror of
https://github.com/searxng/searxng.git
synced 2025-07-16 17:59:30 +02:00
Replaces `x_for` functionality with `trusted_proxies`. This allows defining which IP / ranges to trust extracting the client IP address from X-Forwarded-For and X-Real-IP headers. We don't know if the proxy chain will give us the proper client address, so we rely on reading the headers of the proxy before SearXNG (if there is one, in that case it must be added to trusted_proxies) hoping it has done the proper checks. In case a proxy in the chain does not check the client address correctly, integrity is compromised and this should be fixed by whoever manages the proxy, not us. I had to move the get_cnf func to another file (config.py) to prevent cyclic imports since we need to read the list inside _helpers.py Closes https://github.com/searxng/searxng/issues/4907 Closes https://github.com/searxng/searxng/issues/3632 Closes https://github.com/searxng/searxng/issues/3191 Closes https://github.com/searxng/searxng/issues/1237 Related https://github.com/searxng/searxng-docker/issues/386 Related https://github.com/inetol-infrastructure/searxng-container/issues/81
153 lines
4.5 KiB
Python
153 lines
4.5 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""
|
|
Method ``link_token``
|
|
---------------------
|
|
|
|
The ``link_token`` method evaluates a request as :py:obj:`suspicious
|
|
<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
|
|
client. By adding a random component (the token) in the URL, a bot can not send
|
|
a ping by request a static URL.
|
|
|
|
.. note::
|
|
|
|
This method requires a valkey DB and needs a HTTP X-Forwarded-For_ header.
|
|
|
|
To get in use of this method a flask URL route needs to be added:
|
|
|
|
.. code:: python
|
|
|
|
@app.route('/client<token>.css', methods=['GET', 'POST'])
|
|
def client_token(token=None):
|
|
link_token.ping(request, token)
|
|
return Response('', mimetype='text/css')
|
|
|
|
And in the HTML template from flask a stylesheet link is needed (the value of
|
|
``link_token`` comes from :py:obj:`get_token`):
|
|
|
|
.. code:: html
|
|
|
|
<link rel="stylesheet"
|
|
href="{{ url_for('client_token', token=link_token) }}"
|
|
type="text/css" >
|
|
|
|
.. _X-Forwarded-For:
|
|
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
|
|
|
|
"""
|
|
from __future__ import annotations
|
|
from ipaddress import (
|
|
IPv4Network,
|
|
IPv6Network,
|
|
)
|
|
|
|
import random
|
|
import string
|
|
|
|
from searx import logger
|
|
from searx import valkeydb
|
|
from searx.extended_types import SXNG_Request
|
|
from searx.valkeylib import secret_hash
|
|
|
|
from ._helpers import (
|
|
get_network,
|
|
get_real_ip,
|
|
)
|
|
|
|
TOKEN_LIVE_TIME = 600
|
|
"""Lifetime (sec) of limiter's CSS token."""
|
|
|
|
PING_LIVE_TIME = 3600
|
|
"""Lifetime (sec) of the ping-key from a client (request)"""
|
|
|
|
PING_KEY = 'SearXNG_limiter.ping'
|
|
"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""
|
|
|
|
TOKEN_KEY = 'SearXNG_limiter.token'
|
|
"""Key for which the current token is stored in the DB"""
|
|
|
|
logger = logger.getChild('botdetection.link_token')
|
|
|
|
|
|
def is_suspicious(network: IPv4Network | IPv6Network, request: SXNG_Request, renew: bool = False):
|
|
"""Checks whether a valid ping is exists for this (client) network, if not
|
|
this request is rated as *suspicious*. If a valid ping exists and argument
|
|
``renew`` is ``True`` the expire time of this ping is reset to
|
|
:py:obj:`PING_LIVE_TIME`.
|
|
|
|
"""
|
|
valkey_client = valkeydb.client()
|
|
if not valkey_client:
|
|
return False
|
|
|
|
ping_key = get_ping_key(network, request)
|
|
if not valkey_client.get(ping_key):
|
|
logger.info("missing ping (IP: %s) / request: %s", network.compressed, ping_key)
|
|
return True
|
|
|
|
if renew:
|
|
valkey_client.set(ping_key, 1, ex=PING_LIVE_TIME)
|
|
|
|
logger.debug("found ping for (client) network %s -> %s", network.compressed, ping_key)
|
|
return False
|
|
|
|
|
|
def ping(request: SXNG_Request, token: str):
|
|
"""This function is called by a request to URL ``/client<token>.css``. If
|
|
``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
|
|
The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.
|
|
|
|
"""
|
|
from . import valkey_client # pylint: disable=import-outside-toplevel
|
|
|
|
if not valkey_client:
|
|
return
|
|
if not token_is_valid(token):
|
|
return
|
|
|
|
real_ip = get_real_ip(request)
|
|
network = get_network(real_ip)
|
|
|
|
ping_key = get_ping_key(network, request)
|
|
logger.debug("store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip, ping_key)
|
|
valkey_client.set(ping_key, 1, ex=PING_LIVE_TIME)
|
|
|
|
|
|
def get_ping_key(network: IPv4Network | IPv6Network, request: SXNG_Request) -> str:
|
|
"""Generates a hashed key that fits (more or less) to a *WEB-browser
|
|
session* in a network."""
|
|
return (
|
|
PING_KEY
|
|
+ "["
|
|
+ secret_hash(
|
|
network.compressed + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '')
|
|
)
|
|
+ "]"
|
|
)
|
|
|
|
|
|
def token_is_valid(token) -> bool:
|
|
valid = token == get_token()
|
|
logger.debug("token is valid --> %s", valid)
|
|
return valid
|
|
|
|
|
|
def get_token() -> str:
|
|
"""Returns current token. If there is no currently active token a new token
|
|
is generated randomly and stored in the valkey DB.
|
|
|
|
- :py:obj:`TOKEN_LIVE_TIME`
|
|
- :py:obj:`TOKEN_KEY`
|
|
|
|
"""
|
|
valkey_client = valkeydb.client()
|
|
if not valkey_client:
|
|
# This function is also called when limiter is inactive / no valkey DB
|
|
# (see render function in webapp.py)
|
|
return '12345678'
|
|
token = valkey_client.get(TOKEN_KEY)
|
|
if token:
|
|
token = token.decode('UTF-8')
|
|
else:
|
|
token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
|
|
valkey_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
|
|
return token
|