How to add an EMR step in Airflow and wait until it finishes running

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (57/100)

In this article, I am going to show you three things:

  • how to retrieve the EMR cluster id using the cluster name
  • how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
  • how to define an EmrStepSensor to wait until the EMR finishes processing

First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:

1
2
3
4
from airflow.contrib.hooks.aws_hook import AwsHook

aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')

Get cluster id by name

Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:

1
2
3
4
5
6
7
8
9
10
response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])

matching_clusters = list(
    filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)

if len(matching_clusters) == 1:
    cluster_id = matching_clusters[0]['Id']
else:
    # handle the error

Add a step to the EMR cluster

Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a PythonOperator. In this case, it will automatically store the output in XCOM, so we can use it in the EmrStepSensor later.

Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!

1
2
3
4
5
6
7
def run_spark_job():
    # here is the code that retrieves the cluster id

    step_args = f'spark-submit --master yarn --deploy-mode client ' \
        f'--class name.mikulskibartosz.SparkJob ' \
        f's3://bucket_name/spark_job.jar ' \
        f'--job_parameter something'

After that, I use the step_args in the Args part of a EMR step definition:

1
2
3
4
5
6
7
8
step_configuration = {
    'Name': 'step_name',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': step_args
    }
}

In the end, I use the add_job_flow_steps function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:

1
step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])

In the last line, I extract the step id from the step_ids and return a tuple that contains both the cluster id and the step id:

1
return cluster_id, step_ids['StepIds'][0]

Wait for the result

Let’s assume that I ran the function using a PythonOperator, so the returned values are available in XCOM. In this case, I can define an EmrStepSensor that pauses the DAG until the EMR step finishes processing.

If the task that executes the run_spark_job function is called run_emr_step_task I can retrieve the cluster id by requesting the return_value of that task and selecting the first element of the tuple:

1
task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]

Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:

1
2
3
4
5
6
wait_for_it = EmrStepSensor(
    task_id='wait_for_it',
    job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}" 
    step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}" 
    dag=dag
)

Subscribe to the newsletter and join the free email course.


Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.

Do you want to work with me at riskmethods?

REMOTE position (available in Poland or Germany)