read_kafka
fungsi table-valued
Berlaku untuk: Databricks SQL Databricks Runtime 13.3 LTS ke atas
Membaca data dari kluster Apache Kafka dan mengembalikan data dalam bentuk tabular.
Dapat membaca data dari satu atau beberapa topik Kafka. Ini mendukung kueri batch dan penyerapan streaming.
Sintaks
read_kafka([option_key => option_value ] [, ...])
Argumen
Fungsi ini memerlukan pemanggilan parameter bernama.
option_key
: Nama opsi untuk dikonfigurasi. Anda harus menggunakan backtick (') untuk opsi yang berisi titik-titik (.
).option_value
: Ekspresi konstanta untuk mengatur opsi. Menerima literal dan fungsi skalar.
Mengembalikan
Rekaman yang dibaca dari kluster Apache Kafka dengan skema berikut:
key BINARY
: Kunci rekaman Kafka.value BINARY NOT NULL
: Nilai rekaman Kafka.topic STRING NOT NULL
: Nama topik Kafka tempat rekaman dibaca.partition INT NOT NULL
: ID partisi Kafka tempat rekaman dibaca.offset BIGINT NOT NULL
: Jumlah offset rekaman di KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: Nilai tanda waktu untuk rekaman. KolomtimestampType
menentukan apa yang terkait dengan tanda waktu ini.timestampType INTEGER NOT NULL
: Jenis tanda waktu yang ditentukan dalamtimestamp
kolom.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Nilai header disediakan sebagai bagian dari rekaman (jika diaktifkan).
Contoh
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
);
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opsi
Anda dapat menemukan daftar opsi terperinci dalam dokumentasi Apache Spark.
Opsi yang diperlukan
Berikan opsi di bawah ini untuk menyambungkan ke kluster Kafka Anda.
Opsi |
---|
bootstrapServers Jenis: String Daftar pasangan host/port yang dipisahkan koma yang menunjuk ke kluster Kafka. Nilai default: None |
Berikan hanya salah satu opsi di bawah ini untuk mengonfigurasi topik Kafka mana yang akan diambil datanya.
Opsi |
---|
assign Jenis: String String JSON yang berisi partisi topik tertentu untuk digunakan. Misalnya, untuk '{"topicA":[0,1],"topicB":[2,4]}' partisi 0'th dan 1st topicA akan digunakan.Nilai default: None |
subscribe Jenis: String Daftar topik Kafka yang dipisahkan koma untuk dibaca. Nilai default: None |
subscribePattern Jenis: String Topik yang cocok dengan ekspresi reguler untuk berlangganan. Nilai default: None |
Opsi lain-lain
read_kafka
dapat digunakan dalam kueri batch dan dalam kueri streaming. Opsi di bawah ini menentukan jenis kueri mana yang mereka terapkan.
Opsi |
---|
endingOffsets Jenis: String Jenis Kueri: hanya batchOffset yang akan dibaca hingga kueri batch, baik "latest" untuk menentukan rekaman terbaru, atau string JSON yang menentukan offset akhir untuk setiap TopicPartition. Di JSON, -1 sebagai offset dapat digunakan untuk merujuk ke yang terbaru. -2 (paling awal) karena offset tidak diperbolehkan.Nilai default: "latest" |
endingOffsetsByTimestamp Jenis: String Jenis Kueri: hanya batchString JSON yang menentukan tanda waktu akhir untuk dibaca hingga untuk setiap TopicPartition. Tanda waktu perlu disediakan sebagai nilai panjang tanda waktu dalam milidetik sejak 1970-01-01 00:00:00 UTC , misalnya1686444353000 . Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu.endingOffsetsByTimestamp lebih diutamakan daripada endingOffsets .Nilai default: None |
endingTimestamp Jenis: String Jenis Kueri: hanya batchNilai string tanda waktu dalam milidetik sejak 1970-01-01 00:00:00 UTC , misalnya "1686444353000" . Jika Kafka tidak mengembalikan offset yang cocok, offset akan diatur ke terbaru. Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu. Catatan: endingTimestamp lebih diutamakan dan endingOffsetsByTimestamp endingOffsets .Nilai default: None |
includeHeaders Jenis: Boolean Jenis Kueri: streaming dan batchApakah akan menyertakan header Kafka dalam baris. Nilai default: false |
kafka.<consumer_option> Jenis: String Jenis Kueri: streaming dan batchSetiap opsi khusus konsumen Kafka dapat diteruskan dengan awalan kafka. . Opsi ini perlu dikelilingi oleh backtick ketika disediakan, jika tidak, Anda akan mendapatkan kesalahan pengurai. Anda dapat menemukan opsi dalam dokumentasi Kafka.Catatan: Anda tidak boleh mengatur opsi berikut dengan fungsi ini: key.deserializer , , value.deserializer bootstrap.servers ,group.id Nilai default: None |
maxOffsetsPerTrigger Jenis: Long Jenis Kueri: streaming sajaBatas laju pada jumlah maksimum offset atau baris yang diproses per interval pemicu. Jumlah total offset yang ditentukan akan dibagi secara proporsional di seluruh TopicPartitions. Nilai default: None |
startingOffsets Jenis: String Jenis Kueri: streaming dan batchTitik awal ketika kueri dimulai, baik "earliest" yang berasal dari offset paling awal, "latest" yang hanya dari offset terbaru, atau string JSON yang menentukan offset awal untuk setiap TopicPartition. Di JSON, -2 sebagai offset dapat digunakan untuk merujuk ke paling awal, -1 ke terbaru.Catatan: Untuk kueri batch, terbaru (baik secara implisit atau dengan menggunakan -1 di JSON) tidak diizinkan. Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai. Kueri streaming yang dimulai ulang akan berlanjut dari offset yang ditentukan dalam titik pemeriksaan kueri. Partisi yang baru ditemukan selama kueri akan dimulai paling awal. Nilai default: "latest" untuk streaming, "earliest" untuk batch |
startingOffsetsByTimestamp Jenis: String Jenis Kueri: streaming dan batchString JSON yang menentukan tanda waktu awal untuk setiap TopicPartition. Tanda waktu perlu disediakan sebagai nilai panjang tanda waktu dalam milidetik karena 1970-01-01 00:00:00 UTC , misalnya 1686444353000 . Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu. Jika Kafka tidak mengembalikan offset yang cocok, perilaku akan mengikuti nilai opsi startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp lebih diutamakan daripada startingOffsets .Catatan: Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai. Kueri streaming yang dimulai ulang akan berlanjut dari offset yang ditentukan dalam titik pemeriksaan kueri. Partisi yang baru ditemukan selama kueri akan dimulai paling awal. Nilai default: None |
startingOffsetsByTimestampStrategy Jenis: String Jenis Kueri: streaming dan batchStrategi ini digunakan ketika offset awal yang ditentukan oleh tanda waktu (baik global atau per partisi) tidak cocok dengan offset yang dikembalikan Kafka. Strategi yang tersedia adalah: * "error" : gagal kueri* "latest" : menetapkan offset terbaru untuk partisi ini sehingga Spark dapat membaca rekaman yang lebih baru dari partisi ini di batch mikro nanti.Nilai default: "error" |
startingTimestamp Jenis: String Jenis Kueri: streaming dan batchNilai string tanda waktu dalam milidetik sejak 1970-01-01 00:00:00 UTC , misalnya "1686444353000" . Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu. Jika Kafka tidak mengembalikan offset yang cocok, perilaku akan mengikuti nilai opsi startingOffsetsByTimestampStrategy .startingTimestamp lebih diutamakan dan startingOffsetsByTimestamp startingOffsets .Catatan: Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai. Kueri streaming yang dimulai ulang akan berlanjut dari offset yang ditentukan dalam titik pemeriksaan kueri. Partisi yang baru ditemukan selama kueri akan dimulai paling awal. Nilai default: None |
Catatan
Offset yang dikembalikan untuk setiap partisi adalah offset paling awal yang tanda waktunya lebih besar dari atau sama dengan tanda waktu yang diberikan dalam partisi yang sesuai. Perilaku bervariasi di seluruh opsi jika Kafka tidak mengembalikan offset yang cocok - periksa deskripsi setiap opsi.
Spark hanya meneruskan informasi tanda waktu ke KafkaConsumer.offsetsForTimes
, dan tidak menafsirkan atau alasan tentang nilai tersebut. Untuk detail selengkapnya tentang KafkaConsumer.offsetsForTimes
, silakan lihat dokumentasi. Selain itu, arti tanda waktu di sini dapat bervariasi sesuai dengan konfigurasi Kafka (log.message.timestamp.type
). Untuk detailnya, lihat dokumentasi Apache Kafka.