Source code for astronomer.providers.cncf.kubernetes.operators.kubernetes_pod
from typing import Any, Dict
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from airflow.utils.context import Context
from astronomer.providers.cncf.kubernetes.triggers.wait_container import (
PodLaunchTimeoutException,
WaitContainerTrigger,
)
[docs]class PodNotFoundException(AirflowException):
"""Expected pod does not exist in kube-api."""
[docs]class KubernetesPodOperatorAsync(KubernetesPodOperator):
"""
Async (deferring) version of KubernetesPodOperator
.. warning::
The logs would not be available in the Airflow Webserver until the task completes. This is
the main difference between this operator and the
:class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator`.
:param poll_interval: interval in seconds to sleep between checking pod status
"""
def __init__(self, *, poll_interval: int = 5, **kwargs: Any):
self.poll_interval = poll_interval
super().__init__(**kwargs)
[docs] @staticmethod
def raise_for_trigger_status(event: Dict[str, Any]) -> None:
"""Raise exception if pod is not in expected state."""
if event["status"] == "error":
error_type = event["error_type"]
description = event["description"]
if error_type == "PodLaunchTimeoutException":
raise PodLaunchTimeoutException(description)
else:
raise AirflowException(description)
[docs] def execute(self, context: Context) -> None: # noqa: D102
self.pod_request_obj = self.build_pod_request_obj(context)
self.pod = self.get_or_create_pod(self.pod_request_obj, context)
self.defer(
trigger=WaitContainerTrigger(
kubernetes_conn_id=None,
hook_params={
"cluster_context": self.cluster_context,
"config_file": self.config_file,
"in_cluster": self.in_cluster,
},
pod_name=self.pod.metadata.name,
container_name=self.BASE_CONTAINER_NAME,
pod_namespace=self.pod.metadata.namespace,
pending_phase_timeout=self.startup_timeout_seconds,
poll_interval=self.poll_interval,
),
method_name=self.execute_complete.__name__,
)
[docs] def execute_complete(self, context: Context, event: Dict[str, Any]) -> Any:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
remote_pod = None
try:
self.pod_request_obj = self.build_pod_request_obj(context)
self.pod = self.find_pod(
namespace=self.namespace or self.pod_request_obj.metadata.namespace,
context=context,
)
# we try to find pod before possibly raising so that on_kill will have `pod` attr
self.raise_for_trigger_status(event)
if not self.pod:
raise PodNotFoundException("Could not find pod after resuming from deferral")
if self.get_logs:
self.pod_manager.follow_container_logs(
pod=self.pod,
container_name=self.BASE_CONTAINER_NAME,
)
if self.do_xcom_push:
result = self.extract_xcom(pod=self.pod)
remote_pod = self.pod_manager.await_pod_completion(self.pod)
finally:
self.cleanup(
pod=self.pod or self.pod_request_obj,
remote_pod=remote_pod,
)
ti = context["ti"]
ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
if self.do_xcom_push:
return result