Source code for astronomer.providers.dbt.cloud.operators.dbt
import time
from typing import Any, Dict
from airflow import AirflowException
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from astronomer.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from astronomer.providers.utils.typing_compat import Context
[docs]class DbtCloudRunJobOperatorAsync(DbtCloudRunJobOperator):
"""
Executes a dbt Cloud job asynchronously. Trigger the dbt cloud job via worker to dbt and with run id in response
poll for the status in trigger.
.. seealso::
For more information on sync Operator DbtCloudRunJobOperator, take a look at the guide:
:ref:`howto/operator:DbtCloudRunJobOperator`
:param dbt_cloud_conn_id: The connection ID for connecting to dbt Cloud.
:param job_id: The ID of a dbt Cloud job.
:param account_id: Optional. The ID of a dbt Cloud account.
:param trigger_reason: Optional Description of the reason to trigger the job. Dbt requires the trigger reason while
making an API. if it is not provided uses the default reasons.
:param steps_override: Optional. List of dbt commands to execute when triggering the job instead of those
configured in dbt Cloud.
:param schema_override: Optional. Override the destination schema in the configured target for this job.
:param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
:param check_interval: Time in seconds to check on a job run's status. Defaults to 60 seconds.
:param additional_run_config: Optional. Any additional parameters that should be included in the API
request when triggering the job.
:return: The ID of the triggered dbt Cloud job run.
"""
[docs] def execute(self, context: "Context") -> None:
"""Submits a job which generates a run_id and gets deferred"""
if self.trigger_reason is None:
self.trigger_reason = (
f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG."
)
hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id)
trigger_job_response = hook.trigger_job_run(
account_id=self.account_id,
job_id=self.job_id,
cause=self.trigger_reason,
steps_override=self.steps_override,
schema_override=self.schema_override,
additional_run_config=self.additional_run_config,
)
run_id = trigger_job_response.json()["data"]["id"]
job_run_url = trigger_job_response.json()["data"]["href"]
context["ti"].xcom_push(key="job_run_url", value=job_run_url)
end_time = time.time() + self.timeout
self.defer(
timeout=self.execution_timeout,
trigger=DbtCloudRunJobTrigger(
conn_id=self.dbt_cloud_conn_id,
run_id=run_id,
end_time=end_time,
account_id=self.account_id,
poll_interval=self.check_interval,
),
method_name="execute_complete",
)
[docs] def execute_complete(self, context: "Context", event: Dict[str, Any]) -> int:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(event["message"])
return int(event["run_id"])