It is relatively easy to start a new YARN application in Airflow, but how can we check whether it has finished running? In this tutorial, I show how to configure the PythonSensor to access the YARN cluster using SSH, list all the running applications, and check whether it still runs.

First, we have to define a PythonSensor. In this example, I use an Airflow template to pass the YARN command to the Python function. Later, I will check whether the list of YARN applications contains that string:

wait_for_yarn = PythonSensor(
        'yarn_app': 'here is the YARN command'

I have to define the the_python_function, which should return True when the application has finished running or False if it is still being executed:

def the_python_function(yarn_app: str):

In the function, we have to connect to the EMR cluster using SSH and call the yarn application --list command:

def the_python_function(yarn_app: str):
    ssh = SSHHook(ssh_conn_id=EMR_CONNECTION_ID)
    ssh_client = None
        ssh_client = ssh.get_conn()
        stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list')
        output_lines = stdout.readlines()

        ... # here we will process the output_lines

        if ssh_client:

The yarn application --list command returns YARN applications that have not finished (or failed) yet. Therefore, now, I have to iterate over the output_lines to check whether they contain the yarn_app. If such a line exists, the YARN application is still running, and the sensor function returns False:

# put this between "output_lines = stdout.readlines()" and "finally"

for line in output_lines:
    if yarn_app in line:
        return False

In most cases, we will want to distinguish between an application that finished successfully and those that failed or were killed. Because of that, we must call the yarn application command again to retrieve the failed commands and check whether the list contains the yarn_app. In this case, the function will raise an exception and fail the Airflow task:

# put this after the previous code snippet but before "finally"

stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list -appStates FAILED,KILLED')
output_lines = stdout.readlines()

for line in output_lines:
    if yarn_app in line:
        raise Exception(f'{yarn_app} has failed')

If the function has not returned False or raised an exception yet, it means that the application finished successfully, and we must return True:

# this can be the last line of the function, outside the try...finally block

return True
Older post

How to use WHEN CASE queires in AWS Athena

Using conditions in AWS Athena queries

Newer post

How to check whether a regular expression matches a string in Hive

What is the equivalent of Athena/Presto regexp_like in Hive