Source code for airflow.providers.amazon.aws.operators.opensearch_serverless

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Amazon OpenSearch Serverless operators."""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Literal

from botocore.exceptions import ClientError

from airflow.providers.amazon.aws.hooks.opensearch_serverless import OpenSearchServerlessHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
from airflow.utils.helpers import prune_dict

if TYPE_CHECKING:
    from airflow.sdk import Context


[docs] class OpenSearchServerlessCreateCollectionOperator(AwsBaseOperator[OpenSearchServerlessHook]): """ Create an Amazon OpenSearch Serverless collection. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:OpenSearchServerlessCreateCollectionOperator` :param collection_name: The name of the collection. (templated) :param collection_type: The type of collection (SEARCH, TIMESERIES, VECTORSEARCH). (templated) :param description: Optional description. (templated) :param standby_replicas: Whether to use standby replicas (ENABLED or DISABLED). :param tags: Optional list of tag dicts. :param if_exists: Behavior when the collection already exists. ``"fail"`` raises an error, ``"skip"`` logs and returns. """
[docs] aws_hook_class = OpenSearchServerlessHook
[docs] template_fields: Sequence[str] = aws_template_fields("collection_name", "collection_type", "description")
def __init__( self, *, collection_name: str, collection_type: str = "SEARCH", description: str | None = None, standby_replicas: str | None = None, tags: list[dict[str, str]] | None = None, if_exists: Literal["fail", "skip"] = "skip", **kwargs, ) -> None: super().__init__(**kwargs)
[docs] self.collection_name = collection_name
[docs] self.collection_type = collection_type
[docs] self.description = description
[docs] self.standby_replicas = standby_replicas
[docs] self.tags = tags
[docs] self.if_exists = if_exists
[docs] def execute(self, context: Context) -> str: self.log.info("Creating OpenSearch Serverless collection %s", self.collection_name) kwargs: dict[str, Any] = prune_dict( { "name": self.collection_name, "type": self.collection_type, "description": self.description, "standbyReplicas": self.standby_replicas, "tags": self.tags, } ) try: response = self.hook.conn.create_collection(**kwargs) collection_id = response["createCollectionDetail"]["id"] except ClientError as e: if e.response["Error"]["Code"] == "ConflictException" and self.if_exists == "skip": self.log.info("Collection %s already exists, skipping.", self.collection_name) collections = self.hook.conn.batch_get_collection(names=[self.collection_name]) details = collections.get("collectionDetails", []) if not details: raise RuntimeError( f"Collection {self.collection_name} reported as existing but not found" ) collection_id = details[0]["id"] else: raise self.log.info("Collection %s: %s", self.collection_name, collection_id) return collection_id

Was this entry helpful?