Sdílet prostřednictvím


read_kafka table-valued – funkce

Platí pro: zaškrtnutí označeného ano Databricks SQL zaškrtnutí označeného ano Databricks Runtime 13.3 LTS a vyšší

Čte data z clusteru Apache Kafka a vrací data v tabulkové podobě.

Může číst data z jednoho nebo více témat Kafka. Podporuje dávkové dotazy i příjem dat streamování.

Syntaxe

read_kafka([option_key => option_value ] [, ...])

Argumenty

Tato funkce vyžaduje vyvolání pojmenovaného parametru.

  • option_key: Název možnosti konfigurace. U možností, které obsahují tečky (), je nutné použít backticks (.').
  • option_value: Konstantní výraz, který nastaví možnost. Přijímá literály a skalární funkce.

Návraty

Záznamy načtené z clusteru Apache Kafka s následujícím schématem:

  • key BINARY: Klíč záznamu Kafka.
  • value BINARY NOT NULL: Hodnota záznamu Kafka.
  • topic STRING NOT NULL: Název tématu Kafka, ze které se záznam načte.
  • partition INT NOT NULL: ID oddílu Kafka, ze které se záznam načte.
  • offset BIGINT NOT NULL: Číslo posunu záznamu v systému Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Hodnota časového razítka záznamu. Sloupec timestampType definuje, jaké časové razítko odpovídá.
  • timestampType INTEGER NOT NULL: Typ časového razítka zadaného ve sloupci timestamp .
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Hodnoty záhlaví zadané jako součást záznamu (pokud jsou povolené).

Příklady

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Možnosti

Podrobný seznam možností najdete v dokumentaci k Apache Sparku.

Požadované možnosti

Zadejte níže možnost připojení ke clusteru Kafka.

Možnost
bootstrapServers

Typ: String

Čárkami oddělený seznam dvojic hostitelů a portů odkazující na cluster Kafka.

Výchozí hodnota: None

Zadejte pouze jednu z níže uvedených možností ke konfiguraci témat Kafka, ze kterých se mají načíst data.

Možnost
assign

Typ: String

Řetězec JSON, který obsahuje konkrétní oddíly témat, ze kterého se mají využívat. Například '{"topicA":[0,1],"topicB":[2,4]}'0 a 1. oddíly tématu A budou spotřebovány z.

Výchozí hodnota: None
subscribe

Typ: String

Čárkami oddělený seznam témat Kafka, ze které se mají číst.

Výchozí hodnota: None
subscribePattern

Typ: String

Regulární výraz odpovídající tématům pro přihlášení k odběru.

Výchozí hodnota: None

Různé možnosti

read_kafka lze použít v dávkových dotazech a v dotazech streamování. Níže uvedené možnosti určují, na jaký typ dotazu se vztahují.

Možnost
endingOffsets

Typ: String Typ dotazu: pouze dávka

Posuny, které se mají přečíst do dávkového dotazu, buď "latest" k zadání nejnovějších záznamů, nebo řetězce JSON určující koncový posun pro každou část TopicPartition. Ve formátu JSON -1 se jako posun dá použít k odkazu na nejnovější verzi. -2 (nejstarší) jako posun není povolen.

Výchozí hodnota: "latest"
endingOffsetsByTimestamp

Typ: String Typ dotazu: pouze dávka

Řetězec JSON určující koncové časové razítko, které se má přečíst, dokud se pro každou část tématu nečte. Časové razítka je potřeba zadat jako dlouhou hodnotu časového razítka v milisekundách, protože 1970-01-01 00:00:00 UTCnapříklad
1686444353000. Podrobnosti o chování s časovými razítky najdete níže .
endingOffsetsByTimestamp má přednost před endingOffsets.

Výchozí hodnota: None
endingTimestamp

Typ: String Typ dotazu: pouze dávka

Řetězcová hodnota časového razítka v milisekundách od
1970-01-01 00:00:00 UTCnapříklad "1686444353000". Pokud Kafka nevrátí odpovídající posun, posun se nastaví na nejnovější. Podrobnosti o chování s časovými razítky najdete níže . Poznámka: endingTimestamp má přednost před endingOffsetsByTimestamp a
endingOffsets.

Výchozí hodnota: None
includeHeaders

Typ: Boolean Typ dotazu: streamování a dávka

Zda se mají do řádku zahrnout záhlaví Kafka.

Výchozí hodnota: false
kafka.<consumer_option>

Typ: String Typ dotazu: streamování a dávka

Všechny možnosti specifické pro uživatele Kafka je možné předat s předponou kafka. . Tyto možnosti musí být při zadání obklopené backtickami, jinak se zobrazí chyba analyzátoru. Možnosti najdete v dokumentaci kafka.

Poznámka: U této funkce byste neměli nastavit následující možnosti:
key.deserializer, value.deserializer, , bootstrap.serversgroup.id

Výchozí hodnota: None
maxOffsetsPerTrigger

Typ: Long Typ dotazu: Pouze streamování

Omezení rychlosti maximálního počtu posunů nebo řádků zpracovaných v intervalu aktivační události Zadaný celkový počet posunů bude proporcionálně rozdělen mezi položky TopicPartitions.

Výchozí hodnota: None
startingOffsets

Typ: String Typ dotazu: streamování a dávka

Počáteční bod při spuštění dotazu, který je buď "earliest" od nejstarších posunů, "latest" což je pouze z nejnovějších posunů, nebo řetězec JSON určující počáteční posun pro každou topicPartition. Ve formátu JSON -2 je možné jako posun odkazovat na nejstarší -1 na nejnovější.

Poznámka: U dávkových dotazů není povoleno nejnovější (implicitně nebo pomocí -1 ve formátu JSON). U streamovaných dotazů to platí jenom v případě, že se spustí nový dotaz. Restartování streamovaných dotazů bude pokračovat od posunů definovaných v kontrolním bodu dotazu. Nově zjištěné oddíly během dotazu se spustí nejdříve.

Výchozí hodnota: "latest" pro streamování "earliest" pro dávku
startingOffsetsByTimestamp

Typ: String Typ dotazu: streamování a dávka

Řetězec JSON určující počáteční časové razítko pro každou část tématu. Časové razítka je třeba zadat jako dlouhou hodnotu časového razítka v milisekundách, protože 1970-01-01 00:00:00 UTCnapříklad 1686444353000. Podrobnosti o chování s časovými razítky najdete níže . Pokud Kafka nevrací odpovídající posun, chování se použije k hodnotě možnosti startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp má přednost před startingOffsets.

Poznámka: U streamovaných dotazů to platí jenom v případě, že se spustí nový dotaz. Restartování streamovaných dotazů bude pokračovat od posunů definovaných v kontrolním bodu dotazu. Nově zjištěné oddíly během dotazu se spustí nejdříve.

Výchozí hodnota: None
startingOffsetsByTimestampStrategy

Typ: String Typ dotazu: streamování a dávka

Tato strategie se používá, když zadané počáteční posuny podle časového razítka (globální nebo na oddíl) neodpovídá vráceným posunem Kafka. Dostupné strategie:

- "error": Selhání dotazu
- "latest": Přiřadí nejnovější posun těchto oddílů, aby Spark mohl číst novější záznamy z těchto oddílů v pozdějších mikrodávkách.

Výchozí hodnota: "error"
startingTimestamp

Typ: String Typ dotazu: streamování a dávka

Řetězcová hodnota časového razítka v milisekundách od
1970-01-01 00:00:00 UTCnapříklad "1686444353000". Podrobnosti o chování s časovými razítky najdete níže . Pokud Kafka nevrací odpovídající posun, chování se použije k hodnotě možnosti startingOffsetsByTimestampStrategy.
startingTimestamp má přednost před startingOffsetsByTimestamp a startingOffsets.

Poznámka: U streamovaných dotazů to platí jenom v případě, že se spustí nový dotaz. Restartování streamovaných dotazů bude pokračovat od posunů definovaných v kontrolním bodu dotazu. Nově zjištěné oddíly během dotazu se spustí nejdříve.

Výchozí hodnota: None

Poznámka:

Vrácený posun pro každý oddíl je nejstarší posun, jehož časové razítko je větší nebo rovno danému časovému razítku v odpovídajícím oddílu. Chování se liší v různých možnostech, pokud Kafka nevrací odpovídající posun – zkontrolujte popis jednotlivých možností.

Spark jednoduše předá informace o KafkaConsumer.offsetsForTimesčasovém razítku a neinterpretuje hodnotu ani důvod. Další podrobnosti najdete KafkaConsumer.offsetsForTimesv dokumentaci. Význam časového razítka se zde také může lišit podle konfigurace Kafka (log.message.timestamp.type). Podrobnosti najdete v dokumentaci k Apache Kafka.