Source code for airflow.providers.clickhousedb.hooks.clickhouse

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any
from urllib.parse import quote_plus

from airflow.providers.common.sql.hooks.sql import DbApiHook

if TYPE_CHECKING:
    import clickhouse_connect.driver.client
    from clickhouse_connect.dbapi.cursor import Cursor as ClickHouseDbApiCursor

# Optional scalar parameters forwarded verbatim to clickhouse_connect.get_client()
# Note: client_name is handled separately by _build_client_name() so Airflow version
# info is always embedded in the HTTP User-Agent / system.query_log entry.
_OPTIONAL_CLIENT_KWARGS = (
    "connect_timeout",
    "send_receive_timeout",
    "compress",
)

# kwargs that the hook always manages itself.  User-supplied client_kwargs that
# overlap with these keys are dropped and logged at DEBUG so the hook's own values always win.
_HOOK_MANAGED_KWARGS: frozenset[str] = frozenset(
    {
        "host",
        "port",
        "username",
        "password",
        "database",
        "secure",
        "verify",
        "client_name",
        "settings",
    }
)


def _build_client_name(custom: str | None = None) -> str:
    """
    Return the ``client_name`` string passed to ``clickhouse_connect.get_client()``.

    The value is prepended to the HTTP ``User-Agent`` header that ClickHouse records
    in ``system.query_log``, so every query is traceable back to its Airflow source.

    Format (no custom label)::

        apache-airflow/<airflow_version> apache-airflow-providers-clickhousedb/<provider_version>

    Format (with the ``client_name`` extra field set to ``"my-pipeline"``)::

        apache-airflow/<airflow_version> apache-airflow-providers-clickhousedb/<provider_version> (my-pipeline)

    The ``clickhouse_connect`` library appends its own token and OS information, so
    the full ``User-Agent`` looks like::

        apache-airflow/X.Y.Z apache-airflow-providers-clickhousedb/X.Y.Z
        clickhouse-connect/X.Y.Z (lv:py/X.Y.Z; mode:sync; os:linux)
    """
    import airflow
    from airflow.providers.clickhousedb import __version__ as provider_version

    name = f"apache-airflow/{airflow.__version__} apache-airflow-providers-clickhousedb/{provider_version}"
    if custom:
        name = f"{name} ({custom.strip()})"
    return name


[docs] class ClickHouseConnection: """ Minimal DB-API 2.0 connection adapter wrapping a ``clickhouse_connect`` Client. SQL execution is delegated to ``clickhouse_connect.dbapi.cursor.Cursor``, which routes each statement to ``client.query()`` or ``client.command()`` automatically by inspecting the SQL keyword after stripping comments — the same logic used by the ``clickhouse-connect`` SQLAlchemy dialect. ClickHouse has no multi-statement transactions. Every statement is effectively auto-committed, so ``commit()`` and ``rollback()`` are intentional no-ops and the ``autocommit`` attribute is always ``True``. ``DbApiHook.run()`` checks ``conn.autocommit`` via :meth:`ClickHouseHook.get_autocommit` and skips the ``conn.commit()`` call when it is truthy. """ # Signals to DbApiHook.get_autocommit() that no explicit commit is needed.
[docs] autocommit: bool = True
def __init__(self, client: clickhouse_connect.driver.client.Client) -> None: self._client = client
[docs] def cursor(self) -> ClickHouseDbApiCursor: from clickhouse_connect.dbapi.cursor import Cursor return Cursor(self._client)
[docs] def close(self) -> None: self._client.close()
[docs] def commit(self) -> None: pass # ClickHouse has no multi-statement transactions
[docs] def rollback(self) -> None: pass # ClickHouse has no multi-statement transactions
[docs] class ClickHouseHook(DbApiHook): """ Interact with ClickHouse via the HTTP interface (``clickhouse-connect``). This hook wraps ``clickhouse_connect.get_client()`` behind a thin DB-API 2.0 adapter (:class:`ClickHouseConnection`), so all standard :class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator` features work out of the box (templating, ``handler``, ``split_statements``, etc.). :param database: Optional database name. Overrides the ``schema`` field of the Airflow connection. Useful when one connection points to a ClickHouse cluster and individual tasks need to target different databases. :param session_settings: Optional dict of ClickHouse session-level settings (e.g. ``{"max_execution_time": 60, "max_threads": 4}``). Values supplied here are **merged on top of** any ``session_settings`` dict already present in the connection's ``extra`` JSON field, with the constructor argument taking precedence. For a full list of available session settings visit https://clickhouse.com/docs/operations/settings/settings :param client_kwargs: Optional dict of additional keyword arguments forwarded verbatim to ``clickhouse_connect.get_client()``. Use this to pass any parameter not otherwise exposed by the hook (e.g. ``http_proxy``, ``pool_mgr_params``). Values supplied here are **merged on top of** any ``client_kwargs`` dict already present in the connection's ``extra`` JSON field, with the constructor argument taking precedence on conflicting keys. Keys that the hook manages itself (``host``, ``port``, ``username``, ``password``, ``database``, ``secure``, ``verify``, ``client_name``, ``settings``) are silently ignored so that hook-managed values always take precedence. .. seealso:: - `clickhouse-connect documentation <https://clickhouse.com/docs/en/integrations/python>`_ - `clickhouse session settings documentation <https://clickhouse.com/docs/operations/settings/settings>`_ - :ref:`howto/connection:clickhouse` """
[docs] conn_name_attr = "clickhouse_conn_id"
[docs] default_conn_name = "clickhouse_default"
[docs] conn_type = "clickhouse"
[docs] hook_name = "ClickHouse"
# ClickHouse has no multi-statement transactions; every statement is auto-committed.
[docs] supports_autocommit = True
_test_connection_sql = "SELECT 1"
[docs] def set_autocommit(self, conn: ClickHouseConnection, autocommit: bool) -> None: """No-op: ClickHouse has no transaction support."""
[docs] def get_autocommit(self, conn: ClickHouseConnection) -> bool: """Return ``True``: ClickHouse auto-commits every statement.""" return True
def __init__( self, *args, database: str | None = None, session_settings: dict[str, Any] | None = None, client_kwargs: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(*args, **kwargs)
[docs] self.database = database
[docs] self.session_settings: dict[str, Any] = session_settings or {}
[docs] self.client_kwargs: dict[str, Any] = client_kwargs or {}
def _get_client_kwargs(self) -> dict[str, Any]: """ Build the keyword-argument dict passed to ``clickhouse_connect.get_client()``. Construction order (last write wins): 1. ``client_kwargs`` from the connection ``extra`` JSON — connection-level passthrough kwargs. 2. ``client_kwargs`` constructor argument — task-level overrides merged on top. 3. Hook-managed keys (``host``, ``port``, ``username``, ``password``, ``database``, ``secure``, ``verify``, ``client_name``, ``settings``) — always override anything in ``client_kwargs``. Optional tuning parameters (``connect_timeout``, ``send_receive_timeout``, ``compress``) are forwarded only when explicitly set in the ``extra`` JSON field so that the driver's own defaults apply otherwise. ``client_name`` is always set to a string that identifies the Airflow installation (see :func:`_build_client_name`). Any ``client_name`` value in ``extra`` is treated as a human-readable label and appended as a comment so it appears alongside the Airflow version info in ``system.query_log``. ``session_settings`` from ``extra`` and from the constructor ``session_settings`` argument are **merged**, with the constructor argument taking precedence on conflicting keys. """ conn = self.get_connection(self.get_conn_id()) extra: dict[str, Any] = conn.extra_dejson # Merge client_kwargs: extra values are the base, constructor values override. raw_client_kwargs = extra.get("client_kwargs") if isinstance(raw_client_kwargs, str): try: raw_client_kwargs = json.loads(raw_client_kwargs) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in extra.client_kwargs: {e}") from e merged_client_kwargs: dict[str, Any] = {**(raw_client_kwargs or {}), **self.client_kwargs} # Strip hook-managed keys so they cannot overwrite connection credentials # or versioning info. Log dropped keys at DEBUG to aid troubleshooting. dropped = _HOOK_MANAGED_KWARGS.intersection(merged_client_kwargs) if dropped: self.log.debug( "Ignoring hook-managed keys in client_kwargs: %s", sorted(dropped), ) kwargs: dict[str, Any] = { k: v for k, v in merged_client_kwargs.items() if k not in _HOOK_MANAGED_KWARGS } # Hook-managed connection parameters always take precedence. kwargs.update( { "host": conn.host or "localhost", "port": int(conn.port) if conn.port else 8123, "username": conn.login or "default", "password": conn.password or "", "database": self.database or conn.schema or "default", "secure": bool(extra.get("secure", False)), "verify": bool(extra.get("verify", True)), } ) for key in _OPTIONAL_CLIENT_KWARGS: if key in extra and extra[key] is not None: kwargs[key] = extra[key] # Always embed Airflow + provider version; user's 'client_name' extra is appended to the User-Agent header. kwargs["client_name"] = _build_client_name(extra.get("client_name")) # Merge session_settings: extra values are the base, constructor values override. raw = extra.get("session_settings") if isinstance(raw, str): try: raw = json.loads(raw) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in extra.session_settings: {e}") from e merged: dict[str, Any] = {**(raw or {}), **self.session_settings} if merged: kwargs["settings"] = merged return kwargs
[docs] def get_conn(self) -> ClickHouseConnection: """Return a DB-API 2.0 compatible connection backed by ``clickhouse_connect``.""" import clickhouse_connect client = clickhouse_connect.get_client(**self._get_client_kwargs()) return ClickHouseConnection(client)
[docs] def get_client(self) -> clickhouse_connect.driver.client.Client: """ Return the raw ``clickhouse_connect`` Client for ClickHouse-specific operations. Use this for bulk inserts, streaming queries, or any operation that benefits from the native ``clickhouse_connect`` API rather than DB-API cursors. The caller is responsible for closing the client. """ import clickhouse_connect return clickhouse_connect.get_client(**self._get_client_kwargs())
[docs] def bulk_insert_rows( self, table: str, rows: list[tuple], column_names: list[str], batch_size: int | None = None, ) -> None: """ Insert rows into a ClickHouse table using the native columnar insert. Uses ``clickhouse_connect``'s optimized insert path, which is significantly faster than row-by-row cursor inserts for large datasets. By default (``batch_size=None``) all rows are sent in a single call. Set ``batch_size`` to bound peak memory usage on very large inputs; an insert context is created once and reused across chunks to avoid a repeated ``DESCRIBE TABLE`` round-trip per batch. :param table: Target table name. :param rows: List of row tuples to insert. :param column_names: Column names matching each position in the row tuples. :param batch_size: Number of rows per insert chunk. ``None`` (default) sends all rows in one request. """ if not rows: self.log.warning( "bulk_insert_rows called with an empty rows list — nothing was inserted into %s.", table ) return client = self.get_client() try: if batch_size is None: client.insert(table, rows, column_names=column_names) else: ctx = client.create_insert_context(table, column_names=column_names) for i in range(0, len(rows), batch_size): client.insert(data=rows[i : i + batch_size], context=ctx) self.log.info("Inserted %d rows into %s", len(rows), table) finally: client.close()
[docs] def get_uri(self) -> str: """ Return a SQLAlchemy-compatible URI for ``get_sqlalchemy_engine()``. Always uses the ``clickhousedb://`` scheme registered by ``clickhouse-connect``. TLS and tuning parameters are forwarded as query-string arguments so that SQLAlchemy-path users get the same settings as DB-API-path users: * ``secure`` — enables HTTPS/TLS (``?secure=true``) * ``verify`` — TLS certificate verification (``?verify=false`` to disable) * ``connect_timeout``, ``send_receive_timeout``, ``compress`` — forwarded when explicitly set in the connection's ``extra`` JSON """ conn = self.get_connection(self.get_conn_id()) extra: dict[str, Any] = conn.extra_dejson username = conn.login or "default" password = quote_plus(conn.password) if conn.password else "" host = conn.host or "localhost" port = int(conn.port) if conn.port else 8123 database = self.database or conn.schema or "default" params: dict[str, str] = {} if extra.get("secure", False): params["secure"] = "true" if "verify" in extra and not extra["verify"]: params["verify"] = "false" for key in _OPTIONAL_CLIENT_KWARGS: if key in extra: value = extra[key] params[key] = str(value).lower() if isinstance(value, bool) else str(value) base = ( f"clickhousedb://{username}:{password}@{host}:{port}/{database}" if password else f"clickhousedb://{username}@{host}:{port}/{database}" ) if params: query_string = "&".join(f"{k}={v}" for k, v in params.items()) return f"{base}?{query_string}" return base

Was this entry helpful?