Zum Inhalt springen

Using Data Assets for Data-Driven DAG Scheduling

Zuletzt aktualisiert am

STACKIT Workflows supports Apache Airflow’s Data Assets, letting you build data-driven pipelines: one DAG produces an asset, and downstream DAGs are automatically triggered when that asset changes — no polling, no timers. In this tutorial you’ll learn how to define assets, mark tasks as producers, and schedule consumer DAGs to react to asset updates.

  1. Create a DAG file in your project

    dags/my_data_assets_dag.py

    import json
    import pendulum
    from airflow.sdk import Asset, dag, task
    # ---------------------------------------------------------------------------
    # Data Assets
    #
    # Assets are identified by a URI. The URI is a logical label –
    # Airflow uses it only as a key to link producers to consumers.
    # No polling or external API calls are made against this URI.
    # ---------------------------------------------------------------------------
    orders_asset = Asset("s3://my-stackit-bucket/orders/daily.parquet")
    customers_asset = Asset("s3://my-stackit-bucket/customers/latest.csv")
    # ===========================================================================
    # Producer DAG – updates both assets
    # ===========================================================================
    @dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["demo", "data-assets"],
    dag_id="data_assets_producer",
    )
    def data_assets_producer():
    @task(outlets=[orders_asset])
    def produce_orders():
    orders = [
    {"order_id": 1001, "amount": 250.00, "customer": "Alice"},
    {"order_id": 1002, "amount": 125.50, "customer": "Bob"},
    {"order_id": 1003, "amount": 340.75, "customer": "Charlie"},
    ]
    print(f"Produced {len(orders)} orders:")
    print(json.dumps(orders, indent=2))
    return orders
    @task(outlets=[customers_asset])
    def produce_customers():
    customers = [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
    {"name": "Charlie", "email": "charlie@example.com"},
    ]
    print(f"Produced {len(customers)} customers:")
    print(json.dumps(customers, indent=2))
    return customers
    produce_orders() >> produce_customers()
    data_assets_producer()
    # ===========================================================================
    # Consumer DAG 1 – triggered when the orders asset is updated
    # ===========================================================================
    @dag(
    schedule=orders_asset,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["demo", "data-assets"],
    dag_id="data_assets_consumer_orders",
    )
    def data_assets_consumer_orders():
    @task()
    def process_orders():
    print("New order data detected! Running order processing pipeline...")
    print("Aggregating daily totals...")
    print("Order processing complete.")
    process_orders()
    data_assets_consumer_orders()
    # ===========================================================================
    # Consumer DAG 2 – triggered when BOTH assets have been updated
    # ===========================================================================
    @dag(
    schedule=(orders_asset & customers_asset),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["demo", "data-assets"],
    dag_id="data_assets_consumer_combined",
    )
    def data_assets_consumer_combined():
    @task()
    def join_orders_and_customers():
    print("Both orders AND customers data updated!")
    print("Joining datasets for the combined analytics report...")
    print("Combined report generated successfully.")
    join_orders_and_customers()
    data_assets_consumer_combined()
    • Assets (orders_asset, customers_asset)

      • Defined by a URI that acts as a logical label. Airflow never connects to or polls this URI — it is purely a key used to link producers to consumers.
      • When a producer task finishes successfully, the Airflow scheduler records the update in its metadata database and triggers any consumer DAGs scheduled on that asset.
    • Producer DAG (data_assets_producer)

      • schedule=None means it is triggered manually or by an external system.
      • outlets=[orders_asset] on produce_orders marks that task as updating orders_asset upon successful completion.
      • outlets=[customers_asset] on produce_customers does the same for customers_asset.
    • Consumer DAG 1 (data_assets_consumer_orders)

      • schedule=orders_asset causes this DAG to run automatically every time orders_asset is updated.
    • Consumer DAG 2 (data_assets_consumer_combined)

      • schedule=(orders_asset & customers_asset) requires both assets to have been updated before the DAG is triggered.
  2. Push the DAG file to your environment and trigger the producer in Airflow.

    1. Manually trigger data_assets_producer from the Airflow UI.

    2. Once the producer run completes, observe the automatic downstream triggers:

      • data_assets_consumer_orders starts as soon as produce_orders succeeds.
      • data_assets_consumer_combined starts only after both produce_orders and produce_customers have succeeded.
    3. Open the Datasets (or Data Assets) view in the Airflow UI to inspect the asset lineage graph and see which DAGs are registered as producers and consumers.

    After execution you’ll have seen two scheduling modes powered by Data Assets:

    • Single-asset trigger — a consumer reacts to one asset update
    • Multi-asset trigger — a consumer waits for all required assets to be updated before running