read_kafka
táblaértékű függvény
A következőkre vonatkozik: Databricks SQL Databricks Runtime 13.3 LTS és újabb
Adatokat olvas be egy Apache Kafka-fürtből, és táblázatos formában adja vissza az adatokat.
Adatokat olvashat egy vagy több Kafka-témakörből. Támogatja a kötegelt lekérdezéseket és a streambetöltést is.
Syntax
read_kafka([option_key => option_value ] [, ...])
Argumentumok
Ehhez a függvényhez elnevezett paraméterhívás szükséges.
option_key
: A konfigurálni kívánt beállítás neve. A pontokat (.
) tartalmazó beállításokhoz a backticks (') függvényt kell használnia.option_value
: Állandó kifejezés a beállításhoz. Konstansokat és skaláris függvényeket fogad el.
Válaszok
Apache Kafka-fürtből beolvasott rekordok a következő sémával:
key BINARY
: A Kafka rekord kulcsa.value BINARY NOT NULL
: A Kafka rekord értéke.topic STRING NOT NULL
: Annak a Kafka-témakörnek a neve, amelyből a rekord olvasható.partition INT NOT NULL
: Annak a Kafka-partíciónak az azonosítója, amelyből a rekord beolvasva van.offset BIGINT NOT NULL
: A KafkábanTopicPartition
lévő rekord eltolási száma.timestamp TIMESTAMP NOT NULL
: A rekord időbélyege. AztimestampType
oszlop határozza meg, hogy minek felel meg ez az időbélyeg.timestampType INTEGER NOT NULL
: Az oszlopbantimestamp
megadott időbélyeg típusa.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: A rekord részeként megadott fejlécértékek (ha engedélyezve van).
Példák
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
);
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Beállítások
A lehetőségek részletes listáját az Apache Spark dokumentációjában találja.
Kötelező beállítások
Adja meg az alábbi lehetőséget a Kafka-fürthöz való csatlakozáshoz.
Lehetőség |
---|
bootstrapServers Típus: String A Kafka-fürtre mutató gazdagép-/portpárok vesszővel tagolt listája. Alapértelmezett érték: Nincs |
Az alábbi lehetőségek közül csak egyet adjon meg annak konfigurálásához, hogy mely Kafka-témakörökből szeretne adatokat lekérni.
Lehetőség |
---|
assign Típus: String Egy JSON-sztring, amely tartalmazza azokat a témakörpartíciókat, amelyekből felhasználandó. Például a '{"topicA":[0,1],"topicB":[2,4]}' topicA 0'0'0 és 1st partíciói a rendszerből lesznek felhasználva.Alapértelmezett érték: Nincs |
subscribe Típus: String A Kafka-témakörök vesszővel tagolt listája, amelyből olvasni szeretne. Alapértelmezett érték: Nincs |
subscribePattern Típus: String Rendszeres kifejezésmegfeleltetési témakörök, amelyekre előfizethet. Alapértelmezett érték: Nincs |
Egyéb beállítások
read_kafka
kötegelt lekérdezésekben és streamelési lekérdezésekben is használható. Az alábbi beállítások határozzák meg, hogy milyen típusú lekérdezésre vonatkoznak.
Lehetőség |
---|
endingOffsets Típus: String Lekérdezés típusa: csak kötegA köteglekérdezéshez beolvasandó eltolások a "latest" legújabb rekordok megadásához, vagy egy JSON-sztring, amely az egyes TopicPartition-elemek záró eltolását adja meg. A JSON-ban -1 eltolásként a legújabbakra lehet hivatkozni. -2 (legkorábbi) eltolás nem engedélyezett.Alapértelmezett érték: "latest" |
endingOffsetsByTimestamp Típus: String Lekérdezés típusa: csak kötegEgy JSON-sztring, amely egy befejezési időbélyeget ad meg az egyes TopicPartition-okhoz való olvasáshoz. Az időbélyegeket az időbélyeg hosszú értékeként kell megadni ezredmásodpercben, mivel 1970-01-01 00:00:00 UTC például1686444353000 . Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat.endingOffsetsByTimestamp elsőbbséget élvez a endingOffsets .Alapértelmezett érték: Nincs |
endingTimestamp Típus: String Lekérdezés típusa: csak kötegAz időbélyeg sztringértéke ezredmásodpercben 1970-01-01 00:00:00 UTC "1686444353000" például. Ha a Kafka nem adja vissza a kiegyenlített eltolást, az eltolás a legújabb értékre lesz állítva. Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat. Megjegyzés: endingTimestamp elsőbbséget endingOffsetsByTimestamp élvez aendingOffsets .Alapértelmezett érték: Nincs |
includeHeaders Típus: Boolean Lekérdezés típusa: streamelés és kötegA Kafka-fejlécek belefoglalása a sorba. Alapértelmezett érték: false |
kafka.<consumer_option> Típus: String Lekérdezés típusa: streamelés és kötegAz előtaggal kafka. bármely Kafka-fogyasztóra vonatkozó lehetőség átadható. Ezeket a beállításokat szükség esetén háttérrendszerrel kell körülvenni, ellenkező esetben elemzési hiba jelenik meg. A lehetőségeket a Kafka dokumentációjában találja.Megjegyzés: Ezzel a függvénnyel nem szabad a következő beállításokat megadni: key.deserializer , value.deserializer , bootstrap.servers group.id Alapértelmezett érték: Nincs |
maxOffsetsPerTrigger Típus: Long Lekérdezés típusa: csak streamelésAz eseményindító-intervallumonként feldolgozott eltolások vagy sorok maximális számának sebességkorlátja. A megadott eltolások száma arányosan lesz felosztva a TopicPartitionsra. Alapértelmezett érték: Nincs |
startingOffsets Típus: String Lekérdezés típusa: streamelés és kötegA lekérdezés indításának kezdőpontja, amely vagy "earliest" a legkorábbi eltolásokból származik, "latest" amely csak a legújabb eltolásokból származik, vagy egy JSON-sztring, amely minden TopicPartition kezdő eltolását adja meg. A JSON-ban -2 eltolásként a legkorábbi és -1 a legkésőbbi értékre hivatkozhat.Megjegyzés: Kötegelt lekérdezések esetén a legújabb (implicit módon vagy a -1 JSON-ban történő használatával) nem engedélyezett. Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes. Az újraindult streamelési lekérdezések a lekérdezési ellenőrzőpontban meghatározott eltolásokból folytatódnak. A lekérdezés során újonnan felfedezett partíciók legkorábban elindulnak. Alapértelmezett érték: "latest" streameléshez, "earliest" köteghez |
startingOffsetsByTimestamp Típus: String Lekérdezés típusa: streamelés és kötegEgy JSON-sztring, amely meghatározza az egyes TopicPartition-okhoz tartozó kezdő időbélyeget. Az időbélyegeket az időbélyeg hosszú értékeként kell megadni ezredmásodpercben, mivel 1970-01-01 00:00:00 UTC például 1686444353000 . Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat. Ha a Kafka nem adja vissza a kiegyenlített eltolást, a viselkedés a beállítás startingOffsetsByTimestampStrategy értékéhez igazodik.startingOffsetsByTimestamp elsőbbséget élvez a startingOffsets .Megjegyzés: Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes. Az újraindult streamelési lekérdezések a lekérdezési ellenőrzőpontban meghatározott eltolásokból folytatódnak. A lekérdezés során újonnan felfedezett partíciók legkorábban elindulnak. Alapértelmezett érték: Nincs |
startingOffsetsByTimestampStrategy Típus: String Lekérdezés típusa: streamelés és kötegEz a stratégia akkor használatos, ha az időbélyeggel megadott kezdő eltolás (globális vagy partíciónkénti) nem egyezik a kafka visszaadott eltolásával. A rendelkezésre álló stratégiák a következők: * "error" : sikertelen a lekérdezés* "latest" : hozzárendeli a partíciók legújabb eltolását, hogy a Spark újabb rekordokat olvasson be ezekből a partíciókból a későbbi mikrokötegekben.Alapértelmezett érték: "error" |
startingTimestamp Típus: String Lekérdezés típusa: streamelés és kötegAz időbélyeg sztringértéke ezredmásodpercben 1970-01-01 00:00:00 UTC "1686444353000" például. Az időbélyegekkel kapcsolatos viselkedés részleteiről az alábbi megjegyzésben olvashat. Ha a Kafka nem adja vissza a kiegyenlített eltolást, a viselkedés a beállítás startingOffsetsByTimestampStrategy értékéhez igazodik.startingTimestamp elsőbbséget startingOffsetsByTimestamp élvez és startingOffsets .Megjegyzés: Streamelési lekérdezések esetén ez csak új lekérdezés indításakor érvényes. Az újraindult streamelési lekérdezések a lekérdezési ellenőrzőpontban meghatározott eltolásokból folytatódnak. A lekérdezés során újonnan felfedezett partíciók legkorábban elindulnak. Alapértelmezett érték: Nincs |
Feljegyzés
Az egyes partíciók visszaadott eltolása az a legkorábbi eltolás, amelynek időbélyege nagyobb vagy egyenlő a megfelelő partíció adott időbélyegénél. A viselkedés különböző lehetőségeket mutat, ha a Kafka nem adja vissza a megfelelő eltolást – ellenőrizze az egyes beállítások leírását.
A Spark egyszerűen átadja az időbélyeg adatait KafkaConsumer.offsetsForTimes
, és nem értelmezi vagy nem okolja az értéket. További részletekért KafkaConsumer.offsetsForTimes
tekintse meg a dokumentációt. Az időbélyeg jelentése a Kafka-konfigurációtól (log.message.timestamp.type
) függően változhat. További részletekért tekintse meg az Apache Kafka dokumentációját.