diff --git a/distributed/deploy/__init__.py b/distributed/deploy/__init__.py index 6c3996606f1..ffbeb6a8c64 100644 --- a/distributed/deploy/__init__.py +++ b/distributed/deploy/__init__.py @@ -1,13 +1,8 @@ from __future__ import annotations -from contextlib import suppress - from distributed.deploy.adaptive import Adaptive from distributed.deploy.cluster import Cluster from distributed.deploy.local import LocalCluster from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.ssh import SSHCluster from distributed.deploy.subprocess import SubprocessCluster - -with suppress(ImportError): - from distributed.deploy.ssh import SSHCluster diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 4471a7bc4a3..3ffb1af3228 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -12,7 +12,7 @@ from tornado.ioloop import IOLoop import dask.config -from dask.utils import _deprecated, format_bytes, parse_timedelta, typename +from dask.utils import format_bytes, parse_timedelta, typename from dask.widgets import get_template from distributed.compatibility import PeriodicCallback @@ -348,10 +348,6 @@ def get_logs(self, cluster=True, scheduler=True, workers=True): self._get_logs, cluster=cluster, scheduler=scheduler, workers=workers ) - @_deprecated(use_instead="get_logs") - def logs(self, *args, **kwargs): - return self.get_logs(*args, **kwargs) - def get_client(self): """Return client for the cluster diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 5e4067824a2..e090f6649e0 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -55,8 +55,6 @@ class LocalCluster(SpecCluster): Use a falsey value like False or None for no change. host: string Host address on which the scheduler will listen, defaults to only localhost - ip: string - Deprecated. See ``host`` above. dashboard_address: str Address on which to listen for the Bokeh diagnostics server like 'localhost:8787' or '0.0.0.0:8787'. Defaults to ':8787'. @@ -69,8 +67,6 @@ class LocalCluster(SpecCluster): Address on which to listen for the Bokeh worker diagnostics server like 'localhost:8787' or '0.0.0.0:8787'. Defaults to None which disables the dashboard. Use ':0' for a random port. - diagnostics_port: int - Deprecated. See dashboard_address. asynchronous: bool (False by default) Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use. @@ -120,14 +116,13 @@ def __init__( threads_per_worker=None, processes=None, loop=None, - start=None, + start=None, # deprecated host=None, - ip=None, + ip=None, # deprecated scheduler_port=0, silence_logs=logging.WARN, dashboard_address=":8787", worker_dashboard_address=None, - diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, @@ -142,23 +137,19 @@ def __init__( **worker_kwargs, ): if ip is not None: - # In the future we should warn users about this move - # warnings.warn("The ip keyword has been moved to host") - host = ip - - if diagnostics_port is not None: warnings.warn( - "diagnostics_port has been deprecated. " - "Please use `dashboard_address=` instead" + "The `ip` parameter has been deprecated. Please use `host` instead", + DeprecationWarning, + stacklevel=2, ) - dashboard_address = diagnostics_port + host = ip - if threads_per_worker == 0: + if start is not None: warnings.warn( - "Setting `threads_per_worker` to 0 has been deprecated. " - "Please set to None or to a specific int." + "The `start` parameter has been deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, ) - threads_per_worker = None if "dashboard" in worker_kwargs: warnings.warn( diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index dceefff871b..cb463b00bb5 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -1,10 +1,10 @@ from __future__ import annotations +import datetime import logging import os import sys import traceback -import warnings from queue import Queue from threading import Thread from time import sleep @@ -330,7 +330,7 @@ def __init__( scheduler_port, worker_addrs, nthreads=0, - n_workers=None, + n_workers=1, ssh_username=None, ssh_port=22, ssh_private_key=None, @@ -342,40 +342,16 @@ def __init__( nanny_port=None, remote_dask_worker="distributed.cli.dask_worker", local_directory=None, - **kwargs, ): self.scheduler_addr = scheduler_addr self.scheduler_port = scheduler_port self.nthreads = nthreads - nprocs = kwargs.pop("nprocs", None) - if kwargs: - raise TypeError( - f"__init__() got an unexpected keyword argument {', '.join(kwargs.keys())}" - ) - if nprocs is not None and n_workers is not None: - raise ValueError( - "Both nprocs and n_workers were specified. Use n_workers only." - ) - elif nprocs is not None: - warnings.warn( - "The nprocs argument will be removed in a future release. It has been " - "renamed to n_workers.", - FutureWarning, - ) - n_workers = nprocs - elif n_workers is None: - n_workers = 1 - self.n_workers = n_workers - self.ssh_username = ssh_username self.ssh_port = ssh_port self.ssh_private_key = ssh_private_key - self.nohost = nohost - self.remote_python = remote_python - self.memory_limit = memory_limit self.worker_port = worker_port self.nanny_port = nanny_port @@ -383,8 +359,6 @@ def __init__( self.local_directory = local_directory # Generate a universal timestamp to use for log files - import datetime - if logdir is not None: logdir = os.path.join( logdir, @@ -420,24 +394,6 @@ def __init__( def _start(self): pass - @property - def nprocs(self): - warnings.warn( - "The nprocs attribute will be removed in a future release. It has been " - "renamed to n_workers.", - FutureWarning, - ) - return self.n_workers - - @nprocs.setter - def nprocs(self, value): - warnings.warn( - "The nprocs attribute will be removed in a future release. It has been " - "renamed to n_workers.", - FutureWarning, - ) - self.n_workers = value - @property def scheduler_address(self): return f"{self.scheduler_addr}:{self.scheduler_port}" diff --git a/distributed/deploy/ssh.py b/distributed/deploy/ssh.py index 3d203cda172..5e41407c10b 100644 --- a/distributed/deploy/ssh.py +++ b/distributed/deploy/ssh.py @@ -71,7 +71,6 @@ def __init__( # type: ignore[no-untyped-def] address: str, connect_options: dict, kwargs: dict, - worker_module="deprecated", worker_class="distributed.Nanny", remote_python=None, loop=None, @@ -79,50 +78,21 @@ def __init__( # type: ignore[no-untyped-def] ): super().__init__() - if worker_module != "deprecated": - raise ValueError( - "worker_module has been deprecated in favor of worker_class. " - "Please specify a Python class rather than a CLI module." + if loop is not None: + warnings.warn( + "The `loop` parameter has been deprecated and has no effect.", + DeprecationWarning, + stacklevel=2, ) self.address = address self.scheduler = scheduler self.worker_class = worker_class self.connect_options = connect_options - self.kwargs = copy.copy(kwargs) self.name = name self.remote_python = remote_python - if kwargs.get("nprocs") is not None and kwargs.get("n_workers") is not None: - raise ValueError( - "Both nprocs and n_workers were specified. Use n_workers only." - ) - elif kwargs.get("nprocs") is not None: - warnings.warn( - "The nprocs argument will be removed in a future release. It has been " - "renamed to n_workers.", - FutureWarning, - ) - self.n_workers = self.kwargs.pop("nprocs", 1) - else: - self.n_workers = self.kwargs.pop("n_workers", 1) - - @property - def nprocs(self): - warnings.warn( - "The nprocs attribute will be removed in a future release. It has been " - "renamed to n_workers.", - FutureWarning, - ) - return self.n_workers - - @nprocs.setter - def nprocs(self, value): - warnings.warn( - "The nprocs attribute will be removed in a future release. It has been " - "renamed to n_workers.", - FutureWarning, - ) - self.n_workers = value + self.kwargs = copy.copy(kwargs) + self.n_workers = self.kwargs.pop("n_workers", 1) async def start(self): try: @@ -291,7 +261,6 @@ def SSHCluster( connect_options: dict | list[dict] | None = None, worker_options: dict | None = None, scheduler_options: dict | None = None, - worker_module: str = "deprecated", worker_class: str = "distributed.Nanny", remote_python: str | list[str] | None = None, **kwargs: Any, @@ -387,12 +356,6 @@ def SSHCluster( worker_options = worker_options or {} scheduler_options = scheduler_options or {} - if worker_module != "deprecated": - raise ValueError( - "worker_module has been deprecated in favor of worker_class. " - "Please specify a Python class rather than a CLI module." - ) - if set(kwargs) & old_cluster_kwargs: from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index f4b422af62e..708d952f1a3 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1060,22 +1060,6 @@ async def test_repr(memory_limit): assert "memory" not in text -@gen_test() -async def test_threads_per_worker_set_to_0(): - with pytest.warns( - Warning, match="Setting `threads_per_worker` to 0 has been deprecated." - ): - async with LocalCluster( - n_workers=2, - processes=False, - threads_per_worker=0, - asynchronous=True, - dashboard_address=":0", - ) as cluster: - assert len(cluster.workers) == 2 - assert all(w.state.nthreads < CPU_COUNT for w in cluster.workers.values()) - - @pytest.mark.parametrize("temporary", [True, False]) @gen_test() async def test_capture_security(temporary): diff --git a/distributed/deploy/tests/test_old_ssh.py b/distributed/deploy/tests/test_old_ssh.py index cd844cca912..de59ae8a359 100644 --- a/distributed/deploy/tests/test_old_ssh.py +++ b/distributed/deploy/tests/test_old_ssh.py @@ -36,50 +36,3 @@ def test_cluster(loop): while len(e.ncores()) != 3: sleep(0.01) assert time() < start + 5 - - -def test_old_ssh_nprocs_renamed_to_n_workers(): - with pytest.warns(FutureWarning, match="renamed to n_workers"): - with SSHCluster( - scheduler_addr="127.0.0.1", - scheduler_port=8687, - worker_addrs=["127.0.0.1", "127.0.0.1"], - nprocs=2, - ) as c: - assert c.n_workers == 2 - - -def test_nprocs_attribute_is_deprecated(): - with SSHCluster( - scheduler_addr="127.0.0.1", - scheduler_port=8687, - worker_addrs=["127.0.0.1", "127.0.0.1"], - ) as c: - assert c.n_workers == 1 - with pytest.warns(FutureWarning, match="renamed to n_workers"): - assert c.nprocs == 1 - with pytest.warns(FutureWarning, match="renamed to n_workers"): - c.nprocs = 3 - - assert c.n_workers == 3 - - -def test_old_ssh_n_workers_with_nprocs_is_an_error(): - with pytest.raises(ValueError, match="Both nprocs and n_workers"): - SSHCluster( - scheduler_addr="127.0.0.1", - scheduler_port=8687, - worker_addrs=(), - nprocs=2, - n_workers=2, - ) - - -def test_extra_kwargs_is_an_error(): - with pytest.raises(TypeError, match="unexpected keyword argument"): - SSHCluster( - scheduler_addr="127.0.0.1", - scheduler_port=8687, - worker_addrs=["127.0.0.1", "127.0.0.1"], - unknown_kwarg=None, - ) diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index ce39d20bb1d..f95f67f6280 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -304,14 +304,6 @@ async def test_get_logs(): assert set(logs) == {w} -@gen_test() -async def test_logs_deprecated(): - async with SpecCluster(asynchronous=True, scheduler=scheduler) as cluster: - with pytest.warns(FutureWarning, match="get_logs"): - logs = await cluster.logs() - assert logs["Scheduler"] - - @gen_test() async def test_scheduler_info(): async with SpecCluster( diff --git a/distributed/deploy/tests/test_ssh.py b/distributed/deploy/tests/test_ssh.py index 6e4d469dcde..c14d6d9ef71 100644 --- a/distributed/deploy/tests/test_ssh.py +++ b/distributed/deploy/tests/test_ssh.py @@ -82,59 +82,6 @@ async def test_n_workers(): assert "SSH" in repr(cluster) -@gen_test() -async def test_nprocs_attribute_is_deprecated(): - async with SSHCluster( - ["127.0.0.1"] * 2, - connect_options=dict(known_hosts=None), - asynchronous=True, - scheduler_options={"idle_timeout": "5s"}, - worker_options={"death_timeout": "5s"}, - ) as cluster: - assert len(cluster.workers) == 1 - worker = cluster.workers[0] - assert worker.n_workers == 1 - with pytest.warns(FutureWarning, match="renamed to n_workers"): - assert worker.nprocs == 1 - with pytest.warns(FutureWarning, match="renamed to n_workers"): - worker.nprocs = 3 - - assert worker.n_workers == 3 - - -@gen_test() -async def test_ssh_nprocs_renamed_to_n_workers(): - with pytest.warns(FutureWarning, match="renamed to n_workers"): - async with SSHCluster( - ["127.0.0.1"] * 3, - connect_options=dict(known_hosts=None), - asynchronous=True, - scheduler_options={"idle_timeout": "5s"}, - worker_options={"death_timeout": "5s", "nprocs": 2}, - ) as cluster: - assert len(cluster.workers) == 2 - async with Client(cluster, asynchronous=True) as client: - await client.wait_for_workers(4) - - -@gen_test() -async def test_ssh_n_workers_with_nprocs_is_an_error(): - cluster = SSHCluster( - ["127.0.0.1"] * 3, - connect_options=dict(known_hosts=None), - asynchronous=True, - scheduler_options={}, - worker_options={"n_workers": 2, "nprocs": 2}, - ) - try: - with pytest.raises(ValueError, match="Both nprocs and n_workers"): - async with cluster: - pass - finally: - # FIXME: SSHCluster leaks if SSHCluster.__aenter__ raises - await cluster.close() - - @gen_test() async def test_keywords(): async with SSHCluster(