How to add a manual step to an Airflow DAG using the JiraOperator
How can we add a human action in the middle of an Airflow DAG? This is not an everyday use case, but it is also not something useless. Occasionally, we may need a human confirmation before executing code that may destroy data or our reputation.
For example, we may have a DAG that prepares a newsletter. The DAG’s last task sends it to the subscribers, but we want to wait until a manager approves the content before we send anything.
We can wait for a manual step also when we implement personal data deletion. Our DAG may gather all of the data to be removed, make a list of affected datasets, and send it to a person for final approval before everything gets deleted.
In all of those situations, we can use the JiraOperator to create a Jira ticket and the JiraSensor to wait until the ticket’s status changes to whatever value we use as confirmation.
Creating a new issue
First, we have to create a new ticket. For this, we import the JiraOperator, which gives us access to the Jira Python SDK.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from airflow.contrib.operators.jira_operator import JiraOperator
issue_dict = {
'project': {'id': 123},
'summary': 'Confirmation required',
'description': 'Some description',
'issuetype': {'name': 'Request'},
}
# assuming that your version of the API returns an Issue, not a dictionary
extract_issue_key = lambda issue: issue.id
create_jira_issue = JiraOperator(
task_id='get_human_approval',
jira_conn_id='connection_id',
jira_method='create_issue',
jira_method_args=issue_dict,
result_processor=extract_issue_key
)
The issue key extracted by the function we provided will end up in the XCom result of this operator.
Parsing machine learning logs with Ahana, a managed Presto service, and Cube, a headless BI solution

Check out my article published on the Cube.dev blog!
Waiting for the status
In the next step, we wait until the issue has a desired status using a JiraSensor:
1
2
3
4
5
6
7
sensor = JiraSensor(
task_id='check_if_approved',
jira_conn_id='connection_id',
ticket_id="{{ task_instance.xcom.pull('get_human_approval', key='return_value') }}",
field='status',
expected_value='Done'
)
You may also like
- Why does the DayOfWeekSensor exist in Airflow?
- How to use AWSAthenaOperator in Airflow to verify that a DAG finished successfully
- How to delay an Airflow DAG until a given hour using the DateTimeSensor
- How to prevent Airflow from backfilling old DAG runs
- How to set Airflow variables while creating a dev environment
Bartosz Mikulski
- Data/MLOps engineer by day
- DevRel/copywriter by night
- Python and data engineering trainer
- Conference speaker
- Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
- Twitter: @mikulskibartosz