From 0fd7326edfc61cd623915aed389bb0768fda14f0 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 28 Apr 2026 14:14:39 +0100 Subject: [PATCH] Clean up deprecations in register_plugin Cleans up #8342 --- distributed/client.py | 193 +----------------- distributed/deploy/tests/test_local.py | 5 +- .../diagnostics/tests/test_nanny_plugin.py | 115 ----------- .../tests/test_scheduler_plugin.py | 123 +---------- .../diagnostics/tests/test_worker_plugin.py | 114 ----------- distributed/nanny.py | 11 +- distributed/scheduler.py | 32 +-- distributed/worker.py | 9 +- 8 files changed, 13 insertions(+), 589 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 0d41254a044..e8d1c884f21 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -5136,7 +5136,6 @@ def register_plugin( self, plugin: NannyPlugin | SchedulerPlugin | WorkerPlugin, name: str | None = None, - idempotent: bool | None = None, ): """Register a plugin. @@ -5149,23 +5148,11 @@ def register_plugin( name : Name for the plugin; if None, a name is taken from the plugin instance or automatically generated if not present. - idempotent : - Do not re-register if a plugin of the given name already exists. - If None, ``plugin.idempotent`` is taken if defined, False otherwise. """ if name is None: name = _get_plugin_name(plugin) assert name - if idempotent is not None: - warnings.warn( - "The `idempotent` argument is deprecated and will be removed in a " - "future version. Please mark your plugin as idempotent by setting its " - "`.idempotent` attribute to `True`.", - FutureWarning, - stacklevel=2, - ) - else: - idempotent = getattr(plugin, "idempotent", False) + idempotent = getattr(plugin, "idempotent", False) assert isinstance(idempotent, bool) return self._register_plugin(plugin, name, idempotent) @@ -5229,42 +5216,7 @@ async def _register_scheduler_plugin( idempotent=idempotent, ) - def register_scheduler_plugin( - self, - plugin: SchedulerPlugin, - name: str | None = None, - idempotent: bool | None = None, - ): - """ - Register a scheduler plugin. - - .. deprecated:: 2023.9.2 - Use :meth:`Client.register_plugin` instead. - - See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins - - Parameters - ---------- - plugin : SchedulerPlugin - SchedulerPlugin instance to pass to the scheduler. - name : str - Name for the plugin; if None, a name is taken from the - plugin instance or automatically generated if not present. - idempotent : bool - Do not re-register if a plugin of the given name already exists. - """ - warnings.warn( - "`Client.register_scheduler_plugin` has been deprecated; " - "please `Client.register_plugin` instead", - DeprecationWarning, - stacklevel=2, - ) - return cast(OKMessage, self.register_plugin(plugin, name, idempotent)) - - async def _unregister_scheduler_plugin(self, name): - return await self.scheduler.unregister_scheduler_plugin(name=name) - - def unregister_scheduler_plugin(self, name): + def unregister_scheduler_plugin(self, name: str): """Unregisters a scheduler plugin See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins @@ -5297,7 +5249,7 @@ def unregister_scheduler_plugin(self, name): -------- register_scheduler_plugin """ - return self.sync(self._unregister_scheduler_plugin, name=name) + return self.sync(self.scheduler.unregister_scheduler_plugin, name=name) def register_worker_callbacks(self, setup=None): """ @@ -5349,145 +5301,6 @@ async def _register_nanny_plugin( raise exc.with_traceback(tb) return cast(dict[str, OKMessage], responses) - def register_worker_plugin( - self, - plugin: NannyPlugin | WorkerPlugin, - name: str | None = None, - nanny: bool | None = None, - ): - """ - Registers a lifecycle worker plugin for all current and future workers. - - .. deprecated:: 2023.9.2 - Use :meth:`Client.register_plugin` instead. - - This registers a new object to handle setup, task state transitions and - teardown for workers in this cluster. The plugin will instantiate - itself on all currently connected workers. It will also be run on any - worker that connects in the future. - - The plugin may include methods ``setup``, ``teardown``, ``transition``, - and ``release_key``. See the - ``dask.distributed.WorkerPlugin`` class or the examples below for the - interface and docstrings. It must be serializable with the pickle or - cloudpickle modules. - - If the plugin has a ``name`` attribute, or if the ``name=`` keyword is - used then that will control idempotency. If a plugin with that name has - already been registered, then it will be removed and replaced by the new one. - - For alternatives to plugins, you may also wish to look into preload - scripts. - - Parameters - ---------- - plugin : WorkerPlugin or NannyPlugin - WorkerPlugin or NannyPlugin instance to register. - name : str, optional - A name for the plugin. - Registering a plugin with the same name will have no effect. - If plugin has no name attribute a random name is used. - nanny : bool, optional - Whether to register the plugin with workers or nannies. - - Examples - -------- - >>> class MyPlugin(WorkerPlugin): - ... def __init__(self, *args, **kwargs): - ... pass # the constructor is up to you - ... def setup(self, worker: dask.distributed.Worker): - ... pass - ... def teardown(self, worker: dask.distributed.Worker): - ... pass - ... def transition(self, key: str, start: str, finish: str, - ... **kwargs): - ... pass - ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): - ... pass - - >>> plugin = MyPlugin(1, 2, 3) - >>> client.register_plugin(plugin) - - You can get access to the plugin with the ``get_worker`` function - - >>> client.register_plugin(other_plugin, name='my-plugin') - >>> def f(): - ... worker = get_worker() - ... plugin = worker.plugins['my-plugin'] - ... return plugin.my_state - - >>> future = client.run(f) - - See Also - -------- - distributed.WorkerPlugin - unregister_worker_plugin - """ - warnings.warn( - "`Client.register_worker_plugin` has been deprecated; " - "please use `Client.register_plugin` instead", - DeprecationWarning, - stacklevel=2, - ) - if name is None: - name = _get_plugin_name(plugin) - - assert name - - method: Callable - if isinstance(plugin, WorkerPlugin): - method = self._register_worker_plugin - if nanny is True: - warnings.warn( - "Registering a `WorkerPlugin` as a nanny plugin is not " - "allowed, registering as a worker plugin instead. " - "To register as a nanny plugin, inherit from `NannyPlugin`.", - UserWarning, - stacklevel=2, - ) - elif isinstance(plugin, NannyPlugin): - method = self._register_nanny_plugin - if nanny is False: - warnings.warn( - "Registering a `NannyPlugin` as a worker plugin is not " - "allowed, registering as a nanny plugin instead. " - "To register as a worker plugin, inherit from `WorkerPlugin`.", - UserWarning, - stacklevel=2, - ) - elif isinstance(plugin, SchedulerPlugin): # type: ignore[unreachable] - if nanny: - warnings.warn( - "Registering a `SchedulerPlugin` as a nanny plugin is not " - "allowed, registering as a scheduler plugin instead. " - "To register as a nanny plugin, inherit from `NannyPlugin`.", - UserWarning, - stacklevel=2, - ) - else: - warnings.warn( - "Registering a `SchedulerPlugin` as a worker plugin is not " - "allowed, registering as a scheduler plugin instead. " - "To register as a worker plugin, inherit from `WorkerPlugin`.", - UserWarning, - stacklevel=2, - ) - method = self._register_scheduler_plugin - else: - warnings.warn( - "Registering duck-typed plugins has been deprecated. " - "Please make sure your plugin inherits from `NannyPlugin` " - "or `WorkerPlugin`.", - DeprecationWarning, - stacklevel=2, - ) - if nanny is True: - method = self._register_nanny_plugin - else: - method = self._register_worker_plugin - - return self.sync(method, plugin=plugin, name=name, idempotent=False) - async def _unregister_worker_plugin(self, name, nanny=None): if nanny: responses = await self.scheduler.unregister_nanny_plugin(name=name) diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index f4b422af62e..706baeb2a44 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -13,7 +13,7 @@ from dask.system import CPU_COUNT -from distributed import Client, LocalCluster, Nanny, Worker, get_client +from distributed import Client, LocalCluster, Nanny, Worker, WorkerPlugin, get_client from distributed.compatibility import LINUX, asyncio_run from distributed.config import get_loop_factory from distributed.core import Status @@ -1276,12 +1276,11 @@ async def test_connect_to_closed_cluster(): Client(cluster, asynchronous=True) -class MyPlugin: +class MyPlugin(WorkerPlugin): def setup(self, worker=None): import my_nonexistent_library # noqa -@pytest.mark.slow def test_localcluster_start_exception(loop): with raises_with_cause( RuntimeError, diff --git a/distributed/diagnostics/tests/test_nanny_plugin.py b/distributed/diagnostics/tests/test_nanny_plugin.py index 3c481dce26b..4a7c15d2131 100644 --- a/distributed/diagnostics/tests/test_nanny_plugin.py +++ b/distributed/diagnostics/tests/test_nanny_plugin.py @@ -5,68 +5,9 @@ import pytest from distributed import Nanny, NannyPlugin -from distributed.protocol.pickle import dumps from distributed.utils_test import captured_logger, gen_cluster -@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_worker_plugin_is_deprecated(c, s, a): - class DuckPlugin(NannyPlugin): - def setup(self, nanny): - nanny.foo = 123 - - def teardown(self, nanny): - pass - - n_existing_plugins = len(a.plugins) - assert not hasattr(a, "foo") - with pytest.warns(DeprecationWarning, match="register_worker_plugin.*deprecated"): - await c.register_worker_plugin(DuckPlugin()) - assert len(a.plugins) == n_existing_plugins + 1 - assert a.foo == 123 - - -@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_worker_plugin_typing_over_nanny_keyword(c, s, a): - class DuckPlugin(NannyPlugin): - def setup(self, nanny): - nanny.foo = 123 - - def teardown(self, nanny): - pass - - n_existing_plugins = len(a.plugins) - assert not hasattr(a, "foo") - with ( - pytest.warns(UserWarning, match="`NannyPlugin` as a worker plugin"), - pytest.warns(DeprecationWarning, match="please use `Client.register_plugin`"), - ): - await c.register_worker_plugin(DuckPlugin(), nanny=False) - - assert len(a.plugins) == n_existing_plugins + 1 - assert a.foo == 123 - - -@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_duck_typed_register_nanny_plugin_is_deprecated(c, s, a): - class DuckPlugin: - def setup(self, nanny): - nanny.foo = 123 - - def teardown(self, nanny): - pass - - n_existing_plugins = len(a.plugins) - assert not hasattr(a, "foo") - with ( - pytest.warns(DeprecationWarning, match="duck-typed.*NannyPlugin"), - pytest.warns(DeprecationWarning, match="please use `Client.register_plugin`"), - ): - await c.register_worker_plugin(DuckPlugin(), nanny=True) - assert len(a.plugins) == n_existing_plugins + 1 - assert a.foo == 123 - - @gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) async def test_register_idempotent_plugin(c, s, a): class IdempotentPlugin(NannyPlugin): @@ -107,62 +48,6 @@ def __init__(self, instance=None): assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" - third = NonIdempotentPlugin(instance="third") - with pytest.warns( - FutureWarning, - match="`Scheduler.register_nanny_plugin` now requires `idempotent`", - ): - await s.register_nanny_plugin( - comm=None, plugin=dumps(third), name="nonidempotentplugin" - ) - assert "nonidempotentplugin" in a.plugins - assert a.plugins["nonidempotentplugin"].instance == "third" - - -@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a): - class NonIdempotentPlugin(NannyPlugin): - def __init__(self, instance=None): - self.name = "nonidempotentplugin" - self.instance = instance - # We want to overrule this - self.idempotent = True - - first = NonIdempotentPlugin(instance="first") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(first, idempotent=False) - assert "nonidempotentplugin" in a.plugins - - second = NonIdempotentPlugin(instance="second") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(second, idempotent=False) - assert "nonidempotentplugin" in a.plugins - assert a.plugins["nonidempotentplugin"].instance == "second" - - class IdempotentPlugin(NannyPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - # We want to overrule this - self.idempotent = False - - def setup(self, nanny): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(first, idempotent=True) - assert "idempotentplugin" in a.plugins - - second = IdempotentPlugin(instance="second") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(second, idempotent=True) - assert "idempotentplugin" in a.plugins - assert a.plugins["idempotentplugin"].instance == "first" - class BrokenSetupPlugin(NannyPlugin): def setup(self, nanny): diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index d5205242912..29f74948de2 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -4,7 +4,7 @@ import pytest -from distributed import Nanny, Scheduler, SchedulerPlugin, Worker +from distributed import Scheduler, SchedulerPlugin, Worker from distributed.protocol.pickle import dumps from distributed.utils_test import captured_logger, gen_cluster, gen_test, inc @@ -386,22 +386,6 @@ def start(self, scheduler): assert n_plugins == len(s.plugins) -@gen_cluster(client=True) -async def test_register_scheduler_plugin_deprecated(c, s, a, b): - class Dummy(SchedulerPlugin): - name = "Dummy" - - def start(self, scheduler): - scheduler.foo = "bar" - - assert not hasattr(s, "foo") - with pytest.warns( - DeprecationWarning, match="register_scheduler_plugin.*deprecated" - ): - await c.register_scheduler_plugin(Dummy()) - assert s.foo == "bar" - - @gen_cluster(nthreads=[]) async def test_unregister_scheduler_plugin(s): class Plugin(SchedulerPlugin): @@ -569,46 +553,6 @@ def update_graph( # type: ignore assert plugin.success -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_scheduler_plugin_in_register_worker_plugin_overrides(c, s, a): - class DuckPlugin(SchedulerPlugin): - def start(self, scheduler): - scheduler.foo = 123 - - def stop(self, scheduler): - pass - - n_existing_plugins = len(s.plugins) - assert not hasattr(s, "foo") - with ( - pytest.warns(UserWarning, match="`SchedulerPlugin` as a worker plugin"), - pytest.warns(DeprecationWarning, match="use `Client.register_plugin` instead"), - ): - await c.register_worker_plugin(DuckPlugin(), nanny=False) - assert len(s.plugins) == n_existing_plugins + 1 - assert s.foo == 123 - - -@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) -async def test_scheduler_plugin_in_register_worker_plugin_overrides_nanny(c, s, a): - class DuckPlugin(SchedulerPlugin): - def start(self, scheduler): - scheduler.foo = 123 - - def stop(self, scheduler): - pass - - n_existing_plugins = len(s.plugins) - assert not hasattr(s, "foo") - with ( - pytest.warns(UserWarning, match="`SchedulerPlugin` as a nanny plugin"), - pytest.warns(DeprecationWarning, match="use `Client.register_plugin` instead"), - ): - await c.register_worker_plugin(DuckPlugin(), nanny=True) - assert len(s.plugins) == n_existing_plugins + 1 - assert s.foo == 123 - - @gen_cluster(client=True, nthreads=[]) async def test_register_idempotent_plugin(c, s): class IdempotentPlugin(SchedulerPlugin): @@ -649,62 +593,6 @@ def __init__(self, instance=None): assert "nonidempotentplugin" in s.plugins assert s.plugins["nonidempotentplugin"].instance == "second" - third = NonIdempotentPlugin(instance="third") - with pytest.warns( - FutureWarning, - match="`Scheduler.register_scheduler_plugin` now requires `idempotent`", - ): - await s.register_scheduler_plugin( - plugin=dumps(third), name="nonidempotentplugin" - ) - assert "nonidempotentplugin" in s.plugins - assert s.plugins["nonidempotentplugin"].instance == "third" - - -@gen_cluster(client=True, nthreads=[]) -async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s): - class NonIdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "nonidempotentplugin" - self.instance = instance - # We want to overrule this - self.idempotent = True - - first = NonIdempotentPlugin(instance="first") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(first, idempotent=False) - assert "nonidempotentplugin" in s.plugins - - second = NonIdempotentPlugin(instance="second") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(second, idempotent=False) - assert "nonidempotentplugin" in s.plugins - assert s.plugins["nonidempotentplugin"].instance == "second" - - class IdempotentPlugin(SchedulerPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - # We want to overrule this - self.idempotent = False - - def start(self, scheduler): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(first, idempotent=True) - assert "idempotentplugin" in s.plugins - - second = IdempotentPlugin(instance="second") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(second, idempotent=True) - assert "idempotentplugin" in s.plugins - assert s.plugins["idempotentplugin"].instance == "first" - @gen_cluster(nthreads=[]) async def test_register_idempotent_plugins_directly(s): @@ -744,12 +632,3 @@ def __init__(self, instance=None): await s.register_scheduler_plugin(plugin=dumps(second), idempotent=False) assert "nonidempotentplugin" in s.plugins assert s.plugins["nonidempotentplugin"].instance == "second" - - third = NonIdempotentPlugin(instance="third") - with pytest.warns( - FutureWarning, - match="`Scheduler.register_scheduler_plugin` now requires `idempotent`", - ): - await s.register_scheduler_plugin(plugin=dumps(third)) - assert "nonidempotentplugin" in s.plugins - assert s.plugins["nonidempotentplugin"].instance == "third" diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 001576afe33..23b69ef7bc9 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -7,7 +7,6 @@ import pytest from distributed import Worker, WorkerPlugin -from distributed.protocol.pickle import dumps from distributed.utils_test import async_poll_for, captured_logger, gen_cluster, inc @@ -272,63 +271,6 @@ def teardown(self, worker): assert w.bar == 456 -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_worker_plugin_is_deprecated(c, s, a): - class DuckPlugin(WorkerPlugin): - def setup(self, worker): - worker.foo = 123 - - def teardown(self, worker): - pass - - n_existing_plugins = len(a.plugins) - assert not hasattr(a, "foo") - with pytest.warns(DeprecationWarning, match="register_worker_plugin.*deprecated"): - await c.register_worker_plugin(DuckPlugin()) - assert len(a.plugins) == n_existing_plugins + 1 - assert a.foo == 123 - - -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_worker_plugin_typing_over_nanny_keyword(c, s, a): - class DuckPlugin(WorkerPlugin): - def setup(self, worker): - worker.foo = 123 - - def teardown(self, worker): - pass - - n_existing_plugins = len(a.plugins) - assert not hasattr(a, "foo") - with ( - pytest.warns(UserWarning, match="`WorkerPlugin` as a nanny plugin"), - pytest.warns(DeprecationWarning, match="use `Client.register_plugin` instead"), - ): - await c.register_worker_plugin(DuckPlugin(), nanny=True) - assert len(a.plugins) == n_existing_plugins + 1 - assert a.foo == 123 - - -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_duck_typed_register_worker_plugin_is_deprecated(c, s, a): - class DuckPlugin: - def setup(self, worker): - worker.foo = 123 - - def teardown(self, worker): - pass - - n_existing_plugins = len(a.plugins) - assert not hasattr(a, "foo") - with ( - pytest.warns(DeprecationWarning, match="duck-typed.*WorkerPlugin"), - pytest.warns(DeprecationWarning, match="use `Client.register_plugin` instead"), - ): - await c.register_worker_plugin(DuckPlugin()) - assert len(a.plugins) == n_existing_plugins + 1 - assert a.foo == 123 - - @gen_cluster(client=True, nthreads=[("", 1)]) async def test_register_idempotent_plugin(c, s, a): class IdempotentPlugin(WorkerPlugin): @@ -369,62 +311,6 @@ def __init__(self, instance=None): assert "nonidempotentplugin" in a.plugins assert a.plugins["nonidempotentplugin"].instance == "second" - third = NonIdempotentPlugin(instance="third") - with pytest.warns( - FutureWarning, - match="`Scheduler.register_worker_plugin` now requires `idempotent`", - ): - await s.register_worker_plugin( - comm=None, plugin=dumps(third), name="nonidempotentplugin" - ) - assert "nonidempotentplugin" in a.plugins - assert a.plugins["nonidempotentplugin"].instance == "third" - - -@gen_cluster(client=True, nthreads=[("", 1)]) -async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a): - class NonIdempotentPlugin(WorkerPlugin): - def __init__(self, instance=None): - self.name = "nonidempotentplugin" - self.instance = instance - # We want to overrule this - self.idempotent = True - - first = NonIdempotentPlugin(instance="first") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(first, idempotent=False) - assert "nonidempotentplugin" in a.plugins - - second = NonIdempotentPlugin(instance="second") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(second, idempotent=False) - assert "nonidempotentplugin" in a.plugins - assert a.plugins["nonidempotentplugin"].instance == "second" - - class IdempotentPlugin(WorkerPlugin): - def __init__(self, instance=None): - self.name = "idempotentplugin" - self.instance = instance - # We want to overrule this - self.idempotent = False - - def setup(self, worker): - if self.instance != "first": - raise RuntimeError( - "Only the first plugin should be started when idempotent is set" - ) - - first = IdempotentPlugin(instance="first") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(first, idempotent=True) - assert "idempotentplugin" in a.plugins - - second = IdempotentPlugin(instance="second") - with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"): - await c.register_plugin(second, idempotent=True) - assert "idempotentplugin" in a.plugins - assert a.plugins["idempotentplugin"].instance == "first" - class BrokenSetupPlugin(WorkerPlugin): def setup(self, worker): diff --git a/distributed/nanny.py b/distributed/nanny.py index 4d54666b63c..a9422e65c06 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -15,7 +15,7 @@ from collections.abc import Callable, Collection from inspect import isawaitable from queue import Empty -from typing import ClassVar, Literal, cast +from typing import ClassVar, Literal from toolz import merge from tornado.ioloop import IOLoop @@ -462,14 +462,7 @@ async def plugin_add( ) -> ErrorMessage | OKMessage: if isinstance(plugin, bytes): plugin = pickle.loads(plugin) - if not isinstance(plugin, NannyPlugin): - warnings.warn( - "Registering duck-typed plugins has been deprecated. " - "Please make sure your plugin inherits from `NannyPlugin`.", - DeprecationWarning, - stacklevel=2, - ) - plugin = cast(NannyPlugin, plugin) + assert isinstance(plugin, NannyPlugin) if name is None: name = _get_plugin_name(plugin) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b7c5aabacb7..06b6bda1011 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6295,17 +6295,10 @@ async def register_scheduler_plugin( self, plugin: bytes | SchedulerPlugin, name: str | None = None, - idempotent: bool | None = None, + *, + idempotent: bool, ) -> None: """Register a plugin on the scheduler.""" - if idempotent is None: - warnings.warn( - "The signature of `Scheduler.register_scheduler_plugin` now requires " - "`idempotent`. Not including `idempotent` in the signature will no longer " - "be supported in future versions.", - FutureWarning, - ) - idempotent = False if not isinstance(plugin, SchedulerPlugin): plugin = loads(plugin) assert isinstance(plugin, SchedulerPlugin) @@ -8173,18 +8166,10 @@ def stop_task_metadata(self, name: str | None = None) -> dict: return {"metadata": plugin.metadata, "state": plugin.state} async def register_worker_plugin( - self, comm: None, plugin: bytes, name: str, idempotent: bool | None = None + self, comm: None, plugin: bytes, name: str, *, idempotent: bool ) -> dict[str, OKMessage]: """Registers a worker plugin on all running and future workers""" logger.info("Registering Worker plugin %s", name) - if idempotent is None: - warnings.warn( - "The signature of `Scheduler.register_worker_plugin` now requires " - "`idempotent`. Not including `idempotent` in the signature will no longer " - "be supported in future versions.", - FutureWarning, - ) - idempotent = False if name in self.worker_plugins and idempotent: return {} @@ -8208,20 +8193,11 @@ async def unregister_worker_plugin( return responses async def register_nanny_plugin( - self, comm: None, plugin: bytes, name: str, idempotent: bool | None = None + self, comm: None, plugin: bytes, name: str, idempotent: bool ) -> dict[str, OKMessage]: """Registers a nanny plugin on all running and future nannies""" logger.info("Registering Nanny plugin %s", name) - if idempotent is None: - warnings.warn( - "The signature of `Scheduler.register_nanny_plugin` now requires " - "`idempotent`. Not including `idempotent` in the signature will no longer " - "be supported in future versions.", - FutureWarning, - ) - idempotent = False - if name in self.nanny_plugins and idempotent: return {} diff --git a/distributed/worker.py b/distributed/worker.py index 79f24b22da6..a5d8cf48c4c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1872,14 +1872,7 @@ async def plugin_add( ) -> ErrorMessage | OKMessage: if isinstance(plugin, bytes): plugin = pickle.loads(plugin) - if not isinstance(plugin, WorkerPlugin): - warnings.warn( - "Registering duck-typed plugins has been deprecated. " - "Please make sure your plugin subclasses `WorkerPlugin`.", - DeprecationWarning, - stacklevel=2, - ) - plugin = cast(WorkerPlugin, plugin) + assert isinstance(plugin, WorkerPlugin) if name is None: name = _get_plugin_name(plugin)