Complete the airflow survey & get a free airflow 3 certification!

airflow.providers.cncf.kubernetes.utils.pod_manager

Launches PODs.

Attributes

EMPTY_XCOM_RESULT

Sentinel for no xcom result.

Exceptions

PodLaunchFailedException

When pod launching fails in KubernetesPodOperator.

PodLaunchTimeoutException

When pod does not leave the Pending phase within specified timeout.

PodNotFoundException

Expected pod does not exist in kube-api.

Classes

PodPhase

Possible pod phases.

PodLogsConsumer

Responsible for pulling pod logs from a stream with checking a container status before reading data.

PodLoggingStatus

Return the status of the pod and last log time when exiting from fetch_container_logs.

PodManager

Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodOperator.

OnFinishAction

Action to take when the pod finishes.

Functions

should_retry_start_pod(exception)

Check if an Exception indicates a transient error and warrants retrying.

check_exception_is_kubernetes_api_unauthorized(exc)

is_log_group_marker(line)

Check if the line is a log group marker like ::group:: or ::endgroup::.

Module Contents

airflow.providers.cncf.kubernetes.utils.pod_manager.EMPTY_XCOM_RESULT = '__airflow_xcom_result_empty__'[source]

Sentinel for no xcom result.

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException[source]

Bases: airflow.exceptions.AirflowException

When pod launching fails in KubernetesPodOperator.

airflow.providers.cncf.kubernetes.utils.pod_manager.should_retry_start_pod(exception)[source]

Check if an Exception indicates a transient error and warrants retrying.

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodPhase[source]

Possible pod phases.

See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.

PENDING = 'Pending'[source]
RUNNING = 'Running'[source]
FAILED = 'Failed'[source]
SUCCEEDED = 'Succeeded'[source]
terminal_states[source]
airflow.providers.cncf.kubernetes.utils.pod_manager.check_exception_is_kubernetes_api_unauthorized(exc)[source]
exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchTimeoutException[source]

Bases: airflow.exceptions.AirflowException

When pod does not leave the Pending phase within specified timeout.

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodNotFoundException[source]

Bases: airflow.exceptions.AirflowException

Expected pod does not exist in kube-api.

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer(response, pod, pod_manager, container_name, post_termination_timeout=120, read_pod_cache_timeout=120)[source]

Responsible for pulling pod logs from a stream with checking a container status before reading data.

This class is a workaround for the issue https://github.com/apache/airflow/issues/23497.

Parameters:
  • response (urllib3.response.HTTPResponse) – HTTP response with logs

  • pod (kubernetes.client.models.v1_pod.V1Pod) – Pod instance from Kubernetes client

  • pod_manager (PodManager) – Pod manager instance

  • container_name (str) – Name of the container that we’re reading logs from

  • post_termination_timeout (int) – (Optional) The period of time in seconds representing for how long time logs are available after the container termination.

  • read_pod_cache_timeout (int) – (Optional) The container’s status cache lifetime. The container status is cached to reduce API calls.

response[source]
pod[source]
pod_manager[source]
container_name[source]
post_termination_timeout = 120[source]
last_read_pod_at = None[source]
read_pod_cache = None[source]
read_pod_cache_timeout = 120[source]
__iter__()[source]

Yield log items divided by the ‘n’ symbol.

logs_available()[source]
read_pod()[source]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLoggingStatus[source]

Return the status of the pod and last log time when exiting from fetch_container_logs.

running: bool[source]
last_log_time: pendulum.DateTime | None[source]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager(kube_client, callbacks=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodOperator.

stop_watching_events = False[source]
run_pod_async(pod, **kwargs)[source]

Run POD asynchronously.

delete_pod(pod)[source]

Delete POD.

create_pod(pod)[source]

Launch the pod asynchronously.

async watch_pod_events(pod, check_interval=1)[source]

Read pod events and writes into log.

async await_pod_start(pod, schedule_timeout=120, startup_timeout=120, check_interval=1)[source]

Wait for the pod to reach phase other than Pending.

Parameters:
  • pod (kubernetes.client.models.v1_pod.V1Pod)

  • schedule_timeout (int) – Timeout (in seconds) for pod stay in schedule state (if pod is taking to long in schedule state, fails task)

  • startup_timeout (int) – Timeout (in seconds) for startup of the pod (if pod is pending for too long after being scheduled, fails task)

  • check_interval (int) – Interval (in seconds) between checks

Returns:

Return type:

None

fetch_container_logs(pod, container_name, *, follow=False, since_time=None, post_termination_timeout=120, container_name_log_prefix_enabled=True, log_formatter=None)[source]

Follow the logs of container and stream to airflow logging.

Returns when container exits.

Between when the pod starts and logs being available, there might be a delay due to CSR not approved and signed yet. In such situation, ApiException is thrown. This is why we are retrying on this specific exception.

fetch_requested_init_container_logs(pod, init_containers, follow_logs=False, container_name_log_prefix_enabled=True, log_formatter=None)[source]

Follow the logs of containers in the specified pod and publish it to airflow logging.

Returns when all the containers exit.

fetch_requested_container_logs(pod, containers, follow_logs=False, container_name_log_prefix_enabled=True, log_formatter=None)[source]

Follow the logs of containers in the specified pod and publish it to airflow logging.

Returns when all the containers exit.

await_container_completion(pod, container_name, polling_time=1)[source]

Wait for the given container in the given pod to be completed.

Parameters:
  • pod (kubernetes.client.models.v1_pod.V1Pod) – pod spec that will be monitored

  • container_name (str) – name of the container within the pod to monitor

  • polling_time (float) – polling time between two container status checks. Defaults to 1s.

await_pod_completion(pod, istio_enabled=False, container_name='base')[source]

Monitor a pod and return the final state.

Parameters:
  • istio_enabled (bool) – whether istio is enabled in the namespace

  • pod (kubernetes.client.models.v1_pod.V1Pod) – pod spec that will be monitored

  • container_name (str) – name of the container within the pod

Returns:

tuple[State, str | None]

Return type:

kubernetes.client.models.v1_pod.V1Pod

parse_log_line(line)[source]

Parse K8s log line and returns the final state.

Parameters:

line (str) – k8s log line

Returns:

timestamp and log message

Return type:

tuple[pendulum.DateTime | None, str]

container_is_running(pod, container_name)[source]

Read pod and checks if container is running.

container_is_terminated(pod, container_name)[source]

Read pod and checks if container is terminated.

read_pod_logs(pod, container_name, tail_lines=None, timestamps=False, since_seconds=None, follow=True, post_termination_timeout=120, **kwargs)[source]

Read log from the POD.

get_init_container_names(pod)[source]

Return container names from the POD except for the airflow-xcom-sidecar container.

get_container_names(pod)[source]

Return container names from the POD except for the airflow-xcom-sidecar container.

read_pod_events(pod)[source]

Read events from the POD.

read_pod(pod)[source]

Read POD information.

await_xcom_sidecar_container_start(pod, timeout=900, log_interval=30)[source]

Check if the sidecar container has reached the ‘Running’ state before performing do_xcom_push.

extract_xcom(pod)[source]

Retrieve XCom value and kill xcom sidecar container.

extract_xcom_json(pod)[source]

Retrieve XCom value and also check if xcom json is valid.

extract_xcom_kill(pod)[source]

Kill xcom sidecar container.

class airflow.providers.cncf.kubernetes.utils.pod_manager.OnFinishAction[source]

Bases: str, enum.Enum

Action to take when the pod finishes.

KEEP_POD = 'keep_pod'[source]
DELETE_POD = 'delete_pod'[source]
DELETE_SUCCEEDED_POD = 'delete_succeeded_pod'[source]
airflow.providers.cncf.kubernetes.utils.pod_manager.is_log_group_marker(line)[source]

Check if the line is a log group marker like ::group:: or ::endgroup::.

Was this entry helpful?