[refactor] migrate plugins from "module" to class SXNGPlugin

This patch brings two major changes:

- ``Result.filter_urls(..)`` to pass a filter function for URL fields
- The ``enabled_plugins:`` section in SearXNG's settings do no longer exists.

To understand plugin development compile documentation:

    $ make docs.clean docs.live

and read http://0.0.0.0:8000/dev/plugins/development.html

There is no longer a distinction between built-in and external plugin, all
plugins are registered via the settings in the ``plugins:`` section.

In SearXNG, plugins can be registered via a fully qualified class name.  A
configuration (`PluginCfg`) can be transferred to the plugin, e.g. to activate
it by default / *opt-in* or *opt-out* from user's point of view.

built-in plugins
================

The built-in plugins are all located in the namespace `searx.plugins`.

.. code:: yaml

    plugins:

      searx.plugins.calculator.SXNGPlugin:
        active: true

      searx.plugins.hash_plugin.SXNGPlugin:
        active: true

      searx.plugins.self_info.SXNGPlugin:
        active: true

      searx.plugins.tracker_url_remover.SXNGPlugin:
        active: true

      searx.plugins.unit_converter.SXNGPlugin:
        active: true

      searx.plugins.ahmia_filter.SXNGPlugin:
        active: true

      searx.plugins.hostnames.SXNGPlugin:
        active: true

      searx.plugins.oa_doi_rewrite.SXNGPlugin:
        active: false

      searx.plugins.tor_check.SXNGPlugin:
        active: false

external plugins
================

SearXNG supports *external plugins* / there is no need to install one, SearXNG
runs out of the box.

- Only show green hosted results: https://github.com/return42/tgwf-searx-plugins/

To get a developer installation in a SearXNG developer environment:

.. code:: sh

   $ git clone git@github.com:return42/tgwf-searx-plugins.git
   $ ./manage pyenv.cmd python -m \
         pip install -e tgwf-searx-plugins

To register the plugin in SearXNG add ``only_show_green_results.SXNGPlugin`` to
the ``plugins:``:

.. code:: yaml

    plugins:
      # ...
      only_show_green_results.SXNGPlugin:
        active: false

Result.filter_urls(..)
======================

The ``Result.filter_urls(..)`` can be used to filter and/or modify URL fields.
In the following example, the filter function ``my_url_filter``:

.. code:: python

   def my_url_filter(result, field_name, url_src) -> bool | str:
       if "google" in url_src:
           return False              # remove URL field from result
       if "facebook" in url_src:
           new_url = url_src.replace("facebook", "fb-dummy")
           return new_url            # return modified URL
       return True                   # leave URL in field unchanged

is applied to all URL fields in the :py:obj:`Plugin.on_result` hook:

.. code:: python

   class MyUrlFilter(Plugin):
       ...
       def on_result(self, request, search, result) -> bool:
           result.filter_urls(my_url_filter)
           return True

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
Markus Heiser 2025-03-20 07:47:38 +01:00 committed by Markus Heiser
parent d36da0a6c3
commit 50f92779bd
23 changed files with 816 additions and 607 deletions

View file

@ -3,29 +3,8 @@
- :ref:`plugins admin`
- :ref:`SearXNG settings <settings plugins>`
- :ref:`builtin plugins`
Plugins can extend or replace functionality of various components of SearXNG.
Here is an example of a very simple plugin that adds a "Hello" into the answer
area:
.. code:: python
from flask_babel import gettext as _
from searx.plugins import Plugin
from searx.result_types import Answer
class MyPlugin(Plugin):
id = "self_info"
default_on = True
def __init__(self):
super().__init__()
info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin"))
def post_search(self, request, search):
return [ Answer(answer="Hello") ]
Entry points (hooks) define when a plugin runs. Right now only three hooks are
implemented. So feel free to implement a hook if it fits the behaviour of your
@ -35,9 +14,72 @@ plugin / a plugin doesn't need to implement all the hooks.
- post search: :py:obj:`Plugin.post_search`
- on each result item: :py:obj:`Plugin.on_result`
For a coding example have a look at :ref:`self_info plugin`.
Below you will find some examples, for more coding examples have a look at the
built-in plugins :origin:`searx/plugins/` or `Only show green hosted results`_.
----
.. _Only show green hosted results:
https://github.com/return42/tgwf-searx-plugins/
Add Answer example
==================
Here is an example of a very simple plugin that adds a "Hello World" into the
answer area:
.. code:: python
from flask_babel import gettext as _
from searx.plugins import Plugin
from searx.result_types import Answer
class MyPlugin(Plugin):
id = "hello world"
def __init__(self, plg_cfg):
super().__init__(plg_cfg)
self.info = PluginInfo(id=self.id, name=_("Hello"), description=_("demo plugin"))
def post_search(self, request, search):
return [ Answer(answer="Hello World") ]
.. _filter urls example:
Filter URLs example
===================
.. sidebar:: Further reading ..
- :py:obj:`Result.filter_urls(..) <searx.result_types._base.Result.filter_urls>`
The :py:obj:`Result.filter_urls(..) <searx.result_types._base.Result.filter_urls>`
can be used to filter and/or modify URL fields. In the following example, the
filter function ``my_url_filter``:
.. code:: python
def my_url_filter(result, field_name, url_src) -> bool | str:
if "google" in url_src:
return False # remove URL field from result
if "facebook" in url_src:
new_url = url_src.replace("facebook", "fb-dummy")
return new_url # return modified URL
return True # leave URL in field unchanged
is applied to all URL fields in the :py:obj:`Plugin.on_result` hook:
.. code:: python
class MyUrlFilter(Plugin):
...
def on_result(self, request, search, result) -> bool:
result.filter_urls(my_url_filter)
return True
Implementation
==============
.. autoclass:: Plugin
:members:
@ -48,21 +90,21 @@ For a coding example have a look at :ref:`self_info plugin`.
.. autoclass:: PluginStorage
:members:
.. autoclass:: searx.plugins._core.ModulePlugin
.. autoclass:: PluginCfg
:members:
:show-inheritance:
"""
from __future__ import annotations
__all__ = ["PluginInfo", "Plugin", "PluginStorage"]
__all__ = ["PluginInfo", "Plugin", "PluginStorage", "PluginCfg"]
from ._core import PluginInfo, Plugin, PluginStorage
import searx
from ._core import PluginInfo, Plugin, PluginStorage, PluginCfg
STORAGE: PluginStorage = PluginStorage()
def initialize(app):
STORAGE.load_builtins()
STORAGE.load_settings(searx.get_setting("plugins"))
STORAGE.init(app)

View file

@ -3,31 +3,24 @@
from __future__ import annotations
__all__ = ["PluginInfo", "Plugin", "PluginStorage"]
__all__ = ["PluginInfo", "Plugin", "PluginCfg", "PluginStorage"]
import abc
import importlib
import inspect
import logging
import pathlib
import types
import re
import typing
import warnings
from dataclasses import dataclass, field
import flask
import searx
from searx.utils import load_module
from searx.extended_types import SXNG_Request
from searx.result_types import Result
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
import flask
_default = pathlib.Path(__file__).parent
log: logging.Logger = logging.getLogger("searx.plugins")
@ -69,14 +62,17 @@ class PluginInfo:
"""See :py:obj:`Plugin.keywords`"""
ID_REGXP = re.compile("[a-z][a-z0-9].*")
class Plugin(abc.ABC):
"""Abstract base class of all Plugins."""
id: str = ""
"""The ID (suffix) in the HTML form."""
default_on: bool = False
"""Plugin is enabled/disabled by default."""
active: typing.ClassVar[bool]
"""Plugin is enabled/disabled by default (:py:obj:`PluginCfg.active`)."""
keywords: list[str] = []
"""Keywords in the search query that activate the plugin. The *keyword* is
@ -93,19 +89,28 @@ class Plugin(abc.ABC):
fqn: str = ""
def __init__(self) -> None:
def __init__(self, plg_cfg: PluginCfg) -> None:
super().__init__()
if not self.fqn:
self.fqn = self.__class__.__mro__[0].__module__
for attr in ["id", "default_on"]:
# names from the configuration
for n, v in plg_cfg.__dict__.items():
setattr(self, n, v)
# names that must be set by the plugin implementation
for attr in [
"id",
]:
if getattr(self, attr, None) is None:
raise NotImplementedError(f"plugin {self} is missing attribute {attr}")
if not self.id:
self.id = f"{self.__class__.__module__}.{self.__class__.__name__}"
if not ID_REGXP.match(self.id):
raise ValueError(f"plugin ID {self.id} contains invalid character (use lowercase ASCII)")
if not getattr(self, "log", None):
self.log = log.getChild(self.id)
pkg_name = inspect.getmodule(self.__class__).__package__ # type: ignore
self.log = logging.getLogger(f"{pkg_name}.{self.id}")
def __hash__(self) -> int:
"""The hash value is used in :py:obj:`set`, for example, when an object
@ -121,7 +126,7 @@ class Plugin(abc.ABC):
return hash(self) == hash(other)
def init(self, app: flask.Flask) -> bool: # pylint: disable=unused-argument
def init(self, app: "flask.Flask") -> bool: # pylint: disable=unused-argument
"""Initialization of the plugin, the return value decides whether this
plugin is active or not. Initialization only takes place once, at the
time the WEB application is set up. The base methode always returns
@ -151,7 +156,8 @@ class Plugin(abc.ABC):
.. hint::
If :py:obj:`Result.url` is modified, :py:obj:`Result.parsed_url` must
If :py:obj:`Result.url <searx.result_types._base.Result.url>` is modified,
:py:obj:`Result.parsed_url <searx.result_types._base.Result.parsed_url>` must
be changed accordingly:
.. code:: python
@ -161,81 +167,24 @@ class Plugin(abc.ABC):
return True
def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | typing.Sequence[Result]:
"""Runs AFTER the search request. Can return a list of :py:obj:`Result`
objects to be added to the final result list."""
"""Runs AFTER the search request. Can return a list of
:py:obj:`Result <searx.result_types._base.Result>` objects to be added to the
final result list."""
return
class ModulePlugin(Plugin):
"""A wrapper class for legacy *plugins*.
@dataclass
class PluginCfg:
"""Settings of a plugin.
.. note::
.. code:: yaml
For internal use only!
In a module plugin, the follwing names are mapped:
- `module.query_keywords` --> :py:obj:`Plugin.keywords`
- `module.plugin_id` --> :py:obj:`Plugin.id`
- `module.logger` --> :py:obj:`Plugin.log`
mypackage.mymodule.MyPlugin:
active: true
"""
_required_attrs = (("name", str), ("description", str), ("default_on", bool))
def __init__(self, mod: types.ModuleType, fqn: str):
"""In case of missing attributes in the module or wrong types are given,
a :py:obj:`TypeError` exception is raised."""
self.fqn = fqn
self.module = mod
self.id = getattr(self.module, "plugin_id", self.module.__name__)
self.log = logging.getLogger(self.module.__name__)
self.keywords = getattr(self.module, "query_keywords", [])
for attr, attr_type in self._required_attrs:
if not hasattr(self.module, attr):
msg = f"missing attribute {attr}, cannot load plugin"
self.log.critical(msg)
raise TypeError(msg)
if not isinstance(getattr(self.module, attr), attr_type):
msg = f"attribute {attr} is not of type {attr_type}"
self.log.critical(msg)
raise TypeError(msg)
self.default_on = mod.default_on
self.info = PluginInfo(
id=self.id,
name=self.module.name,
description=self.module.description,
preference_section=getattr(self.module, "preference_section", None),
examples=getattr(self.module, "query_examples", []),
keywords=self.keywords,
)
# monkeypatch module
self.module.logger = self.log # type: ignore
super().__init__()
def init(self, app: flask.Flask) -> bool:
if not hasattr(self.module, "init"):
return True
return self.module.init(app)
def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool:
if not hasattr(self.module, "pre_search"):
return True
return self.module.pre_search(request, search)
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
if not hasattr(self.module, "on_result"):
return True
return self.module.on_result(request, search, result)
def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | list[Result]:
if not hasattr(self.module, "post_search"):
return None
return self.module.post_search(request, search)
active: bool = False
"""Plugin is active by default and the user can *opt-out* in the preferences."""
class PluginStorage:
@ -244,22 +193,10 @@ class PluginStorage:
plugin_list: set[Plugin]
"""The list of :py:obj:`Plugins` in this storage."""
legacy_plugins = [
"ahmia_filter",
"calculator",
"hostnames",
"oa_doi_rewrite",
"tor_check",
"tracker_url_remover",
"unit_converter",
]
"""Internal plugins implemented in the legacy style (as module / deprecated!)."""
def __init__(self):
self.plugin_list = set()
def __iter__(self):
yield from self.plugin_list
def __len__(self):
@ -267,102 +204,42 @@ class PluginStorage:
@property
def info(self) -> list[PluginInfo]:
return [p.info for p in self.plugin_list]
def load_builtins(self):
"""Load plugin modules from:
def load_settings(self, cfg: dict[str, dict]):
"""Load plugins configured in SearXNG's settings :ref:`settings
plugins`."""
- the python packages in :origin:`searx/plugins` and
- the external plugins from :ref:`settings plugins`.
"""
for fqn, plg_settings in cfg.items():
cls = None
mod_name, cls_name = fqn.rsplit('.', 1)
try:
mod = importlib.import_module(mod_name)
cls = getattr(mod, cls_name, None)
except Exception as exc: # pylint: disable=broad-exception-caught
log.exception(exc)
for f in _default.iterdir():
if f.name.startswith("_"):
continue
if f.stem not in self.legacy_plugins:
self.register_by_fqn(f"searx.plugins.{f.stem}.SXNGPlugin")
continue
# for backward compatibility
mod = load_module(f.name, str(f.parent))
self.register(ModulePlugin(mod, f"searx.plugins.{f.stem}"))
for fqn in searx.get_setting("plugins"): # type: ignore
self.register_by_fqn(fqn)
if cls is None:
msg = f"plugin {fqn} is not implemented"
raise ValueError(msg)
plg = cls(PluginCfg(**plg_settings))
self.register(plg)
def register(self, plugin: Plugin):
"""Register a :py:obj:`Plugin`. In case of name collision (if two
plugins have same ID) a :py:obj:`KeyError` exception is raised.
"""
if plugin in self.plugin_list:
if plugin in [p.id for p in self.plugin_list]:
msg = f"name collision '{plugin.id}'"
plugin.log.critical(msg)
raise KeyError(msg)
if not plugin.fqn.startswith("searx.plugins."):
self.plugin_list.add(plugin)
plugin.log.debug("plugin has been registered")
return
# backward compatibility for the enabled_plugins setting
# https://docs.searxng.org/admin/settings/settings_plugins.html#enabled-plugins-internal
en_plgs: list[str] | None = searx.get_setting("enabled_plugins") # type:ignore
if en_plgs is None:
# enabled_plugins not listed in the /etc/searxng/settings.yml:
# check default_on before register ..
if plugin.default_on:
self.plugin_list.add(plugin)
plugin.log.debug("builtin plugin has been registered by SearXNG's defaults")
return
plugin.log.debug("builtin plugin is not registered by SearXNG's defaults")
return
if plugin.info.name not in en_plgs:
# enabled_plugins listed in the /etc/searxng/settings.yml,
# but this plugin is not listed in:
plugin.log.debug("builtin plugin is not registered by maintainer's settings")
return
# if the plugin is in enabled_plugins, then it is on by default.
plugin.default_on = True
self.plugin_list.add(plugin)
plugin.log.debug("builtin plugin is registered by maintainer's settings")
plugin.log.debug("plugin has been loaded")
def register_by_fqn(self, fqn: str):
"""Register a :py:obj:`Plugin` via its fully qualified class name (FQN).
The FQNs of external plugins could be read from a configuration, for
example, and registered using this method
"""
mod_name, _, obj_name = fqn.rpartition('.')
if not mod_name:
# for backward compatibility
code_obj = importlib.import_module(fqn)
else:
mod = importlib.import_module(mod_name)
code_obj = getattr(mod, obj_name, None)
if code_obj is None:
msg = f"plugin {fqn} is not implemented"
log.critical(msg)
raise ValueError(msg)
if isinstance(code_obj, types.ModuleType):
# for backward compatibility
warnings.warn(
f"plugin {fqn} is implemented in a legacy module / migrate to searx.plugins.Plugin", DeprecationWarning
)
self.register(ModulePlugin(code_obj, fqn))
return
self.register(code_obj())
def init(self, app: flask.Flask) -> None:
def init(self, app: "flask.Flask") -> None:
"""Calls the method :py:obj:`Plugin.init` of each plugin in this
storage. Depending on its return value, the plugin is removed from
*this* storage or not."""

View file

@ -1,34 +1,51 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
from __future__ import annotations
import typing
from hashlib import md5
import flask
from flask_babel import gettext
from searx.data import ahmia_blacklist_loader
from searx import get_setting
from searx.plugins import Plugin, PluginInfo
name = "Ahmia blacklist"
description = "Filter out onion results that appear in Ahmia's blacklist. (See https://ahmia.fi/blacklist)"
default_on = True
preference_section = 'onions'
if typing.TYPE_CHECKING:
import flask
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.result_types import Result
from searx.plugins import PluginCfg
ahmia_blacklist: list = []
def on_result(_request, _search, result) -> bool:
if not getattr(result, 'is_onion', None) or not getattr(result, 'parsed_url', None):
class SXNGPlugin(Plugin):
"""Filter out onion results that appear in Ahmia's blacklist (See https://ahmia.fi/blacklist)."""
id = "ahmia_filter"
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Ahmia blacklist"),
description=gettext("Filter out onion results that appear in Ahmia's blacklist."),
preference_section="general",
)
def on_result(
self, request: "SXNG_Request", search: "SearchWithPlugins", result: Result
) -> bool: # pylint: disable=unused-argument
if not getattr(result, "is_onion", False) or not getattr(result, "parsed_url", False):
return True
result_hash = md5(result["parsed_url"].hostname.encode()).hexdigest()
return result_hash not in ahmia_blacklist
def init(self, app: "flask.Flask") -> bool: # pylint: disable=unused-argument
global ahmia_blacklist # pylint: disable=global-statement
if not get_setting("outgoing.using_tor_proxy"):
# disable the plugin
return False
ahmia_blacklist = ahmia_blacklist_loader()
return True
result_hash = md5(result['parsed_url'].hostname.encode()).hexdigest()
return result_hash not in ahmia_blacklist
def init(app=flask.Flask) -> bool: # pylint: disable=unused-argument
global ahmia_blacklist # pylint: disable=global-statement
if not get_setting("outgoing.using_tor_proxy"):
# disable the plugin
return False
ahmia_blacklist = ahmia_blacklist_loader()
return True

View file

@ -1,9 +1,9 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval").
"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval").
"""
from __future__ import annotations
from typing import Callable
import typing
import ast
import re
@ -15,14 +15,78 @@ import babel.numbers
from flask_babel import gettext
from searx.result_types import EngineResults
from searx.plugins import Plugin, PluginInfo
name = "Basic Calculator"
description = gettext("Calculate mathematical expressions via the search bar")
default_on = True
preference_section = 'general'
plugin_id = 'calculator'
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.plugins import PluginCfg
operators: dict[type, Callable] = {
class SXNGPlugin(Plugin):
"""Plugin converts strings to different hash digests. The results are
displayed in area for the "answers".
"""
id = "calculator"
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Basic Calculator"),
description=gettext("Calculate mathematical expressions via the search bar"),
preference_section="general",
)
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
results = EngineResults()
# only show the result of the expression on the first page
if search.search_query.pageno > 1:
return results
query = search.search_query.query
# in order to avoid DoS attacks with long expressions, ignore long expressions
if len(query) > 100:
return results
# replace commonly used math operators with their proper Python operator
query = query.replace("x", "*").replace(":", "/")
# use UI language
ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
# parse the number system in a localized way
def _decimal(match: re.Match) -> str:
val = match.string[match.start() : match.end()]
val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
return str(val)
decimal = ui_locale.number_symbols["latn"]["decimal"]
group = ui_locale.number_symbols["latn"]["group"]
query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
# only numbers and math operators are accepted
if any(str.isalpha(c) for c in query):
return results
# in python, powers are calculated via **
query_py_formatted = query.replace("^", "**")
# Prevent the runtime from being longer than 50 ms
res = timeout_func(0.05, _eval_expr, query_py_formatted)
if res is None or res == "":
return results
res = babel.numbers.format_decimal(res, locale=ui_locale)
results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
return results
operators: dict[type, typing.Callable] = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
@ -92,49 +156,3 @@ def timeout_func(timeout, func, *args, **kwargs):
p.join()
p.close()
return ret_val
def post_search(request, search) -> EngineResults:
results = EngineResults()
# only show the result of the expression on the first page
if search.search_query.pageno > 1:
return results
query = search.search_query.query
# in order to avoid DoS attacks with long expressions, ignore long expressions
if len(query) > 100:
return results
# replace commonly used math operators with their proper Python operator
query = query.replace("x", "*").replace(":", "/")
# use UI language
ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-')
# parse the number system in a localized way
def _decimal(match: re.Match) -> str:
val = match.string[match.start() : match.end()]
val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
return str(val)
decimal = ui_locale.number_symbols["latn"]["decimal"]
group = ui_locale.number_symbols["latn"]["group"]
query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
# only numbers and math operators are accepted
if any(str.isalpha(c) for c in query):
return results
# in python, powers are calculated via **
query_py_formatted = query.replace("^", "**")
# Prevent the runtime from being longer than 50 ms
res = timeout_func(0.05, _eval_expr, query_py_formatted)
if res is None or res == "":
return results
res = babel.numbers.format_decimal(res, locale=ui_locale)
results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
return results

View file

@ -14,6 +14,7 @@ from searx.result_types import EngineResults
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.plugins import PluginCfg
class SXNGPlugin(Plugin):
@ -22,11 +23,10 @@ class SXNGPlugin(Plugin):
"""
id = "hash_plugin"
default_on = True
keywords = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
def __init__(self):
super().__init__()
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.parser_re = re.compile(f"({'|'.join(self.keywords)}) (.*)", re.I)
self.info = PluginInfo(

View file

@ -1,19 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=too-many-branches
# pylint: disable=too-many-branches, unused-argument
"""
.. attention::
The **"Hostname replace"** plugin has been replace by **"Hostnames
plugin"**, see :pull:`3463` & :pull:`3552`.
The **Hostnames plugin** can be enabled by adding it to the
``enabled_plugins`` **list** in the ``setting.yml`` like so.
.. code:: yaml
enabled_plugins:
- 'Hostnames plugin'
...
During the initialization phase, the plugin checks whether a ``hostnames:``
configuration exists. If this is not the case, the plugin is not included
in the PluginStorage (it is not available for selection).
- ``hostnames.replace``: A **mapping** of regular expressions to hostnames to be
replaced by other hostnames.
@ -92,6 +83,7 @@ something like this:
"""
from __future__ import annotations
import typing
import re
from urllib.parse import urlunparse, urlparse
@ -99,84 +91,114 @@ from urllib.parse import urlunparse, urlparse
from flask_babel import gettext
from searx import settings
from searx.result_types._base import MainResult, LegacyResult
from searx.settings_loader import get_yaml_cfg
from searx.plugins import Plugin, PluginInfo
from ._core import log
if typing.TYPE_CHECKING:
import flask
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.result_types import Result
from searx.plugins import PluginCfg
name = gettext('Hostnames plugin')
description = gettext('Rewrite hostnames, remove results or prioritize them based on the hostname')
default_on = False
preference_section = 'general'
plugin_id = 'hostnames'
parsed = 'parsed_url'
_url_fields = ['iframe_src', 'audio_src']
REPLACE: dict[re.Pattern, str] = {}
REMOVE: set = set()
HIGH: set = set()
LOW: set = set()
def _load_regular_expressions(settings_key) -> dict | set | None:
setting_value = settings.get(plugin_id, {}).get(settings_key)
class SXNGPlugin(Plugin):
"""Rewrite hostnames, remove results or prioritize them."""
if not setting_value:
return None
id = "hostnames"
# load external file with configuration
if isinstance(setting_value, str):
setting_value = get_yaml_cfg(setting_value)
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Hostnames plugin"),
description=gettext("Rewrite hostnames, remove results or prioritize them based on the hostname"),
preference_section="general",
)
if isinstance(setting_value, list):
return {re.compile(r) for r in setting_value}
def on_result(self, request: "SXNG_Request", search: "SearchWithPlugins", result: Result) -> bool:
if isinstance(setting_value, dict):
return {re.compile(p): r for (p, r) in setting_value.items()}
for pattern in REMOVE:
if result.parsed_url and pattern.search(result.parsed_url.netloc):
# if the link (parsed_url) of the result match, then remove the
# result from the result list, in any other case, the result
# remains in the list / see final "return True" below.
# log.debug("FIXME: remove [url/parsed_url] %s %s", pattern.pattern, result.url)
return False
return None
result.filter_urls(filter_url_field)
if isinstance(result, (MainResult, LegacyResult)):
for pattern in LOW:
if result.parsed_url and pattern.search(result.parsed_url.netloc):
result.priority = "low"
replacements: dict = _load_regular_expressions('replace') or {} # type: ignore
removables: set = _load_regular_expressions('remove') or set() # type: ignore
high_priority: set = _load_regular_expressions('high_priority') or set() # type: ignore
low_priority: set = _load_regular_expressions('low_priority') or set() # type: ignore
for pattern in HIGH:
if result.parsed_url and pattern.search(result.parsed_url.netloc):
result.priority = "high"
return True
def _matches_parsed_url(result, pattern):
return result[parsed] and (parsed in result and pattern.search(result[parsed].netloc))
def init(self, app: "flask.Flask") -> bool: # pylint: disable=unused-argument
global REPLACE, REMOVE, HIGH, LOW # pylint: disable=global-statement
def on_result(_request, _search, result) -> bool:
for pattern, replacement in replacements.items():
if _matches_parsed_url(result, pattern):
# logger.debug(result['url'])
result[parsed] = result[parsed]._replace(netloc=pattern.sub(replacement, result[parsed].netloc))
result['url'] = urlunparse(result[parsed])
# logger.debug(result['url'])
for url_field in _url_fields:
if not getattr(result, url_field, None):
continue
url_src = urlparse(result[url_field])
if pattern.search(url_src.netloc):
url_src = url_src._replace(netloc=pattern.sub(replacement, url_src.netloc))
result[url_field] = urlunparse(url_src)
for pattern in removables:
if _matches_parsed_url(result, pattern):
if not settings.get(self.id):
# Remove plugin, if there isn't a "hostnames:" setting
return False
for url_field in _url_fields:
if not getattr(result, url_field, None):
continue
REPLACE = self._load_regular_expressions("replace") or {} # type: ignore
REMOVE = self._load_regular_expressions("remove") or set() # type: ignore
HIGH = self._load_regular_expressions("high_priority") or set() # type: ignore
LOW = self._load_regular_expressions("low_priority") or set() # type: ignore
url_src = urlparse(result[url_field])
if pattern.search(url_src.netloc):
del result[url_field]
return True
for pattern in low_priority:
if _matches_parsed_url(result, pattern):
result['priority'] = 'low'
def _load_regular_expressions(self, settings_key) -> dict[re.Pattern, str] | set | None:
setting_value = settings.get(self.id, {}).get(settings_key)
for pattern in high_priority:
if _matches_parsed_url(result, pattern):
result['priority'] = 'high'
if not setting_value:
return None
# load external file with configuration
if isinstance(setting_value, str):
setting_value = get_yaml_cfg(setting_value)
if isinstance(setting_value, list):
return {re.compile(r) for r in setting_value}
if isinstance(setting_value, dict):
return {re.compile(p): r for (p, r) in setting_value.items()}
return None
def filter_url_field(result: "Result|LegacyResult", field_name: str, url_src: str) -> bool | str:
"""Returns bool ``True`` to use URL unchanged (``False`` to ignore URL).
If URL should be modified, the returned string is the new URL to use."""
if not url_src:
log.debug("missing a URL in field %s", field_name)
return True
url_src_parsed = urlparse(url=url_src)
for pattern in REMOVE:
if pattern.search(url_src_parsed.netloc):
return False
for pattern, replacement in REPLACE.items():
if pattern.search(url_src_parsed.netloc):
new_url = url_src_parsed._replace(netloc=pattern.sub(replacement, url_src_parsed.netloc))
new_url = urlunparse(new_url)
return new_url
return True

View file

@ -1,54 +1,90 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# pylint: disable=missing-module-docstring
from __future__ import annotations
import typing
import re
from urllib.parse import urlparse, parse_qsl
from urllib.parse import parse_qsl
from flask_babel import gettext
from searx import get_setting
from searx.plugins import Plugin, PluginInfo
from searx.extended_types import sxng_request
from searx import settings
from ._core import log
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.result_types import Result, LegacyResult
from searx.plugins import PluginCfg
ahmia_blacklist: list = []
def filter_url_field(result: "Result|LegacyResult", field_name: str, url_src: str) -> bool | str:
"""Returns bool ``True`` to use URL unchanged (``False`` to ignore URL).
If URL should be modified, the returned string is the new URL to use."""
if field_name != "url":
return True # use it unchanged
doi = extract_doi(result.parsed_url)
if doi and len(doi) < 50:
for suffix in ("/", ".pdf", ".xml", "/full", "/meta", "/abstract"):
doi = doi.removesuffix(suffix)
new_url = get_doi_resolver() + doi
if "doi" not in result:
result["doi"] = doi
log.debug("oa_doi_rewrite: [URL field: %s] %s -> %s", field_name, url_src, new_url)
return new_url # use new url
return True # use it unchanged
class SXNGPlugin(Plugin):
"""Avoid paywalls by redirecting to open-access."""
id = "oa_doi_rewrite"
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Open Access DOI rewrite"),
description=gettext("Avoid paywalls by redirecting to open-access versions of publications when available"),
preference_section="general",
)
def on_result(
self,
request: "SXNG_Request",
search: "SearchWithPlugins",
result: "Result",
) -> bool: # pylint: disable=unused-argument
if result.parsed_url:
result.filter_urls(filter_url_field)
return True
regex = re.compile(r'10\.\d{4,9}/[^\s]+')
name = gettext('Open Access DOI rewrite')
description = gettext('Avoid paywalls by redirecting to open-access versions of publications when available')
default_on = False
preference_section = 'general/doi_resolver'
def extract_doi(url):
match = regex.search(url.path)
if match:
return match.group(0)
m = regex.search(url.path)
if m:
return m.group(0)
for _, v in parse_qsl(url.query):
match = regex.search(v)
if match:
return match.group(0)
m = regex.search(v)
if m:
return m.group(0)
return None
def get_doi_resolver(preferences):
doi_resolvers = settings['doi_resolvers']
selected_resolver = preferences.get_value('doi_resolver')[0]
def get_doi_resolver() -> str:
doi_resolvers = get_setting("doi_resolvers")
selected_resolver = sxng_request.preferences.get_value('doi_resolver')[0]
if selected_resolver not in doi_resolvers:
selected_resolver = settings['default_doi_resolver']
selected_resolver = get_setting("default_doi_resolver")
return doi_resolvers[selected_resolver]
def on_result(request, _search, result) -> bool:
if not result.parsed_url:
return True
doi = extract_doi(result['parsed_url'])
if doi and len(doi) < 50:
for suffix in ('/', '.pdf', '.xml', '/full', '/meta', '/abstract'):
if doi.endswith(suffix):
doi = doi[: -len(suffix)]
result['url'] = get_doi_resolver(request.preferences) + doi
result['parsed_url'] = urlparse(result['url'])
if 'doi' not in result:
result['doi'] = doi
return True

View file

@ -14,6 +14,7 @@ from . import Plugin, PluginInfo
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from . import PluginCfg
class SXNGPlugin(Plugin):
@ -23,11 +24,10 @@ class SXNGPlugin(Plugin):
"""
id = "self_info"
default_on = True
keywords = ["ip", "user-agent"]
def __init__(self):
super().__init__()
def __init__(self, plg_cfg: "PluginCfg"):
super().__init__(plg_cfg)
self.ip_regex = re.compile(r"^ip", re.IGNORECASE)
self.ua_regex = re.compile(r"^user-agent", re.IGNORECASE)

View file

@ -3,47 +3,24 @@
user searches for ``tor-check``. It fetches the tor exit node list from
:py:obj:`url_exit_list` and parses all the IPs into a list, then checks if the
user's IP address is in it.
Enable in ``settings.yml``:
.. code:: yaml
enabled_plugins:
..
- 'Tor check plugin'
"""
from __future__ import annotations
import typing
import re
from flask_babel import gettext
from httpx import HTTPError
from searx.network import get
from searx.plugins import Plugin, PluginInfo
from searx.result_types import EngineResults
from searx.botdetection import get_real_ip
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.plugins import PluginCfg
default_on = False
name = gettext("Tor check plugin")
'''Translated name of the plugin'''
description = gettext(
"This plugin checks if the address of the request is a Tor exit-node, and"
" informs the user if it is; like check.torproject.org, but from SearXNG."
)
'''Translated description of the plugin.'''
preference_section = 'query'
'''The preference section where the plugin is shown.'''
query_keywords = ['tor-check']
'''Query keywords shown in the preferences.'''
query_examples = ''
'''Query examples shown in the preferences.'''
# Regex for exit node addresses in the list.
reg = re.compile(r"(?<=ExitAddress )\S+")
@ -52,33 +29,51 @@ url_exit_list = "https://check.torproject.org/exit-addresses"
"""URL to load Tor exit list from."""
def post_search(request, search) -> EngineResults:
results = EngineResults()
class SXNGPlugin(Plugin):
"""Rewrite hostnames, remove results or prioritize them."""
if search.search_query.pageno > 1:
return results
id = "tor_check"
keywords = ["tor-check"]
if search.search_query.query.lower() == "tor-check":
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Tor check plugin"),
description=gettext(
"This plugin checks if the address of the request is a Tor exit-node, and"
" informs the user if it is; like check.torproject.org, but from SearXNG."
),
preference_section="query",
)
# Request the list of tor exit nodes.
try:
resp = get(url_exit_list)
node_list = re.findall(reg, resp.text) # type: ignore
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
results = EngineResults()
except HTTPError:
# No answer, return error
msg = gettext("Could not download the list of Tor exit-nodes from")
results.add(results.types.Answer(answer=f"{msg} {url_exit_list}"))
if search.search_query.pageno > 1:
return results
real_ip = get_real_ip(request)
if search.search_query.query.lower() == "tor-check":
if real_ip in node_list:
msg = gettext("You are using Tor and it looks like you have the external IP address")
results.add(results.types.Answer(answer=f"{msg} {real_ip}"))
# Request the list of tor exit nodes.
try:
resp = get(url_exit_list)
node_list = re.findall(reg, resp.text) # type: ignore
else:
msg = gettext("You are not using Tor and you have the external IP address")
results.add(results.types.Answer(answer=f"{msg} {real_ip}"))
except HTTPError:
# No answer, return error
msg = gettext("Could not download the list of Tor exit-nodes from")
results.add(results.types.Answer(answer=f"{msg} {url_exit_list}"))
return results
return results
real_ip = get_real_ip(request)
if real_ip in node_list:
msg = gettext("You are using Tor and it looks like you have the external IP address")
results.add(results.types.Answer(answer=f"{msg} {real_ip}"))
else:
msg = gettext("You are not using Tor and you have the external IP address")
results.add(results.types.Answer(answer=f"{msg} {real_ip}"))
return results

View file

@ -2,12 +2,21 @@
# pylint: disable=missing-module-docstring
from __future__ import annotations
import typing
import re
from urllib.parse import urlunparse, parse_qsl, urlencode
from flask_babel import gettext
from searx.plugins import Plugin, PluginInfo
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.result_types import Result
from searx.plugins import PluginCfg
regexes = {
re.compile(r'utm_[^&]+'),
re.compile(r'(wkey|wemail)[^&]*'),
@ -15,30 +24,35 @@ regexes = {
re.compile(r'&$'),
}
name = gettext('Tracker URL remover')
description = gettext('Remove trackers arguments from the returned URL')
default_on = True
preference_section = 'privacy'
class SXNGPlugin(Plugin):
"""Remove trackers arguments from the returned URL"""
def on_result(_request, _search, result) -> bool:
id = "tracker_url_remover"
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Tracker URL remover"),
description=gettext("Remove trackers arguments from the returned URL"),
preference_section="privacy",
)
def on_result(
self, request: "SXNG_Request", search: "SearchWithPlugins", result: Result
) -> bool: # pylint: disable=unused-argument
if not result.parsed_url:
return True
parsed_query: list[tuple[str, str]] = parse_qsl(result.parsed_url.query)
for name_value in list(parsed_query):
param_name = name_value[0]
for reg in regexes:
if reg.match(param_name):
parsed_query.remove(name_value)
result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query))
result.url = urlunparse(result.parsed_url)
break
parsed_url = getattr(result, "parsed_url", None)
if not parsed_url:
return True
if parsed_url.query == "":
return True
parsed_query = parse_qsl(parsed_url.query)
changes = 0
for i, (param_name, _) in enumerate(list(parsed_query)):
for reg in regexes:
if reg.match(param_name):
parsed_query.pop(i - changes)
changes += 1
result.parsed_url = result.parsed_url._replace(query=urlencode(parsed_query))
result.url = urlunparse(result.parsed_url)
break
return True

View file

@ -7,36 +7,74 @@ converters, each converter is one item in the list (compare
:py:obj:`ADDITIONAL_UNITS`). If the symbols are ambiguous, the matching units
of measurement are evaluated. The weighting in the evaluation results from the
sorting of the :py:obj:`list of unit converters<symbol_to_si>`.
Enable in ``settings.yml``:
.. code:: yaml
enabled_plugins:
..
- 'Unit converter plugin'
"""
from __future__ import annotations
import typing
import re
import babel.numbers
from flask_babel import gettext, get_locale
from searx import data
from searx.plugins import Plugin, PluginInfo
from searx.result_types import EngineResults
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from searx.plugins import PluginCfg
name = "Unit converter plugin"
description = gettext("Convert between units")
default_on = True
plugin_id = "unit_converter"
preference_section = "general"
name = ""
description = gettext("")
plugin_id = ""
preference_section = ""
CONVERT_KEYWORDS = ["in", "to", "as"]
class SXNGPlugin(Plugin):
"""Convert between units. The result is displayed in area for the
"answers".
"""
id = "unit_converter"
def __init__(self, plg_cfg: "PluginCfg") -> None:
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("Unit converter plugin"),
description=gettext("Convert between units"),
preference_section="general",
)
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
results = EngineResults()
# only convert between units on the first page
if search.search_query.pageno > 1:
return results
query = search.search_query.query
query_parts = query.split(" ")
if len(query_parts) < 3:
return results
for query_part in query_parts:
for keyword in CONVERT_KEYWORDS:
if query_part == keyword:
from_query, to_query = query.split(keyword, 1)
target_val = _parse_text_and_convert(from_query.strip(), to_query.strip())
if target_val:
results.add(results.types.Answer(answer=target_val))
return results
# inspired from https://stackoverflow.com/a/42475086
RE_MEASURE = r'''
(?P<sign>[-+]?) # +/- or nothing for positive
@ -243,27 +281,3 @@ def _parse_text_and_convert(from_query, to_query) -> str | None:
result = babel.numbers.format_decimal(value, locale=_locale, format='#,##0.##########;-#')
return f'{result} {target_symbol}'
def post_search(_request, search) -> EngineResults:
results = EngineResults()
# only convert between units on the first page
if search.search_query.pageno > 1:
return results
query = search.search_query.query
query_parts = query.split(" ")
if len(query_parts) < 3:
return results
for query_part in query_parts:
for keyword in CONVERT_KEYWORDS:
if query_part == keyword:
from_query, to_query = query.split(keyword, 1)
target_val = _parse_text_and_convert(from_query.strip(), to_query.strip())
if target_val:
results.add(results.types.Answer(answer=target_val))
return results