Source code for astronomer.providers.amazon.aws.sensors.redshift_cluster

import warnings
from datetime import timedelta
from typing import Any, Dict, Optional

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor

from astronomer.providers.amazon.aws.triggers.redshift_cluster import (
    RedshiftClusterSensorTrigger,
)
from astronomer.providers.utils.typing_compat import Context


[docs]class RedshiftClusterSensorAsync(RedshiftClusterSensor): """ Waits for a Redshift cluster to reach a specific status. :param cluster_identifier: The identifier for the cluster being pinged.\ :param target_status: The cluster status desired. """ def __init__( self, *, poll_interval: float = 5, **kwargs: Any, ): # TODO: Remove once deprecated if poll_interval: self.poke_interval = poll_interval warnings.warn( "Argument `poll_interval` is deprecated and will be removed " "in a future release. Please use `poke_interval` instead.", DeprecationWarning, stacklevel=2, ) super().__init__(**kwargs)
[docs] def execute(self, context: Context) -> None: """Check for the target_status and defers using the trigger""" self.defer( timeout=timedelta(seconds=self.timeout), trigger=RedshiftClusterSensorTrigger( task_id=self.task_id, aws_conn_id=self.aws_conn_id, cluster_identifier=self.cluster_identifier, target_status=self.target_status, poke_interval=self.poke_interval, ), method_name="execute_complete", )
[docs] def execute_complete(self, context: Context, event: Optional[Dict[Any, Any]] = None) -> None: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event: if "status" in event and event["status"] == "error": msg = "{}: {}".format(event["status"], event["message"]) raise AirflowException(msg) if "status" in event and event["status"] == "success": self.log.info("%s completed successfully.", self.task_id) self.log.info( "Cluster Identifier %s is in %s state", self.cluster_identifier, self.target_status ) return None self.log.info("%s completed successfully.", self.task_id) return None