Source code for airflow.providers.slack.transfers.base_sql_to_slack

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator

if TYPE_CHECKING:
    import pandas as pd
    from slack_sdk.http_retry import RetryHandler

    from airflow.providers.common.sql.hooks.sql import DbApiHook


[docs]class BaseSqlToSlackOperator(BaseOperator): """ Operator implements base sql methods for SQL to Slack Transfer operators. :param sql: The SQL query to be executed :param sql_conn_id: reference to a specific DB-API Connection. :param sql_hook_params: Extra config params to be passed to the underlying hook. Should match the desired hook constructor params. :param parameters: The parameters to pass to the SQL query. :param slack_proxy: Proxy to make the Slack Incoming Webhook / API calls. Optional :param slack_timeout: The maximum number of seconds the client will wait to connect and receive a response from Slack. Optional :param slack_retry_handlers: List of handlers to customize retry logic. Optional """ def __init__( self, *, sql: str, sql_conn_id: str, sql_hook_params: dict | None = None, parameters: list | tuple | Mapping[str, Any] | None = None, slack_proxy: str | None = None, slack_timeout: int | None = None, slack_retry_handlers: list[RetryHandler] | None = None, **kwargs, ): super().__init__(**kwargs) self.sql_conn_id = sql_conn_id self.sql_hook_params = sql_hook_params self.sql = sql self.parameters = parameters self.slack_proxy = slack_proxy self.slack_timeout = slack_timeout self.slack_retry_handlers = slack_retry_handlers def _get_hook(self) -> DbApiHook: self.log.debug("Get connection for %s", self.sql_conn_id) conn = BaseHook.get_connection(self.sql_conn_id) hook = conn.get_hook(hook_params=self.sql_hook_params) if not callable(getattr(hook, "get_pandas_df", None)): raise AirflowException( "This hook is not supported. The hook class must have get_pandas_df method." ) return hook def _get_query_results(self) -> pd.DataFrame: sql_hook = self._get_hook() self.log.info("Running SQL query: %s", self.sql) df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters) return df

Was this entry helpful?