read_kafka táblaértékű függvény

A következőkre vonatkozik:jelölje be az igennel jelölt jelölőnégyzetet Databricks SQL jelölje be az igennel jelölt jelölőnégyzetet Databricks Runtime 13.3 LTS és újabb

Adatokat olvas be egy Apache Kafka-fürtből, és táblázatos formában adja vissza az adatokat.

Adatokat olvashat egy vagy több Kafka-témakörből. Támogatja a kötegelt lekérdezéseket és a streambetöltést is.

Syntax

read_kafka([option_key => option_value ] [, ...])

Argumentumok

Ehhez a függvényhez elnevezett paraméterhívás szükséges.

  • option_key: A konfigurálni kívánt beállítás neve. A pontokat (.) tartalmazó beállításokhoz a backticks (') függvényt kell használnia.
  • option_value: Állandó kifejezés a beállításhoz. Konstansokat és skaláris függvényeket fogad el.

Válaszok

Apache Kafka-fürtből beolvasott rekordok a következő sémával:

  • key BINARY: A Kafka rekord kulcsa.
  • value BINARY NOT NULL: A Kafka rekord értéke.
  • topic STRING NOT NULL: Annak a Kafka-témakörnek a neve, amelyből a rekord olvasható.
  • partition INT NOT NULL: Annak a Kafka-partíciónak az azonosítója, amelyből a rekord beolvasva van.
  • offset BIGINT NOT NULL: A Kafkában TopicPartitionlévő rekord eltolási száma.
  • timestamp TIMESTAMP NOT NULL: A rekord időbélyege. Az timestampType oszlop határozza meg, hogy minek felel meg ez az időbélyeg.
  • timestampType INTEGER NOT NULL: Az oszlopban timestamp megadott időbélyeg típusa.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: A rekord részeként megadott fejlécértékek (ha engedélyezve van).

Példák

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Beállítások

A lehetőségek részletes listáját az Apache Spark dokumentációjában találja.

Kötelező beállítások

Adja meg az alábbi lehetőséget a Kafka-fürthöz való csatlakozáshoz.

Lehetőség
bootstrapServers

Típus: String

A Kafka-fürtre mutató gazdagép-/portpárok vesszővel tagolt listája.

Alapértelmezett érték: Nincs

Az alábbi lehetőségek közül csak egyet adjon meg annak konfigurálásához, hogy mely Kafka-témakörökből szeretne adatokat lekérni.

Lehetőség
assign

Típus: String

Egy JSON-sztring, amely tartalmazza azokat a témakörpartíciókat, amelyekből felhasználandó. Például a '{"topicA":[0,1],"topicB":[2,4]}'topicA 0'0'0 és 1st partíciói a rendszerből lesznek felhasználva.

Alapértelmezett érték: Nincs
subscribe

Típus: String

A Kafka-témakörök vesszővel tagolt listája, amelyből olvasni szeretne.

Alapértelmezett érték: Nincs
subscribePattern

Típus: String

Rendszeres kifejezésmegfeleltetési témakörök, amelyekre előfizethet.

Alapértelmezett érték: Nincs

Egyéb beállítások

read_kafka kötegelt lekérdezésekben és streamelési lekérdezésekben is használható. Az alábbi beállítások határozzák meg, hogy milyen típusú lekérdezésre vonatkoznak.

Lehetőség
endingOffsets

Típus: String Lekérdezés típusa: csak köteg

A köteglekérdezéshez beolvasandó eltolások a "latest" legújabb rekordok megadásához, vagy egy JSON-sztring, amely az egyes TopicPartition-elemek záró eltolását adja meg. A JSON-ban -1 eltolásként a legújabbakra lehet hivatkozni. -2 (legkorábbi) eltolás nem engedélyezett.

Alapértelmezett érték: "latest"
endingOffsetsByTimestamp

Típus: String Lekérdezés típusa: csak köteg

Egy JSON-sztring, amely egy befejezési időbélyeget ad meg az egyes TopicPartition-okhoz való olvasáshoz. Az időbélyegeket az időbélyeg hosszú értékeként kell megadni ezredmásodpercben, mivel 1970-01-01 00:00:00 UTCpéldául
1686444353000. Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat.
endingOffsetsByTimestamp elsőbbséget élvez a endingOffsets.

Alapértelmezett érték: Nincs
endingTimestamp

Típus: String Lekérdezés típusa: csak köteg

Az időbélyeg sztringértéke ezredmásodpercben
1970-01-01 00:00:00 UTC"1686444353000"például. Ha a Kafka nem adja vissza a kiegyenlített eltolást, az eltolás a legújabb értékre lesz állítva. Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat. Megjegyzés: endingTimestamp elsőbbséget endingOffsetsByTimestamp élvez a
endingOffsets.

Alapértelmezett érték: Nincs
includeHeaders

Típus: Boolean Lekérdezés típusa: streamelés és köteg

A Kafka-fejlécek belefoglalása a sorba.

Alapértelmezett érték: false
kafka.<consumer_option>

Típus: String Lekérdezés típusa: streamelés és köteg

Az előtaggal kafka. bármely Kafka-fogyasztóra vonatkozó lehetőség átadható. Ezeket a beállításokat szükség esetén háttérrendszerrel kell körülvenni, ellenkező esetben elemzési hiba jelenik meg. A lehetőségeket a Kafka dokumentációjában találja.

Megjegyzés: Ezzel a függvénnyel nem szabad a következő beállításokat megadni:
key.deserializer, value.deserializer, bootstrap.serversgroup.id

Alapértelmezett érték: Nincs
maxOffsetsPerTrigger

Típus: Long Lekérdezés típusa: csak streamelés

Az eseményindító-intervallumonként feldolgozott eltolások vagy sorok maximális számának sebességkorlátja. A megadott eltolások száma arányosan lesz felosztva a TopicPartitionsra.

Alapértelmezett érték: Nincs
startingOffsets

Típus: String Lekérdezés típusa: streamelés és köteg

A lekérdezés indításának kezdőpontja, amely vagy "earliest" a legkorábbi eltolásokból származik, "latest" amely csak a legújabb eltolásokból származik, vagy egy JSON-sztring, amely minden TopicPartition kezdő eltolását adja meg. A JSON-ban -2 eltolásként a legkorábbi és -1 a legkésőbbi értékre hivatkozhat.

Megjegyzés: Kötegelt lekérdezések esetén a legújabb (implicit módon vagy a -1 JSON-ban történő használatával) nem engedélyezett. Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes. Az újraindult streamelési lekérdezések a lekérdezési ellenőrzőpontban meghatározott eltolásokból folytatódnak. A lekérdezés során újonnan felfedezett partíciók legkorábban elindulnak.

Alapértelmezett érték: "latest" streameléshez, "earliest" köteghez
startingOffsetsByTimestamp

Típus: String Lekérdezés típusa: streamelés és köteg

Egy JSON-sztring, amely meghatározza az egyes TopicPartition-okhoz tartozó kezdő időbélyeget. Az időbélyegeket az időbélyeg hosszú értékeként kell megadni ezredmásodpercben, mivel 1970-01-01 00:00:00 UTCpéldául 1686444353000. Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat. Ha a Kafka nem adja vissza a kiegyenlített eltolást, a viselkedés a beállítás startingOffsetsByTimestampStrategyértékéhez igazodik.
startingOffsetsByTimestamp elsőbbséget élvez a startingOffsets.

Megjegyzés: Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes. Az újraindult streamelési lekérdezések a lekérdezési ellenőrzőpontban meghatározott eltolásokból folytatódnak. A lekérdezés során újonnan felfedezett partíciók legkorábban elindulnak.

Alapértelmezett érték: Nincs
startingOffsetsByTimestampStrategy

Típus: String Lekérdezés típusa: streamelés és köteg

Ez a stratégia akkor használatos, ha az időbélyeggel megadott kezdő eltolás (globális vagy partíciónkénti) nem egyezik a kafka visszaadott eltolásával. A rendelkezésre álló stratégiák a következők:

* "error": sikertelen a lekérdezés
* "latest": hozzárendeli a partíciók legújabb eltolását, hogy a Spark újabb rekordokat olvasson be ezekből a partíciókból a későbbi mikrokötegekben.

Alapértelmezett érték: "error"
startingTimestamp

Típus: String Lekérdezés típusa: streamelés és köteg

Az időbélyeg sztringértéke ezredmásodpercben
1970-01-01 00:00:00 UTC"1686444353000"például. Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat. Ha a Kafka nem adja vissza a kiegyenlített eltolást, a viselkedés a beállítás startingOffsetsByTimestampStrategyértékéhez igazodik.
startingTimestamp elsőbbséget startingOffsetsByTimestamp élvez és startingOffsets.

Megjegyzés: Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes. Az újraindult streamelési lekérdezések a lekérdezési ellenőrzőpontban meghatározott eltolásokból folytatódnak. A lekérdezés során újonnan felfedezett partíciók legkorábban elindulnak.

Alapértelmezett érték: Nincs

Feljegyzés

Az egyes partíciók visszaadott eltolása az a legkorábbi eltolás, amelynek időbélyege nagyobb vagy egyenlő a megfelelő partíció adott időbélyegénél. A viselkedés különböző lehetőségeket mutat, ha a Kafka nem adja vissza a megfelelő eltolást – ellenőrizze az egyes beállítások leírását.

A Spark egyszerűen átadja az időbélyeg adatait KafkaConsumer.offsetsForTimes, és nem értelmezi vagy nem okolja az értéket. További részletekért KafkaConsumer.offsetsForTimestekintse meg a dokumentációt. Az időbélyeg jelentése a Kafka-konfigurációtól (log.message.timestamp.type) függően változhat. További részletekért tekintse meg az Apache Kafka dokumentációját.