How to add an EMR step in Airflow and wait until it finishes running

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (57/100)

In this article, I am going to show you three things:

  • how to retrieve the EMR cluster id using the cluster name
  • how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
  • how to define an EmrStepSensor to wait until the EMR finishes processing

First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:

1
2
3
4
from airflow.contrib.hooks.aws_hook import AwsHook

aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')

Get cluster id by name

Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:

1
2
3
4
5
6
7
8
9
10
response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])

matching_clusters = list(
    filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)

if len(matching_clusters) == 1:
    cluster_id = matching_clusters[0]['Id']
else:
    # handle the error

Add a step to the EMR cluster

Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a PythonOperator. In this case, it will automatically store the output in XCOM, so we can use it in the EmrStepSensor later.

Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!

1
2
3
4
5
6
7
def run_spark_job():
    # here is the code that retrieves the cluster id

    step_args = f'spark-submit --master yarn --deploy-mode client ' \
        f'--class name.mikulskibartosz.SparkJob ' \
        f's3://bucket_name/spark_job.jar ' \
        f'--job_parameter something'

After that, I use the step_args in the Args part of a EMR step definition:

1
2
3
4
5
6
7
8
step_configuration = {
    'Name': 'step_name',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': step_args
    }
}

In the end, I use the add_job_flow_steps function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:

1
step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])

In the last line, I extract the step id from the step_ids and return a tuple that contains both the cluster id and the step id:

1
return cluster_id, step_ids['StepIds'][0]

Wait for the result

Let’s assume that I ran the function using a PythonOperator, so the returned values are available in XCOM. In this case, I can define an EmrStepSensor that pauses the DAG until the EMR step finishes processing.

If the task that executes the run_spark_job function is called run_emr_step_task I can retrieve the cluster id by requesting the return_value of that task and selecting the first element of the tuple:

1
task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]

Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:

1
2
3
4
5
6
wait_for_it = EmrStepSensor(
    task_id='wait_for_it',
    job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}" 
    step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}" 
    dag=dag
)

Did you enjoy reading this article?
Would you like to learn more about leveraging AI to drive growth and innovation, software craft in data engineering, and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • MLOps engineer by day
  • AI and data engineering consultant by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
  • Mastodon: @mikulskibartosz@mathstodon.xyz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.