Source code for astronomer.providers.amazon.aws.hooks.base_aws_async

import logging

from aiobotocore.client import AioBaseClient
from aiobotocore.session import get_session
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook, _parse_s3_config
from asgiref.sync import sync_to_async

log = logging.getLogger(__name__)


[docs]class AwsBaseHookAsync(AwsBaseHook): """Interacts with AWS using aiobotocore asynchronously."""
[docs] async def get_client_async(self) -> AioBaseClient: """Create an Async Client object to communicate with AWS services.""" # Fetch the Airflow connection object connection_object = await sync_to_async(self.get_connection)(self.aws_conn_id) extra_config = connection_object.extra_dejson aws_access_key_id = None aws_secret_access_key = None aws_session_token = None if connection_object.login: aws_access_key_id = connection_object.login aws_secret_access_key = connection_object.password self.log.info("Credentials retrieved from login") elif "aws_access_key_id" in extra_config and "aws_secret_access_key" in extra_config: aws_access_key_id = extra_config["aws_access_key_id"] aws_secret_access_key = extra_config["aws_secret_access_key"] aws_session_token = extra_config.get("aws_session_token") self.log.info("Credentials retrieved from extra_config") elif "s3_config_file" in extra_config: aws_access_key_id, aws_secret_access_key = await sync_to_async(_parse_s3_config)( extra_config["s3_config_file"], extra_config.get("s3_config_format"), extra_config.get("profile"), ) self.log.info("Credentials retrieved from extra_config['s3_config_file']") else: self.log.info("No credentials retrieved from Connection") region_name = self.region_name if self.region_name is None and "region_name" in extra_config: self.log.info("Retrieving region_name from Connection.extra_config['region_name']") region_name = extra_config["region_name"] async_connection = get_session() return async_connection.create_client( service_name=self.client_type, region_name=region_name, aws_secret_access_key=aws_secret_access_key, aws_access_key_id=aws_access_key_id, aws_session_token=aws_session_token, verify=self.verify, config=self.config, )