Run a command on a remote server using SSH in Airflow

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (54/100)

In this article, I show how to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.

When that part is done, I can define the function that connects to SSH:

1
2
3
from airflow.contrib.hooks.ssh_hook import SSHHook

ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)

In the next step, I open a new connection and execute the command (in this example, I will use touch to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally block because the SSHHook does not close it automatically when we no longer need it:

1
2
3
4
5
6
7
8
ssh_client = None
try:
    ssh_client = ssh.get_conn()
    ssh_client.load_system_host_keys()
    ssh_client.exec_command('touch file_name')
finally:
    if ssh_client:
        ssh_client.close()

In the try block, I called the load_sytem_host_keys function to import the OpenSSH known_hosts file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.

After that, I executed the exec_command function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command to any variable.

If I wanted to get a response, I would have to assign the returned values to three variables:

1
stdin, stdout, stderr = ssh_client.exec_command('touch file_name')

SSHHook in PythonOperator

In this article, I want to show how to use the SSHHook with the PythonOperator. Therefore, I have to put the whole code in a function:

1
2
3
4
5
6
7
8
9
10
def create_the_file():
    ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        ssh_client.exec_command('touch file_name')
    finally:
        if ssh_client:
            ssh_client.close()

and define a PythonOperator that calls my function:

1
2
3
4
5
call_ssh_task = PythonOperator(
    task_id='call_ssh_task',
    python_callable=create_the_file,
    dag=dag
)

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.