Source code for airflow.providers.amazon.aws.executors.aws_lambda.utils
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import datetime
from collections.abc import Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from airflow.providers.amazon.aws.executors.utils.base_config_keys import BaseConfigKeys
if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
[docs]
CONFIG_GROUP_NAME = "aws_lambda_executor"
[docs]
INVALID_CREDENTIALS_EXCEPTIONS = [
"ExpiredTokenException",
"InvalidClientTokenId",
"UnrecognizedClientException",
]
@dataclass
[docs]
class LambdaQueuedTask:
"""Represents a Lambda task that is queued. The task will be run in the next heartbeat."""
[docs]
executor_config: ExecutorConfigType
[docs]
next_attempt_time: datetime.datetime
[docs]
class InvokeLambdaKwargsConfigKeys(BaseConfigKeys):
"""Config keys loaded which are valid lambda invoke args."""
[docs]
FUNCTION_NAME = "function_name"
[docs]
QUALIFIER = "function_qualifier"
[docs]
class AllLambdaConfigKeys(InvokeLambdaKwargsConfigKeys):
"""All config keys which are related to the Lambda Executor."""
[docs]
AWS_CONN_ID = "conn_id"
[docs]
CHECK_HEALTH_ON_STARTUP = "check_health_on_startup"
[docs]
MAX_INVOKE_ATTEMPTS = "max_run_task_attempts"
[docs]
REGION_NAME = "region_name"
[docs]
QUEUE_URL = "queue_url"
[docs]
DLQ_URL = "dead_letter_queue_url"
[docs]
END_WAIT_TIMEOUT = "end_wait_timeout"
[docs]
CommandType = Sequence[str]
[docs]
ExecutorConfigType = dict[str, Any]