Source code for astronomer.providers.amazon.aws.operators.redshift_sql
from typing import TYPE_CHECKING, Any, Dict, cast
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from astronomer.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
from astronomer.providers.amazon.aws.triggers.redshift_sql import RedshiftSQLTrigger
if TYPE_CHECKING:
from airflow.utils.context import Context
[docs]class RedshiftSQLOperatorAsync(RedshiftSQLOperator):
"""
Executes SQL Statements against an Amazon Redshift cluster"
:param sql: the SQL code to be executed as a single string, or
a list of str (sql statements), or a reference to a template file.
Template references are recognized by str ending in '.sql'
:param redshift_conn_id: reference to Amazon Redshift connection id
:param parameters: (optional) the parameters to render the SQL query with.
:param autocommit: if True, each command is automatically committed.
(default value: False)
"""
def __init__(
self,
*,
poll_interval: float = 5,
**kwargs: Any,
) -> None:
self.poll_interval = poll_interval
super().__init__(**kwargs)
[docs] def execute(self, context: "Context") -> None:
"""
Makes a sync call to RedshiftDataHook and execute the query and gets back the query_ids list and
defers trigger to poll for the status for the query executed
"""
redshift_data_hook = RedshiftDataHook(aws_conn_id=self.redshift_conn_id)
query_ids, response = redshift_data_hook.execute_query(sql=cast(str, self.sql), params=self.params)
if response.get("status") == "error":
self.execute_complete({}, response)
return
self.defer(
timeout=self.execution_timeout,
trigger=RedshiftSQLTrigger(
task_id=self.task_id,
polling_period_seconds=self.poll_interval,
aws_conn_id=self.redshift_conn_id,
query_ids=query_ids,
),
method_name="execute_complete",
)
[docs] def execute_complete(self, context: Dict[str, Any], event: Any = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if "status" in event and event["status"] == "error":
msg = "{0}".format(event["message"])
raise AirflowException(msg)
elif "status" in event and event["status"] == "success":
self.log.info("%s completed successfully.", self.task_id)
return None
else:
self.log.info("%s completed successfully.", self.task_id)
return None