forked from Icycoide/searxng
[refactor] typification of SearXNG (initial) / result items (part 1)
Typification of SearXNG ======================= This patch introduces the typing of the results. The why and how is described in the documentation, please generate the documentation .. $ make docs.clean docs.live and read the following articles in the "Developer documentation": - result types --> http://0.0.0.0:8000/dev/result_types/index.html The result types are available from the `searx.result_types` module. The following have been implemented so far: - base result type: `searx.result_type.Result` --> http://0.0.0.0:8000/dev/result_types/base_result.html - answer results --> http://0.0.0.0:8000/dev/result_types/answer.html including the type for translations (inspired by #3925). For all other types (which still need to be set up in subsequent PRs), template documentation has been created for the transition period. Doc of the fields used in Templates =================================== The template documentation is the basis for the typing and is the first complete documentation of the results (needed for engine development). It is the "working paper" (the plan) with which further typifications can be implemented in subsequent PRs. - https://github.com/searxng/searxng/issues/357 Answer Templates ================ With the new (sub) types for `Answer`, the templates for the answers have also been revised, `Translation` are now displayed with collapsible entries (inspired by #3925). !en-de dog Plugins & Answerer ================== The implementation for `Plugin` and `Answer` has been revised, see documentation: - Plugin: http://0.0.0.0:8000/dev/plugins/index.html - Answerer: http://0.0.0.0:8000/dev/answerers/index.html With `AnswerStorage` and `AnswerStorage` to manage those items (in follow up PRs, `ArticleStorage`, `InfoStorage` and .. will be implemented) Autocomplete ============ The autocompletion had a bug where the results from `Answer` had not been shown in the past. To test activate autocompletion and try search terms for which we have answerers - statistics: type `min 1 2 3` .. in the completion list you should find an entry like `[de] min(1, 2, 3) = 1` - random: type `random uuid` .. in the completion list, the first item is a random UUID Extended Types ============== SearXNG extends e.g. the request and response types of flask and httpx, a module has been set up for type extensions: - Extended Types --> http://0.0.0.0:8000/dev/extended_types.html Unit-Tests ========== The unit tests have been completely revised. In the previous implementation, the runtime (the global variables such as `searx.settings`) was not initialized before each test, so the runtime environment with which a test ran was always determined by the tests that ran before it. This was also the reason why we sometimes had to observe non-deterministic errors in the tests in the past: - https://github.com/searxng/searxng/issues/2988 is one example for the Runtime issues, with non-deterministic behavior .. - https://github.com/searxng/searxng/pull/3650 - https://github.com/searxng/searxng/pull/3654 - https://github.com/searxng/searxng/pull/3642#issuecomment-2226884469 - https://github.com/searxng/searxng/pull/3746#issuecomment-2300965005 Why msgspec.Struct ================== We have already discussed typing based on e.g. `TypeDict` or `dataclass` in the past: - https://github.com/searxng/searxng/pull/1562/files - https://gist.github.com/dalf/972eb05e7a9bee161487132a7de244d2 - https://github.com/searxng/searxng/pull/1412/files - https://github.com/searxng/searxng/pull/1356 In my opinion, TypeDict is unsuitable because the objects are still dictionaries and not instances of classes / the `dataclass` are classes but ... The `msgspec.Struct` combine the advantages of typing, runtime behaviour and also offer the option of (fast) serializing (incl. type check) the objects. Currently not possible but conceivable with `msgspec`: Outsourcing the engines into separate processes, what possibilities this opens up in the future is left to the imagination! Internally, we have already defined that it is desirable to decouple the development of the engines from the development of the SearXNG core / The serialization of the `Result` objects is a prerequisite for this. HINT: The threads listed above were the template for this PR, even though the implementation here is based on msgspec. They should also be an inspiration for the following PRs of typification, as the models and implementations can provide a good direction. Why just one commit? ==================== I tried to create several (thematically separated) commits, but gave up at some point ... there are too many things to tackle at once / The comprehensibility of the commits would not be improved by a thematic separation. On the contrary, we would have to make multiple changes at the same places and the goal of a change would be vaguely recognizable in the fog of the commits. Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
parent
9079d0cac0
commit
edfbf1e118
143 changed files with 3877 additions and 2118 deletions
|
@ -1,232 +1,68 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring, missing-class-docstring
|
||||
""".. sidebar:: Further reading ..
|
||||
|
||||
import sys
|
||||
from hashlib import sha256
|
||||
from importlib import import_module
|
||||
from os import listdir, makedirs, remove, stat, utime
|
||||
from os.path import abspath, basename, dirname, exists, join
|
||||
from shutil import copyfile
|
||||
from pkgutil import iter_modules
|
||||
from logging import getLogger
|
||||
from typing import List, Tuple
|
||||
- :ref:`plugins admin`
|
||||
- :ref:`SearXNG settings <settings plugins>`
|
||||
- :ref:`builtin plugins`
|
||||
|
||||
from searx import logger, settings
|
||||
Plugins can extend or replace functionality of various components of SearXNG.
|
||||
Here is an example of a very simple plugin that adds a "Hello" into the answer
|
||||
area:
|
||||
|
||||
.. code:: python
|
||||
|
||||
class Plugin: # pylint: disable=too-few-public-methods
|
||||
"""This class is currently never initialized and only used for type hinting."""
|
||||
from flask_babel import gettext as _
|
||||
from searx.plugins import Plugin
|
||||
from searx.result_types import Answer
|
||||
|
||||
id: str
|
||||
name: str
|
||||
description: str
|
||||
default_on: bool
|
||||
js_dependencies: Tuple[str]
|
||||
css_dependencies: Tuple[str]
|
||||
preference_section: str
|
||||
class MyPlugin(Plugin):
|
||||
|
||||
id = "self_info"
|
||||
default_on = True
|
||||
|
||||
logger = logger.getChild("plugins")
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin"))
|
||||
|
||||
required_attrs = (
|
||||
# fmt: off
|
||||
("name", str),
|
||||
("description", str),
|
||||
("default_on", bool)
|
||||
# fmt: on
|
||||
)
|
||||
def post_search(self, request, search):
|
||||
return [ Answer(answer="Hello") ]
|
||||
|
||||
optional_attrs = (
|
||||
# fmt: off
|
||||
("js_dependencies", tuple),
|
||||
("css_dependencies", tuple),
|
||||
("preference_section", str),
|
||||
# fmt: on
|
||||
)
|
||||
Entry points (hooks) define when a plugin runs. Right now only three hooks are
|
||||
implemented. So feel free to implement a hook if it fits the behaviour of your
|
||||
plugin / a plugin doesn't need to implement all the hooks.
|
||||
|
||||
- pre search: :py:obj:`Plugin.pre_search`
|
||||
- post search: :py:obj:`Plugin.post_search`
|
||||
- on each result item: :py:obj:`Plugin.on_result`
|
||||
|
||||
def sha_sum(filename):
|
||||
with open(filename, "rb") as f:
|
||||
file_content_bytes = f.read()
|
||||
return sha256(file_content_bytes).hexdigest()
|
||||
For a coding example have a look at :ref:`self_info plugin`.
|
||||
|
||||
----
|
||||
|
||||
def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
|
||||
dep_path = join(base_path, resource_path)
|
||||
file_name = basename(dep_path)
|
||||
resource_path = join(target_dir, file_name)
|
||||
if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
|
||||
try:
|
||||
copyfile(dep_path, resource_path)
|
||||
# copy atime_ns and mtime_ns, so the weak ETags (generated by
|
||||
# the HTTP server) do not change
|
||||
dep_stat = stat(dep_path)
|
||||
utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
|
||||
except IOError:
|
||||
logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name))
|
||||
sys.exit(3)
|
||||
.. autoclass:: Plugin
|
||||
:members:
|
||||
|
||||
# returning with the web path of the resource
|
||||
return join("plugins/external_plugins", plugin_dir, file_name)
|
||||
.. autoclass:: PluginInfo
|
||||
:members:
|
||||
|
||||
.. autoclass:: PluginStorage
|
||||
:members:
|
||||
|
||||
def prepare_package_resources(plugin, plugin_module_name):
|
||||
plugin_base_path = dirname(abspath(plugin.__file__))
|
||||
.. autoclass:: searx.plugins._core.ModulePlugin
|
||||
:members:
|
||||
:show-inheritance:
|
||||
|
||||
plugin_dir = plugin_module_name
|
||||
target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir)
|
||||
try:
|
||||
makedirs(target_dir, exist_ok=True)
|
||||
except IOError:
|
||||
logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name))
|
||||
sys.exit(3)
|
||||
"""
|
||||
|
||||
resources = []
|
||||
from __future__ import annotations
|
||||
|
||||
if hasattr(plugin, "js_dependencies"):
|
||||
resources.extend(map(basename, plugin.js_dependencies))
|
||||
plugin.js_dependencies = [
|
||||
sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
|
||||
for x in plugin.js_dependencies
|
||||
]
|
||||
__all__ = ["PluginInfo", "Plugin", "PluginStorage"]
|
||||
|
||||
if hasattr(plugin, "css_dependencies"):
|
||||
resources.extend(map(basename, plugin.css_dependencies))
|
||||
plugin.css_dependencies = [
|
||||
sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
|
||||
for x in plugin.css_dependencies
|
||||
]
|
||||
from ._core import PluginInfo, Plugin, PluginStorage
|
||||
|
||||
for f in listdir(target_dir):
|
||||
if basename(f) not in resources:
|
||||
resource_path = join(target_dir, basename(f))
|
||||
try:
|
||||
remove(resource_path)
|
||||
except IOError:
|
||||
logger.critical(
|
||||
"failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name)
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
def load_plugin(plugin_module_name, external):
|
||||
# pylint: disable=too-many-branches
|
||||
try:
|
||||
plugin = import_module(plugin_module_name)
|
||||
except (
|
||||
SyntaxError,
|
||||
KeyboardInterrupt,
|
||||
SystemExit,
|
||||
SystemError,
|
||||
ImportError,
|
||||
RuntimeError,
|
||||
) as e:
|
||||
logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
|
||||
sys.exit(3)
|
||||
except BaseException:
|
||||
logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
|
||||
return None
|
||||
|
||||
# difference with searx: use module name instead of the user name
|
||||
plugin.id = plugin_module_name
|
||||
|
||||
#
|
||||
plugin.logger = getLogger(plugin_module_name)
|
||||
|
||||
for plugin_attr, plugin_attr_type in required_attrs:
|
||||
if not hasattr(plugin, plugin_attr):
|
||||
logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr)
|
||||
sys.exit(3)
|
||||
attr = getattr(plugin, plugin_attr)
|
||||
if not isinstance(attr, plugin_attr_type):
|
||||
type_attr = str(type(attr))
|
||||
logger.critical(
|
||||
'{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
|
||||
plugin, plugin_attr, type_attr, plugin_attr_type
|
||||
)
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
for plugin_attr, plugin_attr_type in optional_attrs:
|
||||
if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type):
|
||||
setattr(plugin, plugin_attr, plugin_attr_type())
|
||||
|
||||
if not hasattr(plugin, "preference_section"):
|
||||
plugin.preference_section = "general"
|
||||
|
||||
# query plugin
|
||||
if plugin.preference_section == "query":
|
||||
for plugin_attr in ("query_keywords", "query_examples"):
|
||||
if not hasattr(plugin, plugin_attr):
|
||||
logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin))
|
||||
sys.exit(3)
|
||||
|
||||
if settings.get("enabled_plugins"):
|
||||
# searx compatibility: plugin.name in settings['enabled_plugins']
|
||||
plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"]
|
||||
|
||||
# copy resources if this is an external plugin
|
||||
if external:
|
||||
prepare_package_resources(plugin, plugin_module_name)
|
||||
|
||||
logger.debug("%s: loaded", plugin_module_name)
|
||||
|
||||
return plugin
|
||||
|
||||
|
||||
def load_and_initialize_plugin(plugin_module_name, external, init_args):
|
||||
plugin = load_plugin(plugin_module_name, external)
|
||||
if plugin and hasattr(plugin, 'init'):
|
||||
try:
|
||||
return plugin if plugin.init(*init_args) else None
|
||||
except Exception: # pylint: disable=broad-except
|
||||
plugin.logger.exception("Exception while calling init, the plugin is disabled")
|
||||
return None
|
||||
return plugin
|
||||
|
||||
|
||||
class PluginStore:
|
||||
def __init__(self):
|
||||
self.plugins: List[Plugin] = []
|
||||
|
||||
def __iter__(self):
|
||||
yield from self.plugins
|
||||
|
||||
def register(self, plugin):
|
||||
self.plugins.append(plugin)
|
||||
|
||||
def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
|
||||
ret = True
|
||||
for plugin in ordered_plugin_list:
|
||||
if hasattr(plugin, plugin_type):
|
||||
try:
|
||||
ret = getattr(plugin, plugin_type)(*args, **kwargs)
|
||||
if not ret:
|
||||
break
|
||||
except Exception: # pylint: disable=broad-except
|
||||
plugin.logger.exception("Exception while calling %s", plugin_type)
|
||||
return ret
|
||||
|
||||
|
||||
plugins = PluginStore()
|
||||
|
||||
|
||||
def plugin_module_names():
|
||||
yield_plugins = set()
|
||||
|
||||
# embedded plugins
|
||||
for module in iter_modules(path=[dirname(__file__)]):
|
||||
yield (__name__ + "." + module.name, False)
|
||||
yield_plugins.add(module.name)
|
||||
# external plugins
|
||||
for module_name in settings['plugins']:
|
||||
if module_name not in yield_plugins:
|
||||
yield (module_name, True)
|
||||
yield_plugins.add(module_name)
|
||||
STORAGE: PluginStorage = PluginStorage()
|
||||
|
||||
|
||||
def initialize(app):
|
||||
for module_name, external in plugin_module_names():
|
||||
plugin = load_and_initialize_plugin(module_name, external, (app, settings))
|
||||
if plugin:
|
||||
plugins.register(plugin)
|
||||
STORAGE.load_builtins()
|
||||
STORAGE.init(app)
|
||||
|
|
394
searx/plugins/_core.py
Normal file
394
searx/plugins/_core.py
Normal file
|
@ -0,0 +1,394 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=too-few-public-methods,missing-module-docstring
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
__all__ = ["PluginInfo", "Plugin", "PluginStorage"]
|
||||
|
||||
import abc
|
||||
import importlib
|
||||
import logging
|
||||
import pathlib
|
||||
import types
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import flask
|
||||
|
||||
import searx
|
||||
from searx.utils import load_module
|
||||
from searx.extended_types import SXNG_Request
|
||||
from searx.result_types import Result
|
||||
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from searx.search import SearchWithPlugins
|
||||
|
||||
|
||||
_default = pathlib.Path(__file__).parent
|
||||
log: logging.Logger = logging.getLogger("searx.plugins")
|
||||
|
||||
|
||||
@dataclass
|
||||
class PluginInfo:
|
||||
"""Object that holds informations about a *plugin*, these infos are shown to
|
||||
the user in the Preferences menu.
|
||||
|
||||
To be able to translate the information into other languages, the text must
|
||||
be written in English and translated with :py:obj:`flask_babel.gettext`.
|
||||
"""
|
||||
|
||||
id: str
|
||||
"""The ID-selector in HTML/CSS `#<id>`."""
|
||||
|
||||
name: str
|
||||
"""Name of the *plugin*."""
|
||||
|
||||
description: str
|
||||
"""Short description of the *answerer*."""
|
||||
|
||||
preference_section: typing.Literal["general", "ui", "privacy", "query"] | None = "general"
|
||||
"""Section (tab/group) in the preferences where this plugin is shown to the
|
||||
user.
|
||||
|
||||
The value ``query`` is reserved for plugins that are activated via a
|
||||
*keyword* as part of a search query, see:
|
||||
|
||||
- :py:obj:`PluginInfo.examples`
|
||||
- :py:obj:`Plugin.keywords`
|
||||
|
||||
Those plugins are shown in the preferences in tab *Special Queries*.
|
||||
"""
|
||||
|
||||
examples: list[str] = field(default_factory=list)
|
||||
"""List of short examples of the usage / of query terms."""
|
||||
|
||||
keywords: list[str] = field(default_factory=list)
|
||||
"""See :py:obj:`Plugin.keywords`"""
|
||||
|
||||
|
||||
class Plugin(abc.ABC):
|
||||
"""Abstract base class of all Plugins."""
|
||||
|
||||
id: typing.ClassVar[str]
|
||||
"""The ID (suffix) in the HTML form."""
|
||||
|
||||
default_on: typing.ClassVar[bool]
|
||||
"""Plugin is enabled/disabled by default."""
|
||||
|
||||
keywords: list[str] = []
|
||||
"""Keywords in the search query that activate the plugin. The *keyword* is
|
||||
the first word in a search query. If a plugin should be executed regardless
|
||||
of the search query, the list of keywords should be empty (which is also the
|
||||
default in the base class for Plugins)."""
|
||||
|
||||
log: logging.Logger
|
||||
"""A logger object, is automatically initialized when calling the
|
||||
constructor (if not already set in the subclass)."""
|
||||
|
||||
info: PluginInfo
|
||||
"""Informations about the *plugin*, see :py:obj:`PluginInfo`."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
for attr in ["id", "default_on"]:
|
||||
if getattr(self, attr, None) is None:
|
||||
raise NotImplementedError(f"plugin {self} is missing attribute {attr}")
|
||||
|
||||
if not self.id:
|
||||
self.id = f"{self.__class__.__module__}.{self.__class__.__name__}"
|
||||
if not getattr(self, "log", None):
|
||||
self.log = log.getChild(self.id)
|
||||
|
||||
def __hash__(self) -> int:
|
||||
"""The hash value is used in :py:obj:`set`, for example, when an object
|
||||
is added to the set. The hash value is also used in other contexts,
|
||||
e.g. when checking for equality to identify identical plugins from
|
||||
different sources (name collisions)."""
|
||||
|
||||
return id(self)
|
||||
|
||||
def __eq__(self, other):
|
||||
"""py:obj:`Plugin` objects are equal if the hash values of the two
|
||||
objects are equal."""
|
||||
|
||||
return hash(self) == hash(other)
|
||||
|
||||
def init(self, app: flask.Flask) -> bool: # pylint: disable=unused-argument
|
||||
"""Initialization of the plugin, the return value decides whether this
|
||||
plugin is active or not. Initialization only takes place once, at the
|
||||
time the WEB application is set up. The base methode always returns
|
||||
``True``, the methode can be overwritten in the inheritances,
|
||||
|
||||
- ``True`` plugin is active
|
||||
- ``False`` plugin is inactive
|
||||
"""
|
||||
return True
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
|
||||
"""Runs BEFORE the search request and returns a boolean:
|
||||
|
||||
- ``True`` to continue the search
|
||||
- ``False`` to stop the search
|
||||
"""
|
||||
return True
|
||||
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
|
||||
"""Runs for each result of each engine and returns a boolean:
|
||||
|
||||
- ``True`` to keep the result
|
||||
- ``False`` to remove the result from the result list
|
||||
|
||||
The ``result`` can be modified to the needs.
|
||||
|
||||
.. hint::
|
||||
|
||||
If :py:obj:`Result.url` is modified, :py:obj:`Result.parsed_url` must
|
||||
be changed accordingly:
|
||||
|
||||
.. code:: python
|
||||
|
||||
result["parsed_url"] = urlparse(result["url"])
|
||||
"""
|
||||
return True
|
||||
|
||||
def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | typing.Sequence[Result]:
|
||||
"""Runs AFTER the search request. Can return a list of :py:obj:`Result`
|
||||
objects to be added to the final result list."""
|
||||
return
|
||||
|
||||
|
||||
class ModulePlugin(Plugin):
|
||||
"""A wrapper class for legacy *plugins*.
|
||||
|
||||
.. note::
|
||||
|
||||
For internal use only!
|
||||
|
||||
In a module plugin, the follwing names are mapped:
|
||||
|
||||
- `module.query_keywords` --> :py:obj:`Plugin.keywords`
|
||||
- `module.plugin_id` --> :py:obj:`Plugin.id`
|
||||
- `module.logger` --> :py:obj:`Plugin.log`
|
||||
"""
|
||||
|
||||
_required_attrs = (("name", str), ("description", str), ("default_on", bool))
|
||||
|
||||
def __init__(self, mod: types.ModuleType):
|
||||
"""In case of missing attributes in the module or wrong types are given,
|
||||
a :py:obj:`TypeError` exception is raised."""
|
||||
|
||||
self.module = mod
|
||||
self.id = getattr(self.module, "plugin_id", self.module.__name__)
|
||||
self.log = logging.getLogger(self.module.__name__)
|
||||
self.keywords = getattr(self.module, "query_keywords", [])
|
||||
|
||||
for attr, attr_type in self._required_attrs:
|
||||
if not hasattr(self.module, attr):
|
||||
msg = f"missing attribute {attr}, cannot load plugin"
|
||||
self.log.critical(msg)
|
||||
raise TypeError(msg)
|
||||
if not isinstance(getattr(self.module, attr), attr_type):
|
||||
msg = f"attribute {attr} is not of type {attr_type}"
|
||||
self.log.critical(msg)
|
||||
raise TypeError(msg)
|
||||
|
||||
self.default_on = mod.default_on
|
||||
self.info = PluginInfo(
|
||||
id=self.id,
|
||||
name=self.module.name,
|
||||
description=self.module.description,
|
||||
preference_section=getattr(self.module, "preference_section", None),
|
||||
examples=getattr(self.module, "query_examples", []),
|
||||
keywords=self.keywords,
|
||||
)
|
||||
|
||||
# monkeypatch module
|
||||
self.module.logger = self.log # type: ignore
|
||||
|
||||
super().__init__()
|
||||
|
||||
def init(self, app: flask.Flask) -> bool:
|
||||
if not hasattr(self.module, "init"):
|
||||
return True
|
||||
return self.module.init(app)
|
||||
|
||||
def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
|
||||
if not hasattr(self.module, "pre_search"):
|
||||
return True
|
||||
return self.module.pre_search(request, search)
|
||||
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
|
||||
if not hasattr(self.module, "on_result"):
|
||||
return True
|
||||
return self.module.on_result(request, search, result)
|
||||
|
||||
def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | list[Result]:
|
||||
if not hasattr(self.module, "post_search"):
|
||||
return None
|
||||
return self.module.post_search(request, search)
|
||||
|
||||
|
||||
class PluginStorage:
|
||||
"""A storage for managing the *plugins* of SearXNG."""
|
||||
|
||||
plugin_list: set[Plugin]
|
||||
"""The list of :py:obj:`Plugins` in this storage."""
|
||||
|
||||
legacy_plugins = [
|
||||
"ahmia_filter",
|
||||
"calculator",
|
||||
"hostnames",
|
||||
"oa_doi_rewrite",
|
||||
"tor_check",
|
||||
"tracker_url_remover",
|
||||
"unit_converter",
|
||||
]
|
||||
"""Internal plugins implemented in the legacy style (as module / deprecated!)."""
|
||||
|
||||
def __init__(self):
|
||||
self.plugin_list = set()
|
||||
|
||||
def __iter__(self):
|
||||
|
||||
yield from self.plugin_list
|
||||
|
||||
def __len__(self):
|
||||
return len(self.plugin_list)
|
||||
|
||||
@property
|
||||
def info(self) -> list[PluginInfo]:
|
||||
return [p.info for p in self.plugin_list]
|
||||
|
||||
def load_builtins(self):
|
||||
"""Load plugin modules from:
|
||||
|
||||
- the python packages in :origin:`searx/plugins` and
|
||||
- the external plugins from :ref:`settings plugins`.
|
||||
"""
|
||||
|
||||
for f in _default.iterdir():
|
||||
|
||||
if f.name.startswith("_"):
|
||||
continue
|
||||
|
||||
if f.stem not in self.legacy_plugins:
|
||||
self.register_by_fqn(f"searx.plugins.{f.stem}.SXNGPlugin")
|
||||
continue
|
||||
|
||||
# for backward compatibility
|
||||
mod = load_module(f.name, str(f.parent))
|
||||
self.register(ModulePlugin(mod))
|
||||
|
||||
for fqn in searx.get_setting("plugins"): # type: ignore
|
||||
self.register_by_fqn(fqn)
|
||||
|
||||
def register(self, plugin: Plugin):
|
||||
"""Register a :py:obj:`Plugin`. In case of name collision (if two
|
||||
plugins have same ID) a :py:obj:`KeyError` exception is raised.
|
||||
"""
|
||||
|
||||
if plugin in self.plugin_list:
|
||||
msg = f"name collision '{plugin.id}'"
|
||||
plugin.log.critical(msg)
|
||||
raise KeyError(msg)
|
||||
|
||||
self.plugin_list.add(plugin)
|
||||
plugin.log.debug("plugin has been loaded")
|
||||
|
||||
def register_by_fqn(self, fqn: str):
|
||||
"""Register a :py:obj:`Plugin` via its fully qualified class name (FQN).
|
||||
The FQNs of external plugins could be read from a configuration, for
|
||||
example, and registered using this method
|
||||
"""
|
||||
|
||||
mod_name, _, obj_name = fqn.rpartition('.')
|
||||
if not mod_name:
|
||||
# for backward compatibility
|
||||
code_obj = importlib.import_module(fqn)
|
||||
else:
|
||||
mod = importlib.import_module(mod_name)
|
||||
code_obj = getattr(mod, obj_name, None)
|
||||
|
||||
if code_obj is None:
|
||||
msg = f"plugin {fqn} is not implemented"
|
||||
log.critical(msg)
|
||||
raise ValueError(msg)
|
||||
|
||||
if isinstance(code_obj, types.ModuleType):
|
||||
# for backward compatibility
|
||||
warnings.warn(
|
||||
f"plugin {fqn} is implemented in a legacy module / migrate to searx.plugins.Plugin", DeprecationWarning
|
||||
)
|
||||
self.register(ModulePlugin(code_obj))
|
||||
return
|
||||
|
||||
self.register(code_obj())
|
||||
|
||||
def init(self, app: flask.Flask) -> None:
|
||||
"""Calls the method :py:obj:`Plugin.init` of each plugin in this
|
||||
storage. Depending on its return value, the plugin is removed from
|
||||
*this* storage or not."""
|
||||
|
||||
for plg in self.plugin_list.copy():
|
||||
if not plg.init(app):
|
||||
self.plugin_list.remove(plg)
|
||||
|
||||
def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
|
||||
|
||||
ret = True
|
||||
for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
|
||||
try:
|
||||
ret = bool(plugin.pre_search(request=request, search=search))
|
||||
except Exception: # pylint: disable=broad-except
|
||||
plugin.log.exception("Exception while calling pre_search")
|
||||
continue
|
||||
if not ret:
|
||||
# skip this search on the first False from a plugin
|
||||
break
|
||||
return ret
|
||||
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
|
||||
|
||||
ret = True
|
||||
for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
|
||||
try:
|
||||
ret = bool(plugin.on_result(request=request, search=search, result=result))
|
||||
except Exception: # pylint: disable=broad-except
|
||||
plugin.log.exception("Exception while calling on_result")
|
||||
continue
|
||||
if not ret:
|
||||
# ignore this result item on the first False from a plugin
|
||||
break
|
||||
|
||||
return ret
|
||||
|
||||
def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None:
|
||||
"""Extend :py:obj:`search.result_container
|
||||
<searx.results.ResultContainer`> with result items from plugins listed
|
||||
in :py:obj:`search.user_plugins <SearchWithPlugins.user_plugins>`.
|
||||
"""
|
||||
|
||||
keyword = None
|
||||
for keyword in search.search_query.query.split():
|
||||
if keyword:
|
||||
break
|
||||
|
||||
for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
|
||||
|
||||
if plugin.keywords:
|
||||
# plugin with keywords: skip plugin if no keyword match
|
||||
if keyword and keyword not in plugin.keywords:
|
||||
continue
|
||||
try:
|
||||
results = plugin.post_search(request=request, search=search) or []
|
||||
except Exception: # pylint: disable=broad-except
|
||||
plugin.log.exception("Exception while calling post_search")
|
||||
continue
|
||||
|
||||
# In case of *plugins* prefix ``plugin:`` is set, see searx.result_types.Result
|
||||
search.result_container.extend(f"plugin: {plugin.id}", results)
|
|
@ -1,27 +1,33 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring
|
||||
|
||||
from __future__ import annotations
|
||||
from hashlib import md5
|
||||
|
||||
import flask
|
||||
|
||||
from searx.data import ahmia_blacklist_loader
|
||||
from searx import get_setting
|
||||
|
||||
|
||||
name = "Ahmia blacklist"
|
||||
description = "Filter out onion results that appear in Ahmia's blacklist. (See https://ahmia.fi/blacklist)"
|
||||
default_on = True
|
||||
preference_section = 'onions'
|
||||
|
||||
ahmia_blacklist = None
|
||||
ahmia_blacklist: list = []
|
||||
|
||||
|
||||
def on_result(_request, _search, result):
|
||||
def on_result(_request, _search, result) -> bool:
|
||||
if not result.get('is_onion') or not result.get('parsed_url'):
|
||||
return True
|
||||
result_hash = md5(result['parsed_url'].hostname.encode()).hexdigest()
|
||||
return result_hash not in ahmia_blacklist
|
||||
|
||||
|
||||
def init(_app, settings):
|
||||
def init(app=flask.Flask) -> bool: # pylint: disable=unused-argument
|
||||
global ahmia_blacklist # pylint: disable=global-statement
|
||||
if not settings['outgoing']['using_tor_proxy']:
|
||||
if not get_setting("outgoing.using_tor_proxy"):
|
||||
# disable the plugin
|
||||
return False
|
||||
ahmia_blacklist = ahmia_blacklist_loader()
|
||||
|
|
|
@ -1,28 +1,27 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Calculate mathematical expressions using ack#eval
|
||||
"""Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval").
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Callable
|
||||
|
||||
import ast
|
||||
import re
|
||||
import operator
|
||||
from multiprocessing import Process, Queue
|
||||
from typing import Callable
|
||||
import multiprocessing
|
||||
|
||||
import flask
|
||||
import babel
|
||||
import babel.numbers
|
||||
from flask_babel import gettext
|
||||
|
||||
from searx.plugins import logger
|
||||
from searx.result_types import Answer
|
||||
|
||||
name = "Basic Calculator"
|
||||
description = gettext("Calculate mathematical expressions via the search bar")
|
||||
default_on = True
|
||||
|
||||
preference_section = 'general'
|
||||
plugin_id = 'calculator'
|
||||
|
||||
logger = logger.getChild(plugin_id)
|
||||
|
||||
operators: dict[type, Callable] = {
|
||||
ast.Add: operator.add,
|
||||
ast.Sub: operator.sub,
|
||||
|
@ -33,11 +32,17 @@ operators: dict[type, Callable] = {
|
|||
ast.USub: operator.neg,
|
||||
}
|
||||
|
||||
# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
|
||||
# the old behavior "fork") but it will not solve the core problem of fork, nor
|
||||
# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
|
||||
# ddiscussed here: https://github.com/searxng/searxng/issues/4159
|
||||
mp_fork = multiprocessing.get_context("fork")
|
||||
|
||||
|
||||
def _eval_expr(expr):
|
||||
"""
|
||||
>>> _eval_expr('2^6')
|
||||
4
|
||||
64
|
||||
>>> _eval_expr('2**6')
|
||||
64
|
||||
>>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
|
||||
|
@ -63,46 +68,49 @@ def _eval(node):
|
|||
raise TypeError(node)
|
||||
|
||||
|
||||
def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
|
||||
try:
|
||||
q.put(func(*args, **kwargs))
|
||||
except:
|
||||
q.put(None)
|
||||
raise
|
||||
|
||||
|
||||
def timeout_func(timeout, func, *args, **kwargs):
|
||||
|
||||
def handler(q: Queue, func, args, **kwargs): # pylint:disable=invalid-name
|
||||
try:
|
||||
q.put(func(*args, **kwargs))
|
||||
except:
|
||||
q.put(None)
|
||||
raise
|
||||
|
||||
que = Queue()
|
||||
p = Process(target=handler, args=(que, func, args), kwargs=kwargs)
|
||||
que = mp_fork.Queue()
|
||||
p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
|
||||
p.start()
|
||||
p.join(timeout=timeout)
|
||||
ret_val = None
|
||||
# pylint: disable=used-before-assignment,undefined-variable
|
||||
if not p.is_alive():
|
||||
ret_val = que.get()
|
||||
else:
|
||||
logger.debug("terminate function after timeout is exceeded")
|
||||
logger.debug("terminate function after timeout is exceeded") # type: ignore
|
||||
p.terminate()
|
||||
p.join()
|
||||
p.close()
|
||||
return ret_val
|
||||
|
||||
|
||||
def post_search(_request, search):
|
||||
def post_search(request, search) -> list[Answer]:
|
||||
results = []
|
||||
|
||||
# only show the result of the expression on the first page
|
||||
if search.search_query.pageno > 1:
|
||||
return True
|
||||
return results
|
||||
|
||||
query = search.search_query.query
|
||||
# in order to avoid DoS attacks with long expressions, ignore long expressions
|
||||
if len(query) > 100:
|
||||
return True
|
||||
return results
|
||||
|
||||
# replace commonly used math operators with their proper Python operator
|
||||
query = query.replace("x", "*").replace(":", "/")
|
||||
|
||||
# use UI language
|
||||
ui_locale = babel.Locale.parse(flask.request.preferences.get_value('locale'), sep='-')
|
||||
ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-')
|
||||
|
||||
# parse the number system in a localized way
|
||||
def _decimal(match: re.Match) -> str:
|
||||
|
@ -116,15 +124,17 @@ def post_search(_request, search):
|
|||
|
||||
# only numbers and math operators are accepted
|
||||
if any(str.isalpha(c) for c in query):
|
||||
return True
|
||||
return results
|
||||
|
||||
# in python, powers are calculated via **
|
||||
query_py_formatted = query.replace("^", "**")
|
||||
|
||||
# Prevent the runtime from being longer than 50 ms
|
||||
result = timeout_func(0.05, _eval_expr, query_py_formatted)
|
||||
if result is None or result == "":
|
||||
return True
|
||||
result = babel.numbers.format_decimal(result, locale=ui_locale)
|
||||
search.result_container.answers['calculate'] = {'answer': f"{search.search_query.query} = {result}"}
|
||||
return True
|
||||
res = timeout_func(0.05, _eval_expr, query_py_formatted)
|
||||
if res is None or res == "":
|
||||
return results
|
||||
|
||||
res = babel.numbers.format_decimal(res, locale=ui_locale)
|
||||
Answer(results=results, answer=f"{search.search_query.query} = {res}")
|
||||
|
||||
return results
|
||||
|
|
|
@ -1,43 +1,66 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring
|
||||
# pylint: disable=missing-module-docstring, missing-class-docstring
|
||||
from __future__ import annotations
|
||||
import typing
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
import hashlib
|
||||
|
||||
from flask_babel import gettext
|
||||
|
||||
name = "Hash plugin"
|
||||
description = gettext("Converts strings to different hash digests.")
|
||||
default_on = True
|
||||
preference_section = 'query'
|
||||
query_keywords = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
|
||||
query_examples = 'sha512 The quick brown fox jumps over the lazy dog'
|
||||
from searx.plugins import Plugin, PluginInfo
|
||||
from searx.result_types import Answer
|
||||
|
||||
parser_re = re.compile('(md5|sha1|sha224|sha256|sha384|sha512) (.*)', re.I)
|
||||
if typing.TYPE_CHECKING:
|
||||
from searx.search import SearchWithPlugins
|
||||
from searx.extended_types import SXNG_Request
|
||||
|
||||
|
||||
def post_search(_request, search):
|
||||
# process only on first page
|
||||
if search.search_query.pageno > 1:
|
||||
return True
|
||||
m = parser_re.match(search.search_query.query)
|
||||
if not m:
|
||||
# wrong query
|
||||
return True
|
||||
class SXNGPlugin(Plugin):
|
||||
"""Plugin converts strings to different hash digests. The results are
|
||||
displayed in area for the "answers".
|
||||
"""
|
||||
|
||||
function, string = m.groups()
|
||||
if not string.strip():
|
||||
# end if the string is empty
|
||||
return True
|
||||
id = "hash_plugin"
|
||||
default_on = True
|
||||
keywords = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
|
||||
|
||||
# select hash function
|
||||
f = hashlib.new(function.lower())
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
# make digest from the given string
|
||||
f.update(string.encode('utf-8').strip())
|
||||
answer = function + " " + gettext('hash digest') + ": " + f.hexdigest()
|
||||
self.parser_re = re.compile(f"({'|'.join(self.keywords)}) (.*)", re.I)
|
||||
self.info = PluginInfo(
|
||||
id=self.id,
|
||||
name=gettext("Hash plugin"),
|
||||
description=gettext("Converts strings to different hash digests."),
|
||||
examples=["sha512 The quick brown fox jumps over the lazy dog"],
|
||||
preference_section="query",
|
||||
)
|
||||
|
||||
# print result
|
||||
search.result_container.answers.clear()
|
||||
search.result_container.answers['hash'] = {'answer': answer}
|
||||
return True
|
||||
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list[Answer]:
|
||||
"""Returns a result list only for the first page."""
|
||||
results = []
|
||||
|
||||
if search.search_query.pageno > 1:
|
||||
return results
|
||||
|
||||
m = self.parser_re.match(search.search_query.query)
|
||||
if not m:
|
||||
# wrong query
|
||||
return results
|
||||
|
||||
function, string = m.groups()
|
||||
if not string.strip():
|
||||
# end if the string is empty
|
||||
return results
|
||||
|
||||
# select hash function
|
||||
f = hashlib.new(function.lower())
|
||||
|
||||
# make digest from the given string
|
||||
f.update(string.encode("utf-8").strip())
|
||||
answer = function + " " + gettext("hash digest") + ": " + f.hexdigest()
|
||||
|
||||
Answer(results=results, answer=answer)
|
||||
|
||||
return results
|
||||
|
|
|
@ -91,15 +91,17 @@ something like this:
|
|||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from urllib.parse import urlunparse, urlparse
|
||||
|
||||
from flask_babel import gettext
|
||||
|
||||
from searx import settings
|
||||
from searx.plugins import logger
|
||||
from searx.settings_loader import get_yaml_cfg
|
||||
|
||||
|
||||
name = gettext('Hostnames plugin')
|
||||
description = gettext('Rewrite hostnames, remove results or prioritize them based on the hostname')
|
||||
default_on = False
|
||||
|
@ -107,16 +109,15 @@ preference_section = 'general'
|
|||
|
||||
plugin_id = 'hostnames'
|
||||
|
||||
logger = logger.getChild(plugin_id)
|
||||
parsed = 'parsed_url'
|
||||
_url_fields = ['iframe_src', 'audio_src']
|
||||
|
||||
|
||||
def _load_regular_expressions(settings_key):
|
||||
def _load_regular_expressions(settings_key) -> dict | set | None:
|
||||
setting_value = settings.get(plugin_id, {}).get(settings_key)
|
||||
|
||||
if not setting_value:
|
||||
return {}
|
||||
return None
|
||||
|
||||
# load external file with configuration
|
||||
if isinstance(setting_value, str):
|
||||
|
@ -128,20 +129,20 @@ def _load_regular_expressions(settings_key):
|
|||
if isinstance(setting_value, dict):
|
||||
return {re.compile(p): r for (p, r) in setting_value.items()}
|
||||
|
||||
return {}
|
||||
return None
|
||||
|
||||
|
||||
replacements = _load_regular_expressions('replace')
|
||||
removables = _load_regular_expressions('remove')
|
||||
high_priority = _load_regular_expressions('high_priority')
|
||||
low_priority = _load_regular_expressions('low_priority')
|
||||
replacements: dict = _load_regular_expressions('replace') or {} # type: ignore
|
||||
removables: set = _load_regular_expressions('remove') or set() # type: ignore
|
||||
high_priority: set = _load_regular_expressions('high_priority') or set() # type: ignore
|
||||
low_priority: set = _load_regular_expressions('low_priority') or set() # type: ignore
|
||||
|
||||
|
||||
def _matches_parsed_url(result, pattern):
|
||||
return parsed in result and pattern.search(result[parsed].netloc)
|
||||
|
||||
|
||||
def on_result(_request, _search, result):
|
||||
def on_result(_request, _search, result) -> bool:
|
||||
for pattern, replacement in replacements.items():
|
||||
if _matches_parsed_url(result, pattern):
|
||||
# logger.debug(result['url'])
|
||||
|
|
|
@ -1,18 +1,21 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring
|
||||
|
||||
from __future__ import annotations
|
||||
import re
|
||||
from urllib.parse import urlparse, parse_qsl
|
||||
|
||||
from flask_babel import gettext
|
||||
|
||||
from searx import settings
|
||||
|
||||
|
||||
regex = re.compile(r'10\.\d{4,9}/[^\s]+')
|
||||
|
||||
name = gettext('Open Access DOI rewrite')
|
||||
description = gettext('Avoid paywalls by redirecting to open-access versions of publications when available')
|
||||
default_on = False
|
||||
preference_section = 'general'
|
||||
preference_section = 'general/doi_resolver'
|
||||
|
||||
|
||||
def extract_doi(url):
|
||||
|
@ -34,8 +37,9 @@ def get_doi_resolver(preferences):
|
|||
return doi_resolvers[selected_resolver]
|
||||
|
||||
|
||||
def on_result(request, _search, result):
|
||||
if 'parsed_url' not in result:
|
||||
def on_result(request, _search, result) -> bool:
|
||||
|
||||
if not result.parsed_url:
|
||||
return True
|
||||
|
||||
doi = extract_doi(result['parsed_url'])
|
||||
|
|
|
@ -1,32 +1,57 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring,invalid-name
|
||||
# pylint: disable=missing-module-docstring, missing-class-docstring
|
||||
from __future__ import annotations
|
||||
import typing
|
||||
|
||||
import re
|
||||
from flask_babel import gettext
|
||||
|
||||
from searx.botdetection._helpers import get_real_ip
|
||||
from searx.result_types import Answer
|
||||
|
||||
name = gettext('Self Information')
|
||||
description = gettext('Displays your IP if the query is "ip" and your user agent if the query contains "user agent".')
|
||||
default_on = True
|
||||
preference_section = 'query'
|
||||
query_keywords = ['user-agent']
|
||||
query_examples = ''
|
||||
from . import Plugin, PluginInfo
|
||||
|
||||
# "ip" or "my ip" regex
|
||||
ip_regex = re.compile('^ip$|my ip', re.IGNORECASE)
|
||||
|
||||
# Self User Agent regex
|
||||
ua_regex = re.compile('.*user[ -]agent.*', re.IGNORECASE)
|
||||
if typing.TYPE_CHECKING:
|
||||
from searx.search import SearchWithPlugins
|
||||
from searx.extended_types import SXNG_Request
|
||||
|
||||
|
||||
def post_search(request, search):
|
||||
if search.search_query.pageno > 1:
|
||||
return True
|
||||
if ip_regex.search(search.search_query.query):
|
||||
ip = get_real_ip(request)
|
||||
search.result_container.answers['ip'] = {'answer': gettext('Your IP is: ') + ip}
|
||||
elif ua_regex.match(search.search_query.query):
|
||||
ua = request.user_agent
|
||||
search.result_container.answers['user-agent'] = {'answer': gettext('Your user-agent is: ') + ua.string}
|
||||
return True
|
||||
class SXNGPlugin(Plugin):
|
||||
"""Simple plugin that displays information about user's request, including
|
||||
the IP or HTTP User-Agent. The information is displayed in area for the
|
||||
"answers".
|
||||
"""
|
||||
|
||||
id = "self_info"
|
||||
default_on = True
|
||||
keywords = ["ip", "user-agent"]
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.ip_regex = re.compile(r"^ip", re.IGNORECASE)
|
||||
self.ua_regex = re.compile(r"^user-agent", re.IGNORECASE)
|
||||
|
||||
self.info = PluginInfo(
|
||||
id=self.id,
|
||||
name=gettext("Self Information"),
|
||||
description=gettext(
|
||||
"""Displays your IP if the query is "ip" and your user agent if the query is "user-agent"."""
|
||||
),
|
||||
preference_section="query",
|
||||
)
|
||||
|
||||
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list[Answer]:
|
||||
"""Returns a result list only for the first page."""
|
||||
results = []
|
||||
|
||||
if search.search_query.pageno > 1:
|
||||
return results
|
||||
|
||||
if self.ip_regex.search(search.search_query.query):
|
||||
Answer(results=results, answer=gettext("Your IP is: ") + get_real_ip(request))
|
||||
|
||||
if self.ua_regex.match(search.search_query.query):
|
||||
Answer(results=results, answer=gettext("Your user-agent is: ") + str(request.user_agent))
|
||||
|
||||
return results
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""A plugin to check if the ip address of the request is a Tor exit-node if the
|
||||
user searches for ``tor-check``. It fetches the tor exit node list from
|
||||
https://check.torproject.org/exit-addresses and parses all the IPs into a list,
|
||||
then checks if the user's IP address is in it.
|
||||
:py:obj:`url_exit_list` and parses all the IPs into a list, then checks if the
|
||||
user's IP address is in it.
|
||||
|
||||
Enable in ``settings.yml``:
|
||||
|
||||
|
@ -14,10 +14,15 @@ Enable in ``settings.yml``:
|
|||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from flask_babel import gettext
|
||||
from httpx import HTTPError
|
||||
|
||||
from searx.network import get
|
||||
from searx.result_types import Answer
|
||||
|
||||
|
||||
default_on = False
|
||||
|
||||
|
@ -42,27 +47,28 @@ query_examples = ''
|
|||
# Regex for exit node addresses in the list.
|
||||
reg = re.compile(r"(?<=ExitAddress )\S+")
|
||||
|
||||
url_exit_list = "https://check.torproject.org/exit-addresses"
|
||||
"""URL to load Tor exit list from."""
|
||||
|
||||
def post_search(request, search):
|
||||
|
||||
def post_search(request, search) -> list[Answer]:
|
||||
results = []
|
||||
|
||||
if search.search_query.pageno > 1:
|
||||
return True
|
||||
return results
|
||||
|
||||
if search.search_query.query.lower() == "tor-check":
|
||||
|
||||
# Request the list of tor exit nodes.
|
||||
try:
|
||||
resp = get("https://check.torproject.org/exit-addresses")
|
||||
node_list = re.findall(reg, resp.text)
|
||||
resp = get(url_exit_list)
|
||||
node_list = re.findall(reg, resp.text) # type: ignore
|
||||
|
||||
except HTTPError:
|
||||
# No answer, return error
|
||||
search.result_container.answers["tor"] = {
|
||||
"answer": gettext(
|
||||
"Could not download the list of Tor exit-nodes from: https://check.torproject.org/exit-addresses"
|
||||
)
|
||||
}
|
||||
return True
|
||||
msg = gettext("Could not download the list of Tor exit-nodes from")
|
||||
Answer(results=results, answer=f"{msg} {url_exit_list}")
|
||||
return results
|
||||
|
||||
x_forwarded_for = request.headers.getlist("X-Forwarded-For")
|
||||
|
||||
|
@ -72,20 +78,11 @@ def post_search(request, search):
|
|||
ip_address = request.remote_addr
|
||||
|
||||
if ip_address in node_list:
|
||||
search.result_container.answers["tor"] = {
|
||||
"answer": gettext(
|
||||
"You are using Tor and it looks like you have this external IP address: {ip_address}".format(
|
||||
ip_address=ip_address
|
||||
)
|
||||
)
|
||||
}
|
||||
else:
|
||||
search.result_container.answers["tor"] = {
|
||||
"answer": gettext(
|
||||
"You are not using Tor and you have this external IP address: {ip_address}".format(
|
||||
ip_address=ip_address
|
||||
)
|
||||
)
|
||||
}
|
||||
msg = gettext("You are using Tor and it looks like you have the external IP address")
|
||||
Answer(results=results, answer=f"{msg} {ip_address}")
|
||||
|
||||
return True
|
||||
else:
|
||||
msg = gettext("You are not using Tor and you have the external IP address")
|
||||
Answer(results=results, answer=f"{msg} {ip_address}")
|
||||
|
||||
return results
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from urllib.parse import urlunparse, parse_qsl, urlencode
|
||||
|
||||
|
@ -19,24 +21,24 @@ default_on = True
|
|||
preference_section = 'privacy'
|
||||
|
||||
|
||||
def on_result(_request, _search, result):
|
||||
if 'parsed_url' not in result:
|
||||
def on_result(_request, _search, result) -> bool:
|
||||
|
||||
parsed_url = getattr(result, "parsed_url", None)
|
||||
if not parsed_url:
|
||||
return True
|
||||
|
||||
query = result['parsed_url'].query
|
||||
|
||||
if query == "":
|
||||
if parsed_url.query == "":
|
||||
return True
|
||||
parsed_query = parse_qsl(query)
|
||||
|
||||
parsed_query = parse_qsl(parsed_url.query)
|
||||
changes = 0
|
||||
for i, (param_name, _) in enumerate(list(parsed_query)):
|
||||
for reg in regexes:
|
||||
if reg.match(param_name):
|
||||
parsed_query.pop(i - changes)
|
||||
changes += 1
|
||||
result['parsed_url'] = result['parsed_url']._replace(query=urlencode(parsed_query))
|
||||
result['url'] = urlunparse(result['parsed_url'])
|
||||
result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query))
|
||||
result.url = urlunparse(result.parsed_url)
|
||||
break
|
||||
|
||||
return True
|
||||
|
|
|
@ -18,11 +18,14 @@ Enable in ``settings.yml``:
|
|||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import re
|
||||
import babel.numbers
|
||||
|
||||
from flask_babel import gettext, get_locale
|
||||
|
||||
from searx import data
|
||||
from searx.result_types import Answer
|
||||
|
||||
|
||||
name = "Unit converter plugin"
|
||||
|
@ -171,16 +174,16 @@ def symbol_to_si():
|
|||
return SYMBOL_TO_SI
|
||||
|
||||
|
||||
def _parse_text_and_convert(search, from_query, to_query):
|
||||
def _parse_text_and_convert(from_query, to_query) -> str | None:
|
||||
|
||||
# pylint: disable=too-many-branches, too-many-locals
|
||||
|
||||
if not (from_query and to_query):
|
||||
return
|
||||
return None
|
||||
|
||||
measured = re.match(RE_MEASURE, from_query, re.VERBOSE)
|
||||
if not (measured and measured.group('number'), measured.group('unit')):
|
||||
return
|
||||
return None
|
||||
|
||||
# Symbols are not unique, if there are several hits for the from-unit, then
|
||||
# the correct one must be determined by comparing it with the to-unit
|
||||
|
@ -198,7 +201,7 @@ def _parse_text_and_convert(search, from_query, to_query):
|
|||
target_list.append((si_name, from_si, orig_symbol))
|
||||
|
||||
if not (source_list and target_list):
|
||||
return
|
||||
return None
|
||||
|
||||
source_to_si = target_from_si = target_symbol = None
|
||||
|
||||
|
@ -212,7 +215,7 @@ def _parse_text_and_convert(search, from_query, to_query):
|
|||
target_symbol = target[2]
|
||||
|
||||
if not (source_to_si and target_from_si):
|
||||
return
|
||||
return None
|
||||
|
||||
_locale = get_locale() or 'en_US'
|
||||
|
||||
|
@ -239,25 +242,28 @@ def _parse_text_and_convert(search, from_query, to_query):
|
|||
else:
|
||||
result = babel.numbers.format_decimal(value, locale=_locale, format='#,##0.##########;-#')
|
||||
|
||||
search.result_container.answers['conversion'] = {'answer': f'{result} {target_symbol}'}
|
||||
return f'{result} {target_symbol}'
|
||||
|
||||
|
||||
def post_search(_request, search):
|
||||
def post_search(_request, search) -> list[Answer]:
|
||||
results = []
|
||||
|
||||
# only convert between units on the first page
|
||||
if search.search_query.pageno > 1:
|
||||
return True
|
||||
return results
|
||||
|
||||
query = search.search_query.query
|
||||
query_parts = query.split(" ")
|
||||
|
||||
if len(query_parts) < 3:
|
||||
return True
|
||||
return results
|
||||
|
||||
for query_part in query_parts:
|
||||
for keyword in CONVERT_KEYWORDS:
|
||||
if query_part == keyword:
|
||||
from_query, to_query = query.split(keyword, 1)
|
||||
_parse_text_and_convert(search, from_query.strip(), to_query.strip())
|
||||
return True
|
||||
target_val = _parse_text_and_convert(from_query.strip(), to_query.strip())
|
||||
if target_val:
|
||||
Answer(results=results, answer=target_val)
|
||||
|
||||
return True
|
||||
return results
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue