read_kafka fungsi table-valued

Berlaku untuk:centang ditandai ya Databricks SQL centang ditandai ya Databricks Runtime 13.3 LTS ke atas

Membaca data dari kluster Apache Kafka dan mengembalikan data dalam bentuk tabular.

Dapat membaca data dari satu atau beberapa topik Kafka. Ini mendukung kueri batch dan penyerapan streaming.

Sintaks

read_kafka([option_key => option_value ] [, ...])

Argumen

Fungsi ini memerlukan pemanggilan parameter bernama.

  • option_key: Nama opsi untuk dikonfigurasi. Anda harus menggunakan backtick (') untuk opsi yang berisi titik-titik (.).
  • option_value: Ekspresi konstanta untuk mengatur opsi. Menerima literal dan fungsi skalar.

Mengembalikan

Rekaman yang dibaca dari kluster Apache Kafka dengan skema berikut:

  • key BINARY: Kunci rekaman Kafka.
  • value BINARY NOT NULL: Nilai rekaman Kafka.
  • topic STRING NOT NULL: Nama topik Kafka tempat rekaman dibaca.
  • partition INT NOT NULL: ID partisi Kafka tempat rekaman dibaca.
  • offset BIGINT NOT NULL: Jumlah offset rekaman di Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Nilai tanda waktu untuk rekaman. Kolom timestampType menentukan apa yang terkait dengan tanda waktu ini.
  • timestampType INTEGER NOT NULL: Jenis tanda waktu yang ditentukan dalam timestamp kolom.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Nilai header disediakan sebagai bagian dari rekaman (jika diaktifkan).

Contoh

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opsi

Anda dapat menemukan daftar opsi terperinci dalam dokumentasi Apache Spark.

Opsi yang diperlukan

Berikan opsi di bawah ini untuk menyambungkan ke kluster Kafka Anda.

Opsi
bootstrapServers

Jenis: String

Daftar pasangan host/port yang dipisahkan koma yang menunjuk ke kluster Kafka.

Nilai default: None

Berikan hanya salah satu opsi di bawah ini untuk mengonfigurasi topik Kafka mana yang akan diambil datanya.

Opsi
assign

Jenis: String

String JSON yang berisi partisi topik tertentu untuk digunakan. Misalnya, untuk '{"topicA":[0,1],"topicB":[2,4]}'partisi 0'th dan 1st topicA akan digunakan.

Nilai default: None
subscribe

Jenis: String

Daftar topik Kafka yang dipisahkan koma untuk dibaca.

Nilai default: None
subscribePattern

Jenis: String

Topik yang cocok dengan ekspresi reguler untuk berlangganan.

Nilai default: None

Opsi lain-lain

read_kafka dapat digunakan dalam kueri batch dan dalam kueri streaming. Opsi di bawah ini menentukan jenis kueri mana yang mereka terapkan.

Opsi
endingOffsets

Jenis: String Jenis Kueri: hanya batch

Offset yang akan dibaca hingga kueri batch, baik "latest" untuk menentukan rekaman terbaru, atau string JSON yang menentukan offset akhir untuk setiap TopicPartition. Di JSON, -1 sebagai offset dapat digunakan untuk merujuk ke yang terbaru. -2 (paling awal) karena offset tidak diperbolehkan.

Nilai default: "latest"
endingOffsetsByTimestamp

Jenis: String Jenis Kueri: hanya batch

String JSON yang menentukan tanda waktu akhir untuk dibaca hingga untuk setiap TopicPartition. Tanda waktu perlu disediakan sebagai nilai panjang tanda waktu dalam milidetik sejak 1970-01-01 00:00:00 UTC, misalnya
1686444353000. Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu.
endingOffsetsByTimestamp lebih diutamakan daripada endingOffsets.

Nilai default: None
endingTimestamp

Jenis: String Jenis Kueri: hanya batch

Nilai string tanda waktu dalam milidetik sejak
1970-01-01 00:00:00 UTC, misalnya "1686444353000". Jika Kafka tidak mengembalikan offset yang cocok, offset akan diatur ke terbaru. Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu. Catatan: endingTimestamp lebih diutamakan dan endingOffsetsByTimestamp
endingOffsets.

Nilai default: None
includeHeaders

Jenis: Boolean Jenis Kueri: streaming dan batch

Apakah akan menyertakan header Kafka dalam baris.

Nilai default: false
kafka.<consumer_option>

Jenis: String Jenis Kueri: streaming dan batch

Setiap opsi khusus konsumen Kafka dapat diteruskan dengan awalan kafka. . Opsi ini perlu dikelilingi oleh backtick ketika disediakan, jika tidak, Anda akan mendapatkan kesalahan pengurai. Anda dapat menemukan opsi dalam dokumentasi Kafka.

Catatan: Anda tidak boleh mengatur opsi berikut dengan fungsi ini:
key.deserializer, , value.deserializerbootstrap.servers,group.id

Nilai default: None
maxOffsetsPerTrigger

Jenis: Long Jenis Kueri: streaming saja

Batas laju pada jumlah maksimum offset atau baris yang diproses per interval pemicu. Jumlah total offset yang ditentukan akan dibagi secara proporsional di seluruh TopicPartitions.

Nilai default: None
startingOffsets

Jenis: String Jenis Kueri: streaming dan batch

Titik awal ketika kueri dimulai, baik "earliest" yang berasal dari offset paling awal, "latest" yang hanya dari offset terbaru, atau string JSON yang menentukan offset awal untuk setiap TopicPartition. Di JSON, -2 sebagai offset dapat digunakan untuk merujuk ke paling awal, -1 ke terbaru.

Catatan: Untuk kueri batch, terbaru (baik secara implisit atau dengan menggunakan -1 di JSON) tidak diizinkan. Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai. Kueri streaming yang dimulai ulang akan berlanjut dari offset yang ditentukan dalam titik pemeriksaan kueri. Partisi yang baru ditemukan selama kueri akan dimulai paling awal.

Nilai default: "latest" untuk streaming, "earliest" untuk batch
startingOffsetsByTimestamp

Jenis: String Jenis Kueri: streaming dan batch

String JSON yang menentukan tanda waktu awal untuk setiap TopicPartition. Tanda waktu perlu disediakan sebagai nilai panjang tanda waktu dalam milidetik karena 1970-01-01 00:00:00 UTC, misalnya 1686444353000. Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu. Jika Kafka tidak mengembalikan offset yang cocok, perilaku akan mengikuti nilai opsi startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp lebih diutamakan daripada startingOffsets.

Catatan: Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai. Kueri streaming yang dimulai ulang akan berlanjut dari offset yang ditentukan dalam titik pemeriksaan kueri. Partisi yang baru ditemukan selama kueri akan dimulai paling awal.

Nilai default: None
startingOffsetsByTimestampStrategy

Jenis: String Jenis Kueri: streaming dan batch

Strategi ini digunakan ketika offset awal yang ditentukan oleh tanda waktu (baik global atau per partisi) tidak cocok dengan offset yang dikembalikan Kafka. Strategi yang tersedia adalah:

* "error": gagal kueri
* "latest": menetapkan offset terbaru untuk partisi ini sehingga Spark dapat membaca rekaman yang lebih baru dari partisi ini di batch mikro nanti.

Nilai default: "error"
startingTimestamp

Jenis: String Jenis Kueri: streaming dan batch

Nilai string tanda waktu dalam milidetik sejak
1970-01-01 00:00:00 UTC, misalnya "1686444353000". Lihat catatan di bawah ini tentang detail perilaku dengan tanda waktu. Jika Kafka tidak mengembalikan offset yang cocok, perilaku akan mengikuti nilai opsi startingOffsetsByTimestampStrategy.
startingTimestamp lebih diutamakan dan startingOffsetsByTimestampstartingOffsets.

Catatan: Untuk kueri streaming, ini hanya berlaku saat kueri baru dimulai. Kueri streaming yang dimulai ulang akan berlanjut dari offset yang ditentukan dalam titik pemeriksaan kueri. Partisi yang baru ditemukan selama kueri akan dimulai paling awal.

Nilai default: None

Catatan

Offset yang dikembalikan untuk setiap partisi adalah offset paling awal yang tanda waktunya lebih besar dari atau sama dengan tanda waktu yang diberikan dalam partisi yang sesuai. Perilaku bervariasi di seluruh opsi jika Kafka tidak mengembalikan offset yang cocok - periksa deskripsi setiap opsi.

Spark hanya meneruskan informasi tanda waktu ke KafkaConsumer.offsetsForTimes, dan tidak menafsirkan atau alasan tentang nilai tersebut. Untuk detail selengkapnya tentang KafkaConsumer.offsetsForTimes, silakan lihat dokumentasi. Selain itu, arti tanda waktu di sini dapat bervariasi sesuai dengan konfigurasi Kafka (log.message.timestamp.type). Untuk detailnya, lihat dokumentasi Apache Kafka.