read_kafka-Tabellenwertfunktion

Gilt für:Häkchen gesetzt ja Databricks SQL Häkchen gesetzt ja Databricks Runtime ab Version 13.3 LTS

Liest Daten aus einem Apache Kafka-Cluster und gibt die Daten in tabellarischer Form zurück.

Kann Daten aus einem oder mehreren Kafka-Themen lesen. Es unterstützt sowohl Batchabfragen als auch Streamingerfassung.

Syntax

read_kafka([option_key => option_value ] [, ...])

Argumente

Diese Funktion erfordert einen Aufruf benannter Parameter für die Optionsschlüssel.

  • option_key: Der Name der zu konfigurierenden Option. Sie müssen Graviszeichen (') für Optionen verwenden, die Punkte (.) enthalten.
  • option_value: Ein konstanter Ausdruck, auf den die Option festgelegt werden soll. Akzeptiert Literale und Skalarfunktionen.

Gibt zurück

Datensätze, die aus einem Apache Kafka-Cluster gelesen werden, mit dem folgenden Schema:

  • key BINARY: Der Schlüssel des Kafka-Datensatzes.
  • value BINARY NOT NULL: Der Wert des Kafka-Datensatzes.
  • topic STRING NOT NULL: Der Name des Kafka-Themas, aus dem der Datensatz gelesen wird.
  • partition INT NOT NULL: Die ID der Kafka-Partition, aus der der Datensatz gelesen wird.
  • offset BIGINT NOT NULL: Die Offsetnummer des Datensatzes im Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Ein Zeitstempelwert für den Datensatz. Die Spalte timestampType definiert, was dieser Zeitstempel entspricht.
  • timestampType INTEGER NOT NULL: Der Typ des in der Spalte timestamp angegebenen Zeitstempels.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Headerwerte, die als Teil des Datensatzes bereitgestellt werden (sofern aktiviert).

Beispiele

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Optionen

Eine ausführliche Liste der Optionen finden Sie in der Apache Spark-Dokumentation.

Erforderliche Optionen

Geben Sie die folgende Option an, um eine Verbindung mit Ihrem Kafka-Cluster herzustellen.

Option
bootstrapServers

Typ: String

Eine durch Trennzeichen getrennte Liste von Host/Port-Paaren, die auf den Kafka-Cluster verweisen.

Standardwert: None

Stellen Sie nur eine der folgenden Optionen bereit, um zu konfigurieren, aus welchen Kafka-Themen Daten abgerufen werden sollen.

Option
assign

Typ: String

Eine JSON-Zeichenfolge, die die spezifischen Themenpartitionen enthält, von denen konsumiert werden soll. Beispielsweise werden für '{"topicA":[0,1],"topicB":[2,4]}' die 0'th- und die 1. Partition von topicA verwendet.

Standardwert: None
subscribe

Typ: String

Eine durch Kommas getrennte Liste von Kafka-Themen, aus der gelesen werden soll.

Standardwert: None
subscribePattern

Typ: String

Ein regulärer Ausdruck, der themengleich ist, die abonniert werden sollen.

Standardwert: None

Sonstige Optionen

read_kafka kann in Batchabfragen und in Streamingabfragen verwendet werden. Die folgenden Optionen geben an, auf welchen Abfragetyp sie angewendet werden.

Option
endingOffsets

Typ: String-Abfragetyp: nur Batch

Die Offsets, die für eine Batchabfrage gelesen werden sollen, entweder "latest", um die neuesten Datensätze anzugeben, oder eine JSON-Zeichenfolge, die einen endenden Offset für jede TopicPartition angibt. In der JSON-Zeichenfolge kann mit dem Offset -1 auf den spätestmöglichen Offset verwiesen werden. -2 (frühestens), da ein Offset nicht zulässig ist.

Standardwert: "latest"
endingOffsetsByTimestamp

Typ: String-Abfragetyp: nur Batch

Eine JSON-Zeichenfolge, die einen Endzeitstempel angibt, der für jede TopicPartition gelesen werden soll. Die Zeitstempel müssen als langer Wert des Zeitstempels in Millisekunden angegeben werden, z. B. 1970-01-01 00:00:00 UTC
1686444353000. Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten.
endingOffsetsByTimestamp hat Vorrang vor endingOffsets.

Standardwert: None
endingTimestamp

Typ: String-Abfragetyp: nur Batch

Ein Zeichenfolgenwert des Zeitstempels in Millisekunden seit
1970-01-01 00:00:00 UTC, z. B. "1686444353000". Wenn Kafka den übereinstimmenden Offset nicht zurückgibt, wird der Offset auf neueste festgelegt. Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Hinweis: endingTimestamp hat Vorrang vor endingOffsetsByTimestamp und
endingOffsets.

Standardwert: None
includeHeaders

Typ: Boolean-Abfragetyp: Streaming und Batch

Gibt an, ob die Kafka-Header in die Zeile eingeschlossen werden sollen.

Standardwert: false
kafka.<consumer_option>

Typ: String-Abfragetyp: Streaming und Batch

Alle verbraucherspezifischen Kafka-Optionen können mit dem Präfix kafka. übergeben werden. Diese Optionen müssen bei der Bereitstellung von Backticks umgeben sein, andernfalls erhalten Sie einen Parserfehler. Die Optionen finden Sie in der Kafka-Dokumentation.

Hinweis: Sie sollten die folgenden Optionen nicht mit dieser Funktion festlegen:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Standardwert: None
maxOffsetsPerTrigger

Typ: Long-Abfragetyp: nur Streaming

Ratenbegrenzung für die maximale Anzahl von Offsets oder Zeilen, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtanzahl von Offests wird proportional auf TopicPartitions aufgeteilt.

Standardwert: None
startingOffsets

Typ: String-Abfragetyp: Streaming und Batch

Der Startpunkt einer Abfrage – entweder "earliest" (also ab den frühestmöglichen Offsets), "latest" (also ob den spätestmöglichen Offsets) oder eine JSON-Zeichenfolge, die einen Startversatz für jede Themenpartition (TopicPartition) angibt. In der JSON-Zeichenfolge kann mit dem Offset -2 auf den frühestmöglichen und mit -1 auf den spätestmöglichen Offset verwiesen werden.

Hinweis: Bei Batchabfragen ist „latest“ nicht zulässig (weder implizit noch in Form von „-1“ in JSON). Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen beim frühestmöglichen Startpunkt.

Standardwert: "latest" für Streaming, "earliest" für Batch
startingOffsetsByTimestamp

Typ: String-Abfragetyp: Streaming und Batch

Eine JSON-Zeichenfolge, die einen Startzeitstempel für jede TopicPartition angibt. Die Zeitstempel müssen als langer Wert des Zeitstempels in Millisekunden seit 1970-01-01 00:00:00 UTC angegeben werden, z. B. 1686444353000 Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Wenn Kafka den übereinstimmenden Offset nicht zurückgibt, folgt das Verhalten dem Wert der Option startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp hat Vorrang vor startingOffsets.

Hinweis: Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen beim frühestmöglichen Startpunkt.

Standardwert: None
startingOffsetsByTimestampStrategy

Typ: String-Abfragetyp: Streaming und Batch

Diese Strategie wird verwendet, wenn der angegebene Startoffset nach Zeitstempel (entweder global oder pro Partition) nicht mit dem von Kafka zurückgegebenen Offset übereinstimmt. Die verfügbaren Strategien sind:

* "error": Fehler bei der Abfrage
* "latest": weist den neuesten Offset für diese Partitionen zu, damit Spark neuere Datensätze aus diesen Partitionen in späteren Mikrobatches lesen kann.

Standardwert: "error"
startingTimestamp

Typ: String-Abfragetyp: Streaming und Batch

Ein Zeichenfolgenwert des Zeitstempels in Millisekunden seit
1970-01-01 00:00:00 UTC, z. B. "1686444353000". Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Wenn Kafka den übereinstimmenden Offset nicht zurückgibt, folgt das Verhalten dem Wert der Option startingOffsetsByTimestampStrategy.
startingTimestamp hat Vorrang vor startingOffsetsByTimestamp und startingOffsets.

Hinweis: Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen am frühestmöglichen Startpunkt.

Standardwert: None

Hinweis

Der zurückgegebene Offset für jede Partition ist der früheste Offset, dessen Zeitstempel größer oder gleich dem angegebenen Zeitstempel in der entsprechenden Partition ist. Das Verhalten variiert je nach Option, wenn Kafka den übereinstimmenden Offset nicht zurückgibt. Überprüfen Sie die Beschreibung der einzelnen Optionen.

Spark übergibt einfach die Zeitstempelinformationen an KafkaConsumer.offsetsForTimes und interpretiert oder begründet den Wert nicht. Weitere Informationen zu KafkaConsumer.offsetsForTimes finden Sie in der Dokumentation. Auch die Bedeutung von Zeitstempel kann hier je nach Kafka-Konfiguration (log.message.timestamp.type) variieren. Weitere Informationen finden Sie in der Apache Kafka-Dokumentation.