Bagikan melalui


Streaming dari Apache Pulsar

Penting

Fitur ini ada di Pratinjau Publik.

Di Databricks Runtime 14.1 ke atas, Anda dapat menggunakan Streaming Terstruktur untuk mengalirkan data dari Apache Pulsar di Azure Databricks.

Streaming Terstruktur menyediakan semantik pemrosesan tepat sekali untuk data yang dibaca dari sumber Pulsar.

Contoh sintaks

Berikut ini adalah contoh dasar menggunakan Streaming Terstruktur untuk membaca dari Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Anda harus selalu menyediakan service.url dan salah satu opsi berikut untuk menentukan topik:

  • topic
  • topics
  • topicsPattern

Untuk daftar lengkap opsi, lihat Mengonfigurasi opsi untuk baca streaming Pulsar.

Mengautentikasi ke Pulsar

Azure Databricks mendukung truststore dan autentikasi keystore ke Pulsar. Databricks merekomendasikan penggunaan rahasia saat menyimpan detail konfigurasi.

Anda dapat mengatur opsi berikut selama konfigurasi aliran:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Jika aliran menggunakan PulsarAdmin, atur juga yang berikut:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Contoh berikut menunjukkan konfigurasi opsi autentikasi:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Skema Pulsar

Skema rekaman yang dibaca dari Pulsar tergantung pada bagaimana topik memiliki skema yang dikodekan.

  • Untuk topik dengan skema Avro atau JSON, nama bidang dan jenis bidang dipertahankan dalam Spark DataFrame yang dihasilkan.
  • Untuk topik tanpa skema atau dengan jenis data sederhana di Pulsar, payload dimuat ke value kolom.
  • Jika pembaca dikonfigurasi untuk membaca beberapa topik dengan skema yang berbeda, atur allowDifferentTopicSchemas untuk memuat konten mentah ke value kolom.

Rekaman Pulsar memiliki bidang metadata berikut:

Column Jenis
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Mengonfigurasi opsi untuk baca streaming Pulsar

Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>") sintaksis. Anda juga dapat mengonfigurasi autentikasi menggunakan opsi. Lihat Mengautentikasi ke Pulsar.

Tabel berikut menjelaskan konfigurasi yang diperlukan untuk Pulsar. Anda hanya harus menentukan salah satu opsi topic, topics atau topicsPattern.

Opsi Nilai default Deskripsi
service.url tidak ada Konfigurasi Pulsar serviceUrl untuk layanan Pulsar.
topic tidak ada String nama topik untuk dikonsumsi topik.
topics tidak ada Daftar topik yang dipisahkan koma untuk dikonsumsi.
topicsPattern tidak ada String regex Java yang cocok dengan topik yang akan dikonsumsi.

Tabel berikut ini menjelaskan opsi lain yang didukung untuk Pulsar:

Opsi Nilai default Deskripsi
predefinedSubscription tidak ada Nama langganan yang telah ditentukan sebelumnya yang digunakan oleh konektor untuk melacak kemajuan aplikasi spark.
subscriptionPrefix tidak ada Awalan yang digunakan oleh konektor untuk menghasilkan langganan acak untuk melacak kemajuan aplikasi spark.
pollTimeoutMs 120000 Batas waktu untuk membaca pesan dari Pulsar dalam milidetik.
waitingForNonExistedTopic false Apakah konektor harus menunggu hingga topik yang diinginkan dibuat.
failOnDataLoss true Mengontrol apakah akan gagal kueri saat data hilang (misalnya, topik dihapus, atau pesan dihapus karena kebijakan penyimpanan).
allowDifferentTopicSchemas false Jika beberapa topik dengan skema yang berbeda dibaca, gunakan parameter ini untuk menonaktifkan deserialisasi nilai topik berbasis skema otomatis. Hanya nilai mentah yang dikembalikan ketika ini adalah true.
startingOffsets latest Jika latest, pembaca membaca rekaman terbaru setelah mulai berjalan. Jika earliest, pembaca membaca dari offset paling awal. Pengguna juga dapat menentukan string JSON yang menentukan offset tertentu.
maxBytesPerTrigger tidak ada Batas lunak jumlah maksimum byte yang ingin kita proses per microbatch. Jika ini ditentukan, admin.url juga perlu ditentukan.
admin.url tidak ada Konfigurasi Pulsar serviceHttpUrl . Hanya diperlukan ketika maxBytesPerTrigger ditentukan.

Anda juga dapat menentukan konfigurasi klien, admin, dan pembaca Pulsar apa pun menggunakan pola berikut:

Pola Menautkan ke opsi konifigurasi
pulsar.client.* Konfigurasi klien Pulsar
pulsar.admin.* Konfigurasi admin Pulsar
pulsar.reader.* Konfigurasi pembaca Pulsar

Membangun memulai offset JSON

Anda dapat membuat ID pesan secara manual untuk menentukan offset tertentu dan meneruskan ini sebagai JSON ke startingOffsets opsi . Contoh kode berikut menunjukkan sintaks ini:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()