Source code for astronomer.providers.amazon.aws.hooks.redshift_cluster

import asyncio
import logging
from typing import Any, Dict

import botocore.exceptions

from astronomer.providers.amazon.aws.hooks.base_aws_async import AwsBaseHookAsync

log = logging.getLogger(__name__)


[docs]class RedshiftHookAsync(AwsBaseHookAsync): """Interact with AWS Redshift using aiobotocore python library""" def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["client_type"] = "redshift" kwargs["resource_type"] = "redshift" super().__init__(*args, **kwargs)
[docs] async def cluster_status(self, cluster_identifier: str) -> Dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and get the status and returns the status of the cluster based on the cluster_identifier passed :param cluster_identifier: unique identifier of a cluster """ async with await self.get_client_async() as client: try: response = await client.describe_clusters(ClusterIdentifier=cluster_identifier) cluster_state = ( response["Clusters"][0]["ClusterStatus"] if response and response["Clusters"] else None ) return {"status": "success", "cluster_state": cluster_state} except botocore.exceptions.ClientError as error: return {"status": "error", "message": str(error)}
[docs] async def pause_cluster(self, cluster_identifier: str) -> Dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and pause the cluster based on the cluster_identifier passed :param cluster_identifier: unique identifier of a cluster """ try: async with await self.get_client_async() as client: response = await client.pause_cluster(ClusterIdentifier=cluster_identifier) status = response["Cluster"]["ClusterStatus"] if response and response["Cluster"] else None if status == "pausing": flag = asyncio.Event() while True: expected_response = await asyncio.create_task( self.get_cluster_status(cluster_identifier, "paused", flag) ) await asyncio.sleep(10) if flag.is_set(): return expected_response return {"status": "error", "cluster_state": status} except botocore.exceptions.ClientError as error: return {"status": "error", "message": str(error)}
[docs] async def resume_cluster(self, cluster_identifier: str) -> Dict[str, Any]: """ Connects to the AWS redshift cluster via aiobotocore and resume the cluster for the cluster_identifier passed :param cluster_identifier: unique identifier of a cluster """ async with await self.get_client_async() as client: try: response = await client.resume_cluster(ClusterIdentifier=cluster_identifier) status = response["Cluster"]["ClusterStatus"] if response and response["Cluster"] else None if status == "resuming": flag = asyncio.Event() while True: expected_response = await asyncio.create_task( self.get_cluster_status(cluster_identifier, "available", flag) ) await asyncio.sleep(10) if flag.is_set(): return expected_response return {"status": "error", "cluster_state": status} except botocore.exceptions.ClientError as error: return {"status": "error", "message": str(error)}
[docs] async def get_cluster_status( self, cluster_identifier: str, expected_state: str, flag: asyncio.Event ) -> Dict[str, Any]: """ Make call self.cluster_status to know the status and run till the expected_state is met and set the flag :param cluster_identifier: unique identifier of a cluster :param expected_state: expected_state example("available", "pausing", "paused"") :param flag: asyncio even flag set true if success and if any error """ try: response = await self.cluster_status(cluster_identifier) if ("cluster_state" in response and response["cluster_state"] == expected_state) or response[ "status" ] == "error": flag.set() return response except botocore.exceptions.ClientError as error: flag.set() return {"status": "error", "message": str(error)}