Using Data Assets for Data-Driven DAG Scheduling
Zuletzt aktualisiert am
STACKIT Workflows supports Apache Airflow’s Data Assets, letting you build data-driven pipelines: one DAG produces an asset, and downstream DAGs are automatically triggered when that asset changes — no polling, no timers. In this tutorial you’ll learn how to define assets, mark tasks as producers, and schedule consumer DAGs to react to asset updates.
-
Create a DAG file in your project
dags/my_data_assets_dag.pyimport jsonimport pendulumfrom airflow.sdk import Asset, dag, task# ---------------------------------------------------------------------------# Data Assets## Assets are identified by a URI. The URI is a logical label –# Airflow uses it only as a key to link producers to consumers.# No polling or external API calls are made against this URI.# ---------------------------------------------------------------------------orders_asset = Asset("s3://my-stackit-bucket/orders/daily.parquet")customers_asset = Asset("s3://my-stackit-bucket/customers/latest.csv")# ===========================================================================# Producer DAG – updates both assets# ===========================================================================@dag(schedule=None,start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),catchup=False,tags=["demo", "data-assets"],dag_id="data_assets_producer",)def data_assets_producer():@task(outlets=[orders_asset])def produce_orders():orders = [{"order_id": 1001, "amount": 250.00, "customer": "Alice"},{"order_id": 1002, "amount": 125.50, "customer": "Bob"},{"order_id": 1003, "amount": 340.75, "customer": "Charlie"},]print(f"Produced {len(orders)} orders:")print(json.dumps(orders, indent=2))return orders@task(outlets=[customers_asset])def produce_customers():customers = [{"name": "Alice", "email": "alice@example.com"},{"name": "Bob", "email": "bob@example.com"},{"name": "Charlie", "email": "charlie@example.com"},]print(f"Produced {len(customers)} customers:")print(json.dumps(customers, indent=2))return customersproduce_orders() >> produce_customers()data_assets_producer()# ===========================================================================# Consumer DAG 1 – triggered when the orders asset is updated# ===========================================================================@dag(schedule=orders_asset,start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),catchup=False,tags=["demo", "data-assets"],dag_id="data_assets_consumer_orders",)def data_assets_consumer_orders():@task()def process_orders():print("New order data detected! Running order processing pipeline...")print("Aggregating daily totals...")print("Order processing complete.")process_orders()data_assets_consumer_orders()# ===========================================================================# Consumer DAG 2 – triggered when BOTH assets have been updated# ===========================================================================@dag(schedule=(orders_asset & customers_asset),start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),catchup=False,tags=["demo", "data-assets"],dag_id="data_assets_consumer_combined",)def data_assets_consumer_combined():@task()def join_orders_and_customers():print("Both orders AND customers data updated!")print("Joining datasets for the combined analytics report...")print("Combined report generated successfully.")join_orders_and_customers()data_assets_consumer_combined()How it works
Section titled “How it works”-
Assets (
orders_asset,customers_asset)- Defined by a URI that acts as a logical label. Airflow never connects to or polls this URI — it is purely a key used to link producers to consumers.
- When a producer task finishes successfully, the Airflow scheduler records the update in its metadata database and triggers any consumer DAGs scheduled on that asset.
-
Producer DAG (
data_assets_producer)schedule=Nonemeans it is triggered manually or by an external system.outlets=[orders_asset]onproduce_ordersmarks that task as updatingorders_assetupon successful completion.outlets=[customers_asset]onproduce_customersdoes the same forcustomers_asset.
-
Consumer DAG 1 (
data_assets_consumer_orders)schedule=orders_assetcauses this DAG to run automatically every timeorders_assetis updated.
-
Consumer DAG 2 (
data_assets_consumer_combined)schedule=(orders_asset & customers_asset)requires both assets to have been updated before the DAG is triggered.
-
-
Push the DAG file to your environment and trigger the producer in Airflow.
-
Manually trigger
data_assets_producerfrom the Airflow UI. -
Once the producer run completes, observe the automatic downstream triggers:
data_assets_consumer_ordersstarts as soon asproduce_orderssucceeds.data_assets_consumer_combinedstarts only after bothproduce_ordersandproduce_customershave succeeded.
-
Open the Datasets (or Data Assets) view in the Airflow UI to inspect the asset lineage graph and see which DAGs are registered as producers and consumers.
After execution you’ll have seen two scheduling modes powered by Data Assets:
- Single-asset trigger — a consumer reacts to one asset update
- Multi-asset trigger — a consumer waits for all required assets to be updated before running
-