[httpx] replace searx.poolrequests by searx.network

settings.yml:

* outgoing.networks:
   * can contains network definition
   * propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections,
     keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries
   * retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time)
   * local_addresses can be "192.168.0.1/24" (it supports IPv6)
   * support_ipv4 & support_ipv6: both True by default
     see https://github.com/searx/searx/pull/1034
* each engine can define a "network" section:
   * either a full network description
   * either reference an existing network

* all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
This commit is contained in:
Alexandre Flament 2021-04-05 10:43:33 +02:00
parent eaa694fb7d
commit d14994dc73
31 changed files with 1036 additions and 677 deletions

189
searx/network/__init__.py Normal file
View file

@ -0,0 +1,189 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import asyncio
import threading
import concurrent.futures
from time import time
import httpx
import h2.exceptions
from .network import get_network, initialize
from .client import LOOP
from .raise_for_httperror import raise_for_httperror
# queue.SimpleQueue: Support Python 3.6
try:
from queue import SimpleQueue
except ImportError:
from queue import Empty
from collections import deque
class SimpleQueue:
"""Minimal backport of queue.SimpleQueue"""
def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)
def put(self, item):
self._queue.append(item)
self._count.release()
def get(self):
if not self._count.acquire(True):
raise Empty
return self._queue.popleft()
THREADLOCAL = threading.local()
def reset_time_for_thread():
THREADLOCAL.total_time = 0
def get_time_for_thread():
return THREADLOCAL.total_time
def set_timeout_for_thread(timeout, start_time=None):
THREADLOCAL.timeout = timeout
THREADLOCAL.start_time = start_time
def set_context_network_name(network_name):
THREADLOCAL.network = get_network(network_name)
def get_context_network():
try:
return THREADLOCAL.network
except AttributeError:
return get_network()
def request(method, url, **kwargs):
"""same as requests/requests/api.py request(...)"""
time_before_request = time()
# timeout (httpx)
if 'timeout' in kwargs:
timeout = kwargs['timeout']
else:
timeout = getattr(THREADLOCAL, 'timeout', None)
if timeout is not None:
kwargs['timeout'] = timeout
# 2 minutes timeout for the requests without timeout
timeout = timeout or 120
# ajdust actual timeout
timeout += 0.2 # overhead
start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
if start_time:
timeout -= time() - start_time
# raise_for_error
check_for_httperror = True
if 'raise_for_httperror' in kwargs:
check_for_httperror = kwargs['raise_for_httperror']
del kwargs['raise_for_httperror']
# requests compatibility
if isinstance(url, bytes):
url = url.decode()
# network
network = get_context_network()
# do request
future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), LOOP)
try:
response = future.result(timeout)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
# requests compatibility
# see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
response.ok = not response.is_error
# update total_time.
# See get_time_for_thread() and reset_time_for_thread()
if hasattr(THREADLOCAL, 'total_time'):
time_after_request = time()
THREADLOCAL.total_time += time_after_request - time_before_request
# raise an exception
if check_for_httperror:
raise_for_httperror(response)
return response
def get(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs):
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs):
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs):
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs):
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs):
return request('delete', url, **kwargs)
async def stream_chunk_to_queue(network, q, method, url, **kwargs):
try:
async with network.stream(method, url, **kwargs) as response:
q.put(response)
async for chunk in response.aiter_bytes(65536):
if len(chunk) > 0:
q.put(chunk)
except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
q.put(e)
finally:
q.put(None)
def stream(method, url, **kwargs):
"""Replace httpx.stream.
Usage:
stream = poolrequests.stream(...)
response = next(stream)
for chunk in stream:
...
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
"""
q = SimpleQueue()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(get_network(), q, method, url, **kwargs),
LOOP)
chunk_or_exception = q.get()
while chunk_or_exception is not None:
if isinstance(chunk_or_exception, Exception):
raise chunk_or_exception
yield chunk_or_exception
chunk_or_exception = q.get()
return future.result()

214
searx/network/client.py Normal file
View file

@ -0,0 +1,214 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import asyncio
import logging
import threading
import httpcore
import httpx
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url
import python_socks._errors
from searx import logger
# Optional uvloop (support Python 3.6)
try:
import uvloop
except ImportError:
pass
else:
uvloop.install()
logger = logger.getChild('searx.http.client')
LOOP = None
TRANSPORT_KWARGS = {
'backend': 'asyncio',
'trust_env': False,
}
async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
origin = httpcore._utils.url_to_origin(url)
logger.debug('Drop connections for %r', origin)
connections_to_close = connection_pool._connections_for_origin(origin)
for connection in connections_to_close:
await connection_pool._remove_from_pool(connection)
try:
await connection.aclose()
except httpcore.NetworkError as e:
logger.warning('Error closing an existing connection', exc_info=e)
class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
"""Block HTTP request"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
class AsyncProxyTransportFixed(AsyncProxyTransport):
"""Fix httpx_socks.AsyncProxyTransport
Map python_socks exceptions to httpcore.ProxyError
Map socket.gaierror to httpcore.ConnectError
Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
* self._keepalive_sweep()
* self._response_closed(self, connection)
Note: AsyncProxyTransport inherit from AsyncConnectionPool
Note: the API is going to change on httpx 0.18.0
see https://github.com/encode/httpx/pull/1522
"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except (python_socks._errors.ProxyConnectionError,
python_socks._errors.ProxyTimeoutError,
python_socks._errors.ProxyError) as e:
raise httpcore.ProxyError(e)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.NetworkError(e)
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.NetworkError, httpcore.ProtocolError) as e:
# httpcore.WriteError on HTTP/2 connection leaves a new opened stream
# then each new request creates a new stream and raise the same WriteError
await close_connections_for_url(self, url)
raise e
class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
"""Fix httpx.AsyncHTTPTransport"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.ConnectError(e)
except httpcore.CloseError as e:
# httpcore.CloseError: [Errno 104] Connection reset by peer
# raised by _keepalive_sweep()
# from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.CloseError: retry', exc_info=e)
# retry
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.ProtocolError, httpcore.NetworkError) as e:
await close_connections_for_url(self._pool, url)
raise e
def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
global LOOP, TRANSPORT_KWARGS
# support socks5h (requests compatibility):
# https://requests.readthedocs.io/en/master/user/advanced/#socks
# socks5:// hostname is resolved on client side
# socks5h:// hostname is resolved on proxy side
rdns = False
socks5h = 'socks5h://'
if proxy_url.startswith(socks5h):
proxy_url = 'socks5://' + proxy_url[len(socks5h):]
rdns = True
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
username=proxy_username, password=proxy_password,
rdns=rdns,
loop=LOOP,
verify=verify,
http2=http2,
local_address=local_address,
max_connections=limit.max_connections,
max_keepalive_connections=limit.max_keepalive_connections,
keepalive_expiry=limit.keepalive_expiry,
retries=retries,
**TRANSPORT_KWARGS)
def get_transport(verify, http2, local_address, proxy_url, limit, retries):
return AsyncHTTPTransportFixed(verify=verify,
http2=http2,
local_address=local_address,
proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
limits=limit,
retries=retries,
**TRANSPORT_KWARGS)
def iter_proxies(proxies):
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(proxies, str):
yield 'all://', proxies
elif isinstance(proxies, dict):
for pattern, proxy_url in proxies.items():
yield pattern, proxy_url
def new_client(enable_http, verify, enable_http2,
max_connections, max_keepalive_connections, keepalive_expiry,
proxies, local_address, retries, max_redirects):
limit = httpx.Limits(max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry)
# See https://www.python-httpx.org/advanced/#routing
mounts = {}
for pattern, proxy_url in iter_proxies(proxies):
if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
continue
if proxy_url.startswith('socks4://') \
or proxy_url.startswith('socks5://') \
or proxy_url.startswith('socks5h://'):
mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
retries)
else:
mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
if not enable_http:
mounts['http://'] = AsyncHTTPTransportNoHttp()
transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
def init():
# log
for logger_name in ('hpack.hpack', 'hpack.table'):
logging.getLogger(logger_name).setLevel(logging.WARNING)
# loop
def loop_thread():
global LOOP
LOOP = asyncio.new_event_loop()
LOOP.run_forever()
th = threading.Thread(
target=loop_thread,
name='asyncio_loop',
daemon=True,
)
th.start()
init()

302
searx/network/network.py Normal file
View file

@ -0,0 +1,302 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import atexit
import asyncio
import ipaddress
from itertools import cycle
import httpx
from .client import new_client, LOOP
DEFAULT_NAME = '__DEFAULT__'
NETWORKS = {}
# requests compatibility when reading proxy settings from settings.yml
PROXY_PATTERN_MAPPING = {
'http': 'http://',
'https': 'https://',
'socks4': 'socks4://',
'socks5': 'socks5://',
'socks5h': 'socks5h://',
'http:': 'http://',
'https:': 'https://',
'socks4:': 'socks4://',
'socks5:': 'socks5://',
'socks5h:': 'socks5h://',
}
ADDRESS_MAPPING = {
'ipv4': '0.0.0.0',
'ipv6': '::'
}
class Network:
__slots__ = ('enable_http', 'verify', 'enable_http2',
'max_connections', 'max_keepalive_connections', 'keepalive_expiry',
'local_addresses', 'proxies', 'max_redirects', 'retries', 'retry_on_http_error',
'_local_addresses_cycle', '_proxies_cycle', '_clients')
def __init__(self,
enable_http=True,
verify=True,
enable_http2=False,
max_connections=None,
max_keepalive_connections=None,
keepalive_expiry=None,
proxies=None,
local_addresses=None,
retries=0,
retry_on_http_error=None,
max_redirects=30):
self.enable_http = enable_http
self.verify = verify
self.enable_http2 = enable_http2
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
self.keepalive_expiry = keepalive_expiry
self.proxies = proxies
self.local_addresses = local_addresses
self.retries = retries
self.retry_on_http_error = retry_on_http_error
self.max_redirects = max_redirects
self._local_addresses_cycle = self.get_ipaddress_cycle()
self._proxies_cycle = self.get_proxy_cycles()
self._clients = {}
self.check_parameters()
def check_parameters(self):
for address in self.iter_ipaddresses():
if '/' in address:
ipaddress.ip_network(address, False)
else:
ipaddress.ip_address(address)
if self.proxies is not None and not isinstance(self.proxies, (str, dict)):
raise ValueError('proxies type has to be str, dict or None')
def iter_ipaddresses(self):
local_addresses = self.local_addresses
if not local_addresses:
return
elif isinstance(local_addresses, str):
local_addresses = [local_addresses]
for address in local_addresses:
yield address
def get_ipaddress_cycle(self):
while True:
count = 0
for address in self.iter_ipaddresses():
if '/' in address:
for a in ipaddress.ip_network(address, False).hosts():
yield str(a)
count += 1
else:
a = ipaddress.ip_address(address)
yield str(a)
count += 1
if count == 0:
yield None
def iter_proxies(self):
if not self.proxies:
return
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(self.proxies, str):
yield 'all://', [self.proxies]
else:
for pattern, proxy_url in self.proxies.items():
pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern)
if isinstance(proxy_url, str):
proxy_url = [proxy_url]
yield pattern, proxy_url
def get_proxy_cycles(self):
proxy_settings = {}
for pattern, proxy_urls in self.iter_proxies():
proxy_settings[pattern] = cycle(proxy_urls)
while True:
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
def get_client(self, verify=None, max_redirects=None):
verify = self.verify if verify is None else verify
max_redirects = self.max_redirects if max_redirects is None else max_redirects
local_address = next(self._local_addresses_cycle)
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
key = (verify, max_redirects, local_address, proxies)
if key not in self._clients or self._clients[key].is_closed:
self._clients[key] = new_client(self.enable_http,
verify,
self.enable_http2,
self.max_connections,
self.max_keepalive_connections,
self.keepalive_expiry,
dict(proxies),
local_address,
0,
max_redirects)
return self._clients[key]
async def aclose(self):
async def close_client(client):
try:
await client.aclose()
except httpx.HTTPError:
pass
await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False)
@staticmethod
def get_kwargs_clients(kwargs):
kwargs_clients = {}
if 'verify' in kwargs:
kwargs_clients['verify'] = kwargs.pop('verify')
if 'max_redirects' in kwargs:
kwargs_clients['max_redirects'] = kwargs.pop('max_redirects')
return kwargs_clients
def is_valid_respones(self, response):
if (self.retry_on_http_error is True and 400 <= response.status_code <= 599) \
or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error) \
or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error):
return False
return True
async def request(self, method, url, **kwargs):
retries = self.retries
while retries >= 0: # pragma: no cover
kwargs_clients = Network.get_kwargs_clients(kwargs)
client = self.get_client(**kwargs_clients)
try:
response = await client.request(method, url, **kwargs)
if self.is_valid_respones(response) or retries <= 0:
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
def stream(self, method, url, **kwargs):
retries = self.retries
while retries >= 0: # pragma: no cover
kwargs_clients = Network.get_kwargs_clients(kwargs)
client = self.get_client(**kwargs_clients)
try:
response = client.stream(method, url, **kwargs)
if self.is_valid_respones(response) or retries <= 0:
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
@classmethod
async def aclose_all(cls):
await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False)
def get_network(name=None):
global NETWORKS
return NETWORKS[name or DEFAULT_NAME]
def initialize(settings_engines=None, settings_outgoing=None):
from searx.engines import engines
from searx import settings
global NETWORKS
settings_engines = settings_engines or settings.get('engines')
settings_outgoing = settings_outgoing or settings.get('outgoing')
# default parameters for AsyncHTTPTransport
# see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # noqa
default_params = {
'enable_http': False,
'verify': True,
'enable_http2': settings_outgoing.get('enable_http2', True),
# Magic number kept from previous code
'max_connections': settings_outgoing.get('pool_connections', 100),
# Picked from constructor
'max_keepalive_connections': settings_outgoing.get('pool_maxsize', 10),
#
'keepalive_expiry': settings_outgoing.get('keepalive_expiry', 5.0),
'local_addresses': settings_outgoing.get('source_ips'),
'proxies': settings_outgoing.get('proxies'),
# default maximum redirect
# from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
'max_redirects': settings_outgoing.get('max_redirects', 30),
#
'retries': settings_outgoing.get('retries', 0),
'retry_on_http_error': None,
}
def new_network(params):
nonlocal default_params
result = {}
result.update(default_params)
result.update(params)
return Network(**result)
def iter_networks():
nonlocal settings_engines
for engine_spec in settings_engines:
engine_name = engine_spec['name']
engine = engines.get(engine_name)
if engine is None:
continue
network = getattr(engine, 'network', None)
yield engine_name, engine, network
if NETWORKS:
done()
NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = new_network({})
NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'})
NETWORKS['ipv6'] = new_network({'local_addresses': '::'})
# define networks from outgoing.networks
for network_name, network in settings_outgoing.get('networks', {}).items():
NETWORKS[network_name] = new_network(network)
# define networks from engines.[i].network (except references)
for engine_name, engine, network in iter_networks():
if network is None:
network = {}
for attribute_name, attribute_value in default_params.items():
if hasattr(engine, attribute_name):
network[attribute_name] = getattr(engine, attribute_name)
else:
network[attribute_name] = attribute_value
NETWORKS[engine_name] = new_network(network)
elif isinstance(network, dict):
NETWORKS[engine_name] = new_network(network)
# define networks from engines.[i].network (references)
for engine_name, engine, network in iter_networks():
if isinstance(network, str):
NETWORKS[engine_name] = NETWORKS[network]
@atexit.register
def done():
"""Close all HTTP client
Avoid a warning at exit
see https://github.com/encode/httpx/blob/1a6e254f72d9fd5694a1c10a28927e193ab4f76b/httpx/_client.py#L1785
Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__
So Network.aclose is called here using atexit.register
"""
try:
if LOOP:
future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), LOOP)
# wait 3 seconds to close the HTTP clients
future.result(3)
finally:
NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = Network()

View file

@ -0,0 +1,66 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Raise exception for an HTTP response is an error.
"""
from searx.exceptions import (SearxEngineCaptchaException, SearxEngineTooManyRequestsException,
SearxEngineAccessDeniedException)
def is_cloudflare_challenge(resp):
if resp.status_code in [429, 503]:
if ('__cf_chl_jschl_tk__=' in resp.text)\
or ('/cdn-cgi/challenge-platform/' in resp.text
and 'orchestrate/jsch/v1' in resp.text
and 'window._cf_chl_enter(' in resp.text):
return True
if resp.status_code == 403 and '__cf_chl_captcha_tk__=' in resp.text:
return True
return False
def is_cloudflare_firewall(resp):
return resp.status_code == 403 and '<span class="cf-error-code">1020</span>' in resp.text
def raise_for_cloudflare_captcha(resp):
if resp.headers.get('Server', '').startswith('cloudflare'):
if is_cloudflare_challenge(resp):
# https://support.cloudflare.com/hc/en-us/articles/200170136-Understanding-Cloudflare-Challenge-Passage-Captcha-
# suspend for 2 weeks
raise SearxEngineCaptchaException(message='Cloudflare CAPTCHA', suspended_time=3600 * 24 * 15)
if is_cloudflare_firewall(resp):
raise SearxEngineAccessDeniedException(message='Cloudflare Firewall', suspended_time=3600 * 24)
def raise_for_recaptcha(resp):
if resp.status_code == 503 \
and '"https://www.google.com/recaptcha/' in resp.text:
raise SearxEngineCaptchaException(message='ReCAPTCHA', suspended_time=3600 * 24 * 7)
def raise_for_captcha(resp):
raise_for_cloudflare_captcha(resp)
raise_for_recaptcha(resp)
def raise_for_httperror(resp):
"""Raise exception for an HTTP response is an error.
Args:
resp (requests.Response): Response to check
Raises:
requests.HTTPError: raise by resp.raise_for_status()
searx.exceptions.SearxEngineAccessDeniedException: raise when the HTTP status code is 402 or 403.
searx.exceptions.SearxEngineTooManyRequestsException: raise when the HTTP status code is 429.
searx.exceptions.SearxEngineCaptchaException: raise when if CATPCHA challenge is detected.
"""
if resp.status_code and resp.status_code >= 400:
raise_for_captcha(resp)
if resp.status_code in (402, 403):
raise SearxEngineAccessDeniedException(message='HTTP error ' + str(resp.status_code),
suspended_time=3600 * 24)
if resp.status_code == 429:
raise SearxEngineTooManyRequestsException()
resp.raise_for_status()