Source code for astronomer.providers.cncf.kubernetes.hooks.kubernetes

from typing import Any, Dict

import aiofiles
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from kubernetes_asyncio import client, config


[docs]def get_field(extras: Dict[str, Any], field_name: str, strict: bool = False) -> Any: """Get field from extra, first checking short name, then for backward compatibility we check for prefixed name.""" backward_compatibility_prefix = "extra__kubernetes__" if field_name.startswith("extra__"): raise ValueError( f"Got prefixed name {field_name}; please remove the '{backward_compatibility_prefix}' prefix " "when using this method." ) if field_name in extras: return extras[field_name] or None prefixed_name = f"{backward_compatibility_prefix}{field_name}" if prefixed_name in extras: return extras[prefixed_name] or None if strict: raise KeyError(f"Field {field_name} not found in extras")
[docs]class KubernetesHookAsync(KubernetesHook): # noqa: D101 async def _load_config(self) -> client.ApiClient: """ cluster_context: Optional[str] = None, config_file: Optional[str] = None, in_cluster: Optional[bool] = None, """ if self.conn_id: connection = self.get_connection(self.conn_id) extras = connection.extra_dejson else: extras = {} in_cluster = self._coalesce_param(self.in_cluster, get_field(extras, "in_cluster")) cluster_context = self._coalesce_param(self.cluster_context, get_field(extras, "cluster_context")) kubeconfig_path = self._coalesce_param(self.config_file, get_field(extras, "kube_config_path")) kubeconfig = get_field(extras, "kube_config") or None num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o]) if num_selected_configuration > 1: raise AirflowException( "Invalid connection configuration. Options kube_config_path, " "kube_config, in_cluster are mutually exclusive. " "You can only use one option at a time." ) if in_cluster: self.log.debug("loading kube_config from: in_cluster configuration") config.load_incluster_config() return client.ApiClient() if kubeconfig_path is not None: self.log.debug("loading kube_config from: %s", kubeconfig_path) await config.load_kube_config( config_file=kubeconfig_path, client_configuration=self.client_configuration, context=cluster_context, ) return client.ApiClient() if kubeconfig is not None: async with aiofiles.tempfile.NamedTemporaryFile() as temp_config: self.log.debug("loading kube_config from: connection kube_config") await temp_config.write(kubeconfig.encode()) await temp_config.flush() await config.load_kube_config( config_file=temp_config.name, client_configuration=self.client_configuration, context=cluster_context, ) return client.ApiClient() self.log.debug("loading kube_config from: default file") await config.load_kube_config( client_configuration=self.client_configuration, context=cluster_context, )
[docs] async def get_api_client_async(self) -> client.ApiClient: """Create an API Client object to interact with Kubernetes""" kube_client = await self._load_config() if kube_client is not None: return kube_client return client.ApiClient()