共用方式為


read_kafka table-valued 函式

適用於: 檢查標示為是 Databricks SQL 檢查標示為是 Databricks Runtime 13.3 LTS 和更新版本

從 Apache Kafka 叢集讀取數據,並以表格式傳回數據。

可以從一或多個 Kafka 主題讀取數據。 它同時支援批次查詢和串流擷取。

語法

read_kafka([option_key => option_value ] [, ...])

引數

此函式 需要具名參數調用

  • option_key:要設定的選項名稱。 您必須針對包含點 () 的選項使用反引號 (.')。
  • option_value:要設定選項的常數表達式。 接受常值和純量函式。

傳回

使用下列架構從 Apache Kafka 叢集讀取的記錄:

  • key BINARY:Kafka 記錄的索引鍵。
  • value BINARY NOT NULL:Kafka 記錄的值。
  • topic STRING NOT NULL:讀取記錄的 Kafka 主題名稱。
  • partition INT NOT NULL:讀取記錄的 Kafka 分割區識別碼。
  • offset BIGINT NOT NULL:Kafka TopicPartition中記錄的位移編號。
  • timestamp TIMESTAMP NOT NULL:記錄的時間戳值。 數據 timestampType 行會定義此時間戳對應至的專案。
  • timestampType INTEGER NOT NULL:資料列中指定的 timestamp 時間戳類型。
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>:提供做為記錄一部分的標頭值(如果已啟用)。

範例

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

選項。

您可以在 Apache Spark 檔案中找到詳細的選項清單。

必要選項

提供下列選項以連線到 Kafka 叢集。

選項
bootstrapServers

類型:String (英文)

指向 Kafka 叢集的主機/埠組逗號分隔清單。

預設值:無

只提供下列其中一個選項,以設定要從中提取數據的 Kafka 主題。

選項
assign

類型:String (英文)

JSON 字串,其中包含要取用的特定主題分割區。 例如,針對 '{"topicA":[0,1],"topicB":[2,4]}',topicA 的 0 和第 1 個分割區將會取用自 。

預設值:無
subscribe

類型:String (英文)

要讀取的 Kafka 主題逗號分隔清單。

預設值:無
subscribePattern

類型:String (英文)

正則表達式,符合要訂閱的主題。

預設值:無

其他選項

read_kafka 可用於批次查詢和串流查詢。 下列選項會指定其套用的查詢類型。

選項
endingOffsets

類型:查詢類型: String 僅限批次

要讀取直到批次查詢的位移、 "latest" 指定最新記錄,或是指定每個 TopicPartition 結束位移的 JSON 字串。 在 JSON 中, -1 作為位移可用來參考最新。 -2 (最早)不允許位移。

預設值:"latest"
endingOffsetsByTimestamp

類型:查詢類型: String 僅限批次

JSON 字串,指定要讀取的結束時間戳,直到每個 TopicPartition 為止。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC為單位提供時間戳的長值,例如 ,
1686444353000. 如需時間戳的行為詳細數據,請參閱 下方 的附註。
endingOffsetsByTimestamp 優先於 endingOffsets

預設值:無
endingTimestamp

類型:查詢類型: String 僅限批次

時間戳的字串值,以毫秒為單位,因為
1970-01-01 00:00:00 UTC,例如 "1686444353000"。 如果 Kafka 未傳回相符的位移,則會將位移設定為最新位移。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 注意: endingTimestamp 優先於 endingOffsetsByTimestamp
endingOffsets.

預設值:無
includeHeaders

類型:查詢類型: Boolean 串流和批次

是否要在數據列中包含 Kafka 標頭。

預設值:false
kafka.<consumer_option>

類型:查詢類型: String 串流和批次

任何 Kafka 取用者特定選項都可以使用 kafka. 前置詞傳入。 提供時,這些選項必須以反引弧括住,否則您將會收到剖析器錯誤。 您可以在 Kafka 檔中找到選項。

注意:您不應該使用此函式來設定下列選項:
key.deserializer、 、 value.deserializerbootstrap.serversgroup.id

預設值:無
maxOffsetsPerTrigger

類型:查詢類型: Long 僅串流

每個觸發程式間隔所處理之位移或數據列數目上限的速率限制。 指定的位移總數將會按比例分割到 TopicPartitions。

預設值:無
startingOffsets

類型:查詢類型: String 串流和批次

查詢啟動時的起點,無論是 "earliest" 從最早位移, "latest" 也就是來自最新的位移,或是指定每個 TopicPartition 起始位移的 JSON 字串。 在 JSON 中, -2 作為位移可用來參考最早的 -1 最新的位移。

注意:對於批次查詢,不允許使用 -1 進行批次查詢,否則不允許使用 -1。 對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索到的數據分割最早會啟動。

默認值: "latest" 用於串流處理, "earliest" 用於批次
startingOffsetsByTimestamp

類型:查詢類型: String 串流和批次

JSON 字串,指定每個 TopicPartition 的起始時間戳。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC為單位提供時間戳的長值,例如 1686444353000。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的位移,則行為會遵循選項 startingOffsetsByTimestampStrategy的值。
startingOffsetsByTimestamp 優先於 startingOffsets

注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索到的數據分割最早會啟動。

預設值:無
startingOffsetsByTimestampStrategy

類型:查詢類型: String 串流和批次

當指定的起始時間戳(全域或每個分割區)與傳回的位移 Kafka 不相符時,會使用此策略。 可用的策略如下:

* "error":查詢失敗
* "latest":指派這些分割區的最新位移,讓Spark可以在稍後的微批次中從這些分割區讀取較新的記錄。

預設值:"error"
startingTimestamp

類型:查詢類型: String 串流和批次

時間戳的字串值,以毫秒為單位,因為
1970-01-01 00:00:00 UTC,例如 "1686444353000"。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的位移,則行為會遵循選項 startingOffsetsByTimestampStrategy的值。
startingTimestamp 優先於 startingOffsetsByTimestampstartingOffsets

注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索到的數據分割最早會啟動。

預設值:無

注意

每個分割區傳回的位移是最早的位移,其時間戳大於或等於對應數據分割中的指定時間戳。 如果 Kafka 未傳回相符位移,則行為會因選項而異 - 檢查每個選項的描述。

Spark 只會將時間戳信息傳遞至 KafkaConsumer.offsetsForTimes,而且不會解譯或解釋值的原因。 如需 的詳細資訊 KafkaConsumer.offsetsForTimes,請參閱 。 此外,這裡的時間戳意義可能會根據 Kafka 設定 (log.message.timestamp.type) 而有所不同。 如需詳細資訊,請參閱 Apache Kafka 檔