How to check whether a YARN application has finished
It is relatively easy to start a new YARN application in Airflow, but how can we check whether it has finished running? In this tutorial, I show how to configure the PythonSensor
to access the YARN cluster using SSH, list all the running applications, and check whether it still runs.
First, we have to define a PythonSensor
. In this example, I use an Airflow template to pass the YARN command to the Python function. Later, I will check whether the list of YARN applications contains that string:
1
2
3
4
5
6
7
8
9
wait_for_yarn = PythonSensor(
task_id='wait_for_yarn',
python_callable=the_python_function,
op_kwargs={
'yarn_app': 'here is the YARN command'
},
timeout=YARN_TIMEOUT,
dag=dag
)
I have to define the the_python_function
, which should return True when the application has finished running or False if it is still being executed:
1
2
def the_python_function(yarn_app: str):
...
In the function, we have to connect to the EMR cluster using SSH and call the yarn application --list
command:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def the_python_function(yarn_app: str):
ssh = SSHHook(ssh_conn_id=EMR_CONNECTION_ID)
ssh_client = None
try:
ssh_client = ssh.get_conn()
ssh_client.load_system_host_keys()
stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list')
output_lines = stdout.readlines()
... # here we will process the output_lines
finally:
if ssh_client:
ssh_client.close()
The yarn application --list
command returns YARN applications that have not finished (or failed) yet. Therefore, now, I have to iterate over the output_lines
to check whether they contain the yarn_app
. If such a line exists, the YARN application is still running, and the sensor function returns False:
1
2
3
4
5
# put this between "output_lines = stdout.readlines()" and "finally"
for line in output_lines:
if yarn_app in line:
return False
In most cases, we will want to distinguish between an application that finished successfully and those that failed or were killed. Because of that, we must call the yarn application
command again to retrieve the failed commands and check whether the list contains the yarn_app
. In this case, the function will raise an exception and fail the Airflow task:
1
2
3
4
5
6
7
8
# put this after the previous code snippet but before "finally"
stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list -appStates FAILED,KILLED')
output_lines = stdout.readlines()
for line in output_lines:
if yarn_app in line:
raise Exception(f'{yarn_app} has failed')
If the function has not returned False or raised an exception yet, it means that the application finished successfully, and we must return True:
1
2
3
# this can be the last line of the function, outside the try...finally block
return True
You may also like
- How to use Virtualenv to prepare a separate environment for Python function running in Airflow
- Remove a directory from S3 using Airflow S3Hook
- Get an XCom value in the Airflow on_failure_callback function
- How to delay an Airflow DAG until a given hour using the DateTimeSensor
- Why my Airflow tasks got stuck in "no_status" and how I fixed it
Remember to share on social media! If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.
If you want to contact me, send me a message on LinkedIn or Twitter.
Would you like to have a call and talk? Please schedule a meeting using this link.