Implementing Cross Task Communication (XCom) in STACKIT Workflows
Diese Seite ist noch nicht in deiner Sprache verfügbar. Englische Seite aufrufen
Airflow’s TaskFlow API integrates seamlessly with XCom. You can pass data automatically between tasks or explicitly pull it when needed. In this tutorial you’ll learn how to use XCom to share values between tasks in two ways and insert a transformation step in between.
-
Create a DAG in your project
dags/my_xcom_dag.pyfrom datetime import datetimefrom airflow.decorators import dag, task@dag(dag_id="02_xcom",start_date=datetime(2024, 1, 1),schedule=None,catchup=False,tags=["demo", "airflow", "xcom"],)def xcom_taskflow_demo():@task(task_id="produce_value")def produce() -> str:value = "data-from-upstream"print(f"[produce] Producing value: {value}")return value@task(task_id="consume_via_param")def consume_via_param(transformed_value: str):print(f"[consume_via_param] Received via param: {transformed_value}")@task(task_id="consume_via_pull")def consume_via_pull():from airflow.operators.python import get_current_contextti = get_current_context()["ti"]pulled = ti.xcom_pull(task_ids="produce_value", key="return_value")print(f"[consume_via_pull] Pulled from XCom: {pulled}")# Automatic pass chainproduced = produce()consume_via_param(produced)# Explicit pull chainpull_task = consume_via_pull()produced >> pull_taskdag = xcom_taskflow_demo()How it works
Section titled “How it works”produce_valuereturns a string; TaskFlow stores it in XCom under keyreturn_value.consume_via_paramgets the produced value automatically as a function argument.consume_via_pullreads the original value explicitly viati.xcom_pull(...).
-
Push the DAG to your environment and trigger it in Airflow.
-
Open the task logs:
consume_via_paramprints the uppercased value.consume_via_pullprints the original value.
-
In the Airflow UI, inspect the XCom entry for
produce_value.
After execution you’ll have seen both ways of working with XCom:
- Automatic pass (via function arguments)
- Explicit pull (via
xcom_pull)
-