# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Any
from airflow.exceptions import AirflowException
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
if TYPE_CHECKING:
from msgraph_core import APIVersion
[docs]class PowerBIDatasetRefreshFields(Enum):
"""Power BI refresh dataset details."""
[docs] REQUEST_ID = "request_id"
[docs]class PowerBIDatasetRefreshStatus:
"""Power BI refresh dataset statuses."""
[docs] IN_PROGRESS = "In Progress"
[docs] COMPLETED = "Completed"
[docs] TERMINAL_STATUSES = {FAILED, COMPLETED}
[docs] FAILURE_STATUSES = {FAILED, DISABLED}
[docs]class PowerBIDatasetRefreshException(AirflowException):
"""An exception that indicates a dataset refresh failed to complete."""
[docs]class PowerBIHook(KiotaRequestAdapterHook):
"""
A async hook to interact with Power BI.
:param conn_id: The Power BI connection id.
"""
[docs] conn_type: str = "powerbi"
[docs] conn_name_attr: str = "conn_id"
[docs] default_conn_name: str = "powerbi_default"
[docs] hook_name: str = "Power BI"
def __init__(
self,
conn_id: str = default_conn_name,
proxies: dict | None = None,
timeout: float = 60 * 60 * 24 * 7,
api_version: APIVersion | str | None = None,
):
super().__init__(
conn_id=conn_id,
proxies=proxies,
timeout=timeout,
host="https://api.powerbi.com",
scopes=["https://analysis.windows.net/powerbi/api/.default"],
api_version=api_version,
)
@classmethod
@classmethod
[docs] def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": ["schema", "port", "host", "extra"],
"relabeling": {
"login": "Client ID",
"password": "Client Secret",
},
}
[docs] async def get_refresh_history(
self,
dataset_id: str,
group_id: str,
) -> list[dict[str, str]]:
"""
Retrieve the refresh history of the specified dataset from the given group ID.
:param dataset_id: The dataset ID.
:param group_id: The workspace ID.
:return: Dictionary containing all the refresh histories of the dataset.
"""
try:
response = await self.run(
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
path_parameters={
"group_id": group_id,
"dataset_id": dataset_id,
},
)
refresh_histories = response.get("value")
return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories]
except AirflowException:
raise PowerBIDatasetRefreshException("Failed to retrieve refresh history")
@classmethod
[docs] def raw_to_refresh_details(cls, refresh_details: dict) -> dict[str, str]:
"""
Convert raw refresh details into a dictionary containing required fields.
:param refresh_details: Raw object of refresh details.
"""
return {
PowerBIDatasetRefreshFields.REQUEST_ID.value: str(refresh_details.get("requestId")),
PowerBIDatasetRefreshFields.STATUS.value: (
"In Progress"
if str(refresh_details.get("status")) == "Unknown"
else str(refresh_details.get("status"))
),
PowerBIDatasetRefreshFields.ERROR.value: str(refresh_details.get("serviceExceptionJson")),
}
[docs] async def get_refresh_details_by_refresh_id(
self, dataset_id: str, group_id: str, refresh_id: str
) -> dict[str, str]:
"""
Get the refresh details of the given request Id.
:param refresh_id: Request Id of the Dataset refresh.
"""
refresh_histories = await self.get_refresh_history(dataset_id=dataset_id, group_id=group_id)
if len(refresh_histories) == 0:
raise PowerBIDatasetRefreshException(
f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}"
)
refresh_ids = [
refresh_history.get(PowerBIDatasetRefreshFields.REQUEST_ID.value)
for refresh_history in refresh_histories
]
if refresh_id not in refresh_ids:
raise PowerBIDatasetRefreshException(
f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}"
)
refresh_details = refresh_histories[refresh_ids.index(refresh_id)]
return refresh_details
[docs] async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str) -> str:
"""
Triggers a refresh for the specified dataset from the given group id.
:param dataset_id: The dataset id.
:param group_id: The workspace id.
:return: Request id of the dataset refresh request.
"""
try:
response = await self.run(
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
method="POST",
path_parameters={
"group_id": group_id,
"dataset_id": dataset_id,
},
)
request_id = response.get("requestid")
return request_id
except AirflowException:
raise PowerBIDatasetRefreshException("Failed to trigger dataset refresh.")
[docs] async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None:
"""
Cancel the dataset refresh.
:param dataset_id: The dataset Id.
:param group_id: The workspace Id.
:param dataset_refresh_id: The dataset refresh Id.
"""
await self.run(
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
response_type=None,
path_parameters={
"group_id": group_id,
"dataset_id": dataset_id,
"dataset_refresh_id": dataset_refresh_id,
},
method="DELETE",
)