Source code for airflow.providers.common.sql.datafusion.object_storage_provider

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datafusion.object_store import AmazonS3, LocalFileSystem

from airflow.providers.common.sql.config import ConnectionConfig, StorageType
from airflow.providers.common.sql.datafusion.base import ObjectStorageProvider
from airflow.providers.common.sql.datafusion.exceptions import ObjectStoreCreationException


[docs] class S3ObjectStorageProvider(ObjectStorageProvider): """S3 Object Storage Provider using DataFusion's AmazonS3.""" @property
[docs] def get_storage_type(self) -> StorageType: """Return the storage type.""" return StorageType.S3
[docs] def create_object_store(self, path: str, connection_config: ConnectionConfig | None = None): """Create an S3 object store using DataFusion's AmazonS3.""" if connection_config is None: raise ValueError("connection_config must be provided for %s", self.get_storage_type) try: credentials = connection_config.credentials bucket = self.get_bucket(path) s3_store = AmazonS3(**credentials, **connection_config.extra_config, bucket_name=bucket) self.log.info("Created S3 object store for bucket %s", bucket) return s3_store except Exception as e: raise ObjectStoreCreationException(f"Failed to create S3 object store: {e}")
[docs] def get_scheme(self) -> str: """Return the scheme for S3.""" return "s3://"
[docs] class LocalObjectStorageProvider(ObjectStorageProvider): """Local Object Storage Provider using DataFusion's LocalFileSystem.""" @property
[docs] def get_storage_type(self) -> StorageType: """Return the storage type.""" return StorageType.LOCAL
[docs] def create_object_store(self, path: str, connection_config: ConnectionConfig | None = None): """Create a Local object store.""" return LocalFileSystem()
[docs] def get_scheme(self) -> str: """Return the scheme to a Local file system.""" return "file://"
[docs] def get_object_storage_provider(storage_type: StorageType) -> ObjectStorageProvider: """Get an object storage provider based on the storage type.""" # TODO: Add support for GCS, Azure, HTTP: https://datafusion.apache.org/python/autoapi/datafusion/object_store/index.html providers: dict[StorageType, type] = { StorageType.S3: S3ObjectStorageProvider, StorageType.LOCAL: LocalObjectStorageProvider, } if storage_type not in providers: raise ValueError( f"Unsupported storage type: {storage_type}. Supported types: {list(providers.keys())}" ) provider_class = providers[storage_type] return provider_class()

Was this entry helpful?