次の方法で共有


read_kafka テーブル値関数

適用対象: check marked yes Databricks SQL Databricks Runtime 13.3 LTS 以上

Apache Kafka クラスターからデータを読み取り、表形式でデータを返します。

1 つ以上の Kafka トピックからデータを読み取ることができます。 バッチ クエリとストリーミング インジェストの両方がサポートされています。

構文

read_kafka([option_key => option_value ] [, ...])

引数

この関数には、名前付きパラメーター呼び出しが必要です。

  • option_key: 構成するオプションの名前。 ドット(.) を含むオプションには、バッククウォート (`) を使用する必要があります。
  • option_value: オプションを設定する定数式。 リテラルとスカラー関数を受け入れます。

返品

次のスキーマを使用して Apache Kafka クラスターから読み取られたレコード。

  • key BINARY: Kafka レコードのキー。
  • value BINARY NOT NULL: Kafka レコードの値。
  • topic STRING NOT NULL: レコードが読み取られた Kafka トピックの名前。
  • partition INT NOT NULL: レコードが読み取られた Kafka パーティションの ID。
  • offset BIGINT NOT NULL: Kafka TopicPartition 内のレコードのオフセット番号。
  • timestamp TIMESTAMP NOT NULL: レコードのタイムスタンプ値。 timestampType 列は、このタイムスタンプが対応する内容を定義します。
  • timestampType INTEGER NOT NULL: timestamp 列に指定されたタイムスタンプの型。
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: レコードの一部として提供されるヘッダー値 (有効な場合)。

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

[オプション]

オプションの詳細な一覧については、Apache Spark のドキュメントを参照してください。

必須のオプション

Kafka クラスターに接続するためのオプションを次に示します。

オプション
bootstrapServers

型: String

Kafka クラスターを指すホストとポートのペアのコンマ区切りのリスト。

既定値: なし

データをプルする Kafka トピックを構成するには、次のいずれかのオプションのみを指定します。

オプション
assign

型: String

使用する特定のトピック パーティションを含む JSON 文字列。 たとえば、'{"topicA":[0,1],"topicB":[2,4]}' の場合、topicA の 0 番目と 1 番目のパーティションは次で使用されます。

既定値: なし
subscribe

型: String

読み取る Kafka トピックのコンマ区切りのリスト。

既定値: なし
subscribePattern

型: String

サブスクライブする正規表現の一致するトピック。

既定値: なし

その他のオプション

read_kafka は、バッチ クエリとストリーミング クエリで使用できます。 以下のオプションでは、適用するクエリの種類を指定します。

オプション
endingOffsets

種類: String クエリの種類: バッチのみ

バッチ クエリまで読み取るオフセット。最新のレコードを指定する場合は "latest"、または各 TopicPartition の終了オフセットを指定する場合は JSON 文字列。 JSON では、オフセットとして -1 を使用して latest を示すことができます。 オフセットとしての -2 (最も早い) は許可されません。

既定値:"latest"
endingOffsetsByTimestamp

種類: String クエリの種類: バッチのみ

各 TopicPartition まで読み取る終了タイムスタンプを指定する JSON 文字列。 タイムスタンプは、1970-01-01 00:00:00 UTC 以降のミリ秒単位の長い値で指定する必要があります (例
1686444353000 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。
endingOffsetsByTimestamp は、endingOffsets よりも優先されます。

既定値: なし
endingTimestamp

種類: String クエリの種類: バッチのみ

以下の時刻以降のタイムスタンプの文字列値 (ミリ秒単位):
1970-01-01 00:00:00 UTC (例: "1686444353000")。 Kafka が一致したオフセットを返さない場合、オフセットは最新の状態に設定されます。 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。 注: endingTimestampendingOffsetsByTimestamp よりも優先されます
endingOffsets

既定値: なし
includeHeaders

種類: Boolean クエリの種類: ストリーミングとバッチ

行に Kafka ヘッダーを含めるかどうか。

既定値:false
kafka.<consumer_option>

種類: String クエリの種類: ストリーミングとバッチ

Kafka コンシューマー固有のオプションは、kafka. プレフィックスと共に渡すことができます。 指定した場合、これらのオプションはバッククォータで囲む必要があります。そうしないと、パーサー エラーが発生します。 Kafka のオプションについては、ドキュメントを参照してください。

注: この関数では、次のオプションを設定しないでください。
key.deserializervalue.deserializerbootstrap.servers, group.id

既定値: なし
maxOffsetsPerTrigger

種類: Long クエリの種類: ストリーミングのみ

トリガー間隔ごとに処理される行のオフセットの最大数に対するレート制限。 指定されたオフセットの総数は、TopicPartitions 間で比例的に分割されます。

既定値: なし
startingOffsets

種類: String クエリの種類: ストリーミングとバッチ

クエリが開始されたときの開始点。最も古いオフセットからの "earliest"、最新のオフセットからの "latest"、または各 TopicPartition の開始オフセットを指定する JSON 文字列。 JSON では、オフセットとして -2 を使用して earliest を、-1 で latest を示すことができます。

注: バッチ クエリの場合、latest (暗黙的に、または JSON で -1 を使用して) は許可されません。 ストリーミング クエリの場合、これは新しいクエリが開始された場合にのみ適用されます。 再起動されたストリーミング クエリは、クエリ チェックポイントで定義されているオフセットから続行されます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。

既定値: ストリーミング用は "latest"、バッチ用は "earliest"
startingOffsetsByTimestamp

種類: String クエリの種類: ストリーミングとバッチ

各 TopicPartition の開始タイムスタンプを指定する JSON 文字列。 タイムスタンプは、1970-01-01 00:00:00 UTC 以降のミリ秒単位の長い値で指定する必要があります (例: 1686444353000)。 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。 Kafka が一致したオフセットを返さない場合、動作はオプション startingOffsetsByTimestampStrategy の値に従います。
startingOffsetsByTimestamp は、startingOffsets よりも優先されます。

注: ストリーミング クエリの場合、これは新しいクエリが開始された場合にのみ適用されます。 再起動されたストリーミング クエリは、クエリ チェックポイントで定義されているオフセットから続行されます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。

既定値: なし
startingOffsetsByTimestampStrategy

種類: String クエリの種類: ストリーミングとバッチ

この方法は、タイムスタンプ (グローバルまたはパーティションごと) で開始オフセットが指定された場合に、Kafka が返すオフセットと一致しない場合に使用されます。 使用可能な方法は次のとおりです。

* "error": クエリが失敗します
* "latest": Spark が後のマイクロバッチでこれらのパーティションから新しいレコードを読み取ることができるように、これらのパーティションの最新のオフセットを割り当てます。

既定値:"error"
startingTimestamp

種類: String クエリの種類: ストリーミングとバッチ

以下の時刻以降のタイムスタンプの文字列値 (ミリ秒単位):
1970-01-01 00:00:00 UTC (例: "1686444353000")。 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。 Kafka が一致したオフセットを返さない場合、動作はオプション startingOffsetsByTimestampStrategy の値に従います。
startingTimestamp は、startingOffsetsByTimestamp および startingOffsets よりも優先されます。

注: ストリーミング クエリの場合、これは新しいクエリが開始された場合にのみ適用されます。 再起動されたストリーミング クエリは、クエリ チェックポイントで定義されているオフセットから続行されます。 クエリ中に新しく検出されたパーティションが最も早く開始されます。

既定値: なし

Note

各パーティションに返されるオフセットは、タイムスタンプが対応するパーティション内の指定されたタイムスタンプと同等かそれ以上の最も古いオフセットです。 Kafka が一致したオフセットを返さない場合、各オプションの説明を確認する場合、動作はオプションによって異なります。

Spark は、タイムスタンプ情報を KafkaConsumer.offsetsForTimes に渡すだけで、値の解釈や理由付けは行われません。 KafkaConsumer.offsetsForTimes の詳細については、ドキュメントを参照してください。 また、ここでのタイムスタンプの意味は、Kafka 構成 (log.message.timestamp.type) によって異なる場合があります。 詳細については、Apache Kafka のドキュメントをご覧ください。