Source code for astronomer.providers.microsoft.azure.triggers.wasb

import asyncio
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple

from airflow.triggers.base import BaseTrigger, TriggerEvent

from astronomer.providers.microsoft.azure.hooks.wasb import WasbHookAsync


[docs]class WasbBlobSensorTrigger(BaseTrigger): """ WasbBlobSensorTrigger is fired as deferred class with params to run the task in trigger worker to check for existence of the given blob in the provided container. :param container_name: name of the container in which the blob should be searched for :param blob_name: name of the blob to check existence for :param wasb_conn_id: the connection identifier for connecting to Azure WASB :param poke_interval: polling period in seconds to check for the status :param public_read: whether an anonymous public read access should be used. Default is False """ def __init__( self, container_name: str, blob_name: str, wasb_conn_id: str = "wasb_default", public_read: bool = False, poke_interval: float = 5.0, ): super().__init__() self.container_name = container_name self.blob_name = blob_name self.wasb_conn_id = wasb_conn_id self.poke_interval = poke_interval self.public_read = public_read
[docs] def serialize(self) -> Tuple[str, Dict[str, Any]]: """Serializes WasbBlobSensorTrigger arguments and classpath.""" return ( "astronomer.providers.microsoft.azure.triggers.wasb.WasbBlobSensorTrigger", { "container_name": self.container_name, "blob_name": self.blob_name, "wasb_conn_id": self.wasb_conn_id, "poke_interval": self.poke_interval, "public_read": self.public_read, }, )
[docs] async def run(self) -> AsyncIterator["TriggerEvent"]: """Makes async connection to Azure WASB and polls for existence of the given blob name.""" blob_exists = False hook = WasbHookAsync(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read) try: async with hook.blob_service_client: while not blob_exists: blob_exists = await hook.check_for_blob_async( container_name=self.container_name, blob_name=self.blob_name, ) if blob_exists: message = f"Blob {self.blob_name} found in container {self.container_name}." yield TriggerEvent({"status": "success", "message": message}) else: message = ( f"Blob {self.blob_name} not available yet in container {self.container_name}." f" Sleeping for {self.poke_interval} seconds" ) self.log.info(message) await asyncio.sleep(self.poke_interval) except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)})
[docs]class WasbPrefixSensorTrigger(BaseTrigger): """ WasbPrefixSensorTrigger is fired as a deferred class with params to run the task in trigger worker. It checks for the existence of a blob with the given prefix in the provided container. :param container_name: name of the container in which the blob should be searched for :param prefix: prefix of the blob to check existence for :param include: specifies one or more additional datasets to include in the response. Options include: ``snapshots``, ``metadata``, ``uncommittedblobs``, ``copy`, ``deleted`` :param delimiter: filters objects based on the delimiter (for e.g '.csv') :param wasb_conn_id: the connection identifier for connecting to Azure WASB :param poke_interval: polling period in seconds to check for the status :param public_read: whether an anonymous public read access should be used. Default is False """ def __init__( self, container_name: str, prefix: str, include: Optional[List[str]] = None, delimiter: Optional[str] = "/", wasb_conn_id: str = "wasb_default", public_read: bool = False, poke_interval: float = 5.0, ): super().__init__() self.container_name = container_name self.prefix = prefix self.include = include self.delimiter = delimiter self.wasb_conn_id = wasb_conn_id self.poke_interval = poke_interval self.public_read = public_read
[docs] def serialize(self) -> Tuple[str, Dict[str, Any]]: """Serializes WasbPrefixSensorTrigger arguments and classpath.""" return ( "astronomer.providers.microsoft.azure.triggers.wasb.WasbPrefixSensorTrigger", { "container_name": self.container_name, "prefix": self.prefix, "include": self.include, "delimiter": self.delimiter, "wasb_conn_id": self.wasb_conn_id, "poke_interval": self.poke_interval, "public_read": self.public_read, }, )
[docs] async def run(self) -> AsyncIterator["TriggerEvent"]: """Makes async connection to Azure WASB and polls for existence of a blob with given prefix.""" prefix_exists = False hook = WasbHookAsync(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read) try: async with hook.blob_service_client: while not prefix_exists: prefix_exists = await hook.check_for_prefix_async( container_name=self.container_name, prefix=self.prefix, include=self.include, delimiter=self.delimiter, ) if prefix_exists: message = f"Prefix {self.prefix} found in container {self.container_name}." yield TriggerEvent({"status": "success", "message": message}) else: message = ( f"Prefix {self.prefix} not available yet in container {self.container_name}." f" Sleeping for {self.poke_interval} seconds" ) self.log.info(message) await asyncio.sleep(self.poke_interval) except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)})