Apache Kafka'ya bağlanma

Bu makalede, Azure Databricks üzerinde Yapılandırılmış Akış iş yüklerini çalıştırırken Apache Kafka'nın kaynak veya havuz olarak nasıl kullanılabileceği açıklanmaktadır.

Kafka hakkında daha fazla bilgi için Apache Kafka belgelerine bakın.

Kafka'dan veri okuma

Azure Databricks, Kafka bağlantılarını yapılandırmak için veri biçimi olarak kafka anahtar sözcüğünü sağlar. Aşağıda bir akış okuma örneği verilmiştir:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks, aşağıdaki örnekte gösterildiği gibi toplu okuma semantiğini de destekler:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Artımlı toplu yükleme için Databricks, Trigger.AvailableNow ile Kafka'nın kullanılmasını önerir. Bkz AvailableNow. Artımlı toplu işlem.

Databricks Runtime 13.3 LTS ve üzerinde, Azure Databricks Kafka verilerini okumak için bir SQL işlevi de sağlar. SQL ile akış yalnızca Lakeflow Spark Bildirimli İşlem Hatlarında veya Databricks SQL'deki akış tablolarında desteklenir. Bakınız read_kafka tablo değerli fonksiyon.

Kafka Yapılandırılmış Akış okuyucusu yapılandırma

Hem toplu hem de akış sorguları için Kafka kaynağı için aşağıdaki seçenek ayarlanmalıdır:

Seçenek Değer Açıklama
kafka.bootstrap.servers Virgülle ayrılmış host:port listesi Kafka kümesi önyükleme sunucuları

Ayrıca, abone olunacak konuları belirtmek için aşağıdaki seçeneklerden biri gereklidir:

Seçenek Değer Açıklama
subscribe Virgülle ayrılmış konu listesi. Abone olunacak konu listesi.
subscribePattern Java regex dizesi. Konulara abone olmak için kullanılan desen.
assign JSON dizesi {"topicA":[0,1],"topic":[2,4]}. Tüketime özel topicPartitions.

Kullanılabilir seçeneklerin tam listesi için Seçenekler sayfasına bakın.

Kafka kayıtları şeması

Kafka Yapılandırılmış Akış okuyucusu tarafından döndürülen kayıtlar aşağıdaki şemaya sahip olacaktır:

Köşe yazısı Türü
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

key ve value, her zaman ByteArrayDeserializer ile bayt dizisi olarak seri durumdan çıkarılır. Anahtarları ve değerleri açıkça seri durumdan çıkarmak için DataFrame işlemlerini (veya cast("string")gibifrom_avro) kullanın.

Kafka'ya veri yazma

Aşağıda Kafka'ya akış yazma örneği verilmiştir:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks, aşağıdaki örnekte gösterildiği gibi Kafka veri havuzlarına toplu yazma semantiğini de destekler:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Kafka Yapılandırılmış Akış yazıcısını yapılandırma

Önemli

Databricks Runtime 13.3 LTS ve üstü, kafka-clients kütüphanesinin varsayılan olarak idempotent yazmaları etkinleştiren daha yeni bir sürümünü içerir. Kafka havuzu, yapılandırılmış ACL'lerle birlikte 2.8.0 veya daha düşük bir sürüm kullanıyorsa ancak IDEMPOTENT_WRITE etkinleştirilmediyse, yazma işlemi org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state hata iletisiyle başarısız olur.

Kafka sürümünü 2.8.0 veya daha üstüne yükselterek veya Yapılandırılmış Akış yazıcınızı yapılandırırken .option(“kafka.enable.idempotence”, “false”) ayarı yaparak bu hatayı düzeltin.

Kafka'ya yazarken ayarlanan yaygın seçenekler şunlardır:

Seçenek Değer Varsayılan değer Açıklama
kafka.boostrap.servers Virgülle ayrılmış <host:port> listesi yok [Gerekli] Kafka bootstrap.servers yapılandırması.
topic STRING ayarlanmadı [İsteğe bağlı] Tüm satırların yazılacağı konuyu ayarlar. Bu seçenek, verilerde bulunan tüm konu sütunlarını geçersiz kılar.
includeHeaders BOOLEAN false [İsteğe bağlı] Kafka üst bilgilerinin satıra eklenip eklenmeyeceği.

Kullanılabilir seçeneklerin tam listesi için Seçenekler sayfasına bakın.

Kafka yazıcı şeması

Kafka'ya veri yazarken, sağlanan DataFrame aşağıdaki alanları içerebilir:

Sütun adı Gerekli veya isteğe bağlı Türü
key optional STRING veya BINARY
value required STRING veya BINARY
headers optional ARRAY
topic isteğe bağlı (eğer topic yazar seçeneği olarak ayarlanmışsa yoksayılır) STRING
partition optional INT

Kimlik doğrulama

Azure Databricks; Unity Catalog hizmeti kimlik bilgileri, SASL/SSL ve AWS MSK, Azure Event Hubs ve Google Cloud Managed Kafka için buluta özgü seçenekler de dahil olmak üzere Kafka için birden çok kimlik doğrulama yöntemini destekler. Bkz. Kimlik doğrulaması.

Kafka ölçümlerini alma

, avgOffsetsBehindLatestve maxOffsetsBehindLatest ölçümlerini kullanarak bir akış sorgusunun Kafka'nın minOffsetsBehindLatestarkasında ne kadar gecikmeli olduğunu izleyebilirsiniz. Bunlar, Kafka'daki en son uzaklıklara göre abone olunan tüm konu bölümleri arasında ortalama, en yüksek ve en düşük uzaklık gecikmesini bildirir. Bkz Etkileşimli Ölçümleri Okuma.

Uyarı

Databricks Runtime 17.1 ve üzeri sürümlerde, her mikro toplu işlem tamamlandıktan sonra en son Kafka uzaklıkları getirilir. Sürekli veri alan konularda birikim ölçümleri küçük, kalıcı sıfırdan farklı değerler gösterebilir. Bu beklenen bir davranıştır ve akışın geride kaldığını göstermez.

Databricks Runtime 17.0 ve altında en son Kafka ofsetleri mikro toplu işin başlangıç zamanında getirilir. Akış sorguları mikro toplu iş başlangıcında kullanılabilen tüm kayıtları tutarlı bir şekilde tükettiğinde kapsam ölçümleri döndürülebilir 0 .

Sorgunun henüz ne kadar veri kullanmadığını tahmin etmek için ölçümü kullanın estimatedTotalBytesBehindLatest . Bu ölçüm, son 300 saniye içinde işlenen toplu işlemlere göre abone olunan tüm bölümlerde kalan toplam bayt sayısını tahmin eder. Seçeneğini ayarlayarak bytesEstimateWindowLength bu tahmin için kullanılan zaman penceresini değiştirebilirsiniz. Örneğin, 10 dakikaya ayarlamak için:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Akışı bir not defterinde çalıştırıyorsanız, akış sorgusu ilerleme durumu panosundaki Ham Veri sekmesinin altında şu ölçümleri görebilirsiniz:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Daha fazla bilgi için bkz. Azure Databricks üzerinde Yapılandırılmış Akış sorgularını izleme.

Kod örneği: Kafka'dan Delta'ya

Aşağıdaki örnekte, Kafka'dan Delta tablosuna sürekli veri akışı için eksiksiz bir iş akışı gösterilmektedir. Bu düzen neredeyse gerçek zamanlı veri alımı iş yükleri için idealdir.

Bu örnekte sabit bir JSON şeması kullanılır. Avro veya Protobuf gibi diğer biçimler için from_avro veya from_protobuf kullanın. Ayrıca bir şema kayıt defteriyle tümleştirebilirsiniz. Bkz. Schema Registry ile örnek.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);