連接 Apache Kafka

本文說明如何在 Azure Databricks 上執行結構化串流工作負載時,使用 Apache Kafka 作為來源或匯入工具。

欲了解更多關於 Kafka 的資訊,請參閱 Apache Kafka 文件

從 Kafka 讀取資料

Azure Databricks 提供 kafka 關鍵字作為資料格式,用於配置連接至 Kafka。 以下是串流閱讀的範例:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks 也支援批次讀取語意,如下範例所示:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

對於累加式批次載入,Databricks 建議將 Kafka 與 Trigger.AvailableNow 搭配使用。 參見 AvailableNow:增量批次處理

在 Databricks Runtime 13.3 LTS 及以上版本中,Azure Databricks 也提供一個 SQL 函式來讀取 Kafka 資料。 只有在 Lakeflow Spark 宣告式管線或 Databricks SQL 的串流資料表中,才支援使用 SQL 進行串流。 請參閱 read_kafka 資料表值函式

設定 Kafka 結構化串流讀取器

對於批次查詢與串流查詢,Kafka 來源必須設定以下選項:

Option 價值 說明
kafka.bootstrap.servers 一個逗號分隔的 host:port 列表 Kafka 叢集引導伺服器

此外,以下選項之一需指定訂閱主題:

Option 價值 說明
subscribe 以逗號分隔的主題清單。 要訂閱的主題清單。
subscribePattern Java 正則表達式字串。 用於訂閱主題的範式。
assign JSON 字串 {"topicA":[0,1],"topic":[2,4]} 針對 topicPartitions 特定消費量。

完整選項清單請參閱 選項 頁面。

Kafka 記錄的架構

Kafka 結構化串流讀取器回傳的紀錄將包含以下架構:

資料行 類型
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

keyvalue 一律以ByteArrayDeserializer 還原序列化為位元組陣列。 使用 DataFrame 操作(例如 cast("string")from_avro)來明確反序列化鍵與值。

將資料寫入 Kafka

以下是串流寫入 Kafka 的範例:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks 也支援批次寫入 Kafka 資料匯的語意,如下範例所示:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

設定 Kafka 結構化串流寫入器

這很重要

Databricks Runtime 13.3 LTS 和更新版本包含了一個更新的 kafka-clients 程式庫,該程式庫預設啟用等冪寫入功能。 如果 Kafka 接收器使用 2.8.0 或更低版本,並且已設定 ACL 但未啟用 IDEMPOTENT_WRITE,則寫入將會失敗並出現錯誤訊息 org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state

透過升級至 Kafka 2.8.0 或更新版本或者在設定結構化串流寫入器時設定 .option(“kafka.enable.idempotence”, “false”),來解決此錯誤。

以下是寫入 Kafka 時所設定的常見選項:

Option 價值 預設值 說明
kafka.boostrap.servers 以逗號分隔的 <host:port> 清單 沒有 [必要] Kafka bootstrap.servers 組態。
topic STRING 未設定 [選用] 設定要寫入的所有資料列的主題。 此選項會覆寫資料中已存在的任何主題欄位。
includeHeaders BOOLEAN false [選用] 是否要在資料列中包含 Kafka 標頭。

完整選項清單請參閱 選項 頁面。

卡夫卡作家的架構

在寫入 Kafka 資料時,所提供的 DataFrame 可能包含以下欄位:

欄位名稱 必要或選擇性 類型
key optional STRINGBINARY
value required STRINGBINARY
headers optional ARRAY
topic 選擇性(如果 topic 設為 writer 選項,則會忽略) STRING
partition optional INT

驗證

Azure Databricks 支援多種 Kafka 認證方法,包括 Unity Catalog 服務憑證、SASL/SSL,以及針對 AWS MSK、Azure Event Hubs 和 Google Cloud Managed Kafka 的雲端專用選項。 請參閱驗證

擷取 Kafka 指標

你可以用 avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest 指標來監控串流查詢落後 Kafka 的程度。 這些分類報告了所有訂閱主題分割的平均、最大及最小偏移延遲,相較於 Kafka 中最新的偏移量。 請參閱以互動方式讀取計量

備註

在 Databricks Runtime 17.1 及以上版本中,每個微批次完成後會擷取最新的 Kafka 偏移量。 在持續接收資料的主題中,積壓指標可能顯示小且持續的非零值。 這是預期中的行為,並不代表溪流正在落後。

在 Databricks Runtime 17.0 及以下版本中,最新的 Kafka 偏移量會在微批次開始時擷取。 當串流查詢持續消耗微批次開始時所有可用紀錄時,積壓指標可能會回傳 0

要估計查詢尚未消耗多少資料,請使用指標。estimatedTotalBytesBehindLatest 此指標根據過去 300 秒內處理的批次,估計所有訂閱分割區剩餘的位元組總數。 你可以透過設定選項 bytesEstimateWindowLength 來修改這個估算的時間範圍。 例如,若要將它設定為10分鐘:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

如果在筆記本中執行串流,您可以在串流查詢進度儀表板中的 [原始資料] 索引標籤下查看這些計量:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

更多資訊,請參閱 在 Azure Databricks 上監控結構化串流查詢

程式碼範例:卡夫卡到三角洲

以下範例展示了從 Kafka 持續串流資料到 Delta 表格的完整工作流程。 此模式非常適合近即時的資料擷取工作負載。

這個範例使用固定的 JSON 架構。 對於像 Avro 或 Protobuf 這類格式,可以使用 from_avrofrom_protobuf。 你也可以整合 schema 登錄檔。 請參考 結構登錄器的範例

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);