Source code for airflow.providers.tableau.operators.tableau

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.tableau.hooks.tableau import (
    TableauHook,
    TableauJobFailedException,
    TableauJobFinishCode,
)

if TYPE_CHECKING:
    from airflow.utils.context import Context

[docs]RESOURCES_METHODS = { "datasources": ["delete", "refresh"], "groups": ["delete"], "projects": ["delete"], "schedule": ["delete"], "sites": ["delete"], "subscriptions": ["delete"], "tasks": ["delete", "run"], "users": ["remove"], "workbooks": ["delete", "refresh"], }
[docs]class TableauOperator(BaseOperator): """ Execute a Tableau API Resource. https://tableau.github.io/server-client-python/docs/api-ref .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:TableauOperator` :param resource: The name of the resource to use. :param method: The name of the resource's method to execute. :param find: The reference of resource that will receive the action. :param match_with: The resource field name to be matched with find parameter. :param site_id: The id of the site where the workbook belongs to. :param blocking_refresh: By default will be blocking means it will wait until it has finished. :param check_interval: time in seconds that the job should wait in between each instance state checks until operation is completed :param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>` containing the credentials to authenticate to the Tableau Server. """
[docs] template_fields: Sequence[str] = ( "find", "match_with", )
def __init__( self, *, resource: str, method: str, find: str, match_with: str = "id", site_id: str | None = None, blocking_refresh: bool = True, check_interval: float = 20, tableau_conn_id: str = "tableau_default", **kwargs, ) -> None: super().__init__(**kwargs) self.resource = resource self.method = method self.find = find self.match_with = match_with self.check_interval = check_interval self.site_id = site_id self.blocking_refresh = blocking_refresh self.tableau_conn_id = tableau_conn_id
[docs] def execute(self, context: Context) -> str: """ Execute the Tableau API resource and push the job id or downloaded file URI to xcom. :param context: The task context during execution. :return: the id of the job that executes the extract refresh or downloaded file URI. """ available_resources = RESOURCES_METHODS.keys() if self.resource not in available_resources: error_message = f"Resource not found! Available Resources: {available_resources}" raise AirflowException(error_message) available_methods = RESOURCES_METHODS[self.resource] if self.method not in available_methods: error_message = f"Method not found! Available methods for {self.resource}: {available_methods}" raise AirflowException(error_message) with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook: resource = getattr(tableau_hook.server, self.resource) method = getattr(resource, self.method) resource_id = self._get_resource_id(tableau_hook) response = method(resource_id) job_id = response.id if self.method == "refresh": if self.blocking_refresh: if not tableau_hook.wait_for_state( job_id=job_id, check_interval=self.check_interval, target_state=TableauJobFinishCode.SUCCESS, ): raise TableauJobFailedException(f"The Tableau Refresh {self.resource} Job failed!") return job_id
def _get_resource_id(self, tableau_hook: TableauHook) -> str: if self.match_with == "id": return self.find for resource in tableau_hook.get_all(resource_name=self.resource): if getattr(resource, self.match_with) == self.find: resource_id = resource.id self.log.info("Found matching with id %s", resource_id) return resource_id raise AirflowException(f"{self.resource} with {self.match_with} {self.find} not found!")

Was this entry helpful?