How to add a manual step to an Airflow DAG using the JiraOperator

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (34/100)

How can we add a human action in the middle of an Airflow DAG? This is not an everyday use case, but it is also not something useless. Occasionally, we may need a human confirmation before executing code that may destroy data or our reputation.

For example, we may have a DAG that prepares a newsletter. The DAG’s last task sends it to the subscribers, but we want to wait until a manager approves the content before we send anything.

We can wait for a manual step also when we implement personal data deletion. Our DAG may gather all of the data to be removed, make a list of affected datasets, and send it to a person for final approval before everything gets deleted.

In all of those situations, we can use the JiraOperator to create a Jira ticket and the JiraSensor to wait until the ticket’s status changes to whatever value we use as confirmation.

Creating a new issue

First, we have to create a new ticket. For this, we import the JiraOperator, which gives us access to the Jira Python SDK.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from airflow.contrib.operators.jira_operator import JiraOperator

issue_dict = {
    'project': {'id': 123},
    'summary': 'Confirmation required',
    'description': 'Some description',
    'issuetype': {'name': 'Request'},
}

# assuming that your version of the API returns an Issue, not a dictionary
extract_issue_key = lambda issue: issue.id

create_jira_issue = JiraOperator(
    task_id='get_human_approval',
    jira_conn_id='connection_id',
    jira_method='create_issue',
    jira_method_args=issue_dict,
    result_processor=extract_issue_key
)

The issue key extracted by the function we provided will end up in the XCom result of this operator.

Waiting for the status

In the next step, we wait until the issue has a desired status using a JiraSensor:

1
2
3
4
5
6
7
sensor = JiraSensor(
    task_id='check_if_approved',
    jira_conn_id='connection_id',
    ticket_id="{{ task_instance.xcom.pull('get_human_approval', key='return_value') }}", 
    field='status',
    expected_value='Done'
)

Did you enjoy reading this article?
Would you like to learn more about leveraging AI to drive growth and innovation, software craft in data engineering, and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • MLOps engineer by day
  • AI and data engineering consultant by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
  • Mastodon: @mikulskibartosz@mathstodon.xyz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.