Implementation der Cross-Task-Communication (XCom) in STACKIT Workflows
Die TaskFlow-API von Airflow lässt sich nahtlos in XCom integrieren. Sie können Daten automatisch zwischen Tasks übergeben oder bei Bedarf explizit abrufen. In diesem Tutorial lernen Sie, wie Sie XCom verwenden, um Werte auf zwei Arten zwischen Tasks zu teilen und einen Transformationsschritt dazwischen einzufügen.
-
Erstellen Sie einen DAG in Ihrem Projekt
dags/my_xcom_dag.pyfrom datetime import datetimefrom airflow.decorators import dag, task@dag(dag_id="02_xcom",start_date=datetime(2024, 1, 1),schedule=None,catchup=False,tags=["demo", "airflow", "xcom"],)def xcom_taskflow_demo():@task(task_id="produce_value")def produce() -> str:value = "data-from-upstream"print(f"[produce] Producing value: {value}")return value@task(task_id="consume_via_param")def consume_via_param(transformed_value: str):print(f"[consume_via_param] Received via param: {transformed_value}")@task(task_id="consume_via_pull")def consume_via_pull():from airflow.operators.python import get_current_contextti = get_current_context()["ti"]pulled = ti.xcom_pull(task_ids="produce_value", key="return_value")print(f"[consume_via_pull] Pulled from XCom: {pulled}")# Automatische Übergabeketteproduced = produce()consume_via_param(produced)# Explizite Pull-Kettepull_task = consume_via_pull()produced >> pull_taskdag = xcom_taskflow_demo()Funktionsweise
Abschnitt betitelt „Funktionsweise“produce_valuegibt einen String zurück; TaskFlow speichert diesen in XCom unter dem Keyreturn_value.consume_via_paramerhält den erzeugten Wert automatisch als Funktionsargument.consume_via_pullliest den ursprünglichen Wert explizit überti.xcom_pull(...)aus.
-
Pushen Sie den DAG in Ihre Umgebung und triggern Sie ihn in Airflow.
-
Öffnen Sie die Task-Logs:
consume_via_paramgibt den in Großbuchstaben umgewandelten Wert aus.consume_via_pullgibt den ursprünglichen Wert aus.
-
Überprüfen Sie in der Airflow-Benutzeroberfläche den XCom-Eintrag für
produce_value.
-
Nach der Ausführung haben Sie beide Arten der Arbeit mit XCom gesehen:
- Automatische Übergabe (über Funktionsargumente)
- Expliziter Abruf (über
xcom_pull)