Source code for astronomer.providers.cncf.kubernetes.hooks.kubernetes
import aiofiles
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from kubernetes_asyncio import client, config
[docs]class KubernetesHookAsync(KubernetesHook): # noqa: D101
async def _load_config(self) -> client.ApiClient:
"""
cluster_context: Optional[str] = None,
config_file: Optional[str] = None,
in_cluster: Optional[bool] = None,
"""
if self.conn_id:
connection = self.get_connection(self.conn_id)
extras = connection.extra_dejson
else:
extras = {}
in_cluster = self._coalesce_param(
self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None
)
cluster_context = self._coalesce_param(
self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None
)
kubeconfig_path = self._coalesce_param(
self.config_file, extras.get("extra__kubernetes__kube_config_path") or None
)
kubeconfig = extras.get("extra__kubernetes__kube_config") or None
num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])
if num_selected_configuration > 1:
raise AirflowException(
"Invalid connection configuration. Options kube_config_path, "
"kube_config, in_cluster are mutually exclusive. "
"You can only use one option at a time."
)
if in_cluster:
self.log.debug("loading kube_config from: in_cluster configuration")
config.load_incluster_config()
return client.ApiClient()
if kubeconfig_path is not None:
self.log.debug("loading kube_config from: %s", kubeconfig_path)
await config.load_kube_config(
config_file=kubeconfig_path,
client_configuration=self.client_configuration,
context=cluster_context,
)
return client.ApiClient()
if kubeconfig is not None:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
self.log.debug("loading kube_config from: connection kube_config")
await temp_config.write(kubeconfig.encode())
await temp_config.flush()
await config.load_kube_config(
config_file=temp_config.name,
client_configuration=self.client_configuration,
context=cluster_context,
)
return client.ApiClient()
self.log.debug("loading kube_config from: default file")
await config.load_kube_config(
client_configuration=self.client_configuration,
context=cluster_context,
)
[docs] async def get_api_client_async(self) -> client.ApiClient:
"""Create an API Client object to interact with Kubernetes"""
kube_client = await self._load_config()
if kube_client is not None:
return kube_client
return client.ApiClient()