Source code for airflow.providers.google.cloud.hooks.bigtable

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Cloud Bigtable Hook."""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING

from google.cloud.bigtable import Client, enums
from google.cloud.bigtable.cluster import Cluster
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import ClusterState, Table

from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

if TYPE_CHECKING:
    import enum

    from google.cloud.bigtable.column_family import ColumnFamily, GarbageCollectionRule


[docs]class BigtableHook(GoogleBaseHook): """ Hook for Google Cloud Bigtable APIs. All the methods in the hook where project_id is used must be called with keyword arguments rather than positional. """ def __init__( self, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: super().__init__( gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs, ) self._client: Client | None = None def _get_client(self, project_id: str) -> Client: if not self._client: self._client = Client( project=project_id, credentials=self.get_credentials(), client_info=CLIENT_INFO, admin=True, ) return self._client @GoogleBaseHook.fallback_to_default_project_id
[docs] def get_instance(self, instance_id: str, project_id: str) -> Instance | None: """ Retrieve and returns the specified Cloud Bigtable instance if it exists, otherwise returns None. :param instance_id: The ID of the Cloud Bigtable instance. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. """ instance = self._get_client(project_id=project_id).instance(instance_id) if not instance.exists(): return None return instance
@GoogleBaseHook.fallback_to_default_project_id
[docs] def delete_instance(self, instance_id: str, project_id: str) -> None: """ Delete the specified Cloud Bigtable instance. Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does not exist. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param instance_id: The ID of the Cloud Bigtable instance. """ instance = self.get_instance(instance_id=instance_id, project_id=project_id) if instance: instance.delete() else: self.log.warning( "The instance '%s' does not exist in project '%s'. Exiting", instance_id, project_id )
@GoogleBaseHook.fallback_to_default_project_id
[docs] def create_instance( self, instance_id: str, main_cluster_id: str, main_cluster_zone: str, project_id: str, replica_clusters: list[dict[str, str]] | None = None, instance_display_name: str | None = None, instance_type: enums.Instance.Type = enums.Instance.Type.UNSPECIFIED, # type: ignore[assignment] instance_labels: dict | None = None, cluster_nodes: int | None = None, cluster_storage_type: enums.StorageType = enums.StorageType.UNSPECIFIED, # type: ignore[assignment] timeout: float | None = None, ) -> Instance: """ Create new instance. :param instance_id: The ID for the new instance. :param main_cluster_id: The ID for main cluster for the new instance. :param main_cluster_zone: The zone for main cluster. See https://cloud.google.com/bigtable/docs/locations for more details. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param replica_clusters: (optional) A list of replica clusters for the new instance. Each cluster dictionary contains an id and a zone. Example: [{"id": "replica-1", "zone": "us-west1-a"}] :param instance_type: (optional) The type of the instance. :param instance_display_name: (optional) Human-readable name of the instance. Defaults to ``instance_id``. :param instance_labels: (optional) Dictionary of labels to associate with the instance. :param cluster_nodes: (optional) Number of nodes for cluster. :param cluster_storage_type: (optional) The type of storage. :param timeout: (optional) timeout (in seconds) for instance creation. If None is not specified, Operator will wait indefinitely. """ instance = Instance( instance_id, self._get_client(project_id=project_id), instance_display_name, instance_type, instance_labels, ) cluster_kwargs = { "cluster_id": main_cluster_id, "location_id": main_cluster_zone, "default_storage_type": cluster_storage_type, } if instance_type != enums.Instance.Type.DEVELOPMENT and cluster_nodes: cluster_kwargs["serve_nodes"] = cluster_nodes clusters = [instance.cluster(**cluster_kwargs)] if replica_clusters: for replica_cluster in replica_clusters: if "id" in replica_cluster and "zone" in replica_cluster: clusters.append( instance.cluster( replica_cluster["id"], replica_cluster["zone"], cluster_nodes, cluster_storage_type, ) ) operation = instance.create(clusters=clusters) operation.result(timeout) return instance
@GoogleBaseHook.fallback_to_default_project_id
[docs] def update_instance( self, instance_id: str, project_id: str, instance_display_name: str | None = None, instance_type: enums.Instance.Type | enum.IntEnum | None = None, instance_labels: dict | None = None, timeout: float | None = None, ) -> Instance: """ Update an existing instance. :param instance_id: The ID for the existing instance. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param instance_display_name: (optional) Human-readable name of the instance. :param instance_type: (optional) The type of the instance. :param instance_labels: (optional) Dictionary of labels to associate with the instance. :param timeout: (optional) timeout (in seconds) for instance update. If None is not specified, Operator will wait indefinitely. """ instance = Instance( instance_id=instance_id, client=self._get_client(project_id=project_id), display_name=instance_display_name, instance_type=instance_type, labels=instance_labels, ) operation = instance.update() operation.result(timeout) return instance
@staticmethod
[docs] def create_table( instance: Instance, table_id: str, initial_split_keys: list | None = None, column_families: dict[str, GarbageCollectionRule] | None = None, ) -> None: """ Create the specified Cloud Bigtable table. Raises ``google.api_core.exceptions.AlreadyExists`` if the table exists. :param instance: The Cloud Bigtable instance that owns the table. :param table_id: The ID of the table to create in Cloud Bigtable. :param initial_split_keys: (Optional) A list of row keys in bytes to use to initially split the table. :param column_families: (Optional) A map of columns to create. The key is the column_id str, and the value is a :class:`google.cloud.bigtable.column_family.GarbageCollectionRule`. """ if column_families is None: column_families = {} if initial_split_keys is None: initial_split_keys = [] table = Table(table_id, instance) table.create(initial_split_keys, column_families)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None: """ Delete the specified table in Cloud Bigtable. Raises google.api_core.exceptions.NotFound if the table does not exist. :param instance_id: The ID of the Cloud Bigtable instance. :param table_id: The ID of the table in Cloud Bigtable. :param project_id: Optional, Google Cloud project ID where the BigTable exists. If set to None or missing, the default project_id from the Google Cloud connection is used. """ instance = self.get_instance(instance_id=instance_id, project_id=project_id) if instance is None: raise RuntimeError(f"Instance {instance_id} did not exist; unable to delete table {table_id}") table = instance.table(table_id=table_id) table.delete()
@staticmethod
[docs] def update_cluster(instance: Instance, cluster_id: str, nodes: int) -> None: """ Update number of nodes in the specified Cloud Bigtable cluster. Raises google.api_core.exceptions.NotFound if the cluster does not exist. :param instance: The Cloud Bigtable instance that owns the cluster. :param cluster_id: The ID of the cluster. :param nodes: The desired number of nodes. """ cluster = Cluster(cluster_id, instance) # "reload" is required to set location_id attribute on cluster. cluster.reload() cluster.serve_nodes = nodes cluster.update()
@staticmethod
[docs] def get_column_families_for_table(instance: Instance, table_id: str) -> dict[str, ColumnFamily]: """ Fetch Column Families for the specified table in Cloud Bigtable. :param instance: The Cloud Bigtable instance that owns the table. :param table_id: The ID of the table in Cloud Bigtable to fetch Column Families from. """ table = Table(table_id, instance) return table.list_column_families()
@staticmethod
[docs] def get_cluster_states_for_table(instance: Instance, table_id: str) -> dict[str, ClusterState]: """ Fetch Cluster States for the specified table in Cloud Bigtable. Raises google.api_core.exceptions.NotFound if the table does not exist. :param instance: The Cloud Bigtable instance that owns the table. :param table_id: The ID of the table in Cloud Bigtable to fetch Cluster States from. """ table = Table(table_id, instance) return table.get_cluster_states()

Was this entry helpful?