10. Introduction to Dask DataFrame#
Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency.
Many of existing methods from pandas API are available in Dask DataFrame. Checkout this section of the documentation to learn more about these. In general, computations that are parallelizable are implemented in Dask DataFrame.
In this lecture, you will learn to use Dask DataFrame to analyze large tabular climate data.
10.1. Analyzing Multiple Large CSV files using Dask Data Frame#
For this tutorial, we will use the NOAA Global Historical Climatology Network Daily (GHCN-D) data available on AWS S3. You can reach more about the data on Registry of Open Data on AWS here.
More information about the dataset, including the metadata descriptions, is available on NOAA’s website.
GHCN-D contains daily observations over global land areas. It contains station-based measurements from land-based stations worldwide, about two thirds of which are for precipitation measurement only. Some data are more than 175 years old.
This dataset is very large and to analyze it within Python you need to use Dask Dataframe.
10.1.1. Download Data from AWS S3 bucket#
You can download the dataset from AWS S3 bucket using the following commands. This dataset does not require an AWS account (hence the --no-sign-request
flag should be passed).
# Download one year of data
! aws s3 cp --no-sign-request s3://noaa-ghcn-pds/csv/by_year/2022.csv .
download: s3://noaa-ghcn-pds/csv/by_year/2022.csv to ./2022.csv
# Download all data since 2020
! aws s3 cp --no-sign-request s3://noaa-ghcn-pds/csv/by_year/ . --recursive --exclude="*" --include="202*"
download: s3://noaa-ghcn-pds/csv/by_year/2023.csv to ./2023.csv
download: s3://noaa-ghcn-pds/csv/by_year/2020.csv to ./2020.csv
download: s3://noaa-ghcn-pds/csv/by_year/2021.csv to ./2021.csv
download: s3://noaa-ghcn-pds/csv/by_year/2022.csv to ./2022.csv
10.1.2. Import Packages#
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client
Client
Client-fb6cb252-62c6-11ee-81d3-424756c027d2
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: /user/halemohammad@clarku.edu/proxy/8787/status |
Cluster Info
LocalCluster
48a12102
Dashboard: /user/halemohammad@clarku.edu/proxy/8787/status | Workers: 4 |
Total threads: 8 | Total memory: 32.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-e3b4acfa-94c2-4fcb-83a9-4610ce50b60a
Comm: tcp://127.0.0.1:41753 | Workers: 4 |
Dashboard: /user/halemohammad@clarku.edu/proxy/8787/status | Total threads: 8 |
Started: Just now | Total memory: 32.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:46041 | Total threads: 2 |
Dashboard: /user/halemohammad@clarku.edu/proxy/45185/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:44311 | |
Local directory: /tmp/dask-scratch-space/worker-8j5qe7qt |
Worker: 1
Comm: tcp://127.0.0.1:40565 | Total threads: 2 |
Dashboard: /user/halemohammad@clarku.edu/proxy/36081/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:43905 | |
Local directory: /tmp/dask-scratch-space/worker-b3vn2e3n |
Worker: 2
Comm: tcp://127.0.0.1:34005 | Total threads: 2 |
Dashboard: /user/halemohammad@clarku.edu/proxy/36147/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:39241 | |
Local directory: /tmp/dask-scratch-space/worker-mbtkw2yi |
Worker: 3
Comm: tcp://127.0.0.1:42769 | Total threads: 2 |
Dashboard: /user/halemohammad@clarku.edu/proxy/43823/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:43917 | |
Local directory: /tmp/dask-scratch-space/worker-mg7pxgnj |
10.1.3. Read One CSV file#
Let’s first load one CSV file and see how Dask Dataframe works.
df = dd.read_csv("2022.csv", dtype={'Q_FLAG': 'object'})
You can check the number of partitions that Dask by defualt selects.
df.npartitions
21
To change the number of partitions you need to define the blocksize
in the read_csv
function:
df = dd.read_csv("2022.csv", dtype={'Q_FLAG': 'object'}, blocksize=25e6)
df.npartitions
54
The following line is an unnecessary step and you should not do it regularly. We will just try it to see how dd loads the data.
df.compute()
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
0 | AE000041196 | 20220101 | TAVG | 204 | H | NaN | S | NaN |
1 | AEM00041194 | 20220101 | TAVG | 211 | H | NaN | S | NaN |
2 | AEM00041217 | 20220101 | TAVG | 209 | H | NaN | S | NaN |
3 | AEM00041218 | 20220101 | TAVG | 207 | H | NaN | S | NaN |
4 | AG000060390 | 20220101 | TAVG | 121 | H | NaN | S | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
698661 | WF000917530 | 20221231 | TAVG | 283 | H | NaN | S | NaN |
698662 | WQW00041606 | 20221231 | TAVG | 278 | H | NaN | S | NaN |
698663 | WZ004455110 | 20221231 | TAVG | 203 | H | NaN | S | NaN |
698664 | ZI000067775 | 20221231 | TAVG | 206 | H | NaN | S | NaN |
698665 | ZI000067975 | 20221231 | TAVG | 202 | H | NaN | S | NaN |
37816959 rows × 8 columns
df
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
npartitions=54 | ||||||||
object | int64 | object | int64 | object | object | object | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... |
As you see, df
is empty again. This is because Dask does not store the outputs of the df.compute()
back in df
. If you need to keep these values, you should instead run df = df.compute()
.
10.1.4. Read Multiple CSVs#
Here, we will define a new df and load multiple CSV files.
large_df = dd.read_csv("*.csv", dtype={'Q_FLAG': 'object'}, blocksize=25e6)
large_df.npartitions
196
large_df
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
npartitions=196 | ||||||||
object | int64 | object | int64 | object | object | object | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... |
# This is going to fail, do not run it.
# large_df.compute()
Let’s calculate the mean of each type of observation in whole dataset.
mean_values = large_df.groupby("ELEMENT")["DATA_VALUE"].mean()
mean_values
Dask Series Structure:
npartitions=1
float64
...
Name: DATA_VALUE, dtype: float64
Dask Name: truediv, 7 graph layers
mean_values.compute()
ELEMENT
ADPT 59.341177
ASLP 10160.615552
ASTP 9706.444546
AWBT 96.504325
AWDR 465.104172
...
DASF 2.545455
MDSF 210.227273
WT10 1.000000
WT18 1.000000
WT17 1.000000
Name: DATA_VALUE, Length: 75, dtype: float64
Next, we will select a station in Worcester, MA and calculate the mean for each observation. You can see the list of all stations on NOAA’s website here.
worcester_df = large_df[large_df["ID"].isin(["US1MAWR0097"])]
worcester_df
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
npartitions=196 | ||||||||
object | int64 | object | int64 | object | object | object | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... |
worcester_mean = worcester_df.groupby("ELEMENT")["DATA_VALUE"].mean()
worcester_mean
Dask Series Structure:
npartitions=1
float64
...
Name: DATA_VALUE, dtype: float64
Dask Name: truediv, 11 graph layers
Now, we want to calculate the mean but we are interested to keep these values in memory. So we will assign the output to a new variable worcester_mean_values
:
worcester_mean_values = worcester_mean.compute()
worcester_mean_values
ELEMENT
PRCP 53.095327
SNOW 2.433884
SNWD 158.750000
WESD 0.000000
WESF 0.000000
Name: DATA_VALUE, dtype: float64
10.1.5. Task: find the station with the highest number of snow days#
In the following, we aim to find the station that has the highest number of snow days across years 2020-2023:
snow_df = large_df[large_df["ELEMENT"].isin(["SNOW"])]
snow_positive_df = snow_df[snow_df["DATA_VALUE"]>0]
station_counts = snow_positive_df["ID"].value_counts().compute()
station_counts
ID
USW00014755 525
CA002400306 420
US1MIBG0003 412
USC00502610 396
USC00204668 378
...
US1TXMCL043 1
US1NYNS0036 1
US1ALMS0035 1
US1NYNS0016 1
US1NHCR0043 1
Name: count, Length: 20255, dtype: int64