Source code for astronomer.providers.sftp.sensors.sftp
from datetime import timedelta
from typing import Any, Dict, Optional
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync
from astronomer.providers.sftp.triggers.sftp import SFTPTrigger
from astronomer.providers.utils.typing_compat import Context
[docs]class SFTPSensorAsync(SFTPSensor):
"""
Polls an SFTP server continuously until a file_pattern is matched at a defined path
:param path: The path on the SFTP server to search for a file matching the file pattern.
Authentication method used in the SFTP connection must have access to this path
:param file_pattern: Pattern to be used for matching against the list of files at the path above.
Uses the fnmatch module from std library to perform the matching.
:param timeout: How long, in seconds, the sensor waits for successful before timing out
:param newer_than: DateTime for which the file or file path should be newer than, comparison is inclusive
"""
def __init__(
self,
*,
path: str,
file_pattern: str = "",
timeout: Optional[float] = None,
**kwargs: Any,
) -> None:
self.path = path
self.file_pattern = file_pattern
if timeout is None:
timeout = conf.getfloat("sensors", "default_timeout")
super().__init__(path=path, file_pattern=file_pattern, timeout=timeout, **kwargs)
self.hook = SFTPHookAsync(sftp_conn_id=self.sftp_conn_id) # type: ignore[assignment]
[docs] def execute(self, context: Context) -> None:
"""
Logic that the sensor uses to correctly identify which trigger to
execute, and defer execution as expected.
"""
# Unlike other async sensors, we do not follow the pattern of calling the synchronous self.poke() method before
# deferring here. This is due to the current limitations we have in the synchronous SFTPHook methods.
# The limitations are discovered while being worked upon the ticket
# https://github.com/astronomer/astronomer-providers/issues/1021. They are as follows:
# 1. For host key types of ecdsa, the hook expects the host key to prefixed with 'ssh-' as per the mapping of
# key types defined in it to get the appropriate key constructor for the ecdsa type keys, whereas
# conventionally such keys are not prefixed with 'ssh-'.
# 2. The sync sensor does not support the newer_than field to be passed as a Jinja template value which is of
# string type.
# 3. For file_pattern sensing, the hook implements list_directory() method which returns a list of filenames
# only without the attributes like modified time which is required for the file_pattern sensing when
# newer_than is supplied. This leads to intermittent failures potentially due to throttling by the SFTP
# server as the hook makes multiple calls to the server to get the attributes for each of the files in the
# directory.This limitation is resolved here by instead calling the read_directory() method which returns a
# list of files along with their attributes in a single call.
# We can add back the call to self.poke() before deferring once the above limitations are resolved in the
# sync sensor.
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=SFTPTrigger(
path=self.path,
file_pattern=self.file_pattern,
sftp_conn_id=self.sftp_conn_id,
poke_interval=self.poke_interval,
newer_than=self.newer_than,
),
method_name="execute_complete",
)
[docs] def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event is not None:
if "status" in event and event["status"] == "error":
raise AirflowException(event["message"])
if "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
self.log.info(event["message"])
return None
raise AirflowException("No event received in trigger callback")