#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Cloud Bigtable Hook."""
from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING
from google.cloud.bigtable import Client, enums
from google.cloud.bigtable.cluster import Cluster
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import ClusterState, Table
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
if TYPE_CHECKING:
import enum
from google.cloud.bigtable.column_family import ColumnFamily, GarbageCollectionRule
[docs]class BigtableHook(GoogleBaseHook):
"""
Hook for Google Cloud Bigtable APIs.
All the methods in the hook where project_id is used must be called with
keyword arguments rather than positional.
"""
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
**kwargs,
)
self._client: Client | None = None
def _get_client(self, project_id: str) -> Client:
if not self._client:
self._client = Client(
project=project_id,
credentials=self.get_credentials(),
client_info=CLIENT_INFO,
admin=True,
)
return self._client
@GoogleBaseHook.fallback_to_default_project_id
[docs] def get_instance(self, instance_id: str, project_id: str) -> Instance | None:
"""
Retrieve and returns the specified Cloud Bigtable instance if it exists, otherwise returns None.
:param instance_id: The ID of the Cloud Bigtable instance.
:param project_id: Optional, Google Cloud project ID where the
BigTable exists. If set to None or missing,
the default project_id from the Google Cloud connection is used.
"""
instance = self._get_client(project_id=project_id).instance(instance_id)
if not instance.exists():
return None
return instance
@GoogleBaseHook.fallback_to_default_project_id
[docs] def delete_instance(self, instance_id: str, project_id: str) -> None:
"""
Delete the specified Cloud Bigtable instance.
Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does
not exist.
:param project_id: Optional, Google Cloud project ID where the
BigTable exists. If set to None or missing,
the default project_id from the Google Cloud connection is used.
:param instance_id: The ID of the Cloud Bigtable instance.
"""
instance = self.get_instance(instance_id=instance_id, project_id=project_id)
if instance:
instance.delete()
else:
self.log.warning(
"The instance '%s' does not exist in project '%s'. Exiting", instance_id, project_id
)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def create_instance(
self,
instance_id: str,
main_cluster_id: str,
main_cluster_zone: str,
project_id: str,
replica_clusters: list[dict[str, str]] | None = None,
instance_display_name: str | None = None,
instance_type: enums.Instance.Type = enums.Instance.Type.UNSPECIFIED, # type: ignore[assignment]
instance_labels: dict | None = None,
cluster_nodes: int | None = None,
cluster_storage_type: enums.StorageType = enums.StorageType.UNSPECIFIED, # type: ignore[assignment]
timeout: float | None = None,
) -> Instance:
"""
Create new instance.
:param instance_id: The ID for the new instance.
:param main_cluster_id: The ID for main cluster for the new instance.
:param main_cluster_zone: The zone for main cluster.
See https://cloud.google.com/bigtable/docs/locations for more details.
:param project_id: Optional, Google Cloud project ID where the
BigTable exists. If set to None or missing,
the default project_id from the Google Cloud connection is used.
:param replica_clusters: (optional) A list of replica clusters for the new
instance. Each cluster dictionary contains an id and a zone.
Example: [{"id": "replica-1", "zone": "us-west1-a"}]
:param instance_type: (optional) The type of the instance.
:param instance_display_name: (optional) Human-readable name of the instance.
Defaults to ``instance_id``.
:param instance_labels: (optional) Dictionary of labels to associate with the
instance.
:param cluster_nodes: (optional) Number of nodes for cluster.
:param cluster_storage_type: (optional) The type of storage.
:param timeout: (optional) timeout (in seconds) for instance creation.
If None is not specified, Operator will wait indefinitely.
"""
instance = Instance(
instance_id,
self._get_client(project_id=project_id),
instance_display_name,
instance_type,
instance_labels,
)
cluster_kwargs = {
"cluster_id": main_cluster_id,
"location_id": main_cluster_zone,
"default_storage_type": cluster_storage_type,
}
if instance_type != enums.Instance.Type.DEVELOPMENT and cluster_nodes:
cluster_kwargs["serve_nodes"] = cluster_nodes
clusters = [instance.cluster(**cluster_kwargs)]
if replica_clusters:
for replica_cluster in replica_clusters:
if "id" in replica_cluster and "zone" in replica_cluster:
clusters.append(
instance.cluster(
replica_cluster["id"],
replica_cluster["zone"],
cluster_nodes,
cluster_storage_type,
)
)
operation = instance.create(clusters=clusters)
operation.result(timeout)
return instance
@GoogleBaseHook.fallback_to_default_project_id
[docs] def update_instance(
self,
instance_id: str,
project_id: str,
instance_display_name: str | None = None,
instance_type: enums.Instance.Type | enum.IntEnum | None = None,
instance_labels: dict | None = None,
timeout: float | None = None,
) -> Instance:
"""
Update an existing instance.
:param instance_id: The ID for the existing instance.
:param project_id: Optional, Google Cloud project ID where the
BigTable exists. If set to None or missing,
the default project_id from the Google Cloud connection is used.
:param instance_display_name: (optional) Human-readable name of the instance.
:param instance_type: (optional) The type of the instance.
:param instance_labels: (optional) Dictionary of labels to associate with the
instance.
:param timeout: (optional) timeout (in seconds) for instance update.
If None is not specified, Operator will wait indefinitely.
"""
instance = Instance(
instance_id=instance_id,
client=self._get_client(project_id=project_id),
display_name=instance_display_name,
instance_type=instance_type,
labels=instance_labels,
)
operation = instance.update()
operation.result(timeout)
return instance
@staticmethod
[docs] def create_table(
instance: Instance,
table_id: str,
initial_split_keys: list | None = None,
column_families: dict[str, GarbageCollectionRule] | None = None,
) -> None:
"""
Create the specified Cloud Bigtable table.
Raises ``google.api_core.exceptions.AlreadyExists`` if the table exists.
:param instance: The Cloud Bigtable instance that owns the table.
:param table_id: The ID of the table to create in Cloud Bigtable.
:param initial_split_keys: (Optional) A list of row keys in bytes to use to
initially split the table.
:param column_families: (Optional) A map of columns to create. The key is the
column_id str, and the value is a
:class:`google.cloud.bigtable.column_family.GarbageCollectionRule`.
"""
if column_families is None:
column_families = {}
if initial_split_keys is None:
initial_split_keys = []
table = Table(table_id, instance)
table.create(initial_split_keys, column_families)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None:
"""
Delete the specified table in Cloud Bigtable.
Raises google.api_core.exceptions.NotFound if the table does not exist.
:param instance_id: The ID of the Cloud Bigtable instance.
:param table_id: The ID of the table in Cloud Bigtable.
:param project_id: Optional, Google Cloud project ID where the
BigTable exists. If set to None or missing,
the default project_id from the Google Cloud connection is used.
"""
instance = self.get_instance(instance_id=instance_id, project_id=project_id)
if instance is None:
raise RuntimeError(f"Instance {instance_id} did not exist; unable to delete table {table_id}")
table = instance.table(table_id=table_id)
table.delete()
@staticmethod
[docs] def update_cluster(instance: Instance, cluster_id: str, nodes: int) -> None:
"""
Update number of nodes in the specified Cloud Bigtable cluster.
Raises google.api_core.exceptions.NotFound if the cluster does not exist.
:param instance: The Cloud Bigtable instance that owns the cluster.
:param cluster_id: The ID of the cluster.
:param nodes: The desired number of nodes.
"""
cluster = Cluster(cluster_id, instance)
# "reload" is required to set location_id attribute on cluster.
cluster.reload()
cluster.serve_nodes = nodes
cluster.update()
@staticmethod
[docs] def get_column_families_for_table(instance: Instance, table_id: str) -> dict[str, ColumnFamily]:
"""
Fetch Column Families for the specified table in Cloud Bigtable.
:param instance: The Cloud Bigtable instance that owns the table.
:param table_id: The ID of the table in Cloud Bigtable to fetch Column Families
from.
"""
table = Table(table_id, instance)
return table.list_column_families()
@staticmethod
[docs] def get_cluster_states_for_table(instance: Instance, table_id: str) -> dict[str, ClusterState]:
"""
Fetch Cluster States for the specified table in Cloud Bigtable.
Raises google.api_core.exceptions.NotFound if the table does not exist.
:param instance: The Cloud Bigtable instance that owns the table.
:param table_id: The ID of the table in Cloud Bigtable to fetch Cluster States
from.
"""
table = Table(table_id, instance)
return table.get_cluster_states()