期物 - 非阻塞式分布式计算
目录
您可以在实时会话中运行此 notebook 或在 Github 上查看。
期物 - 非阻塞式分布式计算¶
以并行、急切和非阻塞的方式提交任意函数进行计算。
futures
接口(派生自内置的 concurrent.futures
)提供细粒度的实时执行,适用于定制场景。我们可以使用一组输入提交单个函数进行评估,或者使用 submit()
和 map()
在一系列输入上进行评估。调用会立即返回,得到一个或多个期物,其状态开始时为“待处理”,随后变为“已完成”。这不会阻塞本地 Python 会话。
这是期物和 delayed 之间的重要区别。两者都可以用于支持任意任务调度,但 delayed 是惰性的(它只构建一个图),而期物是急切的。使用期物,一旦输入可用且计算资源可用,计算就会立即开始。
相关文档
[1]:
from dask.distributed import Client
client = Client(n_workers=4)
client
[1]:
客户端
客户端-438c7a7e-168e-11ee-95a8-6045bd777373
连接方法: Cluster 对象 | 集群类型: distributed.LocalCluster |
控制面板: http://127.0.0.1:8787/status |
集群信息
LocalCluster
2169d44d
控制面板: http://127.0.0.1:8787/status | 工作节点 4 |
总线程数 4 | 总内存: 6.77 GiB |
状态: 运行中 | 使用进程: True |
调度器信息
调度器
调度器-fd680bfe-39cd-493b-863f-300abd2194b8
通信: tcp://127.0.0.1:40391 | 工作节点 4 |
控制面板: http://127.0.0.1:8787/status | 总线程数 4 |
启动时间: 刚刚 | 总内存: 6.77 GiB |
工作节点
工作节点:0
通信: tcp://127.0.0.1:45743 | 总线程数 1 |
控制面板: http://127.0.0.1:42129/status | 内存: 1.69 GiB |
Nanny: tcp://127.0.0.1:46213 | |
本地目录: /tmp/dask-worker-space/worker-6cwpop4r |
工作节点:1
通信: tcp://127.0.0.1:36935 | 总线程数 1 |
控制面板: http://127.0.0.1:41037/status | 内存: 1.69 GiB |
Nanny: tcp://127.0.0.1:45393 | |
本地目录: /tmp/dask-worker-space/worker-40gikf42 |
工作节点:2
通信: tcp://127.0.0.1:33221 | 总线程数 1 |
控制面板: http://127.0.0.1:39629/status | 内存: 1.69 GiB |
Nanny: tcp://127.0.0.1:37017 | |
本地目录: /tmp/dask-worker-space/worker-bs0ecu2z |
工作节点:3
通信: tcp://127.0.0.1:45833 | 总线程数 1 |
控制面板: http://127.0.0.1:45981/status | 内存: 1.69 GiB |
Nanny: tcp://127.0.0.1:38217 | |
本地目录: /tmp/dask-worker-space/worker-2x_wok1o |
典型工作流程¶
这与我们在 delayed 教程中看到的工作流程相同。它是循环式的,数据不一定是数组或 dataframe。以下示例概述了一个读取-转换-写入流程
def process_file(filename):
data = read_a_file(filename)
data = do_a_transformation(data)
destination = f"results/{filename}"
write_out_data(data, destination)
return destination
futures = []
for filename in filenames:
future = client.submit(process_file, filename)
futures.append(future)
futures
基础知识¶
就像我们在 delayed 教程中所做的那样,让我们创建一些示例函数,inc
和 add
,它们会暂停一段时间以模拟工作。然后我们计算正常运行这些函数所需的时间。
[2]:
from time import sleep
def inc(x):
sleep(1)
return x + 1
def double(x):
sleep(2)
return 2 * x
def add(x, y):
sleep(1)
return x + y
我们可以在本地运行这些函数
[3]:
inc(1)
[3]:
2
或者我们可以提交它们到 Dask 集群上远程运行。这会立即返回一个期物,指向正在进行的计算,最终指向存储的结果。
[4]:
future = client.submit(inc, 1) # returns immediately with pending future
future
[4]:
如果您等待一秒,然后再次检查期物,您会看到它已经完成。
[5]:
future
[5]:
您可以使用 .result()
方法阻塞计算并收集结果。
[6]:
future.result()
[6]:
2
等待期物的其他方式¶
from dask.distributed import wait, progress
progress(future)
在当前 notebook 中显示进度条,而无需转到控制面板。这个进度条也是异步的,不会阻塞其他代码的执行。
wait(future)
会阻塞并强制 notebook 等待,直到由 future
指向的计算完成。但是请注意,如果 inc()
的结果已存在于集群中,现在执行计算将不花费时间,因为 Dask 注意到我们正在请求一个它已经知道的计算结果。稍后详细介绍。
client.compute
¶
通常,任何使用 .compute()
或 dask.compute()
执行的 Dask 操作都可以转而使用 client.compute()
提交进行异步执行。
这是来自 delayed 教程的一个示例
[7]:
import dask
@dask.delayed
def inc(x):
sleep(1)
return x + 1
@dask.delayed
def add(x, y):
sleep(1)
return x + y
x = inc(1)
y = inc(2)
z = add(x, y)
到目前为止,我们有一个正常的 dask.delayed
输出。当我们将 z
传递给 client.compute
时,我们得到一个期物,并且 Dask 开始评估任务图。
[8]:
# notice the difference from z.compute()
# notice that this cell completes immediately
future = client.compute(z)
future
[8]:
[9]:
future.result() # waits until result is ready
[9]:
5
使用期物时,计算会移向数据,而不是反过来,并且客户端在本地 Python 会话中无需看到中间值。
client.submit
¶
client.submit
接受一个函数和参数,将它们推送到集群,返回一个表示待计算结果的 Future
。该函数被传递给一个工作进程进行评估。这看起来很像执行上面的 client.compute()
,不同之处在于现在我们将函数和参数直接传递给集群。
[10]:
def inc(x):
sleep(1)
return x + 1
future_x = client.submit(inc, 1)
future_y = client.submit(inc, 2)
future_z = client.submit(sum, [future_x, future_y])
future_z
[10]:
[11]:
future_z.result() # waits until result is ready
[11]:
5
client.submit
的参数可以是普通的 Python 函数和对象、来自其他 submit 操作的期物或 dask.delayed
对象。
每个期物都表示集群持有或正在评估的结果。因此,我们可以控制中间值的缓存——当一个期物不再被引用时,其值会被遗忘。在上面的解决方案中,每个函数调用都持有一个期物。如果我们选择提交需要这些结果的新任务,这些结果就不需要重新评估。
我们可以使用 client.scatter()
将本地会话中的数据显式传递到集群中,但通常最好构建在工作节点内部完成数据加载的函数,这样就不需要序列化和通信数据。Dask 中的大多数加载函数,例如 dd.read_csv
,都是这样工作的。类似地,我们通常不希望 gather()
那些内存中过大的结果。
示例:偶尔失败的任务¶
让我们想象一个有时会失败的任务。在处理输入数据时可能会遇到这种情况,例如有时文件格式错误,或者请求超时。
[12]:
from random import random
def flaky_inc(i):
if random() < 0.2:
raise ValueError("You hit the error!")
return i + 1
如果您反复运行这个函数,它有时会失败。
>>> flaky_inc(2)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Input In [65], in <cell line: 1>()
----> 1 flaky_inc(2)
Input In [61], in flaky_inc(i)
3 def flaky_inc(i):
4 if random() < 0.5:
----> 5 raise ValueError("You hit the error!")
6 return i + 1
ValueError: You hit the error!
我们可以使用 client.map
在一系列输入上运行此函数。
[13]:
futures = client.map(flaky_inc, range(10))
注意看,即使有些计算失败了,单元格也返回了。我们可以逐个检查这些期物,找出失败的那些。
[14]:
for i, future in enumerate(futures):
print(i, future.status)
0 pending
1 error
2 pending
3 error
4 pending
5 pending
6 pending
7 pending
8 pending
9 pending
2023-06-29 15:04:44,470 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-f6da6cd306b8486204902a3870a6fdae
Function: flaky_inc
args: (3)
kwargs: {}
Exception: "ValueError('You hit the error!')"
2023-06-29 15:04:44,471 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-1cecb32b6e65e2400b20e991211b2b2f
Function: flaky_inc
args: (1)
kwargs: {}
Exception: "ValueError('You hit the error!')"
2023-06-29 15:04:44,480 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-8ddb78078a71f0888743d87b0702a4dd
Function: flaky_inc
args: (0)
kwargs: {}
Exception: "ValueError('You hit the error!')"
2023-06-29 15:04:44,486 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-075e632c974e750d32844520adb2855a
Function: flaky_inc
args: (2)
kwargs: {}
Exception: "ValueError('You hit the error!')"
您可以重新运行那些特定的期物,尝试让任务成功完成
[15]:
futures[5].retry()
[16]:
for i, future in enumerate(futures):
print(i, future.status)
0 error
1 error
2 error
3 error
4 finished
5 lost
6 finished
7 finished
8 finished
9 finished
在偶尔失败的情况下,一种更简洁的重试方式是在 client.compute
、client.submit
或 client.map
方法中设置重试次数。
注意:在此示例中,我们还需要设置 pure=False
,以让 Dask 知道函数的参数不能完全确定其输出。
[17]:
futures = client.map(flaky_inc, range(10), retries=5, pure=False)
future_z = client.submit(sum, futures)
future_z.result()
2023-06-29 15:04:44,530 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-e4337c25-d1ab-4a6e-8506-4014bea0c0d8-9
Function: flaky_inc
args: (9)
kwargs: {}
Exception: "ValueError('You hit the error!')"
2023-06-29 15:04:44,531 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-e4337c25-d1ab-4a6e-8506-4014bea0c0d8-8
Function: flaky_inc
args: (8)
kwargs: {}
Exception: "ValueError('You hit the error!')"
2023-06-29 15:04:44,537 - distributed.worker - WARNING - Compute Failed
Key: flaky_inc-e4337c25-d1ab-4a6e-8506-4014bea0c0d8-9
Function: flaky_inc
args: (9)
kwargs: {}
Exception: "ValueError('You hit the error!')"
[17]:
55
您会看到很多警告,但计算最终应该会成功。
为何使用期物?¶
期物 API 提供了一种工作提交风格,可以轻松模拟 map/reduce 范式。如果您熟悉它,那么期物可能是进入 Dask 的最简单入口点。
期物的另一个巨大优势是,以期物表示的中间结果可以传递给新任务,而无需将数据从集群拉取到本地。可以设置新操作来处理前一个任务的输出,即使这些任务尚未开始。