airflow.providers.cncf.kubernetes.operators.pod

Executes task in a Kubernetes POD.

Module Contents

Classes

PodEventType

Type of Events emitted by kubernetes pod.

KubernetesPodOperator

Execute a task in a Kubernetes Pod.

Attributes

alphanum_lower

KUBE_CONFIG_ENV_VAR

airflow.providers.cncf.kubernetes.operators.pod.alphanum_lower[source]
airflow.providers.cncf.kubernetes.operators.pod.KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'[source]
class airflow.providers.cncf.kubernetes.operators.pod.PodEventType[source]

Bases: enum.Enum

Type of Events emitted by kubernetes pod.

WARNING = 'Warning'[source]
NORMAL = 'Normal'[source]
exception airflow.providers.cncf.kubernetes.operators.pod.PodReattachFailure[source]

Bases: airflow.exceptions.AirflowException

When we expect to be able to find a pod but cannot.

exception airflow.providers.cncf.kubernetes.operators.pod.PodCredentialsExpiredFailure[source]

Bases: airflow.exceptions.AirflowException

When pod fails to refresh credentials.

class airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator(*, kubernetes_conn_id=KubernetesHook.default_conn_name, namespace=None, image=None, name=None, random_name_suffix=True, cmds=None, arguments=None, ports=None, volume_mounts=None, volumes=None, env_vars=None, env_from=None, secrets=None, in_cluster=None, cluster_context=None, labels=None, reattach_on_restart=True, startup_timeout_seconds=120, startup_check_interval_seconds=5, get_logs=True, base_container_name=None, init_container_logs=None, container_logs=None, image_pull_policy=None, annotations=None, container_resources=None, affinity=None, config_file=None, node_selector=None, image_pull_secrets=None, service_account_name=None, hostnetwork=False, host_aliases=None, tolerations=None, security_context=None, container_security_context=None, dnspolicy=None, dns_config=None, hostname=None, subdomain=None, schedulername=None, full_pod_spec=None, init_containers=None, log_events_on_failure=False, do_xcom_push=False, pod_template_file=None, pod_template_dict=None, priority_class_name=None, pod_runtime_info_envs=None, termination_grace_period=None, configmaps=None, skip_on_exit_code=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=2, log_pod_spec_on_failure=True, on_finish_action='delete_pod', is_delete_operator_pod=None, termination_message_policy='File', active_deadline_seconds=None, callbacks=None, progress_callback=None, logging_interval=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a task in a Kubernetes Pod.

See also

For more information on how to use this operator, take a look at the guide: KubernetesPodOperator

Note

If you use Google Kubernetes Engine and Airflow is not running in the same cluster, consider using GKEStartPodOperator, which simplifies the authorization process.

Parameters
  • kubernetes_conn_id (str | None) – The kubernetes connection id for the Kubernetes cluster.

  • namespace (str | None) – the namespace to run within kubernetes.

  • image (str | None) – Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated)

  • name (str | None) – name of the pod in which the task will run, will be used (plus a random suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).

  • random_name_suffix (bool) – if True, will generate a random suffix.

  • cmds (list[str] | None) – entrypoint of the container. (templated) The docker images’s entrypoint is used if this is not provided.

  • arguments (list[str] | None) – arguments of the entrypoint. (templated) The docker image’s CMD is used if this is not provided.

  • ports (list[kubernetes.client.models.V1ContainerPort] | None) – ports for the launched pod.

  • volume_mounts (list[kubernetes.client.models.V1VolumeMount] | None) – volumeMounts for the launched pod.

  • volumes (list[kubernetes.client.models.V1Volume] | None) – volumes for the launched pod. Includes ConfigMaps and PersistentVolumes.

  • env_vars (list[kubernetes.client.models.V1EnvVar] | dict[str, str] | None) – Environment variables initialized in the container. (templated)

  • env_from (list[kubernetes.client.models.V1EnvFromSource] | None) – (Optional) List of sources to populate environment variables in the container.

  • secrets (list[airflow.providers.cncf.kubernetes.secret.Secret] | None) – Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume.

  • in_cluster (bool | None) – run kubernetes client with in_cluster configuration.

  • cluster_context (str | None) – context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. (templated)

  • reattach_on_restart (bool) – if the worker dies while the pod is running, reattach and monitor during the next try. If False, always create a new pod for each try.

  • labels (dict | None) – labels to apply to the Pod. (templated)

  • startup_timeout_seconds (int) – timeout in seconds to startup the pod.

  • startup_check_interval_seconds (int) – interval in seconds to check if the pod has already started

  • get_logs (bool) – get the stdout of the base container as logs of the tasks.

  • init_container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – list of init containers whose logs will be published to stdout Takes a sequence of containers, a single container name or True. If True, all the containers logs are published.

  • container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – list of containers whose logs will be published to stdout Takes a sequence of containers, a single container name or True. If True, all the containers logs are published. Works in conjunction with get_logs param. The default value is the base container.

  • image_pull_policy (str | None) – Specify a policy to cache or always pull an image.

  • annotations (dict | None) – non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels. (templated)

  • container_resources (kubernetes.client.models.V1ResourceRequirements | None) – resources for the launched pod. (templated)

  • affinity (kubernetes.client.models.V1Affinity | None) – affinity scheduling rules for the launched pod.

  • config_file (str | None) – The path to the Kubernetes config file. (templated) If not specified, default value is ~/.kube/config

  • node_selector (dict | None) – A dict containing a group of scheduling rules. (templated)

  • image_pull_secrets (list[kubernetes.client.models.V1LocalObjectReference] | None) – Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b

  • service_account_name (str | None) – Name of the service account

  • hostnetwork (bool) – If True enable host networking on the pod.

  • host_aliases (list[kubernetes.client.models.V1HostAlias] | None) – A list of host aliases to apply to the containers in the pod.

  • tolerations (list[kubernetes.client.models.V1Toleration] | None) – A list of kubernetes tolerations.

  • security_context (kubernetes.client.models.V1PodSecurityContext | dict | None) – security options the pod should run with (PodSecurityContext).

  • container_security_context (kubernetes.client.models.V1SecurityContext | dict | None) – security options the container should run with.

  • dnspolicy (str | None) – dnspolicy for the pod.

  • dns_config (kubernetes.client.models.V1PodDNSConfig | None) – dns configuration (ip addresses, searches, options) for the pod.

  • hostname (str | None) – hostname for the pod.

  • subdomain (str | None) – subdomain for the pod.

  • schedulername (str | None) – Specify a schedulername for the pod

  • full_pod_spec (kubernetes.client.models.V1Pod | None) – The complete podSpec

  • init_containers (list[kubernetes.client.models.V1Container] | None) – init container for the launched Pod

  • log_events_on_failure (bool) – Log the pod’s events if a failure occurs

  • do_xcom_push (bool) – If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes.

  • pod_template_file (str | None) – path to pod template file (templated)

  • pod_template_dict (dict | None) – pod template dictionary (templated)

  • priority_class_name (str | None) – priority class name for the launched Pod

  • pod_runtime_info_envs (list[kubernetes.client.models.V1EnvVar] | None) – (Optional) A list of environment variables, to be set in the container.

  • termination_grace_period (int | None) – Termination grace period if task killed in UI, defaults to kubernetes default

  • configmaps (list[str] | None) – (Optional) A list of names of config maps from which it collects ConfigMaps to populate the environment variables with. The contents of the target ConfigMap’s Data field will represent the key-value pairs as environment variables. Extends env_from.

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – If task exits with this exit code, leave the task in skipped state (default: None). If set to None, any non-zero exit code will be treated as a failure.

  • base_container_name (str | None) – The name of the base container in the pod. This container’s logs will appear as part of this task’s logs if get_logs is True. Defaults to None. If None, will consult the class variable BASE_CONTAINER_NAME (which defaults to “base”) for the base container name to use.

  • deferrable (bool) – Run operator in the deferrable mode.

  • poll_interval (float) – Polling period in seconds to check for the status. Used only in deferrable mode.

  • log_pod_spec_on_failure (bool) – Log the pod’s specification if a failure occurs

  • on_finish_action (str) – What to do when the pod reaches its final state, or the execution is interrupted. If “delete_pod”, the pod will be deleted regardless its state; if “delete_succeeded_pod”, only succeeded pod will be deleted. You can set to “keep_pod” to keep the pod.

  • termination_message_policy (str) – The termination message policy of the base container. Default value is “File”

  • active_deadline_seconds (int | None) – The active_deadline_seconds which translates to active_deadline_seconds in V1PodSpec.

  • callbacks (type[airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback] | None) – KubernetesPodOperatorCallback instance contains the callbacks methods on different step of KubernetesPodOperator.

  • logging_interval (int | None) – max time in seconds that task should be in deferred state before resuming to fetch the latest logs. If None, then the task will remain in deferred state until pod is done, and no logs will be visible until that time.

BASE_CONTAINER_NAME = 'base'[source]
ISTIO_CONTAINER_NAME = 'istio-proxy'[source]
KILL_ISTIO_PROXY_SUCCESS_MSG = 'HTTP/1.1 200'[source]
POD_CHECKED_KEY = 'already_checked'[source]
POST_TERMINATION_TIMEOUT = 120[source]
template_fields: collections.abc.Sequence[str] = ('image', 'cmds', 'annotations', 'arguments', 'env_vars', 'labels', 'config_file',...[source]
template_fields_renderers[source]
pod_manager()[source]
hook()[source]
client()[source]
find_pod(namespace, context, *, exclude_checked=True)[source]

Return an already-running pod for this task instance if one exists.

log_matching_pod(pod, context)[source]
get_or_create_pod(pod_request_obj, context)[source]
await_pod_start(pod)[source]
extract_xcom(pod)[source]

Retrieve xcom value and kill xcom sidecar container.

execute(context)[source]

Based on the deferrable parameter runs the pod asynchronously or synchronously.

execute_sync(context)[source]
await_init_containers_completion(pod)[source]
await_pod_completion(pod)[source]
execute_async(context)[source]
convert_config_file_to_dict()[source]

Convert passed config_file to dict representation.

invoke_defer_method(last_log_time=None)[source]

Redefine triggers which are being used in child classes.

trigger_reentry(context, event)[source]

Point of re-entry from trigger.

If logging_interval is None, then at this point, the pod should be done, and we’ll just fetch the logs and exit.

If logging_interval is not None, it could be that the pod is still running, and we’ll just grab the latest logs and defer back to the trigger again.

post_complete_action(*, pod, remote_pod, **kwargs)[source]

Actions that must be done after operator finishes logic of the deferrable_execution.

cleanup(pod, remote_pod)[source]
is_istio_enabled(pod)[source]

Check if istio is enabled for the namespace of the pod by inspecting the namespace labels.

kill_istio_sidecar(pod)[source]
process_pod_deletion(pod, *, reraise=True)[source]
patch_already_checked(pod, *, reraise=True)[source]

Add an “already checked” label to ensure we don’t reattach on retries.

on_kill()[source]

Override this method to clean up subprocesses when a task instance gets killed.

Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up, or it will leave ghost processes behind.

build_pod_request_obj(context=None)[source]

Return V1Pod object based on pod template file, full pod spec, and other operator parameters.

The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod template file.

dry_run()[source]

Print out the pod definition that would be created by this operator.

Does not include labels specific to the task instance (since there isn’t one in a dry_run) and excludes all empty elements.

process_duplicate_label_pods(pod_list)[source]

Patch or delete the existing pod with duplicate labels.

This is to handle an edge case that can happen only if reattach_on_restart flag is False, and the previous run attempt has failed because the task process has been killed externally by the cluster or another process.

If the task process is killed externally, it breaks the code execution and immediately exists the task. As a result the pod created in the previous attempt will not be properly deleted or patched by cleanup() method.

Return the newly created pod to be used for the next run attempt.

Was this entry helpful?