[format.python] initial formatting of the python code

This patch was generated by black [1]::

    make format.python

[1] https://github.com/psf/black

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
Markus Heiser 2021-12-27 09:26:22 +01:00
parent fcdc2c2cd2
commit 3d96a9839a
184 changed files with 2800 additions and 2836 deletions

View file

@ -146,11 +146,7 @@ STATS_SORT_PARAMETERS = {
}
# Flask app
app = Flask(
__name__,
static_folder=settings['ui']['static_path'],
template_folder=templates_path
)
app = Flask(__name__, static_folder=settings['ui']['static_path'], template_folder=templates_path)
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
@ -171,14 +167,10 @@ _category_names = (
gettext('news'),
gettext('map'),
gettext('onions'),
gettext('science')
gettext('science'),
)
_simple_style = (
gettext('auto'),
gettext('light'),
gettext('dark')
)
_simple_style = (gettext('auto'), gettext('light'), gettext('dark'))
#
timeout_text = gettext('timeout')
@ -214,11 +206,15 @@ exception_classname_to_text = {
# monkey patch for flask_babel.get_translations
_flask_babel_get_translations = flask_babel.get_translations
def _get_translations():
if has_request_context() and request.form.get('use-translation') == 'oc':
babel_ext = flask_babel.current_app.extensions['babel']
return Translations.load(next(babel_ext.translation_directories), 'oc')
return _flask_babel_get_translations()
flask_babel.get_translations = _get_translations
@ -286,13 +282,10 @@ def code_highlighter(codelines, language=None):
line_code_start = line
# new codeblock is detected
if last_line is not None and\
last_line + 1 != line:
if last_line is not None and last_line + 1 != line:
# highlight last codepart
formatter = HtmlFormatter(
linenos='inline', linenostart=line_code_start, cssclass="code-highlight"
)
formatter = HtmlFormatter(linenos='inline', linenostart=line_code_start, cssclass="code-highlight")
html_code = html_code + highlight(tmp_code, lexer, formatter)
# reset conditions for next codepart
@ -355,16 +348,9 @@ def proxify(url):
url_params = dict(mortyurl=url.encode())
if settings['result_proxy'].get('key'):
url_params['mortyhash'] = hmac.new(
settings['result_proxy']['key'],
url.encode(),
hashlib.sha256
).hexdigest()
url_params['mortyhash'] = hmac.new(settings['result_proxy']['key'], url.encode(), hashlib.sha256).hexdigest()
return '{0}?{1}'.format(
settings['result_proxy']['url'],
urlencode(url_params)
)
return '{0}?{1}'.format(settings['result_proxy']['url'], urlencode(url_params))
def image_proxify(url):
@ -377,10 +363,12 @@ def image_proxify(url):
if url.startswith('data:image/'):
# 50 is an arbitrary number to get only the beginning of the image.
partial_base64 = url[len('data:image/'):50].split(';')
if len(partial_base64) == 2 \
and partial_base64[0] in ['gif', 'png', 'jpeg', 'pjpeg', 'webp', 'tiff', 'bmp']\
and partial_base64[1].startswith('base64,'):
partial_base64 = url[len('data:image/') : 50].split(';')
if (
len(partial_base64) == 2
and partial_base64[0] in ['gif', 'png', 'jpeg', 'pjpeg', 'webp', 'tiff', 'bmp']
and partial_base64[1].startswith('base64,')
):
return url
return None
@ -389,8 +377,7 @@ def image_proxify(url):
h = new_hmac(settings['server']['secret_key'], url.encode())
return '{0}?{1}'.format(url_for('image_proxy'),
urlencode(dict(url=url.encode(), h=h)))
return '{0}?{1}'.format(url_for('image_proxy'), urlencode(dict(url=url.encode(), h=h)))
def get_translations():
@ -412,7 +399,8 @@ def _get_enable_categories(all_categories):
disabled_engines = request.preferences.engines.get_disabled()
enabled_categories = set(
# pylint: disable=consider-using-dict-items
category for engine_name in engines
category
for engine_name in engines
for category in engines[engine_name].categories
if (engine_name, category) not in disabled_engines
)
@ -423,10 +411,7 @@ def get_pretty_url(parsed_url):
path = parsed_url.path
path = path[:-1] if len(path) > 0 and path[-1] == '/' else path
path = path.replace("/", " ")
return [
parsed_url.scheme + "://" + parsed_url.netloc,
path
]
return [parsed_url.scheme + "://" + parsed_url.netloc, path]
def render(template_name, override_theme=None, **kwargs):
@ -448,7 +433,7 @@ def render(template_name, override_theme=None, **kwargs):
kwargs['categories'] = _get_enable_categories(kwargs['all_categories'])
# i18n
kwargs['language_codes'] = [ l for l in languages if l[0] in settings['search']['languages'] ]
kwargs['language_codes'] = [l for l in languages if l[0] in settings['search']['languages']]
kwargs['translations'] = json.dumps(get_translations(), separators=(',', ':'))
locale = request.preferences.get_value('locale')
@ -458,12 +443,11 @@ def render(template_name, override_theme=None, **kwargs):
kwargs['rtl'] = True
if 'current_language' not in kwargs:
kwargs['current_language'] = match_language(
request.preferences.get_value('language'), settings['search']['languages'] )
request.preferences.get_value('language'), settings['search']['languages']
)
# values from settings
kwargs['search_formats'] = [
x for x in settings['search']['formats'] if x != 'html'
]
kwargs['search_formats'] = [x for x in settings['search']['formats'] if x != 'html']
kwargs['instance_name'] = get_setting('general.instance_name')
kwargs['searx_version'] = VERSION_STRING
kwargs['searx_git_url'] = GIT_URL
@ -477,9 +461,7 @@ def render(template_name, override_theme=None, **kwargs):
kwargs['proxify_results'] = settings.get('result_proxy', {}).get('proxify_results', True)
kwargs['get_result_template'] = get_result_template
kwargs['opensearch_url'] = (
url_for('opensearch')
+ '?'
+ urlencode({'method': kwargs['method'], 'autocomplete': kwargs['autocomplete']})
url_for('opensearch') + '?' + urlencode({'method': kwargs['method'], 'autocomplete': kwargs['autocomplete']})
)
# scripts from plugins
@ -495,8 +477,7 @@ def render(template_name, override_theme=None, **kwargs):
kwargs['styles'].add(css)
start_time = default_timer()
result = render_template(
'{}/{}'.format(kwargs['theme'], template_name), **kwargs)
result = render_template('{}/{}'.format(kwargs['theme'], template_name), **kwargs)
request.render_time += default_timer() - start_time # pylint: disable=assigning-non-slot
return result
@ -541,7 +522,7 @@ def pre_request():
# language is defined neither in settings nor in preferences
# use browser headers
if not preferences.get_value("language"):
language = _get_browser_language(request, settings['search']['languages'])
language = _get_browser_language(request, settings['search']['languages'])
preferences.parse_dict({"language": language})
# locale is defined neither in settings nor in preferences
@ -555,8 +536,7 @@ def pre_request():
allowed_plugins = preferences.plugins.get_enabled()
disabled_plugins = preferences.plugins.get_disabled()
for plugin in plugins:
if ((plugin.default_on and plugin.id not in disabled_plugins)
or plugin.id in allowed_plugins):
if (plugin.default_on and plugin.id not in disabled_plugins) or plugin.id in allowed_plugins:
request.user_plugins.append(plugin)
@ -573,17 +553,20 @@ def add_default_headers(response):
@app.after_request
def post_request(response):
total_time = default_timer() - request.start_time
timings_all = ['total;dur=' + str(round(total_time * 1000, 3)),
'render;dur=' + str(round(request.render_time * 1000, 3))]
timings_all = [
'total;dur=' + str(round(total_time * 1000, 3)),
'render;dur=' + str(round(request.render_time * 1000, 3)),
]
if len(request.timings) > 0:
timings = sorted(request.timings, key=lambda v: v['total'])
timings_total = [
'total_' + str(i) + '_' + v['engine'] + ';dur=' + str(round(v['total'] * 1000, 3))
'total_' + str(i) + '_' + v['engine'] + ';dur=' + str(round(v['total'] * 1000, 3))
for i, v in enumerate(timings)
]
timings_load = [
'load_' + str(i) + '_' + v['engine'] + ';dur=' + str(round(v['load'] * 1000, 3))
for i, v in enumerate(timings) if v.get('load')
for i, v in enumerate(timings)
if v.get('load')
]
timings_all = timings_all + timings_total + timings_load
response.headers.add('Server-Timing', ', '.join(timings_all))
@ -592,10 +575,7 @@ def post_request(response):
def index_error(output_format, error_message):
if output_format == 'json':
return Response(
json.dumps({'error': error_message}),
mimetype='application/json'
)
return Response(json.dumps({'error': error_message}), mimetype='application/json')
if output_format == 'csv':
response = Response('', mimetype='application/csv')
cont_disp = 'attachment;Filename=searx.csv'
@ -678,9 +658,7 @@ def search():
raw_text_query = None
result_container = None
try:
search_query, raw_text_query, _, _ = get_search_query_from_webapp(
request.preferences, request.form
)
search_query, raw_text_query, _, _ = get_search_query_from_webapp(request.preferences, request.form)
# search = Search(search_query) # without plugins
search = SearchWithPlugins(search_query, request.user_plugins, request) # pylint: disable=redefined-outer-name
@ -736,10 +714,9 @@ def search():
if hours == 0:
result['publishedDate'] = gettext('{minutes} minute(s) ago').format(minutes=minutes)
else:
result['publishedDate'] = gettext(
'{hours} hour(s), {minutes} minute(s) ago').format(
hours=hours, minutes=minutes
)
result['publishedDate'] = gettext('{hours} hour(s), {minutes} minute(s) ago').format(
hours=hours, minutes=minutes
)
else:
result['publishedDate'] = format_date(result['publishedDate'])
@ -752,11 +729,9 @@ def search():
'corrections': list(result_container.corrections),
'infoboxes': result_container.infoboxes,
'suggestions': list(result_container.suggestions),
'unresponsive_engines': __get_translated_errors(result_container.unresponsive_engines)
'unresponsive_engines': __get_translated_errors(result_container.unresponsive_engines),
}
response = json.dumps(
x, default = lambda item: list(item) if isinstance(item, set) else item
)
response = json.dumps(x, default=lambda item: list(item) if isinstance(item, set) else item)
return Response(response, mimetype='application/json')
if output_format == 'csv':
@ -800,21 +775,17 @@ def search():
# suggestions: use RawTextQuery to get the suggestion URLs with the same bang
suggestion_urls = list(
map(
lambda suggestion: {
'url': raw_text_query.changeQuery(suggestion).getFullQuery(),
'title': suggestion
},
result_container.suggestions
))
lambda suggestion: {'url': raw_text_query.changeQuery(suggestion).getFullQuery(), 'title': suggestion},
result_container.suggestions,
)
)
correction_urls = list(
map(
lambda correction: {
'url': raw_text_query.changeQuery(correction).getFullQuery(),
'title': correction
},
result_container.corrections
))
lambda correction: {'url': raw_text_query.changeQuery(correction).getFullQuery(), 'title': correction},
result_container.corrections,
)
)
return render(
# fmt: off
@ -899,9 +870,7 @@ def autocompleter():
language = language.split('-')[0]
# run autocompletion
raw_results = search_autocomplete(
request.preferences.get_value('autocomplete'), sug_prefix, language
)
raw_results = search_autocomplete(request.preferences.get_value('autocomplete'), sug_prefix, language)
for result in raw_results:
# attention: this loop will change raw_text_query object and this is
# the reason why the sug_prefix was stored before (see above)
@ -952,16 +921,11 @@ def preferences():
allowed_plugins = request.preferences.plugins.get_enabled()
# stats for preferences page
filtered_engines = dict(
filter(
lambda kv: (kv[0], request.preferences.validate_token(kv[1])),
engines.items()
)
)
filtered_engines = dict(filter(lambda kv: (kv[0], request.preferences.validate_token(kv[1])), engines.items()))
engines_by_category = {}
for c in categories: # pylint: disable=consider-using-dict-items
for c in categories: # pylint: disable=consider-using-dict-items
engines_by_category[c] = [e for e in categories[c] if e.name in filtered_engines]
# sort the engines alphabetically since the order in settings.yml is meaningless.
list.sort(engines_by_category[c], key=lambda e: e.name)
@ -996,8 +960,9 @@ def preferences():
reliabilities = {}
engine_errors = get_engine_errors(filtered_engines)
checker_results = checker_get_result()
checker_results = checker_results['engines'] \
if checker_results['status'] == 'ok' and 'engines' in checker_results else {}
checker_results = (
checker_results['engines'] if checker_results['status'] == 'ok' and 'engines' in checker_results else {}
)
for _, e in filtered_engines.items():
checker_result = checker_results.get(e.name, {})
checker_success = checker_result.get('success', True)
@ -1089,10 +1054,7 @@ def _is_selected_language_supported(engine, preferences): # pylint: disable=red
if language == 'all':
return True
x = match_language(
language,
getattr(engine, 'supported_languages', []),
getattr(engine, 'language_aliases', {}),
None
language, getattr(engine, 'supported_languages', []), getattr(engine, 'language_aliases', {}), None
)
return bool(x)
@ -1121,15 +1083,9 @@ def image_proxy():
'DNT': '1',
}
set_context_network_name('image_proxy')
resp, stream = http_stream(
method = 'GET',
url = url,
headers = request_headers
)
resp, stream = http_stream(method='GET', url=url, headers=request_headers)
content_length = resp.headers.get('Content-Length')
if (content_length
and content_length.isdigit()
and int(content_length) > maximum_size ):
if content_length and content_length.isdigit() and int(content_length) > maximum_size:
return 'Max size', 400
if resp.status_code != 200:
@ -1165,15 +1121,8 @@ def image_proxy():
logger.debug('Exception while closing response', e)
try:
headers = dict_subset(
resp.headers,
{'Content-Type', 'Content-Encoding', 'Content-Length', 'Length'}
)
response = Response(
stream,
mimetype=resp.headers['Content-Type'],
headers=headers,
direct_passthrough=True)
headers = dict_subset(resp.headers, {'Content-Type', 'Content-Encoding', 'Content-Length', 'Length'})
response = Response(stream, mimetype=resp.headers['Content-Type'], headers=headers, direct_passthrough=True)
response.call_on_close(close_stream)
return response
except httpx.HTTPError:
@ -1189,11 +1138,11 @@ def engine_descriptions():
for engine, description in ENGINE_DESCRIPTIONS.get(locale, {}).items():
result[engine] = description
for engine, description in result.items():
if len(description) ==2 and description[1] == 'ref':
if len(description) == 2 and description[1] == 'ref':
ref_engine, ref_lang = description[0].split(':')
description = ENGINE_DESCRIPTIONS[ref_lang][ref_engine]
if isinstance(description, str):
description = [ description, 'wikipedia' ]
description = [description, 'wikipedia']
result[engine] = description
return jsonify(result)
@ -1204,11 +1153,7 @@ def stats():
sort_order = request.args.get('sort', default='name', type=str)
selected_engine_name = request.args.get('engine', default=None, type=str)
filtered_engines = dict(
filter(
lambda kv: (kv[0], request.preferences.validate_token(kv[1])),
engines.items()
))
filtered_engines = dict(filter(lambda kv: (kv[0], request.preferences.validate_token(kv[1])), engines.items()))
if selected_engine_name:
if selected_engine_name not in filtered_engines:
selected_engine_name = None
@ -1217,8 +1162,7 @@ def stats():
checker_results = checker_get_result()
checker_results = (
checker_results['engines']
if checker_results['status'] == 'ok' and 'engines' in checker_results else {}
checker_results['engines'] if checker_results['status'] == 'ok' and 'engines' in checker_results else {}
)
engine_stats = get_engines_stats(filtered_engines)
@ -1256,11 +1200,7 @@ def stats():
@app.route('/stats/errors', methods=['GET'])
def stats_errors():
filtered_engines = dict(
filter(
lambda kv: (kv[0], request.preferences.validate_token(kv[1])),
engines.items()
))
filtered_engines = dict(filter(lambda kv: (kv[0], request.preferences.validate_token(kv[1])), engines.items()))
result = get_engine_errors(filtered_engines)
return jsonify(result)
@ -1273,13 +1213,16 @@ def stats_checker():
@app.route('/robots.txt', methods=['GET'])
def robots():
return Response("""User-agent: *
return Response(
"""User-agent: *
Allow: /
Allow: /about
Disallow: /stats
Disallow: /preferences
Disallow: /*?*q=*
""", mimetype='text/plain')
""",
mimetype='text/plain',
)
@app.route('/opensearch.xml', methods=['GET'])
@ -1293,34 +1236,21 @@ def opensearch():
if request.headers.get('User-Agent', '').lower().find('webkit') >= 0:
method = 'get'
ret = render(
'opensearch.xml',
opensearch_method=method,
override_theme='__common__'
)
ret = render('opensearch.xml', opensearch_method=method, override_theme='__common__')
resp = Response(
response = ret,
status = 200,
mimetype = "application/opensearchdescription+xml"
)
resp = Response(response=ret, status=200, mimetype="application/opensearchdescription+xml")
return resp
@app.route('/favicon.ico')
def favicon():
return send_from_directory(
os.path.join(
app.root_path,
settings['ui']['static_path'],
'themes',
get_current_theme_name(),
'img'
),
os.path.join(app.root_path, settings['ui']['static_path'], 'themes', get_current_theme_name(), 'img'),
'favicon.png',
mimetype = 'image/vnd.microsoft.icon'
mimetype='image/vnd.microsoft.icon',
)
@app.route('/clear_cookies')
def clear_cookies():
resp = make_response(redirect(url_for('index', _external=True)))
@ -1341,43 +1271,47 @@ def config():
if isinstance(engine.supported_languages, dict):
supported_languages = list(engine.supported_languages.keys())
_engines.append({
'name': name,
'categories': engine.categories,
'shortcut': engine.shortcut,
'enabled': not engine.disabled,
'paging': engine.paging,
'language_support': engine.language_support,
'supported_languages': supported_languages,
'safesearch': engine.safesearch,
'time_range_support': engine.time_range_support,
'timeout': engine.timeout
})
_engines.append(
{
'name': name,
'categories': engine.categories,
'shortcut': engine.shortcut,
'enabled': not engine.disabled,
'paging': engine.paging,
'language_support': engine.language_support,
'supported_languages': supported_languages,
'safesearch': engine.safesearch,
'time_range_support': engine.time_range_support,
'timeout': engine.timeout,
}
)
_plugins = []
for _ in plugins:
_plugins.append({'name': _.name, 'enabled': _.default_on})
return jsonify({
'categories': list(categories.keys()),
'engines': _engines,
'plugins': _plugins,
'instance_name': settings['general']['instance_name'],
'locales': LOCALE_NAMES,
'default_locale': settings['ui']['default_locale'],
'autocomplete': settings['search']['autocomplete'],
'safe_search': settings['search']['safe_search'],
'default_theme': settings['ui']['default_theme'],
'version': VERSION_STRING,
'brand': {
'CONTACT_URL': get_setting('general.contact_url'),
'GIT_URL': GIT_URL,
'GIT_BRANCH': GIT_BRANCH,
'DOCS_URL': get_setting('brand.docs_url'),
},
'doi_resolvers': list(settings['doi_resolvers'].keys()),
'default_doi_resolver': settings['default_doi_resolver'],
})
return jsonify(
{
'categories': list(categories.keys()),
'engines': _engines,
'plugins': _plugins,
'instance_name': settings['general']['instance_name'],
'locales': LOCALE_NAMES,
'default_locale': settings['ui']['default_locale'],
'autocomplete': settings['search']['autocomplete'],
'safe_search': settings['search']['safe_search'],
'default_theme': settings['ui']['default_theme'],
'version': VERSION_STRING,
'brand': {
'CONTACT_URL': get_setting('general.contact_url'),
'GIT_URL': GIT_URL,
'GIT_BRANCH': GIT_BRANCH,
'DOCS_URL': get_setting('brand.docs_url'),
},
'doi_resolvers': list(settings['doi_resolvers'].keys()),
'default_doi_resolver': settings['default_doi_resolver'],
}
)
@app.errorhandler(404)
@ -1388,9 +1322,7 @@ def page_not_found(_e):
# see https://flask.palletsprojects.com/en/1.1.x/cli/
# True if "FLASK_APP=searx/webapp.py FLASK_ENV=development flask run"
flask_run_development = (
os.environ.get("FLASK_APP") is not None
and os.environ.get("FLASK_ENV") == 'development'
and is_flask_run_cmdline()
os.environ.get("FLASK_APP") is not None and os.environ.get("FLASK_ENV") == 'development' and is_flask_run_cmdline()
)
# True if reload feature is activated of werkzeug, False otherwise (including uwsgi, etc..)
@ -1399,30 +1331,23 @@ flask_run_development = (
werkzeug_reloader = flask_run_development or (searx_debug and __name__ == "__main__")
# initialize the engines except on the first run of the werkzeug server.
if (not werkzeug_reloader
or (werkzeug_reloader
and os.environ.get("WERKZEUG_RUN_MAIN") == "true") ):
if not werkzeug_reloader or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_MAIN") == "true"):
plugin_initialize(app)
search_initialize(enable_checker=True, check_network=True)
def run():
logger.debug(
'starting webserver on %s:%s',
settings['server']['bind_address'],
settings['server']['port']
)
logger.debug('starting webserver on %s:%s', settings['server']['bind_address'], settings['server']['port'])
app.run(
debug = searx_debug,
use_debugger = searx_debug,
port = settings['server']['port'],
host = settings['server']['bind_address'],
threaded = True,
extra_files = [
get_default_settings_path()
],
debug=searx_debug,
use_debugger=searx_debug,
port=settings['server']['port'],
host=settings['server']['bind_address'],
threaded=True,
extra_files=[get_default_settings_path()],
)
application = app
patch_application(app)