Object Storage

This tutorial shows how to use the Object Storage API to manage objects that reside on object storage, like S3, gcs and azure blob storage. The API is introduced as part of Airflow 2.8.

The tutorial covers a simple pattern that is often used in data engineering and data science workflows: accessing a web api, saving and analyzing the result.

Prerequisites

To complete this tutorial, you need a few things:

  • DuckDB, an in-process analytical database, which can be installed by running pip install duckdb.

  • An S3 bucket, along with the Amazon provider including s3fs. You can install the provider package by running pip install apache-airflow-providers-amazon[s3fs]. Alternatively, you can use a different storage provider by changing the URL in the create_object_storage_path function to the appropriate URL for your provider, for example by replacing s3:// with gs:// for Google Cloud Storage, and installing a different provider.

  • pandas, which you can install by running pip install pandas.

Creating an ObjectStoragePath

The ObjectStoragePath is a path-like object that represents a path on object storage. It is the fundamental building block of the Object Storage API.

airflow/example_dags/tutorial_objectstorage.py[source]

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

The username part of the URL given to ObjectStoragePath should be a connection ID. The specified connection will be used to obtain the right credentials to access the backend. If it is omitted, the default connection for the backend will be used.

The connection ID can alternatively be passed in with a keyword argument:

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

This is useful when reusing a URL defined for another purpose (e.g. Asset), which generally does not contain a username part. The explicit keyword argument takes precedence over the URL’s username value if both are specified.

It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections will not be created until the path is used. This means that you can create the path in the global scope of your DAG and use it in multiple tasks.

Saving data to Object Storage

An ObjectStoragePath behaves mostly like a pathlib.Path object. You can use it to save and load data directly to and from object storage. So, a typical flow could look like this:

airflow/example_dags/tutorial_objectstorage.py[source]

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

The get_air_quality_data calls the API of the Finnish Meteorological Institute to obtain the air quality data for the region of Helsinki. It creates a Pandas DataFrame from the resulting json. It then saves the data to object storage and converts it on the fly to parquet.

The key of the object is automatically generated from the logical date of the task, so we could run this everyday and it would create a new object for each day. We concatenate this key with the base path to create the full path to the object. Finally, after writing the object to storage, we return the path to the object. This allows us to use the path in the next task.

Analyzing the data

In understanding the data, you typically want to analyze it. Duck DB is a great tool for this. It is an in-process analytical database that allows you to run SQL queries on data in memory.

Because the data is already in parquet format, we can use the read_parquet and because both Duck DB and the ObjectStoragePath use fsspec we can register the backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the fs property for this. We can then use the register_filesystem function from Duck DB to register the backend with Duck DB.

In Duck DB we can then create a table from the data and run a query on it. The query is returned as a dataframe, which could be used for further analysis or saved to object storage.

airflow/example_dags/tutorial_objectstorage.py[source]

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

You might note that the analyze function does not know the original path to the object, but that it is passed in as a parameter and obtained through XCom. You do not need to re-instantiate the Path object. Also the connection details are handled transparently.

Putting it all together

The final DAG looks like this, which wraps things so that we can run it:

airflow/example_dags/tutorial_objectstorage.py[source]


import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

Was this entry helpful?