Source code for astronomer.providers.amazon.aws.hooks.base_aws
from typing import Dict, Optional
from aiobotocore.client import AioBaseClient
from aiobotocore.session import AioSession, get_session
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
from asgiref.sync import sync_to_async
[docs]class AwsBaseHookAsync(AwsBaseHook):
"""
Interacts with AWS using aiobotocore asynchronously.
.. note::
AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. Hence, AwsBaseHookAsync
only supports the authentication mechanism that aiobotocore supports. Currently, AwsBaseHookAsync supports
only AWS STS client method ``assume_role`` provided in the Airflow connection extra args via aiobotocore.
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).
:param verify: Whether or not to verify SSL certificates.
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:param client_type: boto3.client client_type. Eg 's3', 'emr' etc
:param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
:param config: Configuration for botocore client.
.. seealso::
`AWS API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
"""
[docs] async def get_client_async(self) -> AioBaseClient:
"""Create an Async Client object to communicate with AWS services."""
# Fetch the Airflow connection object
connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id)
conn_config = AwsConnectionWrapper(
conn=connection_object,
region_name=self.region_name,
botocore_config=self.config,
verify=self.verify,
)
async_connection = get_session()
session_token = conn_config.aws_session_token
aws_secret = conn_config.aws_secret_access_key
aws_access = conn_config.aws_access_key_id
if conn_config.role_arn:
credentials = await self.get_role_credentials(
async_session=async_connection, conn_config=conn_config
)
if credentials:
session_token = credentials["SessionToken"]
aws_access = credentials["AccessKeyId"]
aws_secret = credentials["SecretAccessKey"]
return async_connection.create_client(
service_name=self.client_type,
region_name=conn_config.region_name,
aws_secret_access_key=aws_secret,
aws_access_key_id=aws_access,
aws_session_token=session_token,
verify=self.verify,
config=self.config,
endpoint_url=conn_config.endpoint_url,
)
[docs] @staticmethod
async def get_role_credentials(
async_session: AioSession, conn_config: AwsConnectionWrapper
) -> Optional[Dict[str, str]]:
"""Get the role_arn, method credentials from connection details and get the role credentials detail"""
async with async_session.create_client(
"sts",
aws_access_key_id=conn_config.aws_access_key_id,
aws_secret_access_key=conn_config.aws_secret_access_key,
) as client:
return_response = None
if conn_config.assume_role_method == "assume_role" or conn_config.assume_role_method is None:
response: Dict[str, Dict[str, str]] = await client.assume_role(
RoleArn=conn_config.role_arn,
RoleSessionName="RoleSession",
**conn_config.assume_role_kwargs,
)
return_response = response["Credentials"]
return return_response