Source code for airflow.providers.vertica.hooks.vertica

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from collections.abc import Iterable, Mapping
from typing import Any, Callable, overload

from vertica_python import connect

from airflow.providers.common.sql.hooks.sql import DbApiHook, fetch_all_handler


[docs]def vertica_fetch_all_handler(cursor) -> list[tuple] | None: """ Replace the default DbApiHook fetch_all_handler in order to fix this issue https://github.com/apache/airflow/issues/32993. Returned value will not change after the initial call of fetch_all_handler, all the remaining code is here only to make vertica client throws error. With Vertica, if you run the following sql (with split_statements set to false): INSERT INTO MyTable (Key, Label) values (1, 'test 1'); INSERT INTO MyTable (Key, Label) values (1, 'test 2'); INSERT INTO MyTable (Key, Label) values (3, 'test 3'); each insert will have its own result set and if you don't try to fetch data of those result sets you won't detect error on the second insert. """ result = fetch_all_handler(cursor) # loop on all statement result sets to get errors if cursor.description is not None: while cursor.nextset(): if cursor.description is not None: row = cursor.fetchone() while row: row = cursor.fetchone() return result
[docs]class VerticaHook(DbApiHook): """ Interact with Vertica. This hook use a customized version of default fetch_all_handler named vertica_fetch_all_handler. """
[docs] conn_name_attr = "vertica_conn_id"
[docs] default_conn_name = "vertica_default"
[docs] conn_type = "vertica"
[docs] hook_name = "Vertica"
[docs] supports_autocommit = True
[docs] def get_conn(self) -> connect: """Return vertica connection object.""" conn = self.get_connection(self.vertica_conn_id) # type: ignore conn_config = { "user": conn.login, "password": conn.password or "", "database": conn.schema, "host": conn.host or "localhost", } if not conn.port: conn_config["port"] = 5433 else: conn_config["port"] = int(conn.port) bool_options = [ "connection_load_balance", "binary_transfer", "disable_copy_local", "request_complex_types", "use_prepared_statements", ] std_options = [ "session_label", "backup_server_node", "kerberos_host_name", "kerberos_service_name", "unicode_error", "workload", "ssl", ] conn_extra = conn.extra_dejson for bo in bool_options: if bo in conn_extra: conn_config[bo] = str(conn_extra[bo]).lower() in ["true", "on"] for so in std_options: if so in conn_extra: conn_config[so] = conn_extra[so] if "connection_timeout" in conn_extra: conn_config["connection_timeout"] = float(conn_extra["connection_timeout"]) if "log_level" in conn_extra: import logging log_lvl = conn_extra["log_level"] conn_config["log_path"] = None if isinstance(log_lvl, str): log_lvl = log_lvl.lower() if log_lvl == "critical": conn_config["log_level"] = logging.CRITICAL elif log_lvl == "error": conn_config["log_level"] = logging.ERROR elif log_lvl == "warning": conn_config["log_level"] = logging.WARNING elif log_lvl == "info": conn_config["log_level"] = logging.INFO elif log_lvl == "debug": conn_config["log_level"] = logging.DEBUG elif log_lvl == "notset": conn_config["log_level"] = logging.NOTSET else: conn_config["log_level"] = int(conn_extra["log_level"]) conn = connect(**conn_config) return conn
@overload # type: ignore[override]
[docs] def run( self, sql: str | Iterable[str], autocommit: bool = ..., parameters: Iterable | Mapping[str, Any] | None = ..., handler: None = ..., split_statements: bool = ..., return_last: bool = ..., ) -> None: ...
@overload def run( self, sql: str | Iterable[str], autocommit: bool = ..., parameters: Iterable | Mapping[str, Any] | None = ..., handler: Callable[[Any], Any] = ..., split_statements: bool = ..., return_last: bool = ..., ) -> Any | list[Any]: ... def run( self, sql: str | Iterable[str], autocommit: bool = False, parameters: Iterable | Mapping | None = None, handler: Callable[[Any], Any] | None = None, split_statements: bool = False, return_last: bool = True, ) -> Any | list[Any] | None: """ Overwrite the common sql run. Will automatically replace fetch_all_handler by vertica_fetch_all_handler. """ if handler == fetch_all_handler: handler = vertica_fetch_all_handler return DbApiHook.run(self, sql, autocommit, parameters, handler, split_statements, return_last)

Was this entry helpful?