10. Introduction to Dask DataFrame#

Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency.

Many of existing methods from pandas API are available in Dask DataFrame. Checkout this section of the documentation to learn more about these. In general, computations that are parallelizable are implemented in Dask DataFrame.

In this lecture, you will learn to use Dask DataFrame to analyze large tabular climate data.

10.1. Analyzing Multiple Large CSV files using Dask Data Frame#

For this tutorial, we will use the NOAA Global Historical Climatology Network Daily (GHCN-D) data available on AWS S3. You can reach more about the data on Registry of Open Data on AWS here.

More information about the dataset, including the metadata descriptions, is available on NOAA’s website.

GHCN-D contains daily observations over global land areas. It contains station-based measurements from land-based stations worldwide, about two thirds of which are for precipitation measurement only. Some data are more than 175 years old.

This dataset is very large and to analyze it within Python you need to use Dask Dataframe.

10.1.1. Download Data from AWS S3 bucket#

You can download the dataset from AWS S3 bucket using the following commands. This dataset does not require an AWS account (hence the --no-sign-request flag should be passed).

# Download one year of data
! aws s3 cp --no-sign-request s3://noaa-ghcn-pds/csv/by_year/2022.csv .
download: s3://noaa-ghcn-pds/csv/by_year/2022.csv to ./2022.csv   
# Download all data since 2020
! aws s3 cp --no-sign-request s3://noaa-ghcn-pds/csv/by_year/ . --recursive --exclude="*" --include="202*"
download: s3://noaa-ghcn-pds/csv/by_year/2023.csv to ./2023.csv   
download: s3://noaa-ghcn-pds/csv/by_year/2020.csv to ./2020.csv 
download: s3://noaa-ghcn-pds/csv/by_year/2021.csv to ./2021.csv
download: s3://noaa-ghcn-pds/csv/by_year/2022.csv to ./2022.csv

10.1.2. Import Packages#

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client

Client

Client-fb6cb252-62c6-11ee-81d3-424756c027d2

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: /user/halemohammad@clarku.edu/proxy/8787/status

Cluster Info

10.1.3. Read One CSV file#

Let’s first load one CSV file and see how Dask Dataframe works.

df = dd.read_csv("2022.csv", dtype={'Q_FLAG': 'object'})

You can check the number of partitions that Dask by defualt selects.

df.npartitions
21

To change the number of partitions you need to define the blocksize in the read_csv function:

df = dd.read_csv("2022.csv", dtype={'Q_FLAG': 'object'}, blocksize=25e6)
df.npartitions
54

The following line is an unnecessary step and you should not do it regularly. We will just try it to see how dd loads the data.

df.compute()
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
0 AE000041196 20220101 TAVG 204 H NaN S NaN
1 AEM00041194 20220101 TAVG 211 H NaN S NaN
2 AEM00041217 20220101 TAVG 209 H NaN S NaN
3 AEM00041218 20220101 TAVG 207 H NaN S NaN
4 AG000060390 20220101 TAVG 121 H NaN S NaN
... ... ... ... ... ... ... ... ...
698661 WF000917530 20221231 TAVG 283 H NaN S NaN
698662 WQW00041606 20221231 TAVG 278 H NaN S NaN
698663 WZ004455110 20221231 TAVG 203 H NaN S NaN
698664 ZI000067775 20221231 TAVG 206 H NaN S NaN
698665 ZI000067975 20221231 TAVG 202 H NaN S NaN

37816959 rows × 8 columns

df
Dask DataFrame Structure:
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
npartitions=54
object int64 object int64 object object object float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: read-csv, 1 graph layer

As you see, df is empty again. This is because Dask does not store the outputs of the df.compute() back in df. If you need to keep these values, you should instead run df = df.compute().

10.1.4. Read Multiple CSVs#

Here, we will define a new df and load multiple CSV files.

large_df = dd.read_csv("*.csv", dtype={'Q_FLAG': 'object'}, blocksize=25e6)
large_df.npartitions
196
large_df
Dask DataFrame Structure:
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
npartitions=196
object int64 object int64 object object object float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: read-csv, 1 graph layer
# This is going to fail, do not run it. 
# large_df.compute()

Let’s calculate the mean of each type of observation in whole dataset.

mean_values = large_df.groupby("ELEMENT")["DATA_VALUE"].mean()
mean_values
Dask Series Structure:
npartitions=1
    float64
        ...
Name: DATA_VALUE, dtype: float64
Dask Name: truediv, 7 graph layers
mean_values.compute()
ELEMENT
ADPT       59.341177
ASLP    10160.615552
ASTP     9706.444546
AWBT       96.504325
AWDR      465.104172
            ...     
DASF        2.545455
MDSF      210.227273
WT10        1.000000
WT18        1.000000
WT17        1.000000
Name: DATA_VALUE, Length: 75, dtype: float64

Next, we will select a station in Worcester, MA and calculate the mean for each observation. You can see the list of all stations on NOAA’s website here.

worcester_df = large_df[large_df["ID"].isin(["US1MAWR0097"])]
worcester_df
Dask DataFrame Structure:
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
npartitions=196
object int64 object int64 object object object float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: getitem, 5 graph layers
worcester_mean = worcester_df.groupby("ELEMENT")["DATA_VALUE"].mean()
worcester_mean
Dask Series Structure:
npartitions=1
    float64
        ...
Name: DATA_VALUE, dtype: float64
Dask Name: truediv, 11 graph layers

Now, we want to calculate the mean but we are interested to keep these values in memory. So we will assign the output to a new variable worcester_mean_values:

worcester_mean_values = worcester_mean.compute()
worcester_mean_values
ELEMENT
PRCP     53.095327
SNOW      2.433884
SNWD    158.750000
WESD      0.000000
WESF      0.000000
Name: DATA_VALUE, dtype: float64

10.1.5. Task: find the station with the highest number of snow days#

In the following, we aim to find the station that has the highest number of snow days across years 2020-2023:

snow_df = large_df[large_df["ELEMENT"].isin(["SNOW"])]
snow_positive_df = snow_df[snow_df["DATA_VALUE"]>0]
station_counts = snow_positive_df["ID"].value_counts().compute()
station_counts
ID
USW00014755    525
CA002400306    420
US1MIBG0003    412
USC00502610    396
USC00204668    378
              ... 
US1TXMCL043      1
US1NYNS0036      1
US1ALMS0035      1
US1NYNS0016      1
US1NHCR0043      1
Name: count, Length: 20255, dtype: int64