read_kafka
table-valued – funkce
Platí pro: Databricks SQL Databricks Runtime 13.3 LTS a vyšší
Čte data z clusteru Apache Kafka a vrací data v tabulkové podobě.
Může číst data z jednoho nebo více témat Kafka. Podporuje dávkové dotazy i příjem dat streamování.
Syntaxe
read_kafka([option_key => option_value ] [, ...])
Argumenty
Tato funkce vyžaduje vyvolání pojmenovaného parametru.
option_key
: Název možnosti konfigurace. U možností, které obsahují tečky (), je nutné použít backticks (.
').option_value
: Konstantní výraz, který nastaví možnost. Přijímá literály a skalární funkce.
Návraty
Záznamy načtené z clusteru Apache Kafka s následujícím schématem:
key BINARY
: Klíč záznamu Kafka.value BINARY NOT NULL
: Hodnota záznamu Kafka.topic STRING NOT NULL
: Název tématu Kafka, ze které se záznam načte.partition INT NOT NULL
: ID oddílu Kafka, ze které se záznam načte.offset BIGINT NOT NULL
: Číslo posunu záznamu v systému KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: Hodnota časového razítka záznamu. SloupectimestampType
definuje, jaké časové razítko odpovídá.timestampType INTEGER NOT NULL
: Typ časového razítka zadaného ve sloupcitimestamp
.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Hodnoty záhlaví zadané jako součást záznamu (pokud jsou povolené).
Příklady
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Možnosti
Podrobný seznam možností najdete v dokumentaci k Apache Sparku.
Požadované možnosti
Zadejte níže možnost připojení ke clusteru Kafka.
Možnost |
---|
bootstrapServers Typ: String Čárkami oddělený seznam dvojic hostitelů a portů odkazující na cluster Kafka. Výchozí hodnota: None |
Zadejte pouze jednu z níže uvedených možností ke konfiguraci témat Kafka, ze kterých se mají načíst data.
Možnost |
---|
assign Typ: String Řetězec JSON, který obsahuje konkrétní oddíly témat, ze kterého se mají využívat. Například '{"topicA":[0,1],"topicB":[2,4]}' 0 a 1. oddíly tématu A budou spotřebovány z.Výchozí hodnota: None |
subscribe Typ: String Čárkami oddělený seznam témat Kafka, ze které se mají číst. Výchozí hodnota: None |
subscribePattern Typ: String Regulární výraz odpovídající tématům pro přihlášení k odběru. Výchozí hodnota: None |
Různé možnosti
read_kafka
lze použít v dávkových dotazech a v dotazech streamování. Níže uvedené možnosti určují, na jaký typ dotazu se vztahují.
Možnost |
---|
endingOffsets Typ: String Typ dotazu: pouze dávkaPosuny, které se mají přečíst do dávkového dotazu, buď "latest" k zadání nejnovějších záznamů, nebo řetězce JSON určující koncový posun pro každou část TopicPartition. Ve formátu JSON -1 se jako posun dá použít k odkazu na nejnovější verzi. -2 (nejstarší) jako posun není povolen.Výchozí hodnota: "latest" |
endingOffsetsByTimestamp Typ: String Typ dotazu: pouze dávkaŘetězec JSON určující koncové časové razítko, které se má přečíst, dokud se pro každou část tématu nečte. Časové razítka je potřeba zadat jako dlouhou hodnotu časového razítka v milisekundách, protože 1970-01-01 00:00:00 UTC například1686444353000 . Podrobnosti o chování s časovými razítky najdete níže .endingOffsetsByTimestamp má přednost před endingOffsets .Výchozí hodnota: None |
endingTimestamp Typ: String Typ dotazu: pouze dávkaŘetězcová hodnota časového razítka v milisekundách od 1970-01-01 00:00:00 UTC například "1686444353000" . Pokud Kafka nevrátí odpovídající posun, posun se nastaví na nejnovější. Podrobnosti o chování s časovými razítky najdete níže . Poznámka: endingTimestamp má přednost před endingOffsetsByTimestamp aendingOffsets .Výchozí hodnota: None |
includeHeaders Typ: Boolean Typ dotazu: streamování a dávkaZda se mají do řádku zahrnout záhlaví Kafka. Výchozí hodnota: false |
kafka.<consumer_option> Typ: String Typ dotazu: streamování a dávkaVšechny možnosti specifické pro uživatele Kafka je možné předat s předponou kafka. . Tyto možnosti musí být při zadání obklopené backtickami, jinak se zobrazí chyba analyzátoru. Možnosti najdete v dokumentaci kafka.Poznámka: U této funkce byste neměli nastavit následující možnosti: key.deserializer , value.deserializer , , bootstrap.servers group.id Výchozí hodnota: None |
maxOffsetsPerTrigger Typ: Long Typ dotazu: Pouze streamováníOmezení rychlosti maximálního počtu posunů nebo řádků zpracovaných v intervalu aktivační události Zadaný celkový počet posunů bude proporcionálně rozdělen mezi položky TopicPartitions. Výchozí hodnota: None |
startingOffsets Typ: String Typ dotazu: streamování a dávkaPočáteční bod při spuštění dotazu, který je buď "earliest" od nejstarších posunů, "latest" což je pouze z nejnovějších posunů, nebo řetězec JSON určující počáteční posun pro každou topicPartition. Ve formátu JSON -2 je možné jako posun odkazovat na nejstarší -1 na nejnovější.Poznámka: U dávkových dotazů není povoleno nejnovější (implicitně nebo pomocí -1 ve formátu JSON). U streamovaných dotazů to platí jenom v případě, že se spustí nový dotaz. Restartování streamovaných dotazů bude pokračovat od posunů definovaných v kontrolním bodu dotazu. Nově zjištěné oddíly během dotazu se spustí nejdříve. Výchozí hodnota: "latest" pro streamování "earliest" pro dávku |
startingOffsetsByTimestamp Typ: String Typ dotazu: streamování a dávkaŘetězec JSON určující počáteční časové razítko pro každou část tématu. Časové razítka je třeba zadat jako dlouhou hodnotu časového razítka v milisekundách, protože 1970-01-01 00:00:00 UTC například 1686444353000 . Podrobnosti o chování s časovými razítky najdete níže . Pokud Kafka nevrací odpovídající posun, chování se použije k hodnotě možnosti startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp má přednost před startingOffsets .Poznámka: U streamovaných dotazů to platí jenom v případě, že se spustí nový dotaz. Restartování streamovaných dotazů bude pokračovat od posunů definovaných v kontrolním bodu dotazu. Nově zjištěné oddíly během dotazu se spustí nejdříve. Výchozí hodnota: None |
startingOffsetsByTimestampStrategy Typ: String Typ dotazu: streamování a dávkaTato strategie se používá, když zadané počáteční posuny podle časového razítka (globální nebo na oddíl) neodpovídá vráceným posunem Kafka. Dostupné strategie: - "error" : Selhání dotazu- "latest" : Přiřadí nejnovější posun těchto oddílů, aby Spark mohl číst novější záznamy z těchto oddílů v pozdějších mikrodávkách.Výchozí hodnota: "error" |
startingTimestamp Typ: String Typ dotazu: streamování a dávkaŘetězcová hodnota časového razítka v milisekundách od 1970-01-01 00:00:00 UTC například "1686444353000" . Podrobnosti o chování s časovými razítky najdete níže . Pokud Kafka nevrací odpovídající posun, chování se použije k hodnotě možnosti startingOffsetsByTimestampStrategy .startingTimestamp má přednost před startingOffsetsByTimestamp a startingOffsets .Poznámka: U streamovaných dotazů to platí jenom v případě, že se spustí nový dotaz. Restartování streamovaných dotazů bude pokračovat od posunů definovaných v kontrolním bodu dotazu. Nově zjištěné oddíly během dotazu se spustí nejdříve. Výchozí hodnota: None |
Poznámka:
Vrácený posun pro každý oddíl je nejstarší posun, jehož časové razítko je větší nebo rovno danému časovému razítku v odpovídajícím oddílu. Chování se liší v různých možnostech, pokud Kafka nevrací odpovídající posun – zkontrolujte popis jednotlivých možností.
Spark jednoduše předá informace o KafkaConsumer.offsetsForTimes
časovém razítku a neinterpretuje hodnotu ani důvod. Další podrobnosti najdete KafkaConsumer.offsetsForTimes
v dokumentaci. Význam časového razítka se zde také může lišit podle konfigurace Kafka (log.message.timestamp.type
). Podrobnosti najdete v dokumentaci k Apache Kafka.