12. Introduction to Dask DataFrame#
Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency.
Many of existing methods from pandas API are available in Dask DataFrame. Checkout this section of the documentation to learn more about these. In general, computations that are parallelizable are implemented in Dask DataFrame.
In this lecture, you will learn to use Dask DataFrame to analyze large tabular climate data.
12.1. Analyzing Multiple Large CSV files using Dask Data Frame#
For this tutorial, we will use the NOAA Global Historical Climatology Network Daily (GHCN-D) data available on AWS S3. You can read more about the data on Registry of Open Data on AWS here.
More information about the dataset, including the metadata descriptions, is available on NOAA’s website.
GHCN-D contains daily observations over global land areas. It contains station-based measurements from land-based stations worldwide, about two thirds of which are for precipitation measurement only. Some data are more than 175 years old.
This dataset is very large and to analyze it within Python you need to use Dask Dataframe.
12.1.1. Download Data from AWS S3 bucket#
You can download the dataset from AWS S3 bucket using the following commands. This dataset does not require an AWS account (hence the UNSIGNED
value should be passed to the S3 client should be passed).
import boto3
import os
from botocore import UNSIGNED
from botocore.client import Config
def download_s3_objects_no_auth(bucket_name, download_path, prefix, substring, aws_region = None):
"""
Download all objects from a public S3 bucket (no authentication) that contain a specific prefix and substring in their keys.
Parameters
----------
bucket_name : str
The name of the S3 bucket.
download_path : str
Local directory where the files will be downloaded.
prefix : str
Characters that are required to be at the begining of the S3 object keys.
substring : str
The substring to search for in the S3 object keys.
aws_region : str
AWS region where the S3 bucket is located (optional).
Returns
-------
Donwloads all the files that match the search criteria into the download_path
"""
# Initialize the S3 client with no request signing (public bucket)
s3_client = boto3.client('s3', config = Config(signature_version=UNSIGNED), region_name = aws_region)
# Ensure the download path exists
if not os.path.exists(download_path):
os.makedirs(download_path)
# List all objects in the bucket that contain the prefix
response = s3_client.list_objects_v2(Bucket = bucket_name, Prefix = prefix)
if 'Contents' not in response:
print(f"No objects found in the bucket '{bucket_name}'.")
return
# Loop through objects and download those that contain the substring
for obj in response['Contents']:
key = obj['Key']
if substring in key:
local_filename = os.path.join(download_path, key.split('/')[-1])
if not os.path.exists(local_filename):
print(f"Downloading {key} to {local_filename}...")
s3_client.download_file(bucket_name, key, local_filename)
print(f"Downloaded: {local_filename}")
else:
print(f"Object {key} exists at {local_filename}. Download skipped.")
download_s3_objects_no_auth(bucket_name="noaa-ghcn-pds", download_path="./data", prefix="csv/by_year/", substring="202")
Object csv/by_year/2020.csv exists at ./data/2020.csv. Download skipped.
Object csv/by_year/2021.csv exists at ./data/2021.csv. Download skipped.
Object csv/by_year/2022.csv exists at ./data/2022.csv. Download skipped.
Object csv/by_year/2023.csv exists at ./data/2023.csv. Download skipped.
Object csv/by_year/2024.csv exists at ./data/2024.csv. Download skipped.
12.1.2. Import Packages#
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
print(client.dashboard_link)
http://127.0.0.1:8787/status
2024-10-29 03:08:29,631 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle c13f174aa9c7c055892846fcec63e0eb initialized by task ('shuffle-transfer-c13f174aa9c7c055892846fcec63e0eb', 4) executed on worker tcp://127.0.0.1:37029
2024-10-29 03:08:40,665 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle c13f174aa9c7c055892846fcec63e0eb deactivated due to stimulus 'task-finished-1730171320.6634696'
2024-10-29 03:09:08,926 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 35cc973c67f92223b02f04c8bfa63774 initialized by task ('shuffle-transfer-35cc973c67f92223b02f04c8bfa63774', 124) executed on worker tcp://127.0.0.1:41279
2024-10-29 03:09:21,234 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 35cc973c67f92223b02f04c8bfa63774 deactivated due to stimulus 'task-finished-1730171361.2319715'
2024-10-29 03:12:32,850 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 94) executed on worker tcp://127.0.0.1:44969
2024-10-29 03:12:47,017 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730171567.0173345'
2024-10-29 03:14:46,939 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 140) executed on worker tcp://127.0.0.1:41279
2024-10-29 03:15:01,245 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730171701.2428126'
2024-10-29 03:19:21,715 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 30) executed on worker tcp://127.0.0.1:38473
2024-10-29 03:19:36,353 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730171976.3521986'
2024-10-29 03:19:50,283 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 140) executed on worker tcp://127.0.0.1:41279
2024-10-29 03:20:04,652 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730172004.6500053'
2024-10-29 03:20:48,104 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a initialized by task ('shuffle-transfer-528d292f143ede867c3848e6dd33cc7a', 21) executed on worker tcp://127.0.0.1:38473
2024-10-29 03:21:02,444 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 528d292f143ede867c3848e6dd33cc7a deactivated due to stimulus 'task-finished-1730172062.443373'
12.1.3. Read One CSV file#
Let’s first load one CSV file and see how Dask Dataframe works.
df = dd.read_csv("./data/2023.csv", dtype = {"Q_FLAG" : "object"})
You can check the number of partitions that Dask by defualt selects.
df.npartitions
16
To change the number of partitions you need to define the blocksize
in the read_csv
function:
df = dd.read_csv("./data/2023.csv", dtype = {"Q_FLAG" : "object"}, blocksize=25e6)
df.npartitions
43
The following line is an unnecessary step and you should not run it regularly. This will try to load all the data into memory which can easily go beyond your memory limit.
# DO NOT RUN
# df.compute()
But if you would like to load a small portion of the data, you can load a few lines of the dataframe as following:
df.loc[:100, :].compute()
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
0 | AGM00060430 | 20230101 | TMIN | 101 | NaN | NaN | S | NaN |
1 | AGM00060430 | 20230101 | PRCP | 0 | NaN | NaN | S | NaN |
2 | AGM00060430 | 20230101 | TAVG | 148 | H | NaN | S | NaN |
3 | AGM00060437 | 20230101 | TMIN | 87 | NaN | NaN | S | NaN |
4 | AGM00060437 | 20230101 | PRCP | 0 | NaN | NaN | S | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
96 | USC00424467 | 20231011 | SNWD | 0 | NaN | NaN | 7 | 1700.0 |
97 | USC00424508 | 20231011 | TMAX | 256 | NaN | NaN | 7 | 1700.0 |
98 | USC00424508 | 20231011 | TMIN | 56 | NaN | NaN | 7 | 1700.0 |
99 | USC00424508 | 20231011 | TOBS | 194 | NaN | NaN | 7 | 1700.0 |
100 | USC00424508 | 20231011 | PRCP | 0 | NaN | NaN | 7 | 1700.0 |
4343 rows × 8 columns
df
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
npartitions=43 | ||||||||
string | int64 | string | int64 | string | string | string | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... |
As you see, df
is empty again. This is because Dask does not store the outputs of the df.compute()
back in df
. If you need to keep these values, you should instead run df = df.compute()
.
12.1.4. Read Multiple CSVs#
Here, we will define a new df and load multiple CSV files.
large_df = dd.read_csv("./data/*.csv", dtype = {"Q_FLAG" : "object"}, blocksize=25e6)
large_df.npartitions
252
large_df
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
npartitions=252 | ||||||||
string | int64 | string | int64 | string | string | string | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... |
Let’s calculate the mean of each type of observation in whole dataset.
mean_values = large_df.groupby("ELEMENT")["DATA_VALUE"].mean()
mean_values
Dask Series Structure:
npartitions=1
float64
...
Dask Name: getitem, 4 expressions
Expr=((ReadCSV(03735e5)[['ELEMENT', 'DATA_VALUE']]).mean(observed=False, chunk_kwargs={'numeric_only': False}, aggregate_kwargs={'numeric_only': False}, _slice='DATA_VALUE'))['DATA_VALUE']
mean_values.compute()
ELEMENT
SN56 65.149429
TAVG 109.644308
WDF5 200.575220
SN31 117.909843
SX53 101.728783
...
PSUN 42.392204
SN32 143.052831
SN55 80.291063
SX51 167.196816
DASF 2.545455
Name: DATA_VALUE, Length: 75, dtype: float64
Next, we will select a station in Worcester, MA and calculate the mean for each observation. You can see the list of all stations on NOAA’s website here.
worcester_df = large_df[large_df["ID"].isin(["US1MAWR0097"])]
worcester_df
ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME | |
---|---|---|---|---|---|---|---|---|
npartitions=252 | ||||||||
string | int64 | string | int64 | string | string | string | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... |
worcester_mean = worcester_df.groupby("ELEMENT")["DATA_VALUE"].mean()
worcester_mean
Dask Series Structure:
npartitions=1
float64
...
Dask Name: getitem, 8 expressions
Expr=(((Filter(frame=ReadCSV(03735e5), predicate=Isin(frame=ReadCSV(03735e5)['ID'], values=_DelayedExpr(Delayed('delayed-1e99729721b067d61cb17db688716cf6')))))[['ELEMENT', 'DATA_VALUE']]).mean(observed=False, chunk_kwargs={'numeric_only': False}, aggregate_kwargs={'numeric_only': False}, _slice='DATA_VALUE'))['DATA_VALUE']
Now, we want to calculate the mean but we are interested to keep these values in memory. So we will assign the output to a new variable worcester_mean_values
:
worcester_mean_values = worcester_mean.compute()
worcester_mean_values
ELEMENT
SNWD 158.750000
SNOW 2.433884
PRCP 53.095327
WESD 0.000000
WESF 0.000000
Name: DATA_VALUE, dtype: float64
12.1.5. Exercise: find the station with the highest number of snow days#
Write a function that receives the name of a observation (e.g. PRCP
) from the NOAA GHCN-D dataset, and returns the ID of the station(s) with the highest number of days that the target observation has been more than 0.
Use this function to find the station that has the highest number of snow days across years 2020-2024.
def highest_observation_days(ddf, var):
var_df = ddf[ddf["ELEMENT"].isin([var])]
var_df_positive = var_df[var_df["DATA_VALUE"]>0]
station_counts = var_df_positive["ID"].value_counts(sort=True).compute()
print(f"Station {station_counts.index[0]} has the highest number of days with a positive {var} value at {station_counts.iloc[0]} days")
highest_observation_days(large_df, "SNOW")
Station USW00014755 has the highest number of days with a positive SNOW value at 663 days