12. Introduction to Dask DataFrame#

Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency.

Dask DataFrame

Many of existing methods from pandas API are available in Dask DataFrame. Checkout this section of the documentation to learn more about these. In general, computations that are parallelizable are implemented in Dask DataFrame.

In this lecture, you will learn to use Dask DataFrame to analyze large tabular climate data.

12.1. Analyzing Multiple Large CSV files using Dask Data Frame#

For this tutorial, we will use the NOAA Global Historical Climatology Network Daily (GHCN-D) data available on AWS S3. You can read more about the data on Registry of Open Data on AWS here.

More information about the dataset, including the metadata descriptions, is available on NOAA’s website.

GHCN-D contains daily observations over global land areas. It contains station-based measurements from land-based stations worldwide, about two thirds of which are for precipitation measurement only. Some data are more than 175 years old.

This dataset is very large and to analyze it within Python you need to use Dask Dataframe.

12.1.1. Download Data from AWS S3 bucket#

You can download the dataset from AWS S3 bucket using the following commands. This dataset does not require an AWS account (hence the UNSIGNED value should be passed to the S3 client should be passed).

import boto3
import os
from botocore import UNSIGNED
from botocore.client import Config

def download_s3_objects_no_auth(bucket_name, download_path, prefix, substring, aws_region = None):
    """
    Download all objects from a public S3 bucket (no authentication) that contain a specific prefix and substring in their keys.

    Parameters
    ----------
    bucket_name : str
        The name of the S3 bucket.
    download_path : str
        Local directory where the files will be downloaded.
    prefix : str
        Characters that are required to be at the begining of the S3 object keys.
    substring : str
        The substring to search for in the S3 object keys.
    aws_region : str
        AWS region where the S3 bucket is located (optional).

    Returns
    -------
    Donwloads all the files that match the search criteria into the download_path
    
    """
    # Initialize the S3 client with no request signing (public bucket)
    s3_client = boto3.client('s3', config = Config(signature_version=UNSIGNED), region_name = aws_region)

    # Ensure the download path exists
    if not os.path.exists(download_path):
        os.makedirs(download_path)

    # List all objects in the bucket that contain the prefix
    response = s3_client.list_objects_v2(Bucket = bucket_name, Prefix = prefix)
    if 'Contents' not in response:
        print(f"No objects found in the bucket '{bucket_name}'.")
        return

    # Loop through objects and download those that contain the substring
    for obj in response['Contents']:
        key = obj['Key']
        if substring in key:
            local_filename = os.path.join(download_path, key.split('/')[-1])
            if not os.path.exists(local_filename):
                print(f"Downloading {key} to {local_filename}...")
                s3_client.download_file(bucket_name, key, local_filename)
                print(f"Downloaded: {local_filename}")
            else:
                print(f"Object {key} exists at {local_filename}. Download skipped.")
download_s3_objects_no_auth(bucket_name="noaa-ghcn-pds", download_path="./data", prefix="csv/by_year/", substring="202")
Object csv/by_year/2020.csv exists at ./data/2020.csv. Download skipped.
Object csv/by_year/2021.csv exists at ./data/2021.csv. Download skipped.
Object csv/by_year/2022.csv exists at ./data/2022.csv. Download skipped.
Object csv/by_year/2023.csv exists at ./data/2023.csv. Download skipped.
Object csv/by_year/2024.csv exists at ./data/2024.csv. Download skipped.

12.1.2. Import Packages#

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
print(client.dashboard_link)
http://127.0.0.1:8787/status
2024-10-29 03:08:29,631 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle c13f174aa9c7c055892846fcec63e0eb initialized by task ('shuffle-transfer-c13f174aa9c7c055892846fcec63e0eb', 4) executed on worker tcp://127.0.0.1:37029
2024-10-29 03:08:40,665 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle c13f174aa9c7c055892846fcec63e0eb deactivated due to stimulus 'task-finished-1730171320.6634696'
2024-10-29 03:09:08,926 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 35cc973c67f92223b02f04c8bfa63774 initialized by task ('shuffle-transfer-35cc973c67f92223b02f04c8bfa63774', 124) executed on worker tcp://127.0.0.1:41279
2024-10-29 03:09:21,234 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 35cc973c67f92223b02f04c8bfa63774 deactivated due to stimulus 'task-finished-1730171361.2319715'
2024-10-29 03:12:32,850 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 94) executed on worker tcp://127.0.0.1:44969
2024-10-29 03:12:47,017 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730171567.0173345'
2024-10-29 03:14:46,939 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 140) executed on worker tcp://127.0.0.1:41279
2024-10-29 03:15:01,245 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730171701.2428126'
2024-10-29 03:19:21,715 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 30) executed on worker tcp://127.0.0.1:38473
2024-10-29 03:19:36,353 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730171976.3521986'
2024-10-29 03:19:50,283 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 140) executed on worker tcp://127.0.0.1:41279
2024-10-29 03:20:04,652 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730172004.6500053'
2024-10-29 03:20:48,104 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 21) executed on worker tcp://127.0.0.1:38473
2024-10-29 03:21:02,444 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730172062.443373'

12.1.3. Read One CSV file#

Let’s first load one CSV file and see how Dask Dataframe works.

df = dd.read_csv("./data/2023.csv", dtype = {"Q_FLAG" : "object"})

You can check the number of partitions that Dask by defualt selects.

df.npartitions
16

To change the number of partitions you need to define the blocksize in the read_csv function:

df = dd.read_csv("./data/2023.csv", dtype = {"Q_FLAG" : "object"}, blocksize=25e6)
df.npartitions
43

The following line is an unnecessary step and you should not run it regularly. This will try to load all the data into memory which can easily go beyond your memory limit.

# DO NOT RUN
# df.compute()

But if you would like to load a small portion of the data, you can load a few lines of the dataframe as following:

df.loc[:100, :].compute()
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
0 AGM00060430 20230101 TMIN 101 NaN NaN S NaN
1 AGM00060430 20230101 PRCP 0 NaN NaN S NaN
2 AGM00060430 20230101 TAVG 148 H NaN S NaN
3 AGM00060437 20230101 TMIN 87 NaN NaN S NaN
4 AGM00060437 20230101 PRCP 0 NaN NaN S NaN
... ... ... ... ... ... ... ... ...
96 USC00424467 20231011 SNWD 0 NaN NaN 7 1700.0
97 USC00424508 20231011 TMAX 256 NaN NaN 7 1700.0
98 USC00424508 20231011 TMIN 56 NaN NaN 7 1700.0
99 USC00424508 20231011 TOBS 194 NaN NaN 7 1700.0
100 USC00424508 20231011 PRCP 0 NaN NaN 7 1700.0

4343 rows × 8 columns

df
Dask DataFrame Structure:
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
npartitions=43
string int64 string int64 string string string float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: read_csv, 1 expression

As you see, df is empty again. This is because Dask does not store the outputs of the df.compute() back in df. If you need to keep these values, you should instead run df = df.compute().

12.1.4. Read Multiple CSVs#

Here, we will define a new df and load multiple CSV files.

large_df = dd.read_csv("./data/*.csv", dtype = {"Q_FLAG" : "object"}, blocksize=25e6)
large_df.npartitions
252
large_df
Dask DataFrame Structure:
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
npartitions=252
string int64 string int64 string string string float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: read_csv, 1 expression

Let’s calculate the mean of each type of observation in whole dataset.

mean_values = large_df.groupby("ELEMENT")["DATA_VALUE"].mean()
mean_values
Dask Series Structure:
npartitions=1
    float64
        ...
Dask Name: getitem, 4 expressions
Expr=((ReadCSV(03735e5)[['ELEMENT', 'DATA_VALUE']]).mean(observed=False, chunk_kwargs={'numeric_only': False}, aggregate_kwargs={'numeric_only': False}, _slice='DATA_VALUE'))['DATA_VALUE']
mean_values.compute()
ELEMENT
SN56     65.149429
TAVG    109.644308
WDF5    200.575220
SN31    117.909843
SX53    101.728783
           ...    
PSUN     42.392204
SN32    143.052831
SN55     80.291063
SX51    167.196816
DASF      2.545455
Name: DATA_VALUE, Length: 75, dtype: float64

Next, we will select a station in Worcester, MA and calculate the mean for each observation. You can see the list of all stations on NOAA’s website here.

worcester_df = large_df[large_df["ID"].isin(["US1MAWR0097"])]
worcester_df
Dask DataFrame Structure:
ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
npartitions=252
string int64 string int64 string string string float64
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ...
Dask Name: getitem, 5 expressions
worcester_mean = worcester_df.groupby("ELEMENT")["DATA_VALUE"].mean()
worcester_mean
Dask Series Structure:
npartitions=1
    float64
        ...
Dask Name: getitem, 8 expressions
Expr=(((Filter(frame=ReadCSV(03735e5), predicate=Isin(frame=ReadCSV(03735e5)['ID'], values=_DelayedExpr(Delayed('delayed-1e99729721b067d61cb17db688716cf6')))))[['ELEMENT', 'DATA_VALUE']]).mean(observed=False, chunk_kwargs={'numeric_only': False}, aggregate_kwargs={'numeric_only': False}, _slice='DATA_VALUE'))['DATA_VALUE']

Now, we want to calculate the mean but we are interested to keep these values in memory. So we will assign the output to a new variable worcester_mean_values:

worcester_mean_values = worcester_mean.compute()
worcester_mean_values
ELEMENT
SNWD    158.750000
SNOW      2.433884
PRCP     53.095327
WESD      0.000000
WESF      0.000000
Name: DATA_VALUE, dtype: float64

12.1.5. Exercise: find the station with the highest number of snow days#

Write a function that receives the name of a observation (e.g. PRCP) from the NOAA GHCN-D dataset, and returns the ID of the station(s) with the highest number of days that the target observation has been more than 0.

Use this function to find the station that has the highest number of snow days across years 2020-2024.

def highest_observation_days(ddf, var):
    var_df = ddf[ddf["ELEMENT"].isin([var])]
    var_df_positive = var_df[var_df["DATA_VALUE"]>0]
    station_counts = var_df_positive["ID"].value_counts(sort=True).compute()
    
    print(f"Station {station_counts.index[0]} has the highest number of days with a positive {var} value at {station_counts.iloc[0]} days")
highest_observation_days(large_df, "SNOW")
Station USW00014755 has the highest number of days with a positive SNOW value at 663 days