Source code for astronomer.providers.sftp.triggers.sftp
from __future__ import annotations
import asyncio
import warnings
from datetime import datetime
from typing import Any, AsyncIterator
from airflow.exceptions import AirflowException
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.timezone import convert_to_utc
from dateutil.parser import parse as parse_date
from astronomer.providers.sftp.hooks.sftp import SFTPHookAsync
[docs]
class SFTPTrigger(BaseTrigger):
"""
This class is deprecated and will be removed in 2.0.0.
Use :class: `~airflow.providers.sftp.triggers.sftp.SFTPTrigger` instead.
"""
def __init__(
self,
path: str,
file_pattern: str = "",
sftp_conn_id: str = "sftp_default",
newer_than: datetime | str | None = None,
poke_interval: float = 5,
) -> None:
warnings.warn(
"This class is deprecated and will be removed in 2.0.0. "
"Use `airflow.providers.sftp.triggers.sftp.SFTPTrigger` instead."
)
super().__init__()
self.path = path
self.file_pattern = file_pattern
self.sftp_conn_id = sftp_conn_id
self.newer_than = newer_than
self.poke_interval = poke_interval
[docs]
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes SFTPTrigger arguments and classpath"""
return (
"astronomer.providers.sftp.triggers.sftp.SFTPTrigger",
{
"path": self.path,
"file_pattern": self.file_pattern,
"sftp_conn_id": self.sftp_conn_id,
"newer_than": self.newer_than,
"poke_interval": self.poke_interval,
},
)
[docs]
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Makes a series of asynchronous calls to sftp servers via async sftp hook. It yields a Trigger
- If file matching file pattern exists at the specified path return it,
- If file pattern was not provided, it looks directly into the specific path which was provided.
- If newer then datetime was provided it looks for the file path last modified time and
check whether the last modified time is greater, if true return file if false it polls again.
"""
hook = self._get_async_hook()
exc = None
if isinstance(self.newer_than, str):
self.newer_than = parse_date(self.newer_than)
_newer_than = convert_to_utc(self.newer_than) if self.newer_than else None
while True:
try:
if self.file_pattern:
files_returned_by_hook = await hook.get_files_and_attrs_by_pattern(
path=self.path, fnmatch_pattern=self.file_pattern
)
files_sensed = []
for file in files_returned_by_hook:
if _newer_than:
if file.attrs.mtime is None:
continue
mod_time = datetime.fromtimestamp(float(file.attrs.mtime)).strftime(
"%Y%m%d%H%M%S"
)
mod_time_utc = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S"))
if _newer_than <= mod_time_utc:
files_sensed.append(file.filename)
else:
files_sensed.append(file.filename)
if files_sensed:
yield TriggerEvent(
{
"status": "success",
"message": f"Sensed {len(files_sensed)} files: {files_sensed}",
}
)
else:
mod_time = await hook.get_mod_time(self.path)
if _newer_than:
mod_time_utc = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S"))
if _newer_than <= mod_time_utc:
yield TriggerEvent({"status": "success", "message": f"Sensed file: {self.path}"})
else:
yield TriggerEvent({"status": "success", "message": f"Sensed file: {self.path}"})
await asyncio.sleep(self.poke_interval)
except AirflowException:
await asyncio.sleep(self.poke_interval)
except Exception as e:
exc = e
# Break loop to avoid infinite retries on terminal failure
break
yield TriggerEvent({"status": "error", "message": exc})
def _get_async_hook(self) -> SFTPHookAsync:
return SFTPHookAsync(sftp_conn_id=self.sftp_conn_id)