本文說明如何在 Azure Databricks 上執行結構化串流工作負載時,使用 Apache Kafka 作為來源或接收器。
欲了解更多關於 Kafka 的資訊,請參閱 Apache Kafka 文件。
從 Kafka 讀取資料
Azure Databricks 提供關鍵字 kafka 作為資料格式,用以配置連接至 Kafka。 以下是串流閱讀的範例:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks 也支援批次讀取語意,如下範例所示:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
對於累加式批次載入,Databricks 建議將 Kafka 與 Trigger.AvailableNow 搭配使用。 參見 AvailableNow:增量批次處理。
在 Databricks Runtime 13.3 LTS 及以上版本中,Azure Databricks 也提供 SQL 函式來讀取 Kafka 資料。 只有在 Lakeflow Spark 宣告式管線或 Databricks SQL 的串流資料表中,才支援使用 SQL 進行串流。 請參閱 read_kafka 資料表值函式。
設定 Kafka 結構化串流讀取器
對於批次查詢與串流查詢,Kafka 來源必須設定以下選項:
| Option | 價值 | 說明 |
|---|---|---|
kafka.bootstrap.servers |
一個逗號分隔的 host:port 列表 | Kafka 叢集引導伺服器 |
此外,以下選項之一需指定訂閱主題:
| Option | 價值 | 說明 |
|---|---|---|
subscribe |
以逗號分隔的主題清單。 | 要訂閱的主題清單。 |
subscribePattern |
Java regex 字串。 | 用於訂閱主題的範式。 |
assign |
JSON 字串 {"topicA":[0,1],"topic":[2,4]}。 |
要使用的特定主題分區。 |
完整選項清單請參閱 選項 頁面。
Kafka 記錄的架構
Kafka 結構化串流讀取器回傳的紀錄將包含以下架構:
| 資料行 | 類型 |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
key和value 一律以ByteArrayDeserializer 還原序列化為位元組陣列。 使用 DataFrame 操作(例如 cast("string") 或 from_avro)來明確反序列化鍵與值。
將資料寫入 Kafka
以下是串流寫入 Kafka 的範例:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks 也支援 Kafka 資料接收器的批次寫入語意,如下列範例所示:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
設定 Kafka 結構化串流寫入器
這很重要
Databricks Runtime 13.3 LTS 和更新版本包含了一個更新的 kafka-clients 程式庫,該程式庫預設啟用等冪寫入功能。 如果 Kafka 接收器使用 2.8.0 或更低版本,並且已設定 ACL 但未啟用 IDEMPOTENT_WRITE,則寫入將會失敗並出現錯誤訊息 org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state。
透過升級至 Kafka 2.8.0 或更新版本或者在設定結構化串流寫入器時設定 .option(“kafka.enable.idempotence”, “false”),來解決此錯誤。
以下是寫入 Kafka 時所設定的常見選項:
| Option | 價值 | 預設值 | 說明 |
|---|---|---|---|
kafka.boostrap.servers |
以逗號分隔的 <host:port> 清單 |
沒有 | [必要] Kafka bootstrap.servers 組態。 |
topic |
STRING |
未設定 | [選用] 設定要寫入的所有資料列的主題。 此選項會覆寫資料中已存在的任何主題欄位。 |
includeHeaders |
BOOLEAN |
false |
[選用] 是否要在資料列中包含 Kafka 標頭。 |
完整選項清單請參閱 選項 頁面。
卡夫卡作家的架構
在寫入 Kafka 資料時,所提供的 DataFrame 可能包含以下欄位:
| 欄位名稱 | 必要或選擇性 | 類型 |
|---|---|---|
key |
optional |
STRING 或 BINARY |
value |
required |
STRING 或 BINARY |
headers |
optional | ARRAY |
topic |
選擇性(如果 topic 設為 writer 選項,則會忽略) |
STRING |
partition |
optional | INT |
驗證
Azure Databricks 支援多種 Kafka 認證方式,包括 Unity 目錄服務憑證、SASL/SSL,以及針對 AWS MSK、Azure Event Hubs 和 Google Cloud Managed Kafka 的雲端專用選項。 請參閱驗證。
擷取 Kafka 指標
你可以用 avgOffsetsBehindLatest、 maxOffsetsBehindLatest和 minOffsetsBehindLatest 指標來監控串流查詢落後 Kafka 的程度。 這些分類報告了所有訂閱主題分割的平均、最大及最小偏移延遲,相較於 Kafka 中最新的偏移量。 請參閱以互動方式讀取計量。
要估計查詢尚未消耗多少資料,請使用指標。estimatedTotalBytesBehindLatest 此指標根據過去 300 秒內處理的批次,估計所有訂閱分割區剩餘的位元組總數。 你可以透過設定選項 bytesEstimateWindowLength 來修改這個估算的時間範圍。 例如,若要將它設定為10分鐘:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
如果在筆記本中執行串流,您可以在串流查詢進度儀表板中的 [原始資料] 索引標籤下查看這些計量:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
更多資訊請參閱 Azure Databricks 上的「監控結構化串流查詢 」。
程式碼範例:卡夫卡到三角洲
以下範例展示了從 Kafka 持續串流資料到 Delta 表格的完整工作流程。 此模式非常適合近即時的資料擷取工作負載。
這個範例使用固定的 JSON 架構。 對於像 Avro 或 Protobuf 這類格式,可以使用 from_avro 或 from_protobuf。 你也可以整合 schema 登錄檔。 請參考 結構登錄器的範例。
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);