Source code for astronomer.providers.sftp.hooks.sftp

from fnmatch import fnmatch
from typing import List, Optional

import asyncssh
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from asgiref.sync import sync_to_async


[docs]class SFTPHookAsync(BaseHook): """ Interact with an SFTP server via asyncssh package :param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server :param host: hostname of the SFTP server :param port: port of the SFTP server :param username: username used when authenticating to the SFTP server :param password: password used when authenticating to the SFTP server Can be left blank if using a key file :param known_hosts: path to the known_hosts file on the local file system If known_hosts is set to the literal "none", then no host verification is performed :param key_file: path to the client key file used for authentication to SFTP server :param passphrase: passphrase used with the key_file for authentication to SFTP server """ conn_name_attr = "ssh_conn_id" default_conn_name = "sftp_default" conn_type = "sftp" hook_name = "SFTP" default_known_hosts = '"~/.ssh/known_hosts"' def __init__( # nosec: B107 self, sftp_conn_id: str = default_conn_name, host: str = "", port: int = 22, username: str = "", password: str = "", known_hosts: str = default_known_hosts, key_file: str = "", passphrase: str = "", ) -> None: self.sftp_conn_id = sftp_conn_id self.host = host self.port = port self.username = username self.password = password self.known_hosts = known_hosts self.key_file = key_file self.passphrase = passphrase async def _get_conn(self) -> asyncssh.SSHClientConnection: """ Asynchronously connect to the SFTP server as an SSH client The following parameters are provided either in the extra json object in the SFTP connection definition - key_file - known_hosts - passphrase """ if self.sftp_conn_id is not None: conn = await sync_to_async(self.get_connection)(self.sftp_conn_id) if conn.extra is not None: extra_options = conn.extra_dejson if "key_file" in extra_options and self.key_file == "": self.key_file = extra_options.get("key_file") if "known_hosts" in extra_options: self.known_hosts = extra_options.get("known_hosts") if "passphrase" in extra_options: self.passphrase = extra_options.get("passphrase") conn_config = { "host": conn.host, "port": conn.port, "username": conn.login, "password": conn.password, } if self.key_file: conn_config.update(client_keys=self.key_file) if self.known_hosts: if self.known_hosts.lower() == "none": conn_config.update(known_hosts=None) else: conn_config.update(known_hosts=self.known_hosts) if self.passphrase: conn_config.update(passphrase=self.passphrase) ssh_client = await asyncssh.connect(**conn_config) return ssh_client
[docs] async def list_directory(self, path: str = "") -> Optional[List[str]]: """Returns a list of files on the SFTP server at the provided path""" ssh_conn = await self._get_conn() sftp_client = await ssh_conn.start_sftp_client() try: files = await sftp_client.listdir(path) return sorted(files) except asyncssh.SFTPNoSuchFile: return None
[docs] async def get_file_by_pattern(self, path: str = "", fnmatch_pattern: str = "") -> str: """ Returns the name of a file matching the file pattern at the provided path, if one exists Otherwise, raises an AirflowException to be handled upstream for deferring """ files_list = await self.list_directory(path) if files_list is None: raise AirflowException(f"No files at path {path} found...") for file in files_list: if not fnmatch(file, fnmatch_pattern): pass else: return file raise AirflowException(f"No files matching file pattern were found at {path} — Deferring")