Source code for astronomer.providers.sftp.triggers.sftp
import asyncio
from typing import Any, AsyncIterator, Dict, Tuple
from airflow.exceptions import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent
from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync
[docs]class SFTPTrigger(BaseTrigger):
"""
Trigger that fires when either the path on the SFTP server does not exist,
or when there are no files matching the file pattern at the path
:param path: The path on the SFTP server to search for a file matching the file pattern.
Authentication method used in the SFTP connection must have access to this path
:param file_pattern: Pattern to be used for matching against the list of files at the path above.
Uses the fnmatch module from std library to perform the matching.
:param sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
:param poke_interval: How often, in seconds, to check for the existence of the file on the SFTP server
"""
def __init__(
self,
path: str,
file_pattern: str = "",
sftp_conn_id: str = "sftp_default",
poke_interval: float = 5,
) -> None:
super().__init__()
self.path = path
self.file_pattern = file_pattern
self.sftp_conn_id = sftp_conn_id
self.poke_interval = poke_interval
[docs] def serialize(self) -> Tuple[str, Dict[str, Any]]:
"""Serializes SFTPTrigger arguments and classpath"""
return (
"astronomer.providers.sftp.triggers.sftp.SFTPTrigger",
{
"path": self.path,
"file_pattern": self.file_pattern,
"sftp_conn_id": self.sftp_conn_id,
"poke_interval": self.poke_interval,
},
)
[docs] async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
"""
Makes a series of asynchronous calls to sftp servers via async sftp hook. It yields a Trigger if
file matching file pattern exists at the specified path, otherwise it throws an exception.
"""
hook = self._get_async_hook()
exc = None
while True:
try:
file_returned_by_hook = await hook.get_file_by_pattern(
path=self.path, fnmatch_pattern=self.file_pattern
)
yield TriggerEvent({"status": "success", "message": f"Sensed file: {file_returned_by_hook}"})
except AirflowException:
await asyncio.sleep(self.poke_interval)
except Exception as e:
exc = e
# Break loop to avoid infinite retries on terminal failure
break
yield TriggerEvent({"status": "error", "message": exc})
def _get_async_hook(self) -> SFTPHookAsync:
return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)