How to check whether a YARN application has finished

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (77/100)

It is relatively easy to start a new YARN application in Airflow, but how can we check whether it has finished running? In this tutorial, I show how to configure the PythonSensor to access the YARN cluster using SSH, list all the running applications, and check whether it still runs.

First, we have to define a PythonSensor. In this example, I use an Airflow template to pass the YARN command to the Python function. Later, I will check whether the list of YARN applications contains that string:

1
2
3
4
5
6
7
8
9
wait_for_yarn = PythonSensor(
    task_id='wait_for_yarn',
    python_callable=the_python_function,
    op_kwargs={
        'yarn_app': 'here is the YARN command'
    },
    timeout=YARN_TIMEOUT,
    dag=dag
)

I have to define the the_python_function, which should return True when the application has finished running or False if it is still being executed:

1
2
def the_python_function(yarn_app: str):
    ...

In the function, we have to connect to the EMR cluster using SSH and call the yarn application --list command:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def the_python_function(yarn_app: str):
    ssh = SSHHook(ssh_conn_id=EMR_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list')
        output_lines = stdout.readlines()

        ... # here we will process the output_lines

    finally:
        if ssh_client:
            ssh_client.close()

The yarn application --list command returns YARN applications that have not finished (or failed) yet. Therefore, now, I have to iterate over the output_lines to check whether they contain the yarn_app. If such a line exists, the YARN application is still running, and the sensor function returns False:

1
2
3
4
5
# put this between "output_lines = stdout.readlines()" and "finally"

for line in output_lines:
    if yarn_app in line:
        return False

In most cases, we will want to distinguish between an application that finished successfully and those that failed or were killed. Because of that, we must call the yarn application command again to retrieve the failed commands and check whether the list contains the yarn_app. In this case, the function will raise an exception and fail the Airflow task:

1
2
3
4
5
6
7
8
# put this after the previous code snippet but before "finally"

stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list -appStates FAILED,KILLED')
output_lines = stdout.readlines()

for line in output_lines:
    if yarn_app in line:
        raise Exception(f'{yarn_app} has failed')

If the function has not returned False or raised an exception yet, it means that the application finished successfully, and we must return True:

1
2
3
# this can be the last line of the function, outside the try...finally block

return True

Subscribe to the newsletter and join the free email course.


Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * MLOps Engineer / data engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.