Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Penting
Fitur ini ada di Pratinjau Publik. Anda dapat mengonfirmasi pendaftaran pratinjau di halaman Pratinjau . Lihat Kelola Pratinjau Azure Databricks.
Di Databricks Runtime 14.1 ke atas, Anda dapat menggunakan Streaming Terstruktur untuk mengalirkan data dari Apache Pulsar di Azure Databricks.
Streaming Terstruktur menyediakan semantik pemrosesan tepat sekali untuk data yang dibaca dari sumber Pulsar.
Contoh sintaksis
Berikut ini adalah contoh dasar menggunakan Streaming Terstruktur untuk membaca dari Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Anda harus selalu menyediakan service.url dan salah satu opsi berikut untuk menentukan topik:
topictopicstopicsPattern
Untuk daftar lengkap opsi, lihat Mengonfigurasi opsi untuk streaming Pulsar membaca.
Mengautentikasi ke Pulsar
Azure Databricks mendukung truststore dan autentikasi keystore ke Pulsar. Databricks merekomendasikan penggunaan rahasia saat menyimpan detail konfigurasi.
Anda dapat mengatur opsi berikut selama konfigurasi aliran:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
Jika aliran menggunakan PulsarAdmin, atur juga yang berikut:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Contoh berikut menunjukkan konfigurasi opsi autentikasi:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Skema Pulsar
Skema rekaman yang dibaca dari Pulsar tergantung pada bagaimana topik memiliki skema yang dikodekan.
- Untuk topik dengan skema Avro atau JSON, nama bidang dan jenis bidang dipertahankan dalam Spark DataFrame yang dihasilkan.
- Untuk topik tanpa skema atau dengan jenis data sederhana di Pulsar, payload dimuat ke kolom
value. - Jika pembaca dikonfigurasi untuk membaca beberapa topik dengan skema yang berbeda, atur
allowDifferentTopicSchemasuntuk memuat konten mentah ke kolomvalue.
Rekaman Pulsar memiliki bidang metadata berikut:
| Kolom | Tipe |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Mengonfigurasi opsi untuk baca streaming Pulsar
Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>") sintaksis. Anda juga dapat mengonfigurasi autentikasi menggunakan opsi. Lihat Mengautentikasi ke Pulsar.
Tabel berikut menjelaskan konfigurasi yang diperlukan untuk Pulsar. Anda hanya harus menentukan salah satu opsi topic, topics atau topicsPattern.
| Opsi | Nilai default | Deskripsi |
|---|---|---|
service.url |
tidak ada | Konfigurasi Pulsar serviceUrl untuk layanan Pulsar. |
topic |
tidak ada | String nama topik untuk dikonsumsi topik. |
topics |
tidak ada | Daftar topik yang dipisahkan koma untuk dikonsumsi. |
topicsPattern |
tidak ada | String regex Java yang cocok dengan topik yang akan dikonsumsi. |
Tabel berikut ini menjelaskan opsi lain yang didukung untuk Pulsar:
| Opsi | Nilai default | Deskripsi |
|---|---|---|
predefinedSubscription |
tidak ada | Nama langganan yang telah ditentukan sebelumnya yang digunakan oleh konektor untuk melacak kemajuan aplikasi spark. |
subscriptionPrefix |
tidak ada | Awalan yang digunakan oleh konektor untuk menghasilkan langganan acak guna melacak progres aplikasi Spark. |
pollTimeoutMs |
120000 | Batas waktu untuk membaca pesan dari Pulsar dalam milidetik. |
waitingForNonExistedTopic |
false |
Apakah konektor harus menunggu hingga topik yang diinginkan dibuat. |
failOnDataLoss |
true |
Mengontrol apakah akan gagal kueri saat data hilang (misalnya, topik dihapus, atau pesan dihapus karena kebijakan penyimpanan). |
allowDifferentTopicSchemas |
false |
Jika beberapa topik dengan skema yang berbeda dibaca, gunakan parameter ini untuk menonaktifkan deserialisasi nilai topik berbasis skema otomatis. Hanya nilai mentah yang dikembalikan ketika ini adalah true. |
startingOffsets |
latest |
Jika latest, pembaca membaca rekaman terbaru setelah mulai berjalan. Jika earliest, pembaca membaca mulai dari offset paling awal. Pengguna juga dapat menentukan string JSON yang menentukan offset tertentu. |
maxBytesPerTrigger |
tidak ada | Batas lunak jumlah maksimum byte yang ingin kita proses per microbatch. Jika ini ditentukan, admin.url juga perlu ditentukan. |
admin.url |
tidak ada | Konfigurasi Pulsar serviceHttpUrl . Hanya diperlukan ketika maxBytesPerTrigger ditentukan. |
Anda juga dapat menentukan konfigurasi klien, admin, dan pembaca Pulsar apa pun menggunakan pola berikut:
| Pola | Menautkan ke opsi konifigurasi |
|---|---|
pulsar.client.* |
Konfigurasi klien Pulsar |
pulsar.admin.* |
Konfigurasi admin Pulsar |
pulsar.reader.* |
Konfigurasi pembaca Pulsar |
Membangun memulai offset JSON
Anda dapat membuat ID pesan secara manual untuk menentukan offset tertentu dan meneruskan ini sebagai JSON ke opsi startingOffsets. Contoh kode berikut menunjukkan sintaks ini:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()