
How to use Variables and XCom in Apache Airflow?
It is said that Apache Airflow is CRON on steroids. It is gaining popularity among tools for ETL orchestration (Scheduling, managing and monitoring tasks). The tasks are defined as Directed Acyclic Graph (DAG), in which they exchange information. In the entry you will learn how to use Variables and XCom in Apache Airflow.
The Environment
In case of Apache Airflow, the puckel/docker-airflow version works well. Most often I use docker-compose-LocalExecutor.yml
variant.
sudo docker-compose -f docker-compose-LocalExecutor.yml up -d
Why Variables and XCom?
Variables and XCom are like variables used within the Apache Airflow environment.
Variables are a kind of global variable. If a value is used by many DAGs (and you don’t want to edit N files if you change it), consider adding it to Variables.
XCom (from cross-communication) are messages allowing to exchange data between tasks. They are defined as key, value, timestamp and task/DAG id. Any object that can be pickled
can be used as XCom.
Let’s say that you wrote an application in Apache Spark, which saves the effects of the work in a directory on HDFS/S3. The path must be given as an argument for this application, and it is generated by the Python script, which your teammate wrote. Later on, subsequent tasks download this data and make something with it. A parameter with a path circulates everywhere. This is what XCom is for 😁.
WARNING! Do not use XCom to send a lot of data! These values are stored in the database used by Apache Airflow. Throwing giant Numpy objects is a common case of incorrect use of XCom.
Example
DAG’s tasks are simple:
- Download (and if it does not exist, generate) a value from Variables
- Create another value from it and add to XCom
- Iterate the Variables value and save it
- Download the date with BashOperator and add it to XCom
- Display both values in the console on the remote machine using SSHOperator
Preparation of Variables and XCom
Variables support is available through Variable, precisely get
and set
methods. In case of XCom, you need the context (provide_context=True
). The xcom_push
method is used to save the values to XCom. Below is the code of the first task.
...
def setup_var_and_xcom(**kwargs):
ti = kwargs['ti']
iterator = int(Variable.get('my_iterator', default_var=0))
ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
iterator += 1
Variable.set('my_iterator', iterator)
t1 = PythonOperator(
task_id='setup_var_and_xcom',
python_callable=setup_var_and_xcom,
provide_context=True,
dag=dag
)
...
Passing the BashOperator result to XCom
According to the documentation, the xcom_push
argument in the operator allows you to save the last line from stdout to XCom.
...
t2 = BashOperator(
task_id="get_date",
bash_command="date",
xcom_push=True,
dag=dag
)
...
Displaying XCom in the remote console
In this case we will use SSHOperator in combination with Jinja Templating. We will inject the values previously added to XCom. One value is taken by the key, the other by the job id.
...
t3 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='ssh_xcom_check',
command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
dag=dag)
...

Effects
All transmitted values were correctly displayed in the last task.

[2020-11-22 18:22:59,048] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,047] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-11-22 18:22:59,065] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,065] {{transport.py:1819}} INFO - Authentication (password) successful!
[2020-11-22 18:22:59,065] {{ssh_operator.py:109}} INFO - Running command: echo "something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC"
[2020-11-22 18:22:59,767] {{ssh_operator.py:143}} INFO - something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC
[2020-11-22 18:22:59,780] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=var_xcom_example, task_id=ssh_xcom_check, execution_date=20201122T182252, start_date=20201122T182258, end_date=20201122T182259
[2020-11-22 18:23:08,905] {{logging_mixin.py:112}} INFO - [2020-11-22 18:23:08,905] {{local_task_job.py:103}} INFO - Task exited with return code 0
Created XComs are visible in the Apache Airflow UI

The same applies to Variables.

Code
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
default_args = {
"owner": "Maciej Szymczyk",
"depends_on_past": False,
"start_date": datetime(2020, 9, 27),
"email": ["maciej@wiadrodanych.pl"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=1)
}
dag = DAG("var_xcom_example",
default_args=default_args,
schedule_interval='@daily',
catchup=False)
def setup_var_and_xcom(**kwargs):
ti = kwargs['ti']
iterator = int(Variable.get('my_iterator', default_var=0))
ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
iterator += 1
Variable.set('my_iterator', iterator)
t1 = PythonOperator(
task_id='setup_var_and_xcom',
python_callable=setup_var_and_xcom,
provide_context=True,
dag=dag
)
t2 = BashOperator(
task_id="get_date",
bash_command="date",
xcom_push=True,
dag=dag
)
t3 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='ssh_xcom_check',
command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
dag=dag)
t1 >> t2 >> t3
Conclusions
And this way you can create a simple DAG in Apache Airflow using XCom and Variables. In the future I’ll try to give some more life example (like Apache Spark).