read_kafka
-Tabellenwertfunktion
Gilt für: Databricks SQL Databricks Runtime 13.3 LTS und höher
Liest Daten aus einem Apache Kafka-Cluster und gibt die Daten in tabellarischer Form zurück.
Kann Daten aus einem oder mehreren Kafka-Themen lesen. Es unterstützt sowohl Batchabfragen als auch Streamingerfassung.
Syntax
read_kafka([option_key => option_value ] [, ...])
Argumente
Diese Funktion erfordert einen Aufruf benannter Parameter für die Optionsschlüssel.
option_key
: Der Name der zu konfigurierenden Option. Sie müssen Graviszeichen (') für Optionen verwenden, die Punkte (.
) enthalten.option_value
: Ein konstanter Ausdruck, auf den die Option festgelegt werden soll. Akzeptiert Literale und Skalarfunktionen.
Gibt zurück
Datensätze, die aus einem Apache Kafka-Cluster gelesen werden, mit dem folgenden Schema:
key BINARY
: Der Schlüssel des Kafka-Datensatzes.value BINARY NOT NULL
: Der Wert des Kafka-Datensatzes.topic STRING NOT NULL
: Der Name des Kafka-Themas, aus dem der Datensatz gelesen wird.partition INT NOT NULL
: Die ID der Kafka-Partition, aus der der Datensatz gelesen wird.offset BIGINT NOT NULL
: Die Offsetnummer des Datensatzes im KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: Ein Zeitstempelwert für den Datensatz. Die SpaltetimestampType
definiert, was dieser Zeitstempel entspricht.timestampType INTEGER NOT NULL
: Der Typ des in der Spaltetimestamp
angegebenen Zeitstempels.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Headerwerte, die als Teil des Datensatzes bereitgestellt werden (sofern aktiviert).
Beispiele
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Optionen
Eine ausführliche Liste der Optionen finden Sie in der Apache Spark-Dokumentation.
Erforderliche Optionen
Geben Sie die folgende Option an, um eine Verbindung mit Ihrem Kafka-Cluster herzustellen.
Option |
---|
bootstrapServers Typ: String Eine durch Trennzeichen getrennte Liste von Host/Port-Paaren, die auf den Kafka-Cluster verweisen. Standardwert: None |
Stellen Sie nur eine der folgenden Optionen bereit, um zu konfigurieren, aus welchen Kafka-Themen Daten abgerufen werden sollen.
Option |
---|
assign Typ: String Eine JSON-Zeichenfolge, die die spezifischen Themenpartitionen enthält, von denen konsumiert werden soll. Beispielsweise werden für '{"topicA":[0,1],"topicB":[2,4]}' die 0'th- und die 1. Partition von topicA verwendet.Standardwert: None |
subscribe Typ: String Eine durch Kommas getrennte Liste von Kafka-Themen, aus der gelesen werden soll. Standardwert: None |
subscribePattern Typ: String Ein regulärer Ausdruck, der themengleich ist, die abonniert werden sollen. Standardwert: None |
Sonstige Optionen
read_kafka
kann in Batchabfragen und in Streamingabfragen verwendet werden. Die folgenden Optionen geben an, auf welchen Abfragetyp sie angewendet werden.
Option |
---|
endingOffsets Typ: String -Abfragetyp: nur BatchDie Offsets, die für eine Batchabfrage gelesen werden sollen, entweder "latest" , um die neuesten Datensätze anzugeben, oder eine JSON-Zeichenfolge, die einen endenden Offset für jede TopicPartition angibt. In der JSON-Zeichenfolge kann mit dem Offset -1 auf den spätestmöglichen Offset verwiesen werden. -2 (frühestens), da ein Offset nicht zulässig ist.Standardwert: "latest" |
endingOffsetsByTimestamp Typ: String -Abfragetyp: nur BatchEine JSON-Zeichenfolge, die einen Endzeitstempel angibt, der für jede TopicPartition gelesen werden soll. Die Zeitstempel müssen als langer Wert des Zeitstempels in Millisekunden angegeben werden, z. B. 1970-01-01 00:00:00 UTC 1686444353000 . Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten.endingOffsetsByTimestamp hat Vorrang vor endingOffsets .Standardwert: None |
endingTimestamp Typ: String -Abfragetyp: nur BatchEin Zeichenfolgenwert des Zeitstempels in Millisekunden seit 1970-01-01 00:00:00 UTC , z. B. "1686444353000" . Wenn Kafka den übereinstimmenden Offset nicht zurückgibt, wird der Offset auf neueste festgelegt. Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Hinweis: endingTimestamp hat Vorrang vor endingOffsetsByTimestamp undendingOffsets .Standardwert: None |
includeHeaders Typ: Boolean -Abfragetyp: Streaming und BatchGibt an, ob die Kafka-Header in die Zeile eingeschlossen werden sollen. Standardwert: false |
kafka.<consumer_option> Typ: String -Abfragetyp: Streaming und BatchAlle verbraucherspezifischen Kafka-Optionen können mit dem Präfix kafka. übergeben werden. Diese Optionen müssen bei der Bereitstellung von Backticks umgeben sein, andernfalls erhalten Sie einen Parserfehler. Die Optionen finden Sie in der Kafka-Dokumentation.Hinweis: Sie sollten die folgenden Optionen nicht mit dieser Funktion festlegen: key.deserializer , value.deserializer , bootstrap.servers , group.id Standardwert: None |
maxOffsetsPerTrigger Typ: Long -Abfragetyp: nur StreamingRatenbegrenzung für die maximale Anzahl von Offsets oder Zeilen, die pro Triggerintervall verarbeitet werden. Die angegebene Gesamtanzahl von Offests wird proportional auf TopicPartitions aufgeteilt. Standardwert: None |
startingOffsets Typ: String -Abfragetyp: Streaming und BatchDer Startpunkt einer Abfrage – entweder "earliest" (also ab den frühestmöglichen Offsets), "latest" (also ob den spätestmöglichen Offsets) oder eine JSON-Zeichenfolge, die einen Startversatz für jede Themenpartition (TopicPartition) angibt. In der JSON-Zeichenfolge kann mit dem Offset -2 auf den frühestmöglichen und mit -1 auf den spätestmöglichen Offset verwiesen werden.Hinweis: Bei Batchabfragen ist „latest“ nicht zulässig (weder implizit noch in Form von „-1“ in JSON). Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen beim frühestmöglichen Startpunkt. Standardwert: "latest" für Streaming, "earliest" für Batch |
startingOffsetsByTimestamp Typ: String -Abfragetyp: Streaming und BatchEine JSON-Zeichenfolge, die einen Startzeitstempel für jede TopicPartition angibt. Die Zeitstempel müssen als langer Wert des Zeitstempels in Millisekunden seit 1970-01-01 00:00:00 UTC angegeben werden, z. B. 1686444353000 Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Wenn Kafka den übereinstimmenden Offset nicht zurückgibt, folgt das Verhalten dem Wert der Option startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp hat Vorrang vor startingOffsets .Hinweis: Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen beim frühestmöglichen Startpunkt. Standardwert: None |
startingOffsetsByTimestampStrategy Typ: String -Abfragetyp: Streaming und BatchDiese Strategie wird verwendet, wenn der angegebene Startoffset nach Zeitstempel (entweder global oder pro Partition) nicht mit dem von Kafka zurückgegebenen Offset übereinstimmt. Die verfügbaren Strategien sind: - "error" : Fehler bei der Abfrage- "latest" : weist den neuesten Offset für diese Partitionen zu, damit Spark neuere Datensätze aus diesen Partitionen in späteren Mikrobatches lesen kann.Standardwert: "error" |
startingTimestamp Typ: String -Abfragetyp: Streaming und BatchEin Zeichenfolgenwert des Zeitstempels in Millisekunden seit 1970-01-01 00:00:00 UTC , z. B. "1686444353000" . Weitere Informationen zum Verhalten mit Zeitstempeln finden Sie weiter unten. Wenn Kafka den übereinstimmenden Offset nicht zurückgibt, folgt das Verhalten dem Wert der Option startingOffsetsByTimestampStrategy .startingTimestamp hat Vorrang vor startingOffsetsByTimestamp und startingOffsets .Hinweis: Für Streamingabfragen gilt dies nur, wenn eine neue Abfrage gestartet wird. Neu gestartete Streamingabfragen werden von den im Abfrageprüfpunkt definierten Offsets fortgesetzt. Bei einer Abfrage neu entdeckte Partitionen beginnen am frühestmöglichen Startpunkt. Standardwert: None |
Hinweis
Der zurückgegebene Offset für jede Partition ist der früheste Offset, dessen Zeitstempel größer oder gleich dem angegebenen Zeitstempel in der entsprechenden Partition ist. Das Verhalten variiert je nach Option, wenn Kafka den übereinstimmenden Offset nicht zurückgibt. Überprüfen Sie die Beschreibung der einzelnen Optionen.
Spark übergibt einfach die Zeitstempelinformationen an KafkaConsumer.offsetsForTimes
und interpretiert oder begründet den Wert nicht. Weitere Informationen zu KafkaConsumer.offsetsForTimes
finden Sie in der Dokumentation. Auch die Bedeutung von Zeitstempel kann hier je nach Kafka-Konfiguration (log.message.timestamp.type
) variieren. Weitere Informationen finden Sie in der Apache Kafka-Dokumentation.