Streaming dari Apache Pulsar
Penting
Fitur ini ada di Pratinjau Publik.
Di Databricks Runtime 14.1 ke atas, Anda dapat menggunakan Streaming Terstruktur untuk mengalirkan data dari Apache Pulsar di Azure Databricks.
Streaming Terstruktur menyediakan semantik pemrosesan tepat sekali untuk data yang dibaca dari sumber Pulsar.
Contoh sintaks
Berikut ini adalah contoh dasar menggunakan Streaming Terstruktur untuk membaca dari Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Anda harus selalu menyediakan service.url
dan salah satu opsi berikut untuk menentukan topik:
topic
topics
topicsPattern
Untuk daftar lengkap opsi, lihat Mengonfigurasi opsi untuk baca streaming Pulsar.
Mengautentikasi ke Pulsar
Azure Databricks mendukung truststore dan autentikasi keystore ke Pulsar. Databricks merekomendasikan penggunaan rahasia saat menyimpan detail konfigurasi.
Anda dapat mengatur opsi berikut selama konfigurasi aliran:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Jika aliran menggunakan PulsarAdmin
, atur juga yang berikut:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
Contoh berikut menunjukkan konfigurasi opsi autentikasi:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Skema Pulsar
Skema rekaman yang dibaca dari Pulsar tergantung pada bagaimana topik memiliki skema yang dikodekan.
- Untuk topik dengan skema Avro atau JSON, nama bidang dan jenis bidang dipertahankan dalam Spark DataFrame yang dihasilkan.
- Untuk topik tanpa skema atau dengan jenis data sederhana di Pulsar, payload dimuat ke
value
kolom. - Jika pembaca dikonfigurasi untuk membaca beberapa topik dengan skema yang berbeda, atur
allowDifferentTopicSchemas
untuk memuat konten mentah kevalue
kolom.
Rekaman Pulsar memiliki bidang metadata berikut:
Column | Jenis |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Mengonfigurasi opsi untuk baca streaming Pulsar
Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>")
sintaksis. Anda juga dapat mengonfigurasi autentikasi menggunakan opsi. Lihat Mengautentikasi ke Pulsar.
Tabel berikut menjelaskan konfigurasi yang diperlukan untuk Pulsar. Anda hanya harus menentukan salah satu opsi topic
, topics
atau topicsPattern
.
Opsi | Nilai default | Deskripsi |
---|---|---|
service.url |
tidak ada | Konfigurasi Pulsar serviceUrl untuk layanan Pulsar. |
topic |
tidak ada | String nama topik untuk dikonsumsi topik. |
topics |
tidak ada | Daftar topik yang dipisahkan koma untuk dikonsumsi. |
topicsPattern |
tidak ada | String regex Java yang cocok dengan topik yang akan dikonsumsi. |
Tabel berikut ini menjelaskan opsi lain yang didukung untuk Pulsar:
Opsi | Nilai default | Deskripsi |
---|---|---|
predefinedSubscription |
tidak ada | Nama langganan yang telah ditentukan sebelumnya yang digunakan oleh konektor untuk melacak kemajuan aplikasi spark. |
subscriptionPrefix |
tidak ada | Awalan yang digunakan oleh konektor untuk menghasilkan langganan acak untuk melacak kemajuan aplikasi spark. |
pollTimeoutMs |
120000 | Batas waktu untuk membaca pesan dari Pulsar dalam milidetik. |
waitingForNonExistedTopic |
false |
Apakah konektor harus menunggu hingga topik yang diinginkan dibuat. |
failOnDataLoss |
true |
Mengontrol apakah akan gagal kueri saat data hilang (misalnya, topik dihapus, atau pesan dihapus karena kebijakan penyimpanan). |
allowDifferentTopicSchemas |
false |
Jika beberapa topik dengan skema yang berbeda dibaca, gunakan parameter ini untuk menonaktifkan deserialisasi nilai topik berbasis skema otomatis. Hanya nilai mentah yang dikembalikan ketika ini adalah true . |
startingOffsets |
latest |
Jika latest , pembaca membaca rekaman terbaru setelah mulai berjalan. Jika earliest , pembaca membaca dari offset paling awal. Pengguna juga dapat menentukan string JSON yang menentukan offset tertentu. |
maxBytesPerTrigger |
tidak ada | Batas lunak jumlah maksimum byte yang ingin kita proses per microbatch. Jika ini ditentukan, admin.url juga perlu ditentukan. |
admin.url |
tidak ada | Konfigurasi Pulsar serviceHttpUrl . Hanya diperlukan ketika maxBytesPerTrigger ditentukan. |
Anda juga dapat menentukan konfigurasi klien, admin, dan pembaca Pulsar apa pun menggunakan pola berikut:
Pola | Menautkan ke opsi konifigurasi |
---|---|
pulsar.client.* |
Konfigurasi klien Pulsar |
pulsar.admin.* |
Konfigurasi admin Pulsar |
pulsar.reader.* |
Konfigurasi pembaca Pulsar |
Membangun memulai offset JSON
Anda dapat membuat ID pesan secara manual untuk menentukan offset tertentu dan meneruskan ini sebagai JSON ke startingOffsets
opsi . Contoh kode berikut menunjukkan sintaks ini:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()