Run a command on a remote server using SSH in Airflow
In this article, I show how to use the SSHHook
in a PythonOperator
to connect to a remote server from Airflow using SSH and execute a command.
First, I have to define the SSH connection in Airflow because I will pass the connection parameters using the Airflow connection id instead of defining the host, port, username, and password in the Python code.
When that part is done, I can define the function that connects to SSH:
1
2
3
from airflow.contrib.hooks.ssh_hook import SSHHook
ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
In the next step, I open a new connection and execute the command (in this example, I will use touch
to create a new file). Creating a new connection, however, is not enough. We have to do it in the try...finally
block because the SSHHook
does not close it automatically when we no longer need it:
1
2
3
4
5
6
7
8
ssh_client = None
try:
ssh_client = ssh.get_conn()
ssh_client.load_system_host_keys()
ssh_client.exec_command('touch file_name')
finally:
if ssh_client:
ssh_client.close()
In the try
block, I called the load_sytem_host_keys
function to import the OpenSSH known_hosts
file, which contains the remote hosts’ identifiers and is used as a rudiment method of man-in-the-middle attack prevention.
After that, I executed the exec_command
function to create a new file. In this example, I don’t care about the server response, so I do not assign the values returned by exec_command
to any variable.
If I wanted to get a response, I would have to assign the returned values to three variables:
1
stdin, stdout, stderr = ssh_client.exec_command('touch file_name')
Parsing machine learning logs with Ahana, a managed Presto service, and Cube, a headless BI solution

Check out my article published on the Cube.dev blog!
SSHHook in PythonOperator
In this article, I want to show how to use the SSHHook
with the PythonOperator
. Therefore, I have to put the whole code in a function:
1
2
3
4
5
6
7
8
9
10
def create_the_file():
ssh = SSHHook(ssh_conn_id=AIRFLOW_CONNECTION_ID)
ssh_client = None
try:
ssh_client = ssh.get_conn()
ssh_client.load_system_host_keys()
ssh_client.exec_command('touch file_name')
finally:
if ssh_client:
ssh_client.close()
and define a PythonOperator
that calls my function:
1
2
3
4
5
call_ssh_task = PythonOperator(
task_id='call_ssh_task',
python_callable=create_the_file,
dag=dag
)
You may also like
Bartosz Mikulski
- Data/MLOps engineer by day
- DevRel/copywriter by night
- Python and data engineering trainer
- Conference speaker
- Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
- Twitter: @mikulskibartosz