Source code for astronomer.providers.cncf.kubernetes.operators.kubernetes_pod
from __future__ import annotations
import warnings
from typing import Any
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
[docs]
class PodNotFoundException(AirflowException):
"""Expected pod does not exist in kube-api."""
[docs]
class KubernetesPodOperatorAsync(KubernetesPodOperator):
"""
This class is deprecated.
Please use :class: `~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator`
and set `deferrable` param to `True` instead.
"""
is_deprecated = True
post_deprecation_replacement = (
"from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator"
)
def __init__(self, **kwargs: Any):
warnings.warn(
(
"This module is deprecated."
"Please use `airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator`"
"and set `deferrable` param to `True` instead."
),
DeprecationWarning,
stacklevel=2,
)
super().__init__(deferrable=True, **kwargs)