Source code for airflow.providers.apache.kafka.hooks.base
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from functools import cached_property
from typing import Any
from confluent_kafka.admin import AdminClient
from airflow.hooks.base import BaseHook
[docs]class KafkaBaseHook(BaseHook):
"""
A base hook for interacting with Apache Kafka.
:param kafka_config_id: The connection object to use, defaults to "kafka_default"
"""
[docs] conn_name_attr = "kafka_config_id"
[docs] default_conn_name = "kafka_default"
[docs] hook_name = "Apache Kafka"
def __init__(self, kafka_config_id=default_conn_name, *args, **kwargs):
"""Initialize our Base."""
super().__init__()
self.kafka_config_id = kafka_config_id
@classmethod
[docs] def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": ["schema", "login", "password", "port", "host"],
"relabeling": {"extra": "Config Dict"},
"placeholders": {
"extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}',
},
}
def _get_client(self, config) -> Any:
return AdminClient(config)
@cached_property
[docs] def get_conn(self) -> Any:
"""Get the configuration object."""
config = self.get_connection(self.kafka_config_id).extra_dejson
if not (config.get("bootstrap.servers", None)):
raise ValueError("config['bootstrap.servers'] must be provided.")
return self._get_client(config)
[docs] def test_connection(self) -> tuple[bool, str]:
"""Test Connectivity from the UI."""
try:
config = self.get_connection(self.kafka_config_id).extra_dejson
t = AdminClient(config, timeout=10).list_topics()
if t:
return True, "Connection successful."
except Exception as e:
return False, str(e)
return False, "Failed to establish connection."