Image for post
Image for post

How to use Variables and XCom in Apache Airflow?

It is said that Apache Airflow is CRON on steroids. It is gaining popularity among tools for ETL orchestration (Scheduling, managing and monitoring tasks). The tasks are defined as Directed Acyclic Graph (DAG), in which they exchange information. In the entry you will learn how to use Variables and XCom in Apache Airflow.

The Environment

In case of Apache Airflow, the puckel/docker-airflow version works well. Most often I use docker-compose-LocalExecutor.yml variant.

sudo docker-compose -f docker-compose-LocalExecutor.yml up -d

Why Variables and XCom?

Variables and XCom are like variables used within the Apache Airflow environment.

Variables are a kind of global variable. If a value is used by many DAGs (and you don’t want to edit N files if you change it), consider adding it to Variables.

XCom (from cross-communication) are messages allowing to exchange data between tasks. They are defined as key, value, timestamp and task/DAG id. Any object that can be pickled can be used as XCom.

Let’s say that you wrote an application in Apache Spark, which saves the effects of the work in a directory on HDFS/S3. The path must be given as an argument for this application, and it is generated by the Python script, which your teammate wrote. Later on, subsequent tasks download this data and make something with it. A parameter with a path circulates everywhere. This is what XCom is for 😁.

WARNING! Do not use XCom to send a lot of data! These values are stored in the database used by Apache Airflow. Throwing giant Numpy objects is a common case of incorrect use of XCom.

Example

DAG’s tasks are simple:

  • Download (and if it does not exist, generate) a value from Variables
  • Create another value from it and add to XCom
  • Iterate the Variables value and save it
  • Download the date with BashOperator and add it to XCom
  • Display both values in the console on the remote machine using SSHOperator

Preparation of Variables and XCom

Variables support is available through Variable, precisely get and set methods. In case of XCom, you need the context (provide_context=True). The xcom_push method is used to save the values to XCom. Below is the code of the first task.

...
def setup_var_and_xcom(**kwargs):
ti = kwargs['ti']
iterator = int(Variable.get('my_iterator', default_var=0))
ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
iterator += 1
Variable.set('my_iterator', iterator)

t1 = PythonOperator(
task_id='setup_var_and_xcom',
python_callable=setup_var_and_xcom,
provide_context=True,
dag=dag
)
...

Passing the BashOperator result to XCom

According to the documentation, the xcom_push argument in the operator allows you to save the last line from stdout to XCom.

...
t2 = BashOperator(
task_id="get_date",
bash_command="date",
xcom_push=True,
dag=dag
)
...

Displaying XCom in the remote console

In this case we will use SSHOperator in combination with Jinja Templating. We will inject the values previously added to XCom. One value is taken by the key, the other by the job id.

...
t3 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='ssh_xcom_check',
command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
dag=dag)
...
Image for post
Image for post
The given Connection Id requires prior configuration

Effects

All transmitted values were correctly displayed in the last task.

Image for post
Image for post
[2020-11-22 18:22:59,048] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,047] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-11-22 18:22:59,065] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,065] {{transport.py:1819}} INFO - Authentication (password) successful!
[2020-11-22 18:22:59,065] {{ssh_operator.py:109}} INFO - Running command: echo "something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC"
[2020-11-22 18:22:59,767] {{ssh_operator.py:143}} INFO - something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC
[2020-11-22 18:22:59,780] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=var_xcom_example, task_id=ssh_xcom_check, execution_date=20201122T182252, start_date=20201122T182258, end_date=20201122T182259
[2020-11-22 18:23:08,905] {{logging_mixin.py:112}} INFO - [2020-11-22 18:23:08,905] {{local_task_job.py:103}} INFO - Task exited with return code 0

Created XComs are visible in the Apache Airflow UI

Image for post
Image for post

The same applies to Variables.

Image for post
Image for post

Code

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

default_args = {
"owner": "Maciej Szymczyk",
"depends_on_past": False,
"start_date": datetime(2020, 9, 27),
"email": ["maciej@wiadrodanych.pl"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=1)
}

dag = DAG("var_xcom_example",
default_args=default_args,
schedule_interval='@daily',
catchup=False)


def setup_var_and_xcom(**kwargs):
ti = kwargs['ti']
iterator = int(Variable.get('my_iterator', default_var=0))
ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
iterator += 1
Variable.set('my_iterator', iterator)

t1 = PythonOperator(
task_id='setup_var_and_xcom',
python_callable=setup_var_and_xcom,
provide_context=True,
dag=dag
)

t2 = BashOperator(
task_id="get_date",
bash_command="date",
xcom_push=True,
dag=dag
)

t3 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='ssh_xcom_check',
command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
dag=dag)

t1 >> t2 >> t3

Conclusions

And this way you can create a simple DAG in Apache Airflow using XCom and Variables. In the future I’ll try to give some more life example (like Apache Spark).

Software Developer, Big Data Engineer, Blogger (https://wiadrodanych.pl), Amateur Cyclists & Triathlete, @maciej_szymczyk

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store