Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 3 additions & 190 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5136,7 +5136,6 @@ def register_plugin(
self,
plugin: NannyPlugin | SchedulerPlugin | WorkerPlugin,
name: str | None = None,
idempotent: bool | None = None,
):
"""Register a plugin.

Expand All @@ -5149,23 +5148,11 @@ def register_plugin(
name :
Name for the plugin; if None, a name is taken from the
plugin instance or automatically generated if not present.
idempotent :
Do not re-register if a plugin of the given name already exists.
If None, ``plugin.idempotent`` is taken if defined, False otherwise.
"""
if name is None:
name = _get_plugin_name(plugin)
assert name
if idempotent is not None:
warnings.warn(
"The `idempotent` argument is deprecated and will be removed in a "
"future version. Please mark your plugin as idempotent by setting its "
"`.idempotent` attribute to `True`.",
FutureWarning,
stacklevel=2,
)
else:
idempotent = getattr(plugin, "idempotent", False)
idempotent = getattr(plugin, "idempotent", False)
assert isinstance(idempotent, bool)
return self._register_plugin(plugin, name, idempotent)

Expand Down Expand Up @@ -5229,42 +5216,7 @@ async def _register_scheduler_plugin(
idempotent=idempotent,
)

def register_scheduler_plugin(
self,
plugin: SchedulerPlugin,
name: str | None = None,
idempotent: bool | None = None,
):
"""
Register a scheduler plugin.

.. deprecated:: 2023.9.2
Use :meth:`Client.register_plugin` instead.

See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins

Parameters
----------
plugin : SchedulerPlugin
SchedulerPlugin instance to pass to the scheduler.
name : str
Name for the plugin; if None, a name is taken from the
plugin instance or automatically generated if not present.
idempotent : bool
Do not re-register if a plugin of the given name already exists.
"""
warnings.warn(
"`Client.register_scheduler_plugin` has been deprecated; "
"please `Client.register_plugin` instead",
DeprecationWarning,
stacklevel=2,
)
return cast(OKMessage, self.register_plugin(plugin, name, idempotent))

async def _unregister_scheduler_plugin(self, name):
return await self.scheduler.unregister_scheduler_plugin(name=name)

def unregister_scheduler_plugin(self, name):
def unregister_scheduler_plugin(self, name: str):
"""Unregisters a scheduler plugin

See https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins
Expand Down Expand Up @@ -5297,7 +5249,7 @@ def unregister_scheduler_plugin(self, name):
--------
register_scheduler_plugin
"""
return self.sync(self._unregister_scheduler_plugin, name=name)
return self.sync(self.scheduler.unregister_scheduler_plugin, name=name)

def register_worker_callbacks(self, setup=None):
"""
Expand Down Expand Up @@ -5349,145 +5301,6 @@ async def _register_nanny_plugin(
raise exc.with_traceback(tb)
return cast(dict[str, OKMessage], responses)

def register_worker_plugin(
self,
plugin: NannyPlugin | WorkerPlugin,
name: str | None = None,
nanny: bool | None = None,
):
"""
Registers a lifecycle worker plugin for all current and future workers.

.. deprecated:: 2023.9.2
Use :meth:`Client.register_plugin` instead.

This registers a new object to handle setup, task state transitions and
teardown for workers in this cluster. The plugin will instantiate
itself on all currently connected workers. It will also be run on any
worker that connects in the future.

The plugin may include methods ``setup``, ``teardown``, ``transition``,
and ``release_key``. See the
``dask.distributed.WorkerPlugin`` class or the examples below for the
interface and docstrings. It must be serializable with the pickle or
cloudpickle modules.

If the plugin has a ``name`` attribute, or if the ``name=`` keyword is
used then that will control idempotency. If a plugin with that name has
already been registered, then it will be removed and replaced by the new one.

For alternatives to plugins, you may also wish to look into preload
scripts.

Parameters
----------
plugin : WorkerPlugin or NannyPlugin
WorkerPlugin or NannyPlugin instance to register.
name : str, optional
A name for the plugin.
Registering a plugin with the same name will have no effect.
If plugin has no name attribute a random name is used.
nanny : bool, optional
Whether to register the plugin with workers or nannies.

Examples
--------
>>> class MyPlugin(WorkerPlugin):
... def __init__(self, *args, **kwargs):
... pass # the constructor is up to you
... def setup(self, worker: dask.distributed.Worker):
... pass
... def teardown(self, worker: dask.distributed.Worker):
... pass
... def transition(self, key: str, start: str, finish: str,
... **kwargs):
... pass
... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
... pass

>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin)

You can get access to the plugin with the ``get_worker`` function

>>> client.register_plugin(other_plugin, name='my-plugin')
>>> def f():
... worker = get_worker()
... plugin = worker.plugins['my-plugin']
... return plugin.my_state

>>> future = client.run(f)

See Also
--------
distributed.WorkerPlugin
unregister_worker_plugin
"""
warnings.warn(
"`Client.register_worker_plugin` has been deprecated; "
"please use `Client.register_plugin` instead",
DeprecationWarning,
stacklevel=2,
)
if name is None:
name = _get_plugin_name(plugin)

assert name

method: Callable
if isinstance(plugin, WorkerPlugin):
method = self._register_worker_plugin
if nanny is True:
warnings.warn(
"Registering a `WorkerPlugin` as a nanny plugin is not "
"allowed, registering as a worker plugin instead. "
"To register as a nanny plugin, inherit from `NannyPlugin`.",
UserWarning,
stacklevel=2,
)
elif isinstance(plugin, NannyPlugin):
method = self._register_nanny_plugin
if nanny is False:
warnings.warn(
"Registering a `NannyPlugin` as a worker plugin is not "
"allowed, registering as a nanny plugin instead. "
"To register as a worker plugin, inherit from `WorkerPlugin`.",
UserWarning,
stacklevel=2,
)
elif isinstance(plugin, SchedulerPlugin): # type: ignore[unreachable]
if nanny:
warnings.warn(
"Registering a `SchedulerPlugin` as a nanny plugin is not "
"allowed, registering as a scheduler plugin instead. "
"To register as a nanny plugin, inherit from `NannyPlugin`.",
UserWarning,
stacklevel=2,
)
else:
warnings.warn(
"Registering a `SchedulerPlugin` as a worker plugin is not "
"allowed, registering as a scheduler plugin instead. "
"To register as a worker plugin, inherit from `WorkerPlugin`.",
UserWarning,
stacklevel=2,
)
method = self._register_scheduler_plugin
else:
warnings.warn(
"Registering duck-typed plugins has been deprecated. "
"Please make sure your plugin inherits from `NannyPlugin` "
"or `WorkerPlugin`.",
DeprecationWarning,
stacklevel=2,
)
if nanny is True:
method = self._register_nanny_plugin
else:
method = self._register_worker_plugin

return self.sync(method, plugin=plugin, name=name, idempotent=False)

async def _unregister_worker_plugin(self, name, nanny=None):
if nanny:
responses = await self.scheduler.unregister_nanny_plugin(name=name)
Expand Down
5 changes: 2 additions & 3 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from dask.system import CPU_COUNT

from distributed import Client, LocalCluster, Nanny, Worker, get_client
from distributed import Client, LocalCluster, Nanny, Worker, WorkerPlugin, get_client
from distributed.compatibility import LINUX, asyncio_run
from distributed.config import get_loop_factory
from distributed.core import Status
Expand Down Expand Up @@ -1276,12 +1276,11 @@ async def test_connect_to_closed_cluster():
Client(cluster, asynchronous=True)


class MyPlugin:
class MyPlugin(WorkerPlugin):
def setup(self, worker=None):
import my_nonexistent_library # noqa


@pytest.mark.slow
def test_localcluster_start_exception(loop):
with raises_with_cause(
RuntimeError,
Expand Down
115 changes: 0 additions & 115 deletions distributed/diagnostics/tests/test_nanny_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,9 @@
import pytest

from distributed import Nanny, NannyPlugin
from distributed.protocol.pickle import dumps
from distributed.utils_test import captured_logger, gen_cluster


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_worker_plugin_is_deprecated(c, s, a):
class DuckPlugin(NannyPlugin):
def setup(self, nanny):
nanny.foo = 123

def teardown(self, nanny):
pass

n_existing_plugins = len(a.plugins)
assert not hasattr(a, "foo")
with pytest.warns(DeprecationWarning, match="register_worker_plugin.*deprecated"):
await c.register_worker_plugin(DuckPlugin())
assert len(a.plugins) == n_existing_plugins + 1
assert a.foo == 123


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_worker_plugin_typing_over_nanny_keyword(c, s, a):
class DuckPlugin(NannyPlugin):
def setup(self, nanny):
nanny.foo = 123

def teardown(self, nanny):
pass

n_existing_plugins = len(a.plugins)
assert not hasattr(a, "foo")
with (
pytest.warns(UserWarning, match="`NannyPlugin` as a worker plugin"),
pytest.warns(DeprecationWarning, match="please use `Client.register_plugin`"),
):
await c.register_worker_plugin(DuckPlugin(), nanny=False)

assert len(a.plugins) == n_existing_plugins + 1
assert a.foo == 123


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_duck_typed_register_nanny_plugin_is_deprecated(c, s, a):
class DuckPlugin:
def setup(self, nanny):
nanny.foo = 123

def teardown(self, nanny):
pass

n_existing_plugins = len(a.plugins)
assert not hasattr(a, "foo")
with (
pytest.warns(DeprecationWarning, match="duck-typed.*NannyPlugin"),
pytest.warns(DeprecationWarning, match="please use `Client.register_plugin`"),
):
await c.register_worker_plugin(DuckPlugin(), nanny=True)
assert len(a.plugins) == n_existing_plugins + 1
assert a.foo == 123


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_idempotent_plugin(c, s, a):
class IdempotentPlugin(NannyPlugin):
Expand Down Expand Up @@ -107,62 +48,6 @@ def __init__(self, instance=None):
assert "nonidempotentplugin" in a.plugins
assert a.plugins["nonidempotentplugin"].instance == "second"

third = NonIdempotentPlugin(instance="third")
with pytest.warns(
FutureWarning,
match="`Scheduler.register_nanny_plugin` now requires `idempotent`",
):
await s.register_nanny_plugin(
comm=None, plugin=dumps(third), name="nonidempotentplugin"
)
assert "nonidempotentplugin" in a.plugins
assert a.plugins["nonidempotentplugin"].instance == "third"


@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny)
async def test_register_plugin_with_idempotent_keyword_is_deprecated(c, s, a):
class NonIdempotentPlugin(NannyPlugin):
def __init__(self, instance=None):
self.name = "nonidempotentplugin"
self.instance = instance
# We want to overrule this
self.idempotent = True

first = NonIdempotentPlugin(instance="first")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(first, idempotent=False)
assert "nonidempotentplugin" in a.plugins

second = NonIdempotentPlugin(instance="second")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(second, idempotent=False)
assert "nonidempotentplugin" in a.plugins
assert a.plugins["nonidempotentplugin"].instance == "second"

class IdempotentPlugin(NannyPlugin):
def __init__(self, instance=None):
self.name = "idempotentplugin"
self.instance = instance
# We want to overrule this
self.idempotent = False

def setup(self, nanny):
if self.instance != "first":
raise RuntimeError(
"Only the first plugin should be started when idempotent is set"
)

first = IdempotentPlugin(instance="first")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(first, idempotent=True)
assert "idempotentplugin" in a.plugins

second = IdempotentPlugin(instance="second")
with pytest.warns(FutureWarning, match="`idempotent` argument is deprecated"):
await c.register_plugin(second, idempotent=True)
assert "idempotentplugin" in a.plugins
assert a.plugins["idempotentplugin"].instance == "first"


class BrokenSetupPlugin(NannyPlugin):
def setup(self, nanny):
Expand Down
Loading
Loading