Zum Inhalt springen

Implementation der Cross-Task-Communication (XCom) in STACKIT Workflows

Die TaskFlow-API von Airflow lässt sich nahtlos in XCom integrieren. Sie können Daten automatisch zwischen Tasks übergeben oder bei Bedarf explizit abrufen. In diesem Tutorial lernen Sie, wie Sie XCom verwenden, um Werte auf zwei Arten zwischen Tasks zu teilen und einen Transformationsschritt dazwischen einzufügen.

  1. Erstellen Sie einen DAG in Ihrem Projekt

    dags/my_xcom_dag.py

    from datetime import datetime
    from airflow.decorators import dag, task
    @dag(
    dag_id="02_xcom",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
    tags=["demo", "airflow", "xcom"],
    )
    def xcom_taskflow_demo():
    @task(task_id="produce_value")
    def produce() -> str:
    value = "data-from-upstream"
    print(f"[produce] Producing value: {value}")
    return value
    @task(task_id="consume_via_param")
    def consume_via_param(transformed_value: str):
    print(f"[consume_via_param] Received via param: {transformed_value}")
    @task(task_id="consume_via_pull")
    def consume_via_pull():
    from airflow.operators.python import get_current_context
    ti = get_current_context()["ti"]
    pulled = ti.xcom_pull(task_ids="produce_value", key="return_value")
    print(f"[consume_via_pull] Pulled from XCom: {pulled}")
    # Automatische Übergabekette
    produced = produce()
    consume_via_param(produced)
    # Explizite Pull-Kette
    pull_task = consume_via_pull()
    produced >> pull_task
    dag = xcom_taskflow_demo()
    • produce_value gibt einen String zurück; TaskFlow speichert diesen in XCom unter dem Key return_value.
    • consume_via_param erhält den erzeugten Wert automatisch als Funktionsargument.
    • consume_via_pull liest den ursprünglichen Wert explizit über ti.xcom_pull(...) aus.
  2. Pushen Sie den DAG in Ihre Umgebung und triggern Sie ihn in Airflow.

    • Öffnen Sie die Task-Logs:

      • consume_via_param gibt den in Großbuchstaben umgewandelten Wert aus.
      • consume_via_pull gibt den ursprünglichen Wert aus.
    • Überprüfen Sie in der Airflow-Benutzeroberfläche den XCom-Eintrag für produce_value.

Nach der Ausführung haben Sie beide Arten der Arbeit mit XCom gesehen:

  • Automatische Übergabe (über Funktionsargumente)
  • Expliziter Abruf (über xcom_pull)