Source code for astronomer.providers.dbt.cloud.sensors.dbt

import time
from typing import Any, Dict

from airflow import AirflowException
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

from astronomer.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from astronomer.providers.utils.typing_compat import Context


[docs]class DbtCloudJobRunSensorAsync(DbtCloudJobRunSensor): """ Checks the status of a dbt Cloud job run. .. seealso:: For more information on sync Sensor DbtCloudJobRunSensor, take a look at the guide:: :ref:`howto/operator:DbtCloudJobRunSensor` :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud. :param run_id: The job run identifier. :param account_id: The dbt Cloud account identifier. :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days. """ def __init__( self, *, poll_interval: float = 5, timeout: float = 60 * 60 * 24 * 7, **kwargs: Any, ): self.poll_interval = poll_interval self.timeout = timeout super().__init__(**kwargs)
[docs] def execute(self, context: "Context") -> None: """Defers trigger class to poll for state of the job run until it reaches a failure state or success state""" end_time = time.time() + self.timeout self.defer( timeout=self.execution_timeout, trigger=DbtCloudRunJobTrigger( run_id=self.run_id, conn_id=self.dbt_cloud_conn_id, account_id=self.account_id, poll_interval=self.poll_interval, end_time=end_time, ), method_name="execute_complete", )
[docs] def execute_complete(self, context: "Context", event: Dict[str, Any]) -> int: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event["status"] in ["error", "cancelled"]: raise AirflowException(event["message"]) self.log.info(event["message"]) return int(event["run_id"])