您可以在 实时会话 中运行此 notebook Binder 或在 Github 上查看它。

Dask logo\

分布式 - 将您的数据和计算分散到集群中

如我们开头所述,Dask 能够使用分布式调度器在多台机器上运行任务。

到目前为止,我们实际上一直在使用分布式调度器进行工作,但仅限于单台机器。

当我们在不传递任何参数的情况下实例化一个 Client() 对象时,它会尝试定位一个 Dask 集群。它会检查您的本地 Dask 配置和环境变量,看是否指定了连接信息。如果没有,它将创建一个 LocalCluster 实例并使用它。

在配置中指定连接信息对于系统管理员向其用户提供访问权限很有用。我们在 Dask Helm Chart for Kubernetes 中执行此操作,该 chart 在 Kubernetes 集群上安装一个多节点 Dask 集群和一个 Jupyter 服务器,并且 Jupyter 已预配置为发现分布式集群。

本地集群

让我们自己探索 LocalCluster 对象,看看它在做什么。

[1]:
from dask.distributed import LocalCluster, Client
[2]:
cluster = LocalCluster()
cluster

创建集群对象将创建一个 Dask 调度器和多个 Dask 工作进程。如果未指定任何参数,它将自动检测系统拥有的 CPU 核心数量和内存大小,并创建相应数量的工作进程以充分利用这些资源。

您也可以自己指定这些参数。让我们看看 docstring(文档字符串),了解可用的选项。

这些参数也可以传递给 ``Client``,如果 ``Client`` 创建一个 ``LocalCluster``,这些参数就会被向下传递。

[3]:
?LocalCluster

我们的集群对象具有属性和方法,我们可以使用它们访问关于集群的信息。例如,我们可以使用 get_logs() 方法获取调度器和所有工作进程的日志输出。

[4]:
cluster.get_logs()
[4]:
集群

调度器

2023-06-29 15:04:32,886 - distributed.scheduler - INFO - State start

2023-06-29 15:04:32,889 - distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:43973

2023-06-29 15:04:32,889 - distributed.scheduler - INFO - dashboard at: 127.0.0.1:8787

2023-06-29 15:04:33,648 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:40465', name: 0, status: running, memory: 0, processing: 0>

2023-06-29 15:04:33,650 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40465

2023-06-29 15:04:33,677 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:33157', name: 1, status: running, memory: 0, processing: 0>

2023-06-29 15:04:33,677 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:33157

tcp://127.0.0.1:33157

2023-06-29 15:04:33,384 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:33157

2023-06-29 15:04:33,384 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:33157

2023-06-29 15:04:33,384 - distributed.worker - INFO - Worker name: 1

2023-06-29 15:04:33,384 - distributed.worker - INFO - dashboard at: 127.0.0.1:36055

2023-06-29 15:04:33,384 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:43973

2023-06-29 15:04:33,384 - distributed.worker - INFO - -------------------------------------------------

2023-06-29 15:04:33,384 - distributed.worker - INFO - Threads: 1

2023-06-29 15:04:33,384 - distributed.worker - INFO - Memory: 3.38 GiB

2023-06-29 15:04:33,384 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-cym14mrp

2023-06-29 15:04:33,384 - distributed.worker - INFO - -------------------------------------------------

2023-06-29 15:04:33,678 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:43973

2023-06-29 15:04:33,678 - distributed.worker - INFO - -------------------------------------------------

tcp://127.0.0.1:40465

2023-06-29 15:04:33,377 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:40465

2023-06-29 15:04:33,377 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:40465

2023-06-29 15:04:33,377 - distributed.worker - INFO - Worker name: 0

2023-06-29 15:04:33,377 - distributed.worker - INFO - dashboard at: 127.0.0.1:36305

2023-06-29 15:04:33,377 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:43973

2023-06-29 15:04:33,377 - distributed.worker - INFO - -------------------------------------------------

2023-06-29 15:04:33,377 - distributed.worker - INFO - Threads: 1

2023-06-29 15:04:33,377 - distributed.worker - INFO - Memory: 3.38 GiB

2023-06-29 15:04:33,377 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-a3z__l3w

2023-06-29 15:04:33,377 - distributed.worker - INFO - -------------------------------------------------

2023-06-29 15:04:33,650 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:43973

2023-06-29 15:04:33,650 - distributed.worker - INFO - -------------------------------------------------

我们可以访问托管 Dask 面板的 URL。

[5]:
cluster.dashboard_link
[5]:
'http://127.0.0.1:8787/status'

为了让 Dask 使用我们的集群,我们仍然需要创建一个 Client 对象,但是由于我们已经创建了一个集群,我们可以将其直接传递给我们的客户端。

[6]:
client = Client(cluster)
client
[6]:

客户端

Client-41bd1dc3-168e-11ee-94c2-6045bd777373

连接方法: 集群对象 集群类型: distributed.LocalCluster
面板: http://127.0.0.1:8787/status

集群信息

[7]:
del client, cluster

通过 SSH 连接远程集群

将工作分配到多台机器的常见方式是通过 SSH。Dask 有一个集群管理器,可以为您处理创建 SSH 连接,它叫做 SSHCluster

from dask.distributed import SSHCluster

构建此集群管理器时,我们需要传递一个地址列表,可以是主机名或 IP 地址,我们将通过 SSH 连接到这些地址,并尝试在其上启动 Dask 调度器或工作进程。

cluster = SSHCluster(["localhost", "hostA", "hostB"])
cluster

当我们创建 SSHCluster 对象时,我们给出了三个主机名的列表。

列表中的第一个主机将用作调度器,所有其他主机将用作工作进程。如果您在同一个网络中,将本地机器设置为调度器,然后使用其他机器作为工作进程也是合理的。

如果您的服务器是远程的,例如在云中,您可能也希望调度器位于远程机器上,以避免网络瓶颈。

可伸缩集群

到目前为止我们看到的两种集群都是固定大小的集群。我们要么在本地运行并使用机器中的所有资源,要么通过 SSH 使用指定数量的其他机器。

对于某些集群管理器,可以通过在代码中调用 cluster.scale(n) 来增加或减少工作进程数量,其中 n 是所需的工作进程数量。或者,您可以让 Dask 通过调用 cluster.adapt(minimum=1, maximum=100) 来动态执行此操作,其中 minimum 和 maximum 是您希望 Dask 遵守的限制。

始终最好将最小工作进程数保持在 1 或以上,因为 Dask 会开始在单个工作进程上运行任务,以便分析任务耗时并推断其认为需要多少额外的工人。获取新的工作进程可能需要时间,具体取决于您的设置,因此将其保持在 1 或以上意味着分析将立即开始。

我们目前支持 KubernetesHadoop/Yarn云平台 以及 包括 PBS、SLURM 和 SGE 在内的批处理系统 的集群管理器。

这些集群管理器允许拥有此类资源访问权限的用户在其上引导 Dask 集群。如果一个机构希望提供一个用户可以从中请求 Dask 集群的中心服务,那么还有 Dask Gateway

集群组件

一个功能正常的 Dask 集群的最低要求是一个调度器进程和一个工作进程。

我们可以通过命令行手动启动这些进程。让我们从调度器开始。

$ dask-scheduler
2022-07-07 14:11:35,661 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-07 14:11:37,405 - distributed.scheduler - INFO - State start
2022-07-07 14:11:37,408 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-07 14:11:37,409 - distributed.scheduler - INFO - Clear task state
2022-07-07 14:11:37,409 - distributed.scheduler - INFO -   Scheduler at:   tcp://10.51.100.80:8786
2022-07-07 14:11:37,409 - distributed.scheduler - INFO -   dashboard at:                     :8787

然后,我们可以连接一个工作进程到调度器正在监听的地址。

$ dask-worker tcp://10.51.100.80:8786 --nworkers=auto
2022-07-07 14:12:53,915 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58051'
2022-07-07 14:12:53,922 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58052'
2022-07-07 14:12:53,924 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58053'
2022-07-07 14:12:53,925 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.51.100.80:58054'
2022-07-07 14:12:55,222 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58065
2022-07-07 14:12:55,222 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58065
2022-07-07 14:12:55,223 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58068
2022-07-07 14:12:55,223 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,223 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,223 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,223 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,224 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-hlvac6m5
2022-07-07 14:12:55,225 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,227 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58066
2022-07-07 14:12:55,227 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58066
2022-07-07 14:12:55,227 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58070
2022-07-07 14:12:55,227 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,227 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,227 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,228 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,228 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-e1suf_7o
2022-07-07 14:12:55,229 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,231 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58063
2022-07-07 14:12:55,233 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58063
2022-07-07 14:12:55,233 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58067
2022-07-07 14:12:55,233 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,233 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,234 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,234 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,235 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-oq39ihb4
2022-07-07 14:12:55,236 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,246 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,246 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,249 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,264 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,264 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,267 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,267 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,267 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,269 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,273 - distributed.worker - INFO -       Start worker at:   tcp://10.51.100.80:58064
2022-07-07 14:12:55,273 - distributed.worker - INFO -          Listening to:   tcp://10.51.100.80:58064
2022-07-07 14:12:55,273 - distributed.worker - INFO -          dashboard at:         10.51.100.80:58069
2022-07-07 14:12:55,273 - distributed.worker - INFO - Waiting to connect to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,274 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,274 - distributed.worker - INFO -               Threads:                          3
2022-07-07 14:12:55,275 - distributed.worker - INFO -                Memory:                   4.00 GiB
2022-07-07 14:12:55,275 - distributed.worker - INFO -       Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-zfie55ku
2022-07-07 14:12:55,276 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,299 - distributed.worker - INFO -         Registered to:    tcp://10.51.100.80:8786
2022-07-07 14:12:55,300 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,302 - distributed.core - INFO - Starting established connection

然后在 Python 中,我们可以连接一个客户端到这个集群并提交一些工作。

>>> from dask.distributed import Client
>>> client = Client("tcp://10.51.100.80:8786")
>>> client.submit(lambda: 1+1)

我们也可以在 Python 中通过导入集群组件并直接创建它们来完成此操作。

[8]:
from dask.distributed import Scheduler, Worker, Client

async with Scheduler() as scheduler:
    async with Worker(scheduler.address) as worker:
        async with Client(scheduler.address, asynchronous=True) as client:
            print(await client.submit(lambda: 1 + 1))
2
/usr/share/miniconda3/envs/dask-tutorial/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 34129 instead
  warnings.warn(

大多数时候,我们无需自己创建这些组件,而是可以依靠集群管理器对象为我们完成此操作。但在某些情况下,能够手动构建集群会很有用。

您可能偶尔会看到引用 Nanny 进程。这是工作进程的一个包装器,用于处理进程被杀死后的重启。当我们通过命令行运行 dask-worker 时,会自动为我们创建一个 nanny。

集群网络

默认情况下,Dask 使用基于自定义 TCP 的远程过程调用协议在进程之间进行通信。调度器和工作进程都监听 TCP 端口进行通信。

当您启动调度器时,它通常监听端口 8786。创建工作进程时,它会监听一个随机的高端口,并在首次连接时将该端口告知调度器。

调度器维护一个包含所有工作进程及其地址的列表,该列表可以通过工作进程访问,因此调度器和任何工作进程都可以随时打开与其他工作进程的连接。连接在不使用时会自动关闭。

Client 只会连接到调度器,并且所有与工作进程的通信都将通过调度器传递。这意味着部署 Dask 集群时,调度器和工作进程通常必须在同一个网络中,并且能够通过 IP 和端口直接相互访问。但客户端可以在任何地方运行,只要它能够访问调度器的通信端口。通常会配置防火墙规则或负载均衡器,以仅提供对调度器端口的访问。

Dask 还支持其他网络协议,例如 TLSwebsocketsUCX

用于安全通信的 TLS/SSL

如果在不受信任的环境中运行,Dask 集群组件可以使用证书进行相互认证和安全通信。您可以自动生成调度器、工作进程和客户端的证书并分发它们,或者您可以生成临时凭据。

一些集群管理器,例如 dask-cloudprovider,在将公共云中的集群暴露到互联网时会自动启用 TLS 并生成一次性证书。

[9]:
from dask.distributed import Scheduler, Worker, Client
from distributed.security import Security

security = Security.temporary()

async with Scheduler(security=security) as scheduler:
    async with Worker(scheduler.address, security=security) as worker:
        async with Client(
            scheduler.address, security=security, asynchronous=True
        ) as client:
            print(await client.submit(lambda: 1 + 1))
2
/usr/share/miniconda3/envs/dask-tutorial/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 34081 instead
  warnings.warn(

Websockets

Dask 还可以通过 websockets 而不是 TCP 进行通信。这样做会产生非常小的性能开销,但这使得面板和通信发生在同一个端口上,并且可以被像 nginx 这样的七层代理进行反向代理。这对于某些部署场景是必需的,在这些场景中您无法暴露端口,但可以代理 Web 服务。

UCX

在具有高性能网络(如 Infiniband 或 NVLink)的系统上,Dask 还可以利用 UCX。UCX 提供统一的通信协议,可自动升级通信以使用可用的最快硬件。这对于在配备 Infiniband 的 HPC 系统或具有多个 GPU 工作进程的系统上获得良好性能至关重要。