Berlangganan Google Pub/Sub
Azure Databricks menyediakan konektor bawaan untuk berlangganan Google Pub/Sub di Databricks Runtime 13.3 LTS ke atas. Konektor ini menyediakan semantik pemrosesan tepat sekali untuk rekaman dari pelanggan.
Catatan
Pub/Sub mungkin menerbitkan rekaman duplikat, dan rekaman mungkin tiba ke pelanggan yang tidak berurutan. Anda harus menulis kode Azure Databricks untuk menangani rekaman duplikat dan tidak berurutan.
Contoh sintaks
Contoh kode berikut menunjukkan sintaks dasar untuk mengonfigurasi Bacaan Streaming Terstruktur dari Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Untuk opsi konfigurasi lainnya, lihat Mengonfigurasi opsi untuk baca streaming Pub/Sub.
Mengonfigurasi akses ke Pub/Sub
Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Opsi berikut diperlukan untuk mengotorisasi koneksi:
clientEmail
clientId
privateKey
privateKeyId
Tabel berikut menjelaskan peran yang diperlukan untuk kredensial yang dikonfigurasi:
Peran | Diperlukan atau opsional | Cara penggunaan |
---|---|---|
roles/pubsub.viewer atau roles/viewer |
Wajib | Periksa apakah langganan ada dan dapatkan langganan |
roles/pubsub.subscriber |
Wajib | Mengambil data dari langganan |
roles/pubsub.editor atau roles/editor |
Opsional | Mengaktifkan pembuatan langganan jika langganan tidak ada dan juga memungkinkan penggunaan deleteSubscriptionOnStreamStop untuk menghapus langganan pada penghentian streaming |
Skema Pub/Sub
Skema untuk aliran cocok dengan rekaman yang diambil dari Pub/Sub, seperti yang dijelaskan dalam tabel berikut:
Bidang | Jenis |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Mengonfigurasi opsi untuk baca streaming Pub/Sub
Tabel berikut ini menjelaskan opsi yang didukung untuk Pub/Sub. Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>")
sintaksis.
Catatan
Beberapa opsi konfigurasi Pub/Sub menggunakan konsep pengambilan alih-alih batch mikro. Ini mencerminkan detail implementasi internal, dan opsi berfungsi mirip dengan koroler di konektor Streaming Terstruktur lainnya, kecuali bahwa rekaman diambil dan kemudian diproses.
Opsi | Nilai default | Deskripsi |
---|---|---|
numFetchPartitions |
Atur ke setengah dari jumlah pelaksana yang ada saat inisialisasi streaming. | Jumlah tugas Spark paralel yang mengambil rekaman dari langganan. |
deleteSubscriptionOnStreamStop |
false |
Jika true , langganan yang diteruskan ke aliran dihapus saat pekerjaan streaming berakhir. |
maxBytesPerTrigger |
tidak ada | Batas lunak untuk ukuran batch yang akan diproses selama setiap batch mikro yang dipicu. |
maxRecordsPerFetch |
1000 | Jumlah rekaman yang diambil per tugas sebelum memproses rekaman. |
maxFetchPeriod |
10 detik | Durasi waktu untuk setiap tugas diambil sebelum memproses rekaman. Databricks merekomendasikan penggunaan nilai default. |
Semantik pemrosesan batch inkremental untuk Pub/Sub
Anda dapat menggunakan Trigger.AvailableNow
untuk mengonsumsi rekaman yang tersedia dari sumber Pub/Sub batch inkremental.
Azure Databricks merekam tanda waktu saat Anda memulai baca dengan Trigger.AvailableNow
pengaturan . Rekaman yang diproses oleh batch mencakup semua data yang diambil sebelumnya dan rekaman yang baru diterbitkan dengan tanda waktu yang kurang dari tanda waktu mulai aliran yang direkam.
Lihat Mengonfigurasi pemrosesan batch inkremental.
Memantau metrik streaming
Metrik kemajuan Streaming Terstruktur melaporkan jumlah rekaman yang diambil dan siap diproses, ukuran rekaman yang diambil dan siap diproses, dan jumlah duplikat yang terlihat sejak streaming dimulai. Berikut ini adalah contoh metrik ini:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Batasan
Eksekusi spekulatif (spark.speculation
) tidak didukung dengan Pub/Sub.