您可以在实时会话中运行此 notebook Binder在 Github 上查看。

Dask logo\

期物 - 非阻塞式分布式计算

以并行、急切和非阻塞的方式提交任意函数进行计算。

futures 接口(派生自内置的 concurrent.futures)提供细粒度的实时执行,适用于定制场景。我们可以使用一组输入提交单个函数进行评估,或者使用 submit()map() 在一系列输入上进行评估。调用会立即返回,得到一个或多个期物,其状态开始时为“待处理”,随后变为“已完成”。这不会阻塞本地 Python 会话。

这是期物和 delayed 之间的重要区别。两者都可以用于支持任意任务调度,但 delayed 是惰性的(它只构建一个图),而期物是急切的。使用期物,一旦输入可用且计算资源可用,计算就会立即开始。

相关文档

[1]:
from dask.distributed import Client

client = Client(n_workers=4)
client
[1]:

客户端

客户端-438c7a7e-168e-11ee-95a8-6045bd777373

连接方法: Cluster 对象 集群类型: distributed.LocalCluster
控制面板: http://127.0.0.1:8787/status

集群信息

典型工作流程

这与我们在 delayed 教程中看到的工作流程相同。它是循环式的,数据不一定是数组或 dataframe。以下示例概述了一个读取-转换-写入流程

def process_file(filename):
    data = read_a_file(filename)
    data = do_a_transformation(data)
    destination = f"results/{filename}"
    write_out_data(data, destination)
    return destination

futures = []
for filename in filenames:
    future = client.submit(process_file, filename)
    futures.append(future)

futures

基础知识

就像我们在 delayed 教程中所做的那样,让我们创建一些示例函数,incadd,它们会暂停一段时间以模拟工作。然后我们计算正常运行这些函数所需的时间。

[2]:
from time import sleep


def inc(x):
    sleep(1)
    return x + 1


def double(x):
    sleep(2)
    return 2 * x


def add(x, y):
    sleep(1)
    return x + y

我们可以在本地运行这些函数

[3]:
inc(1)
[3]:
2

或者我们可以提交它们到 Dask 集群上远程运行。这会立即返回一个期物,指向正在进行的计算,最终指向存储的结果。

[4]:
future = client.submit(inc, 1)  # returns immediately with pending future
future
[4]:
期物:inc 状态: 待处理, 类型: NoneType, 键: inc-79e98188eecd6b29183b3ce9d7faa7fb

如果您等待一秒,然后再次检查期物,您会看到它已经完成。

[5]:
future
[5]:
期物:inc 状态: 待处理, 类型: NoneType, 键: inc-79e98188eecd6b29183b3ce9d7faa7fb

您可以使用 .result() 方法阻塞计算并收集结果。

[6]:
future.result()
[6]:
2

等待期物的其他方式

from dask.distributed import wait, progress
progress(future)

当前 notebook 中显示进度条,而无需转到控制面板。这个进度条也是异步的,不会阻塞其他代码的执行。

wait(future)

会阻塞并强制 notebook 等待,直到由 future 指向的计算完成。但是请注意,如果 inc() 的结果已存在于集群中,现在执行计算将不花费时间,因为 Dask 注意到我们正在请求一个它已经知道的计算结果。稍后详细介绍。

收集结果的其他方式

client.gather(futures)

从多个期物收集结果。

client.compute

通常,任何使用 .compute()dask.compute() 执行的 Dask 操作都可以转而使用 client.compute() 提交进行异步执行。

这是来自 delayed 教程的一个示例

[7]:
import dask


@dask.delayed
def inc(x):
    sleep(1)
    return x + 1


@dask.delayed
def add(x, y):
    sleep(1)
    return x + y


x = inc(1)
y = inc(2)
z = add(x, y)

到目前为止,我们有一个正常的 dask.delayed 输出。当我们将 z 传递给 client.compute 时,我们得到一个期物,并且 Dask 开始评估任务图。

[8]:
# notice the difference from z.compute()
# notice that this cell completes immediately
future = client.compute(z)
future
[8]:
期物:add 状态: 待处理, 类型: NoneType, 键: add-d841513e-069c-4143-83f5-d414e8c6075b
[9]:
future.result()  # waits until result is ready
[9]:
5

使用期物时,计算会移向数据,而不是反过来,并且客户端在本地 Python 会话中无需看到中间值。

client.submit

client.submit 接受一个函数和参数,将它们推送到集群,返回一个表示待计算结果的 Future。该函数被传递给一个工作进程进行评估。这看起来很像执行上面的 client.compute(),不同之处在于现在我们将函数和参数直接传递给集群。

[10]:
def inc(x):
    sleep(1)
    return x + 1


future_x = client.submit(inc, 1)
future_y = client.submit(inc, 2)
future_z = client.submit(sum, [future_x, future_y])
future_z
[10]:
期物:sum 状态: 待处理, 类型: NoneType, 键: sum-9d5470cab76844721b4f83e7f4f83d03
[11]:
future_z.result()  # waits until result is ready
[11]:
5

client.submit 的参数可以是普通的 Python 函数和对象、来自其他 submit 操作的期物或 dask.delayed 对象。

每个期物都表示集群持有或正在评估的结果。因此,我们可以控制中间值的缓存——当一个期物不再被引用时,其值会被遗忘。在上面的解决方案中,每个函数调用都持有一个期物。如果我们选择提交需要这些结果的新任务,这些结果就不需要重新评估。

我们可以使用 client.scatter() 将本地会话中的数据显式传递到集群中,但通常最好构建在工作节点内部完成数据加载的函数,这样就不需要序列化和通信数据。Dask 中的大多数加载函数,例如 dd.read_csv,都是这样工作的。类似地,我们通常不希望 gather() 那些内存中过大的结果。

示例:偶尔失败的任务

让我们想象一个有时会失败的任务。在处理输入数据时可能会遇到这种情况,例如有时文件格式错误,或者请求超时。

[12]:
from random import random


def flaky_inc(i):
    if random() < 0.2:
        raise ValueError("You hit the error!")
    return i + 1

如果您反复运行这个函数,它有时会失败。

>>> flaky_inc(2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [65], in <cell line: 1>()
----> 1 flaky_inc(2)

Input In [61], in flaky_inc(i)
      3 def flaky_inc(i):
      4     if random() < 0.5:
----> 5         raise ValueError("You hit the error!")
      6     return i + 1

ValueError: You hit the error!

我们可以使用 client.map 在一系列输入上运行此函数。

[13]:
futures = client.map(flaky_inc, range(10))

注意看,即使有些计算失败了,单元格也返回了。我们可以逐个检查这些期物,找出失败的那些。

[14]:
for i, future in enumerate(futures):
    print(i, future.status)
0 pending
1 error
2 pending
3 error
4 pending
5 pending
6 pending
7 pending
8 pending
9 pending
2023-06-29 15:04:44,470 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-f6da6cd306b8486204902a3870a6fdae
Function:  flaky_inc
args:      (3)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2023-06-29 15:04:44,471 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-1cecb32b6e65e2400b20e991211b2b2f
Function:  flaky_inc
args:      (1)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2023-06-29 15:04:44,480 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-8ddb78078a71f0888743d87b0702a4dd
Function:  flaky_inc
args:      (0)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2023-06-29 15:04:44,486 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-075e632c974e750d32844520adb2855a
Function:  flaky_inc
args:      (2)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

您可以重新运行那些特定的期物,尝试让任务成功完成

[15]:
futures[5].retry()
[16]:
for i, future in enumerate(futures):
    print(i, future.status)
0 error
1 error
2 error
3 error
4 finished
5 lost
6 finished
7 finished
8 finished
9 finished

在偶尔失败的情况下,一种更简洁的重试方式是在 client.computeclient.submitclient.map 方法中设置重试次数。

注意:在此示例中,我们还需要设置 pure=False,以让 Dask 知道函数的参数不能完全确定其输出。

[17]:
futures = client.map(flaky_inc, range(10), retries=5, pure=False)
future_z = client.submit(sum, futures)
future_z.result()
2023-06-29 15:04:44,530 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-e4337c25-d1ab-4a6e-8506-4014bea0c0d8-9
Function:  flaky_inc
args:      (9)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2023-06-29 15:04:44,531 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-e4337c25-d1ab-4a6e-8506-4014bea0c0d8-8
Function:  flaky_inc
args:      (8)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

2023-06-29 15:04:44,537 - distributed.worker - WARNING - Compute Failed
Key:       flaky_inc-e4337c25-d1ab-4a6e-8506-4014bea0c0d8-9
Function:  flaky_inc
args:      (9)
kwargs:    {}
Exception: "ValueError('You hit the error!')"

[17]:
55

您会看到很多警告,但计算最终应该会成功。

为何使用期物?

期物 API 提供了一种工作提交风格,可以轻松模拟 map/reduce 范式。如果您熟悉它,那么期物可能是进入 Dask 的最简单入口点。

期物的另一个巨大优势是,以期物表示的中间结果可以传递给新任务,而无需将数据从集群拉取到本地。可以设置新操作来处理前一个任务的输出,即使这些任务尚未开始。