# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Backwards compatibility for Pod generation.
This module provides an interface between the previous Pod
API and outputs a kubernetes.client.models.V1Pod.
The advantage being that the full Kubernetes API
is supported and no serialization need be written.
"""
from __future__ import annotations
import copy
import uuid
import re2
from kubernetes.client import models as k8s
[docs]class PodDefaults:
"""Static defaults for Pods."""
[docs] XCOM_MOUNT_PATH = "/airflow/xcom"
[docs] SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar"
[docs] XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
[docs] VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH)
[docs] VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource())
[docs] SIDECAR_CONTAINER = k8s.V1Container(
name=SIDECAR_CONTAINER_NAME,
command=["sh", "-c", XCOM_CMD],
image="alpine",
volume_mounts=[VOLUME_MOUNT],
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "1m",
}
),
)
[docs]def make_safe_label_value(string):
"""
Normalize a provided label to be of valid length and characters.
Valid label values must be 63 characters or less and must be empty or begin and
end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),
dots (.), and alphanumerics between.
If the label value is greater than 63 chars once made safe, or differs in any
way from the original value sent to this function, then we need to truncate to
53 chars, and append it with a unique hash.
"""
from airflow.utils.hashlib_wrapper import md5
safe_label = re2.sub(r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$", "", string)
if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
safe_hash = md5(string.encode()).hexdigest()[:9]
safe_label = safe_label[: MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
return safe_label
[docs]class PodGenerator:
"""
Contains Kubernetes Airflow Worker configuration logic.
Represents a kubernetes pod and manages execution of a single pod.
Any configuration that is container specific gets applied to
the first container in the list of containers.
:param image: The docker image
:param name: name in the metadata section (not the container name)
:param namespace: pod namespace
:param volume_mounts: list of kubernetes volumes mounts
:param envs: A dict containing the environment variables
:param cmds: The command to be run on the first container
:param args: The arguments to be run on the pod
:param labels: labels for the pod metadata
:param node_selectors: node selectors for the pod
:param ports: list of ports. Applies to the first container.
:param volumes: Volumes to be attached to the first container
:param image_pull_policy: Specify a policy to cache or always pull an image
:param restart_policy: The restart policy of the pod
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a comma separated list:
secret_a,secret_b
:param init_containers: A list of init containers
:param service_account_name: Identity for processes that run in a Pod
:param resources: Resource requirements for the first containers
:param annotations: annotations for the pod
:param affinity: A dict containing a group of affinity scheduling rules
:param hostnetwork: If True enable host networking on the pod
:param tolerations: A list of kubernetes tolerations
:param security_context: A dict containing the security context for the pod
:param configmaps: Any configmap refs to read ``configmaps`` for environments from.
If more than one configmap is required, provide a comma separated list
configmap_a,configmap_b
:param dnspolicy: Specify a dnspolicy for the pod
:param schedulername: Specify a schedulername for the pod
:param pod: The fully specified pod. Mutually exclusive with `path_or_string`
:param extract_xcom: Whether to bring up a container for xcom
:param priority_class_name: priority class name for the launched Pod
"""
def __init__(
self,
image: str | None = None,
name: str | None = None,
namespace: str | None = None,
volume_mounts: list[k8s.V1VolumeMount | dict] | None = None,
envs: dict[str, str] | None = None,
cmds: list[str] | None = None,
args: list[str] | None = None,
labels: dict[str, str] | None = None,
node_selectors: dict[str, str] | None = None,
ports: list[k8s.V1ContainerPort | dict] | None = None,
volumes: list[k8s.V1Volume | dict] | None = None,
image_pull_policy: str | None = None,
restart_policy: str | None = None,
image_pull_secrets: str | None = None,
init_containers: list[k8s.V1Container] | None = None,
service_account_name: str | None = None,
resources: k8s.V1ResourceRequirements | dict | None = None,
annotations: dict[str, str] | None = None,
affinity: dict | None = None,
hostnetwork: bool = False,
tolerations: list | None = None,
security_context: k8s.V1PodSecurityContext | dict | None = None,
configmaps: list[str] | None = None,
dnspolicy: str | None = None,
schedulername: str | None = None,
extract_xcom: bool = False,
priority_class_name: str | None = None,
):
self.pod = k8s.V1Pod()
self.pod.api_version = "v1"
self.pod.kind = "Pod"
# Pod Metadata
self.metadata = k8s.V1ObjectMeta()
self.metadata.labels = labels
self.metadata.name = name
self.metadata.namespace = namespace
self.metadata.annotations = annotations
# Pod Container
self.container = k8s.V1Container(name="base")
self.container.image = image
self.container.env = []
if envs:
if isinstance(envs, dict):
for key, val in envs.items():
self.container.env.append(k8s.V1EnvVar(name=key, value=val))
elif isinstance(envs, list):
self.container.env.extend(envs)
configmaps = configmaps or []
self.container.env_from = []
for configmap in configmaps:
self.container.env_from.append(
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))
)
self.container.command = cmds or []
self.container.args = args or []
if image_pull_policy:
self.container.image_pull_policy = image_pull_policy
self.container.ports = ports or []
self.container.resources = resources
self.container.volume_mounts = volume_mounts or []
# Pod Spec
self.spec = k8s.V1PodSpec(containers=[])
self.spec.security_context = security_context
self.spec.tolerations = tolerations
if dnspolicy:
self.spec.dns_policy = dnspolicy
self.spec.scheduler_name = schedulername
self.spec.host_network = hostnetwork
self.spec.affinity = affinity
self.spec.service_account_name = service_account_name
self.spec.init_containers = init_containers
self.spec.volumes = volumes or []
self.spec.node_selector = node_selectors
if restart_policy:
self.spec.restart_policy = restart_policy
self.spec.priority_class_name = priority_class_name
self.spec.image_pull_secrets = []
if image_pull_secrets:
for image_pull_secret in image_pull_secrets.split(","):
self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(name=image_pull_secret))
# Attach sidecar
self.extract_xcom = extract_xcom
[docs] def gen_pod(self) -> k8s.V1Pod:
"""Generate pod."""
result = None
if result is None:
result = self.pod
result.spec = self.spec
result.metadata = self.metadata
result.spec.containers = [self.container]
result.metadata.name = self.make_unique_pod_id(result.metadata.name)
if self.extract_xcom:
result = self.add_sidecar(result)
return result
@staticmethod
[docs] def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
"""Add sidecar."""
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
return pod_cp
@staticmethod
[docs] def from_obj(obj) -> k8s.V1Pod | None:
"""Convert to pod from obj."""
if obj is None:
return None
if isinstance(obj, PodGenerator):
return obj.gen_pod()
if not isinstance(obj, dict):
raise TypeError(
"Cannot convert a non-dictionary or non-PodGenerator "
"object into a KubernetesExecutorConfig"
)
# We do not want to extract constant here from ExecutorLoader because it is just
# A name in dictionary rather than executor selection mechanism and it causes cyclic import
namespaced = obj.get("KubernetesExecutor", {})
if not namespaced:
return None
resources = namespaced.get("resources")
if resources is None:
requests = {
"cpu": namespaced.get("request_cpu"),
"memory": namespaced.get("request_memory"),
"ephemeral-storage": namespaced.get("ephemeral-storage"),
}
limits = {
"cpu": namespaced.get("limit_cpu"),
"memory": namespaced.get("limit_memory"),
"ephemeral-storage": namespaced.get("ephemeral-storage"),
}
all_resources = list(requests.values()) + list(limits.values())
if all(r is None for r in all_resources):
resources = None
else:
resources = k8s.V1ResourceRequirements(requests=requests, limits=limits)
namespaced["resources"] = resources
return PodGenerator(**namespaced).gen_pod()
@staticmethod
[docs] def make_unique_pod_id(dag_id):
r"""
Generate a unique Pod name.
Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
:param dag_id: a dag_id with only alphanumeric characters
:return: ``str`` valid Pod name of appropriate length
"""
if not dag_id:
return None
safe_uuid = uuid.uuid4().hex
safe_pod_id = dag_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid
return safe_pod_id