How to add an EMR step in Airflow and wait until it finishes running

In this article, I am going to show you three things:

  • how to retrieve the EMR cluster id using the cluster name
  • how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
  • how to define an EmrStepSensor to wait until the EMR finishes processing

First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:

from airflow.contrib.hooks.aws_hook import AwsHook

aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')

Get cluster id by name

Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:

response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])

matching_clusters = list(
    filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)

if len(matching_clusters) == 1:
    cluster_id = matching_clusters[0]['Id']
else:
    # handle the error

Add a step to the EMR cluster

Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a PythonOperator. In this case, it will automatically store the output in XCOM, so we can use it in the EmrStepSensor later.

Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!

def run_spark_job():
    # here is the code that retrieves the cluster id

    step_args = f'spark-submit --master yarn --deploy-mode client ' \
        f'--class name.mikulskibartosz.SparkJob ' \
        f's3://bucket_name/spark_job.jar ' \
        f'--job_parameter something'

After that, I use the step_args in the Args part of a EMR step definition:

step_configuration = {
    'Name': 'step_name',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': step_args
    }
}

In the end, I use the add_job_flow_steps function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:

step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])

In the last line, I extract the step id from the step_ids and return a tuple that contains both the cluster id and the step id:

return cluster_id, step_ids['StepIds'][0]

Wait for the result

Let’s assume that I ran the function using a PythonOperator, so the returned values are available in XCOM. In this case, I can define an EmrStepSensor that pauses the DAG until the EMR step finishes processing.

If the task that executes the run_spark_job function is called run_emr_step_task I can retrieve the cluster id by requesting the return_value of that task and selecting the first element of the tuple:

task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]

Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:

wait_for_it = EmrStepSensor(
    task_id='wait_for_it',
    job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}" 
    step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}" 
    dag=dag
)
Older post

How to use Virtualenv to prepare a separate environment for Python function running in Airflow

How to use the PythonVirtualenvOperator in Airflow

Newer post

Use HttpSensor to pause an Airflow DAG until a website is available

Pause an Airflow DAG until an HTTP endpoint returns 200 OK