Source code for astronomer.providers.sftp.hooks.sftp
from __future__ import annotations
import os.path
import warnings
from datetime import datetime
from fnmatch import fnmatch
from typing import Sequence
import asyncssh
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
from asgiref.sync import sync_to_async
[docs]
class SFTPHookAsync(BaseHook):
"""
This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.sftp.hooks.sftp.SFTPHookAsync` instead.
"""
conn_name_attr = "ssh_conn_id"
default_conn_name = "sftp_default"
conn_type = "sftp"
hook_name = "SFTP"
default_known_hosts = "~/.ssh/known_hosts"
def __init__( # nosec: B107
self,
sftp_conn_id: str = default_conn_name,
host: str = "",
port: int = 22,
username: str = "",
password: str = "",
known_hosts: str = default_known_hosts,
key_file: str = "",
passphrase: str = "",
private_key: str = "",
) -> None:
warnings.warn(
"This class is deprecated and will be removed in 2.0.0. "
"Use `airflow.providers.sftp.hooks.sftp.SFTPHookAsync` instead."
)
self.sftp_conn_id = sftp_conn_id
self.host = host
self.port = port
self.username = username
self.password = password
self.known_hosts: bytes | str = os.path.expanduser(known_hosts)
self.key_file = key_file
self.passphrase = passphrase
self.private_key = private_key
def _parse_extras(self, conn: Connection) -> None:
"""Parse extra fields from the connection into instance fields"""
extra_options = conn.extra_dejson
if "key_file" in extra_options and self.key_file == "":
self.key_file = extra_options["key_file"]
if "known_hosts" in extra_options and self.known_hosts != self.default_known_hosts:
self.known_hosts = extra_options["known_hosts"]
if ("passphrase" or "private_key_passphrase") in extra_options:
self.passphrase = extra_options["passphrase"]
if "private_key" in extra_options:
self.private_key = extra_options["private_key"]
host_key = extra_options.get("host_key")
no_host_key_check = extra_options.get("no_host_key_check")
if no_host_key_check is not None:
no_host_key_check = str(no_host_key_check).lower() == "true"
if host_key is not None and no_host_key_check:
raise ValueError("Host key check was skipped, but `host_key` value was given")
if no_host_key_check:
self.log.warning(
"No Host Key Verification. This won't protect against Man-In-The-Middle attacks"
)
self.known_hosts = "none"
if host_key is not None:
self.known_hosts = f"{conn.host} {host_key}".encode()
async def _get_conn(self) -> asyncssh.SSHClientConnection:
"""
Asynchronously connect to the SFTP server as an SSH client
The following parameters are provided either in the extra json object in
the SFTP connection definition
- key_file
- known_hosts
- passphrase
"""
conn = await sync_to_async(self.get_connection)(self.sftp_conn_id)
if conn.extra is not None:
self._parse_extras(conn)
conn_config = {
"host": conn.host,
"port": conn.port,
"username": conn.login,
"password": conn.password,
}
if self.key_file:
conn_config.update(client_keys=self.key_file)
if self.known_hosts:
if self.known_hosts.lower() == "none":
conn_config.update(known_hosts=None)
else:
conn_config.update(known_hosts=self.known_hosts)
if self.private_key:
_private_key = asyncssh.import_private_key(self.private_key, self.passphrase)
conn_config.update(client_keys=[_private_key])
if self.passphrase:
conn_config.update(passphrase=self.passphrase)
ssh_client_conn = await asyncssh.connect(**conn_config)
return ssh_client_conn
[docs]
async def list_directory(self, path: str = "") -> list[str] | None:
"""Returns a list of files on the SFTP server at the provided path"""
ssh_conn = await self._get_conn()
sftp_client = await ssh_conn.start_sftp_client()
try:
files = await sftp_client.listdir(path)
return sorted(files)
except asyncssh.SFTPNoSuchFile:
return None
[docs]
async def read_directory(self, path: str = "") -> Sequence[asyncssh.sftp.SFTPName] | None:
"""Returns a list of files along with their attributes on the SFTP server at the provided path"""
ssh_conn = await self._get_conn()
sftp_client = await ssh_conn.start_sftp_client()
try:
files = await sftp_client.readdir(path)
return files
except asyncssh.SFTPNoSuchFile:
return None
[docs]
async def get_files_and_attrs_by_pattern(
self, path: str = "", fnmatch_pattern: str = ""
) -> Sequence[asyncssh.sftp.SFTPName]:
"""
Returns the files along with their attributes matching the file pattern (e.g. ``*.pdf``) at the provided path,
if one exists. Otherwise, raises an AirflowException to be handled upstream for deferring
"""
files_list = await self.read_directory(path)
if files_list is None:
raise FileNotFoundError(f"No files at path {path!r} found...")
matched_files = [file for file in files_list if fnmatch(str(file.filename), fnmatch_pattern)]
return matched_files
[docs]
async def get_files_by_pattern(self, path: str = "", fnmatch_pattern: str = "") -> list[str]:
"""
Returns the name of a file matching the file pattern at the provided path, if one exists
Otherwise, raises an AirflowException to be handled upstream for deferring
"""
files_list = await self.list_directory(path)
if files_list is None:
raise AirflowException(f"No files at path {path} found...")
matched_files = [file for file in files_list if fnmatch(file, fnmatch_pattern)]
return matched_files
[docs]
async def get_mod_time(self, path: str) -> str:
"""
Makes SFTP async connection and looks for last modified time in the specific file
path and returns last modification time for the file path.
:param path: full path to the remote file
"""
ssh_conn = await self._get_conn()
sftp_client = await ssh_conn.start_sftp_client()
try:
ftp_mdtm = await sftp_client.stat(path)
modified_time = ftp_mdtm.mtime
mod_time = datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S") # type: ignore[arg-type]
self.log.info("Found File %s last modified: %s", str(path), str(mod_time))
return mod_time
except asyncssh.SFTPNoSuchFile:
raise AirflowException("No files matching")