Source code for astronomer.providers.dbt.cloud.operators.dbt
from __future__ import annotations
import warnings
from typing import Any
from airflow import AirflowException
from airflow.exceptions import AirflowFailException
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from astronomer.providers.utils.typing_compat import Context
[docs]
class DbtCloudRunJobOperatorAsync(DbtCloudRunJobOperator):
"""
This class is deprecated.
Use :class: `~airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobOperator` instead
and set `deferrable` param to `True` instead.
"""
is_deprecated = True
post_deprecation_replacement = (
"from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator"
)
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn(
(
"This class is deprecated. "
"Use `airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobOperator` "
"and set `deferrable` param to `True` instead."
),
DeprecationWarning,
stacklevel=2,
)
super().__init__(*args, deferrable=True, **kwargs)
[docs]
def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
# We handle the case where the job run is cancelled a bit differently than the OSS operator.
# Essentially, we do not want to retry the task if the job run is cancelled, whereas the OSS operator will
# retry the task if the job run is cancelled. This has been specifically handled here differently based upon
# the feedback from a user. And hence, while we are deprecating this operator, we are not changing the behavior
# of the `execute_complete` method. We can check if the wider OSS community wants this behavior to be changed
# in the future as it is here, and then we can remove this override.
if event["status"] == "cancelled":
self.log.info("Job run %s has been cancelled.", str(event["run_id"]))
self.log.info("Task will not be retried.")
raise AirflowFailException(event["message"])
elif event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(event["message"])
return int(event["run_id"])