共用方式為


連接 Apache Kafka

本文說明如何在 Azure Databricks 上執行結構化串流工作負載時,使用 Apache Kafka 作為來源或接收器。

欲了解更多關於 Kafka 的資訊,請參閱 Apache Kafka 文件

從 Kafka 讀取資料

Azure Databricks 提供關鍵字 kafka 作為資料格式,用以配置連接至 Kafka。 以下是串流閱讀的範例:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks 也支援批次讀取語意,如下範例所示:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

對於累加式批次載入,Databricks 建議將 Kafka 與 Trigger.AvailableNow 搭配使用。 參見 AvailableNow:增量批次處理

在 Databricks Runtime 13.3 LTS 及以上版本中,Azure Databricks 也提供 SQL 函式來讀取 Kafka 資料。 只有在 Lakeflow Spark 宣告式管線或 Databricks SQL 的串流資料表中,才支援使用 SQL 進行串流。 請參閱 read_kafka 資料表值函式

設定 Kafka 結構化串流讀取器

對於批次查詢與串流查詢,Kafka 來源必須設定以下選項:

Option 價值 說明
kafka.bootstrap.servers 一個逗號分隔的 host:port 列表 Kafka 叢集引導伺服器

此外,以下選項之一需指定訂閱主題:

Option 價值 說明
subscribe 以逗號分隔的主題清單。 要訂閱的主題清單。
subscribePattern Java regex 字串。 用於訂閱主題的範式。
assign JSON 字串 {"topicA":[0,1],"topic":[2,4]} 要使用的特定主題分區。

完整選項清單請參閱 選項 頁面。

Kafka 記錄的架構

Kafka 結構化串流讀取器回傳的紀錄將包含以下架構:

資料行 類型
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

keyvalue 一律以ByteArrayDeserializer 還原序列化為位元組陣列。 使用 DataFrame 操作(例如 cast("string")from_avro)來明確反序列化鍵與值。

將資料寫入 Kafka

以下是串流寫入 Kafka 的範例:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks 也支援 Kafka 資料接收器的批次寫入語意,如下列範例所示:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

設定 Kafka 結構化串流寫入器

這很重要

Databricks Runtime 13.3 LTS 和更新版本包含了一個更新的 kafka-clients 程式庫,該程式庫預設啟用等冪寫入功能。 如果 Kafka 接收器使用 2.8.0 或更低版本,並且已設定 ACL 但未啟用 IDEMPOTENT_WRITE,則寫入將會失敗並出現錯誤訊息 org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state

透過升級至 Kafka 2.8.0 或更新版本或者在設定結構化串流寫入器時設定 .option(“kafka.enable.idempotence”, “false”),來解決此錯誤。

以下是寫入 Kafka 時所設定的常見選項:

Option 價值 預設值 說明
kafka.boostrap.servers 以逗號分隔的 <host:port> 清單 沒有 [必要] Kafka bootstrap.servers 組態。
topic STRING 未設定 [選用] 設定要寫入的所有資料列的主題。 此選項會覆寫資料中已存在的任何主題欄位。
includeHeaders BOOLEAN false [選用] 是否要在資料列中包含 Kafka 標頭。

完整選項清單請參閱 選項 頁面。

卡夫卡作家的架構

在寫入 Kafka 資料時,所提供的 DataFrame 可能包含以下欄位:

欄位名稱 必要或選擇性 類型
key optional STRINGBINARY
value required STRINGBINARY
headers optional ARRAY
topic 選擇性(如果 topic 設為 writer 選項,則會忽略) STRING
partition optional INT

驗證

Azure Databricks 支援多種 Kafka 認證方式,包括 Unity 目錄服務憑證、SASL/SSL,以及針對 AWS MSK、Azure Event Hubs 和 Google Cloud Managed Kafka 的雲端專用選項。 請參閱驗證

擷取 Kafka 指標

你可以用 avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest 指標來監控串流查詢落後 Kafka 的程度。 這些分類報告了所有訂閱主題分割的平均、最大及最小偏移延遲,相較於 Kafka 中最新的偏移量。 請參閱以互動方式讀取計量

要估計查詢尚未消耗多少資料,請使用指標。estimatedTotalBytesBehindLatest 此指標根據過去 300 秒內處理的批次,估計所有訂閱分割區剩餘的位元組總數。 你可以透過設定選項 bytesEstimateWindowLength 來修改這個估算的時間範圍。 例如,若要將它設定為10分鐘:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

如果在筆記本中執行串流,您可以在串流查詢進度儀表板中的 [原始資料] 索引標籤下查看這些計量:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

更多資訊請參閱 Azure Databricks 上的「監控結構化串流查詢 」。

程式碼範例:卡夫卡到三角洲

以下範例展示了從 Kafka 持續串流資料到 Delta 表格的完整工作流程。 此模式非常適合近即時的資料擷取工作負載。

這個範例使用固定的 JSON 架構。 對於像 Avro 或 Protobuf 這類格式,可以使用 from_avrofrom_protobuf。 你也可以整合 schema 登錄檔。 請參考 結構登錄器的範例

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);