How to add an EMR step in Airflow and wait until it finishes running
In this article, I am going to show you three things:
- how to retrieve the EMR cluster id using the cluster name
- how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
- how to define an
EmrStepSensor
to wait until the EMR finishes processing
First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:
1
2
3
4
from airflow.contrib.hooks.aws_hook import AwsHook
aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')
Get cluster id by name
Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:
1
2
3
4
5
6
7
8
9
10
response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])
matching_clusters = list(
filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)
if len(matching_clusters) == 1:
cluster_id = matching_clusters[0]['Id']
else:
# handle the error
Add a step to the EMR cluster
Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a PythonOperator
. In this case, it will automatically store the output in XCOM, so we can use it in the EmrStepSensor
later.
Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!
1
2
3
4
5
6
7
def run_spark_job():
# here is the code that retrieves the cluster id
step_args = f'spark-submit --master yarn --deploy-mode client ' \
f'--class name.mikulskibartosz.SparkJob ' \
f's3://bucket_name/spark_job.jar ' \
f'--job_parameter something'
After that, I use the step_args
in the Args
part of a EMR step definition:
1
2
3
4
5
6
7
8
step_configuration = {
'Name': 'step_name',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': step_args
}
}
In the end, I use the add_job_flow_steps
function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:
1
step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])
In the last line, I extract the step id from the step_ids
and return a tuple that contains both the cluster id and the step id:
1
return cluster_id, step_ids['StepIds'][0]
Wait for the result
Let’s assume that I ran the function using a PythonOperator
, so the returned values are available in XCOM. In this case, I can define an EmrStepSensor
that pauses the DAG until the EMR step finishes processing.
If the task that executes the run_spark_job
function is called run_emr_step_task
I can retrieve the cluster id by requesting the return_value
of that task and selecting the first element of the tuple:
1
task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]
Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:
1
2
3
4
5
6
wait_for_it = EmrStepSensor(
task_id='wait_for_it',
job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}"
step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}"
dag=dag
)
Did you enjoy reading this article?
Would you like to learn more about leveraging AI to drive growth and innovation, software craft in data engineering, and MLOps?
Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!
You may also like
- Why does the DayOfWeekSensor exist in Airflow?
- How to conditionally skip tasks in an Airflow DAG
- Run a command on a remote server using SSH in Airflow
- How to find the Hive partition closest to a given date
- How to use Virtualenv to prepare a separate environment for Python function running in Airflow

Bartosz Mikulski
- MLOps engineer by day
- AI and data engineering consultant by night
- Python and data engineering trainer
- Conference speaker
- Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
- Twitter: @mikulskibartosz
- Mastodon: @mikulskibartosz@mathstodon.xyz