# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING
from airflow.sdk import (
DAG,
Asset,
CronPartitionTimetable,
DailyMapper,
HourlyMapper,
IdentityMapper,
PartitionedAssetTimetable,
ProductMapper,
YearlyMapper,
asset,
task,
)
[docs]
team_a_player_stats = Asset(uri="file://incoming/player-stats/team_a.csv", name="team_a_player_stats")
[docs]
combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")
with DAG(
dag_id="ingest_team_a_player_stats",
schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
tags=["player-stats", "ingestion"],
):
"""Produce hourly partitioned stats for Team A."""
@task(outlets=[team_a_player_stats])
[docs]
def ingest_team_a_stats():
"""Materialize Team A player statistics for the current hourly partition."""
pass
ingest_team_a_stats()
@asset(
uri="file://incoming/player-stats/team_b.csv",
schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
tags=["player-stats", "ingestion"],
)
[docs]
def team_b_player_stats():
"""Produce hourly partitioned stats for Team B."""
pass
@asset(
uri="file://incoming/player-stats/team_c.csv",
schedule=CronPartitionTimetable("30 * * * *", timezone="UTC"),
tags=["player-stats", "ingestion"],
)
[docs]
def team_c_player_stats():
"""Produce hourly partitioned stats for Team C."""
pass
with DAG(
dag_id="clean_and_combine_player_stats",
schedule=PartitionedAssetTimetable(
assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
default_partition_mapper=HourlyMapper(),
),
catchup=False,
tags=["player-stats", "cleanup"],
):
"""
Combine hourly partitions from Team A, B and C into a single curated dataset.
This Dag demonstrates multi-asset partition alignment using HourlyMapper.
"""
@task(outlets=[combined_player_stats])
[docs]
def combine_player_stats(dag_run=None):
"""Merge the aligned hourly partitions into a combined dataset."""
if TYPE_CHECKING:
assert dag_run
print(dag_run.partition_key)
combine_player_stats()
@asset(
uri="file://analytics/player-stats/computed-player-odds.csv",
# Fallback to IdentityMapper if no partition_mapper is specified.
# If we want to other temporal mapper (e.g., HourlyMapper) here,
# make sure the input_format is changed since the partition_key is now in "%Y-%m-%dT%H" format
# instead of a valid timestamp
schedule=PartitionedAssetTimetable(assets=combined_player_stats),
tags=["player-stats", "odds"],
)
[docs]
def compute_player_odds():
"""
Compute player odds from the combined hourly statistics.
This asset is partition-aware and triggered by the combined stats asset.
"""
pass
with DAG(
dag_id="player_odds_quality_check_wont_ever_to_trigger",
schedule=PartitionedAssetTimetable(
assets=(combined_player_stats & team_a_player_stats & Asset.ref(name="team_b_player_stats")),
partition_mapper_config={
combined_player_stats: YearlyMapper(), # incompatible on purpose
team_a_player_stats: HourlyMapper(),
Asset.ref(name="team_b_player_stats"): HourlyMapper(),
},
),
catchup=False,
tags=["player-stats", "odds"],
):
"""
Demonstrate a partition mapper mismatch scenario.
The configured partition mapper transforms partition keys into formats
that never matches ("%Y" v.s. "%Y-%m-%dT%H), so the Dag will never trigger.
"""
@task
[docs]
def check_partition_alignment():
pass
check_partition_alignment()
[docs]
regional_sales = Asset(uri="file://incoming/sales/regional.csv", name="regional_sales")
with DAG(
dag_id="ingest_regional_sales",
schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
tags=["sales", "ingestion"],
):
"""Produce hourly regional sales data with composite partition keys."""
@task(outlets=[regional_sales])
[docs]
def ingest_sales():
"""Ingest regional sales data partitioned by region and time."""
pass
ingest_sales()
with DAG(
dag_id="aggregate_regional_sales",
schedule=PartitionedAssetTimetable(
assets=regional_sales,
default_partition_mapper=ProductMapper(IdentityMapper(), DailyMapper()),
),
catchup=False,
tags=["sales", "aggregation"],
):
"""
Aggregate regional sales using ProductMapper.
The ProductMapper splits the composite key "region|timestamp" and applies
IdentityMapper to the region segment and DailyMapper to the timestamp segment,
aligning hourly partitions to daily granularity per region.
"""
@task
[docs]
def aggregate_sales(dag_run=None):
"""Aggregate sales data for the matched region-day partition."""
if TYPE_CHECKING:
assert dag_run
print(dag_run.partition_key)
aggregate_sales()