mirror of
https://github.com/searxng/searxng.git
synced 2025-07-15 01:09:21 +02:00
Some checks are pending
Documentation / Release (push) Waiting to run
Integration / Python 3.11 (push) Waiting to run
Integration / Python 3.10 (push) Waiting to run
Integration / Python 3.12 (push) Waiting to run
Integration / Python 3.13 (push) Waiting to run
Integration / Python 3.9 (push) Waiting to run
Integration / Theme (push) Waiting to run
It's possible that `SyntaxError` or `TypeError` instances are thrown when we can't evaluate a query, simply because it's not a math expression. In this case, it should just be skipped, i.e. the calculator plugin doesn't return any result instead of forwarding the exception.
220 lines
6.5 KiB
Python
220 lines
6.5 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
|
|
|
|
from __future__ import annotations
|
|
import typing
|
|
|
|
import ast
|
|
import math
|
|
import re
|
|
import operator
|
|
import multiprocessing
|
|
|
|
import babel
|
|
import babel.numbers
|
|
from flask_babel import gettext
|
|
|
|
from searx.result_types import EngineResults
|
|
from searx.plugins import Plugin, PluginInfo
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from searx.search import SearchWithPlugins
|
|
from searx.extended_types import SXNG_Request
|
|
from searx.plugins import PluginCfg
|
|
|
|
|
|
class SXNGPlugin(Plugin):
|
|
"""Plugin converts strings to different hash digests. The results are
|
|
displayed in area for the "answers".
|
|
"""
|
|
|
|
id = "calculator"
|
|
|
|
def __init__(self, plg_cfg: "PluginCfg") -> None:
|
|
super().__init__(plg_cfg)
|
|
|
|
self.info = PluginInfo(
|
|
id=self.id,
|
|
name=gettext("Basic Calculator"),
|
|
description=gettext("Calculate mathematical expressions via the search bar"),
|
|
preference_section="general",
|
|
)
|
|
|
|
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
|
|
results = EngineResults()
|
|
|
|
# only show the result of the expression on the first page
|
|
if search.search_query.pageno > 1:
|
|
return results
|
|
|
|
query = search.search_query.query
|
|
# in order to avoid DoS attacks with long expressions, ignore long expressions
|
|
if len(query) > 100:
|
|
return results
|
|
|
|
# replace commonly used math operators with their proper Python operator
|
|
query = query.replace("x", "*").replace(":", "/")
|
|
|
|
# use UI language
|
|
ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
|
|
|
|
# parse the number system in a localized way
|
|
def _decimal(match: re.Match) -> str:
|
|
val = match.string[match.start() : match.end()]
|
|
val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
|
|
return str(val)
|
|
|
|
decimal = ui_locale.number_symbols["latn"]["decimal"]
|
|
group = ui_locale.number_symbols["latn"]["group"]
|
|
query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
|
|
|
|
# in python, powers are calculated via **
|
|
query_py_formatted = query.replace("^", "**")
|
|
|
|
# Prevent the runtime from being longer than 50 ms
|
|
res = timeout_func(0.05, _eval_expr, query_py_formatted)
|
|
if res is None or res[0] == "":
|
|
return results
|
|
|
|
res, is_boolean = res
|
|
if is_boolean:
|
|
res = "True" if res != 0 else "False"
|
|
else:
|
|
res = babel.numbers.format_decimal(res, locale=ui_locale)
|
|
results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
|
|
|
|
return results
|
|
|
|
|
|
def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int:
|
|
"""
|
|
2 < 3 becomes ops=[ast.Lt] and values=[2,3]
|
|
2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4]
|
|
"""
|
|
for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name
|
|
if isinstance(op, ast.Eq) and a == b:
|
|
continue
|
|
if isinstance(op, ast.NotEq) and a != b:
|
|
continue
|
|
if isinstance(op, ast.Lt) and a < b:
|
|
continue
|
|
if isinstance(op, ast.LtE) and a <= b:
|
|
continue
|
|
if isinstance(op, ast.Gt) and a > b:
|
|
continue
|
|
if isinstance(op, ast.GtE) and a >= b:
|
|
continue
|
|
|
|
# Ignore impossible ops:
|
|
# * ast.Is
|
|
# * ast.IsNot
|
|
# * ast.In
|
|
# * ast.NotIn
|
|
|
|
# the result is False for a and b and operation op
|
|
return 0
|
|
# the results for all the ops are True
|
|
return 1
|
|
|
|
|
|
operators: dict[type, typing.Callable] = {
|
|
ast.Add: operator.add,
|
|
ast.Sub: operator.sub,
|
|
ast.Mult: operator.mul,
|
|
ast.Div: operator.truediv,
|
|
ast.Pow: operator.pow,
|
|
ast.BitXor: operator.xor,
|
|
ast.BitOr: operator.or_,
|
|
ast.BitAnd: operator.and_,
|
|
ast.USub: operator.neg,
|
|
ast.RShift: operator.rshift,
|
|
ast.LShift: operator.lshift,
|
|
ast.Mod: operator.mod,
|
|
ast.Compare: _compare,
|
|
}
|
|
|
|
|
|
math_constants = {
|
|
'e': math.e,
|
|
'pi': math.pi,
|
|
}
|
|
|
|
|
|
# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
|
|
# the old behavior "fork") but it will not solve the core problem of fork, nor
|
|
# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
|
|
# ddiscussed here: https://github.com/searxng/searxng/issues/4159
|
|
mp_fork = multiprocessing.get_context("fork")
|
|
|
|
|
|
def _eval_expr(expr):
|
|
"""
|
|
Evaluates the given textual expression.
|
|
|
|
Returns a tuple of (numericResult, isBooleanResult).
|
|
|
|
>>> _eval_expr('2^6')
|
|
64, False
|
|
>>> _eval_expr('2**6')
|
|
64, False
|
|
>>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
|
|
-5.0, False
|
|
>>> _eval_expr('1 < 3')
|
|
1, True
|
|
>>> _eval_expr('5 < 3')
|
|
0, True
|
|
>>> _eval_expr('17 == 11+1+5 == 7+5+5')
|
|
1, True
|
|
"""
|
|
try:
|
|
root_expr = ast.parse(expr, mode='eval').body
|
|
return _eval(root_expr), isinstance(root_expr, ast.Compare)
|
|
|
|
except (SyntaxError, TypeError, ZeroDivisionError):
|
|
# Expression that can't be evaluated (i.e. not a math expression)
|
|
return "", False
|
|
|
|
|
|
def _eval(node):
|
|
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
|
|
return node.value
|
|
|
|
if isinstance(node, ast.BinOp):
|
|
return operators[type(node.op)](_eval(node.left), _eval(node.right))
|
|
|
|
if isinstance(node, ast.UnaryOp):
|
|
return operators[type(node.op)](_eval(node.operand))
|
|
|
|
if isinstance(node, ast.Compare):
|
|
return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators])
|
|
|
|
if isinstance(node, ast.Name) and node.id in math_constants:
|
|
return math_constants[node.id]
|
|
|
|
raise TypeError(node)
|
|
|
|
|
|
def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
|
|
try:
|
|
q.put(func(*args, **kwargs))
|
|
except:
|
|
q.put(None)
|
|
raise
|
|
|
|
|
|
def timeout_func(timeout, func, *args, **kwargs):
|
|
|
|
que = mp_fork.Queue()
|
|
p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
|
|
p.start()
|
|
p.join(timeout=timeout)
|
|
ret_val = None
|
|
# pylint: disable=used-before-assignment,undefined-variable
|
|
if not p.is_alive():
|
|
ret_val = que.get()
|
|
else:
|
|
logger.debug("terminate function after timeout is exceeded") # type: ignore
|
|
p.terminate()
|
|
p.join()
|
|
p.close()
|
|
return ret_val
|