你可以在 live session 中运行此 notebook Binder ,或在 Github 上查看。

Dask logo\

Dask DataFrame - 并行化的 pandas

看起来和用起来都像 pandas API,但适用于并行和分布式工作流。

其核心是,dask.dataframe 模块实现了一个“块并行”的 DataFrame 对象,它看起来和用起来都像 pandas API,但适用于并行和分布式工作流。一个 Dask DataFrame 由许多内存中的 pandas DataFrame 组成,这些 DataFrame 沿索引分开。对一个 Dask DataFrame 的操作会触发对组成它的 pandas DataFrame 进行许多 pandas 操作,这种方式考虑了潜在的并行性和内存限制。

Dask DataFrame is composed of pandas DataFrames

相关文档

何时使用 dask.dataframe

pandas 非常适合内存中可以容纳的表格数据集。使用 pandas 的经验法则是

“RAM 容量应为数据集大小的 5 到 10 倍”

Wes McKinney (2017) 在 《我讨厌 pandas 的 10 件事》中提到

这里的“数据集大小”是指数据集在磁盘上的大小。

当数据集超出上述规则时,Dask 就变得很有用了。

在本 notebook 中,你将使用纽约市航空公司数据。这个数据集只有大约 200MB,因此你可以在合理的时间内下载它,但是 dask.dataframe 可以扩展到远大于内存的数据集。

创建数据集

创建将在此 notebook 中使用的数据集

[1]:
%run prep.py -d flights

设置本地集群

创建本地 Dask 集群并连接到客户端。暂时不用担心这段代码,你将在分布式 notebook 中学习更多。

[2]:
from dask.distributed import Client

client = Client(n_workers=4)
client
[2]:

客户端

Client-a09c1408-168d-11ee-8f9e-6045bd777373

连接方法: 集群对象 集群类型: distributed.LocalCluster
仪表板: http://127.0.0.1:8787/status

集群信息

Dask 诊断仪表板

Dask Distributed 提供了一个有用的仪表板,用于可视化集群和计算的状态。

如果你在 JupyterLab 或 Binder 上,可以使用 Dask JupyterLab 扩展(你的环境中应该已经安装了)来打开仪表板图: * 点击左侧边栏中的 Dask 徽标 * 点击放大镜图标,它会自动连接到活动仪表板(如果不起作用,你可以在字段中输入/粘贴仪表板链接 http://127.0.0.1:8787) * 点击“任务流”(Task Stream)、“进度条”(Progress Bar)和“工作节点内存”(Worker Memory),它们将在新选项卡中打开这些图 * 重新组织选项卡以适应你的工作流程!

或者,点击上面客户端详细信息中显示的仪表板链接:http://127.0.0.1:8787/status。它将在新的浏览器选项卡中打开仪表板。

读取和处理数据集

让我们读取美国过去几年的一些航班数据。这些数据专门针对从纽约市区域三个机场起飞的航班。

[3]:
import os
import dask

按照惯例,我们将模块 dask.dataframe 导入为 dd,并将相应的 DataFrame 对象称为 ddf

注意:“Dask DataFrame”这个术语有点多重含义。根据上下文,它可以指模块或 DataFrame 对象。为了避免混淆,在本 notebook 中: - dask.dataframe(注意全部小写)指 API,- DataFrame(注意驼峰式大小写)指对象。

以下文件名包含 glob 模式 *,因此路径中匹配该模式的所有文件将被读入同一个 DataFrame

[4]:
import dask.dataframe as dd

ddf = dd.read_csv(
    os.path.join("data", "nycflights", "*.csv"), parse_dates={"Date": [0, 1, 2]}
)
ddf
[4]:
Dask DataFrame 结构
日期 星期几 出发时间 计划出发时间 到达时间 计划到达时间 唯一承运人 航班号 机尾号 实际耗时 计划耗时 飞行时间 到达延误 出发延误 出发地 目的地 距离 滑入时间 滑出时间 已取消 已备降
npartitions=10
datetime64[ns] int64 float64 int64 float64 int64 object int64 float64 float64 int64 float64 float64 float64 object object float64 float64 float64 int64 int64
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask 名称:read-csv,1 个图层

Dask 尚未加载数据,它已完成: - 调查输入路径并发现有十个匹配文件 - 智能地为每个数据块创建了一组作业 - 在本例中,每个原始 CSV 文件对应一个作业

请注意,DataFrame 对象的表示中不包含任何数据 - Dask 仅完成了足以读取第一个文件开头并推断列名和数据类型的工作。

惰性评估

大多数 Dask 集合,包括 Dask DataFrame,都是惰性评估的,这意味着 Dask 会立即构建你的计算逻辑(称为任务图),但只在必要时才“评估”它们。你可以使用 .visualize() 查看此任务图。

你将在 Delayed notebook 中了解更多相关信息,但现在请注意,我们需要调用 .compute() 来触发实际计算。

[5]:
ddf.visualize()
[5]:
_images/01_dataframe_16_0.png

一些函数,如 lenhead,也会触发计算。具体来说,调用 len 会: - 加载实际数据(即,将每个文件加载到 pandas DataFrame 中) - 然后将相应的函数应用于每个 pandas DataFrame(也称为分区) - 合并小计以给出最终总计

[6]:
# load and count number of rows
len(ddf)
[6]:
9990

你可以像在 pandas 中一样查看数据的开头和结尾

[7]:
ddf.head()
[7]:
日期 星期几 出发时间 计划出发时间 到达时间 计划到达时间 唯一承运人 航班号 机尾号 实际耗时 ... 飞行时间 到达延误 出发延误 出发地 目的地 距离 滑入时间 滑出时间 已取消 已备降
0 1990-01-01 1 1621.0 1540 1747.0 1701 US 33 NaN 86.0 ... NaN 46.0 41.0 EWR PIT 319.0 NaN NaN 0 0
1 1990-01-02 2 1547.0 1540 1700.0 1701 US 33 NaN 73.0 ... NaN -1.0 7.0 EWR PIT 319.0 NaN NaN 0 0
2 1990-01-03 3 1546.0 1540 1710.0 1701 US 33 NaN 84.0 ... NaN 9.0 6.0 EWR PIT 319.0 NaN NaN 0 0
3 1990-01-04 4 1542.0 1540 1710.0 1701 US 33 NaN 88.0 ... NaN 9.0 2.0 EWR PIT 319.0 NaN NaN 0 0
4 1990-01-05 5 1549.0 1540 1706.0 1701 US 33 NaN 77.0 ... NaN 5.0 9.0 EWR PIT 319.0 NaN NaN 0 0

5 行 × 21 列

ddf.tail()

# ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

# +----------------+---------+----------+
# | Column         | Found   | Expected |
# +----------------+---------+----------+
# | CRSElapsedTime | float64 | int64    |
# | TailNum        | object  | float64  |
# +----------------+---------+----------+

# The following columns also raised exceptions on conversion:

# - TailNum
#   ValueError("could not convert string to float: 'N54711'")

# Usually this is due to dask's dtype inference failing, and
# *may* be fixed by specifying dtypes manually by adding:

# dtype={'CRSElapsedTime': 'float64',
#        'TailNum': 'object'}

# to the call to `read_csv`/`read_table`.

pandas.read_csv 在推断数据类型之前读取整个文件不同,dask.dataframe.read_csv 只读取文件开头(或使用 glob 时的第一个文件)的样本。然后在读取所有分区时强制执行这些推断的数据类型。

在这种情况下,样本中推断的数据类型不正确。前 n 行没有 CRSElapsedTime 的值(pandas 将其推断为 float),后来发现是字符串(object dtype)。请注意,Dask 会提供有关不匹配的详细错误消息。发生这种情况时,你有几个选项

  • 使用 dtype 关键字直接指定数据类型。这是推荐的解决方案,因为它的错误率最低(显式优于隐式),而且性能最好。

  • 增加 sample 关键字的大小(以字节为单位)

  • 使用 assume_missingdask 假定推断为 int(不允许缺失值)的列实际上是 float(允许缺失值)。在我们的特定情况下,这不适用。

在我们的例子中,我们将使用第一个选项并直接指定有问题的列的数据类型。

[8]:
ddf = dd.read_csv(
    os.path.join("data", "nycflights", "*.csv"),
    parse_dates={"Date": [0, 1, 2]},
    dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
)
[9]:
ddf.tail()  # now works
[9]:
日期 星期几 出发时间 计划出发时间 到达时间 计划到达时间 唯一承运人 航班号 机尾号 实际耗时 ... 飞行时间 到达延误 出发延误 出发地 目的地 距离 滑入时间 滑出时间 已取消 已备降
994 1999-01-25 1 632.0 635 803.0 817 CO 437 N27213 91.0 ... 68.0 -14.0 -3.0 EWR RDU 416.0 4.0 19.0 False 0
995 1999-01-26 2 632.0 635 751.0 817 CO 437 N16217 79.0 ... 62.0 -26.0 -3.0 EWR RDU 416.0 3.0 14.0 False 0
996 1999-01-27 3 631.0 635 756.0 817 CO 437 N12216 85.0 ... 66.0 -21.0 -4.0 EWR RDU 416.0 4.0 15.0 False 0
997 1999-01-28 4 629.0 635 803.0 817 CO 437 N26210 94.0 ... 69.0 -14.0 -6.0 EWR RDU 416.0 5.0 20.0 False 0
998 1999-01-29 5 632.0 635 802.0 817 CO 437 N12225 90.0 ... 67.0 -15.0 -3.0 EWR RDU 416.0 5.0 18.0 False 0

5 行 × 21 列

从远程存储读取

如果你正在考虑分布式计算,你的数据可能存储在远程服务(如 Amazon S3 或 Google Cloud Storage)上,并且采用更友好的格式(如 Parquet)。Dask 可以直接从这些远程位置以惰性方式并行读取各种格式的数据。

以下是如何从 Amazon S3 读取纽约市出租车数据的方法

ddf = dd.read_parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2012-*.parquet",
)

你还可以利用 Parquet 特有的优化,如列选择和元数据处理,在 Dask 关于处理 Parquet 文件的文档中了解更多信息。

使用 dask.dataframe 进行计算

让我们计算航班延误的最大值。

如果只使用 pandas,我们将遍历每个文件以找到各个最大值,然后找到所有各个最大值的最终最大值。

import pandas as pd

files = os.listdir(os.path.join('data', 'nycflights'))

maxes = []

for file in files:
    df = pd.read_csv(os.path.join('data', 'nycflights', file))
    maxes.append(df.DepDelay.max())

final_max = max(maxes)

dask.dataframe 允许我们编写类似 pandas 的代码,对大于内存的数据集进行并行操作。

[10]:
%%time
result = ddf.DepDelay.max()
result.compute()
CPU times: user 25.2 ms, sys: 29 µs, total: 25.2 ms
Wall time: 84.8 ms
[10]:
409.0

这为我们创建了惰性计算,然后运行它。

注意: Dask 会尽快删除中间结果(例如每个文件的完整 pandas DataFrame)。这意味着你可以处理大于内存的数据集,但是重复计算每次都必须重新加载所有数据。(再次运行上面的代码,它比你预期的快还是慢?)

你可以使用 .visualize() 查看底层任务图

[11]:
# notice the parallelism
result.visualize()
[11]:
_images/01_dataframe_31_0.png

练习

在本节中,你将进行一些 dask.dataframe 计算。如果你熟悉 pandas,那么这些应该会很熟悉。你需要考虑何时调用 .compute()

1. 数据集中有多少行?

提示:你如何检查列表中有多少项?

[12]:
# Your code here
[13]:
len(ddf)
[13]:
9990

2. 总共有多少非取消航班?

提示:使用布尔索引

[14]:
# Your code here
[15]:
len(ddf[~ddf.Cancelled])
[15]:
9383

3. 总共有多少非取消航班从每个机场起飞?

提示:使用groupby

[16]:
# Your code here
[17]:
ddf[~ddf.Cancelled].groupby("Origin").Origin.count().compute()
[17]:
Origin
EWR    4132
JFK    1085
LGA    4166
Name: Origin, dtype: int64

4. 每个机场的平均出发延误是多少?

[18]:
# Your code here
[19]:
ddf.groupby("Origin").DepDelay.mean().compute()
[19]:
Origin
EWR    12.500968
JFK    17.053456
LGA    10.169227
Name: DepDelay, dtype: float64

5. 一周中哪一天的平均出发延误最严重?

[20]:
# Your code here
[21]:
ddf.groupby("DayOfWeek").DepDelay.mean().idxmax().compute()
[21]:
5

6. 假设距离列有错误,你需要给所有值加 1,你会怎么做?

[22]:
# Your code here
[23]:
ddf["Distance"].apply(
    lambda x: x + 1
).compute()  # don't worry about the warning, we'll discuss in the next sections

# OR

(ddf["Distance"] + 1).compute()
/usr/share/miniconda3/envs/dask-tutorial/lib/python3.10/site-packages/dask/dataframe/core.py:4139: UserWarning:
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('Distance', 'float64'))

  warnings.warn(meta_warning(meta))
[23]:
0      320.0
1      320.0
2      320.0
3      320.0
4      320.0
       ...
994    417.0
995    417.0
996    417.0
997    417.0
998    417.0
Name: Distance, Length: 9990, dtype: float64

共享中间结果

在计算上述所有内容时,我们有时会多次执行相同的操作。对于大多数操作,dask.dataframe 会存储参数,从而允许共享重复的计算并仅计算一次。

例如,让我们计算所有非取消航班的出发延误的平均值和标准差。由于 Dask 操作是惰性的,这些值还不是最终结果。它们只是获取结果所需的步骤。

如果你通过两次调用 compute 来计算它们,则不会共享中间计算。

[24]:
non_canceled = ddf[~ddf.Cancelled]
mean_delay = non_canceled.DepDelay.mean()
std_delay = non_canceled.DepDelay.std()
[25]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()
CPU times: user 78.7 ms, sys: 4.18 ms, total: 82.9 ms
Wall time: 237 ms

dask.compute

但让我们尝试将两者都传递给一次 compute 调用。

[26]:
%%time

mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)
CPU times: user 54.1 ms, sys: 0 ns, total: 54.1 ms
Wall time: 147 ms

使用 dask.compute 大约只需要一半的时间。这是因为在调用 dask.compute 时,两个结果的任务图会合并,从而允许共享操作只执行一次而不是两次。特别是,使用 dask.compute 只会执行以下操作一次

  • read_csv 的调用

  • 过滤 (df[~df.Cancelled])

  • 一些必要的归约 (sum, count)

要查看多个结果之间的合并任务图是什么样的(以及哪些被共享了),你可以使用 dask.visualize 函数(你可能想使用 filename='graph.pdf' 将图保存到磁盘以便更容易放大查看)

[27]:
dask.visualize(mean_delay, std_delay, engine="cytoscape")
[27]:

.persist()

在使用分布式调度器时(你将在后续 notebook 中了解更多关于调度器的信息),你可以将一些你想经常使用的数据保存在分布式内存中。

persist 生成“Futures”(稍后也会详细介绍)并将它们存储在与你的输出相同的结构中。你可以对任何适合内存的数据或计算使用 persist

如果你只想分析从 JFK 机场起飞的非取消航班的数据,你可以像上一节一样进行两次 compute 调用

[28]:
non_cancelled = ddf[~ddf.Cancelled]
ddf_jfk = non_cancelled[non_cancelled.Origin == "JFK"]
[29]:
%%time
ddf_jfk.DepDelay.mean().compute()
ddf_jfk.DepDelay.sum().compute()
CPU times: user 74.6 ms, sys: 0 ns, total: 74.6 ms
Wall time: 244 ms
[29]:
18503.0

或者,考虑将该数据子集持久化到内存中。

查看“图”(Graph)仪表板图,红色方块表示作为 Futures 存储在内存中的持久化数据。你还会注意到工作节点内存(Worker Memory,另一个仪表板图)消耗的增加。

[30]:
ddf_jfk = ddf_jfk.persist()  # returns back control immediately
[31]:
%%time
ddf_jfk.DepDelay.mean().compute()
ddf_jfk.DepDelay.std().compute()
CPU times: user 83.8 ms, sys: 4.65 ms, total: 88.4 ms
Wall time: 208 ms
[31]:
36.85799641792652

对这些持久化数据的分析更快,因为我们不必重复加载和选择(非取消、从 JFK 出发)操作。

使用 Dask DataFrame 的自定义代码

dask.dataframe 只涵盖了 pandas API 中一小部分但常用功能。

此限制有两个原因

  1. Pandas API 非常庞大

  2. 有些操作确实难以并行化,例如排序。

此外,一些重要的操作,如 set_index,虽然可以工作,但比在 pandas 中慢,因为它们涉及到大量的数据混洗,并可能写入磁盘。

如果你想使用一些尚未(或无法)为 Dask DataFrame 实现的自定义函数怎么办?

你可以在 Dask 问题跟踪器上提交问题,检查该函数实现的可能性,并可以考虑为 Dask 贡献该函数。

如果它是自定义函数或实现起来很复杂,dask.dataframe 提供了一些方法来使将自定义函数应用于 Dask DataFrames 更容易

让我们快速了解一下 map_partitions() 函数

[32]:
help(ddf.map_partitions)
Help on method map_partitions in module dask.dataframe.core:

map_partitions(func, *args, **kwargs) method of dask.dataframe.core.DataFrame instance
    Apply Python function on each DataFrame partition.

    Note that the index and divisions are assumed to remain unchanged.

    Parameters
    ----------
    func : function
        The function applied to each partition. If this function accepts
        the special ``partition_info`` keyword argument, it will receive
        information on the partition's relative location within the
        dataframe.
    args, kwargs :
        Positional and keyword arguments to pass to the function.
        Positional arguments are computed on a per-partition basis, while
        keyword arguments are shared across all partitions. The partition
        itself will be the first positional argument, with all other
        arguments passed *after*. Arguments can be ``Scalar``, ``Delayed``,
        or regular Python objects. DataFrame-like args (both dask and
        pandas) will be repartitioned to align (if necessary) before
        applying the function; see ``align_dataframes`` to control this
        behavior.
    enforce_metadata : bool, default True
        Whether to enforce at runtime that the structure of the DataFrame
        produced by ``func`` actually matches the structure of ``meta``.
        This will rename and reorder columns for each partition,
        and will raise an error if this doesn't work,
        but it won't raise if dtypes don't match.
    transform_divisions : bool, default True
        Whether to apply the function onto the divisions and apply those
        transformed divisions to the output.
    align_dataframes : bool, default True
        Whether to repartition DataFrame- or Series-like args
        (both dask and pandas) so their divisions align before applying
        the function. This requires all inputs to have known divisions.
        Single-partition inputs will be split into multiple partitions.

        If False, all inputs must have either the same number of partitions
        or a single partition. Single-partition inputs will be broadcast to
        every partition of multi-partition inputs.
    meta : pd.DataFrame, pd.Series, dict, iterable, tuple, optional
        An empty ``pd.DataFrame`` or ``pd.Series`` that matches the dtypes
        and column names of the output. This metadata is necessary for
        many algorithms in dask dataframe to work.  For ease of use, some
        alternative inputs are also available. Instead of a ``DataFrame``,
        a ``dict`` of ``{name: dtype}`` or iterable of ``(name, dtype)``
        can be provided (note that the order of the names should match the
        order of the columns). Instead of a series, a tuple of ``(name,
        dtype)`` can be used. If not provided, dask will try to infer the
        metadata. This may lead to unexpected results, so providing
        ``meta`` is recommended. For more information, see
        ``dask.dataframe.utils.make_meta``.

    Examples
    --------
    Given a DataFrame, Series, or Index, such as:

    >>> import pandas as pd
    >>> import dask.dataframe as dd
    >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
    ...                    'y': [1., 2., 3., 4., 5.]})
    >>> ddf = dd.from_pandas(df, npartitions=2)

    One can use ``map_partitions`` to apply a function on each partition.
    Extra arguments and keywords can optionally be provided, and will be
    passed to the function after the partition.

    Here we apply a function with arguments and keywords to a DataFrame,
    resulting in a Series:

    >>> def myadd(df, a, b=1):
    ...     return df.x + df.y + a + b
    >>> res = ddf.map_partitions(myadd, 1, b=2)
    >>> res.dtype
    dtype('float64')

    Here we apply a function to a Series resulting in a Series:

    >>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure
    >>> res.dtype
    dtype('int64')

    By default, dask tries to infer the output metadata by running your
    provided function on some fake data. This works well in many cases, but
    can sometimes be expensive, or even fail. To avoid this, you can
    manually specify the output metadata with the ``meta`` keyword. This
    can be specified in many forms, for more information see
    ``dask.dataframe.utils.make_meta``.

    Here we specify the output is a Series with no name, and dtype
    ``float64``:

    >>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))

    Here we map a function that takes in a DataFrame, and returns a
    DataFrame with a new column:

    >>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
    >>> res.dtypes
    x      int64
    y    float64
    z    float64
    dtype: object

    As before, the output metadata can also be specified manually. This
    time we pass in a ``dict``, as the output is a DataFrame:

    >>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y),
    ...                          meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})

    In the case where the metadata doesn't change, you can also pass in
    the object itself directly:

    >>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)

    Also note that the index and divisions are assumed to remain unchanged.
    If the function you're mapping changes the index/divisions, you'll need
    to clear them afterwards:

    >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP

    Your map function gets information about where it is in the dataframe by
    accepting a special ``partition_info`` keyword argument.

    >>> def func(partition, partition_info=None):
    ...     pass

    This will receive the following information:

    >>> partition_info  # doctest: +SKIP
    {'number': 1, 'division': 3}

    For each argument and keyword arguments that are dask dataframes you will
    receive the number (n) which represents the nth partition of the dataframe
    and the division (the first index value in the partition). If divisions
    are not known (for instance if the index is not sorted) then you will get
    None as the division.

ddf 中的“距离”列当前以英里为单位。假设我们想将单位转换为公里,并且我们有一个通用的辅助函数,如下所示。在这种情况下,我们可以使用 map_partitions 并行地将此函数应用于每个内部 pandas DataFrame

[33]:
def my_custom_converter(df, multiplier=1):
    return df * multiplier


meta = pd.Series(name="Distance", dtype="float64")

distance_km = ddf.Distance.map_partitions(
    my_custom_converter, multiplier=0.6, meta=meta
)
[34]:
distance_km.visualize()
[34]:
_images/01_dataframe_73_0.png
[35]:
distance_km.head()
[35]:
0    191.4
1    191.4
2    191.4
3    191.4
4    191.4
Name: Distance, dtype: float64

什么是 meta?

由于 Dask 是惰性操作的,它并不总是有足够的信息来推断某些操作的输出结构(包括数据类型)。

meta 是关于你的计算输出对 Dask 的一个建议。重要的是,meta 永远不会干预输出结构。Dask 会使用这个 meta,直到它能够确定实际的输出结构。

尽管有很多方法来定义 meta,我们建议使用一个与最终输出结构匹配的小型 pandas Series 或 DataFrame。

关闭本地 Dask 集群

养成一个好习惯是始终关闭你创建的任何 Dask 集群

[36]:
client.shutdown()