Bagikan melalui


Berlangganan Google Pub/Sub

Azure Databricks menyediakan konektor bawaan untuk berlangganan Google Pub/Sub di Databricks Runtime 13.3 LTS ke atas. Konektor ini menyediakan semantik pemrosesan tepat sekali untuk rekaman dari pelanggan.

Catatan

Pub/Sub mungkin menerbitkan rekaman duplikat, dan rekaman mungkin tiba ke pelanggan yang tidak berurutan. Anda harus menulis kode Azure Databricks untuk menangani rekaman duplikat dan tidak berurutan.

Contoh sintaks

Contoh kode berikut menunjukkan sintaks dasar untuk mengonfigurasi Bacaan Streaming Terstruktur dari Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Untuk opsi konfigurasi lainnya, lihat Mengonfigurasi opsi untuk baca streaming Pub/Sub.

Mengonfigurasi akses ke Pub/Sub

Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Opsi berikut diperlukan untuk mengotorisasi koneksi:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Tabel berikut menjelaskan peran yang diperlukan untuk kredensial yang dikonfigurasi:

Peran Diperlukan atau opsional Cara penggunaan
roles/pubsub.viewer atau roles/viewer Wajib Periksa apakah langganan ada dan dapatkan langganan
roles/pubsub.subscriber Wajib Mengambil data dari langganan
roles/pubsub.editor atau roles/editor Opsional Mengaktifkan pembuatan langganan jika langganan tidak ada dan juga memungkinkan penggunaan deleteSubscriptionOnStreamStop untuk menghapus langganan pada penghentian streaming

Skema Pub/Sub

Skema untuk aliran cocok dengan rekaman yang diambil dari Pub/Sub, seperti yang dijelaskan dalam tabel berikut:

Bidang Jenis
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Mengonfigurasi opsi untuk baca streaming Pub/Sub

Tabel berikut ini menjelaskan opsi yang didukung untuk Pub/Sub. Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>") sintaksis.

Catatan

Beberapa opsi konfigurasi Pub/Sub menggunakan konsep pengambilan alih-alih batch mikro. Ini mencerminkan detail implementasi internal, dan opsi berfungsi mirip dengan koroler di konektor Streaming Terstruktur lainnya, kecuali bahwa rekaman diambil dan kemudian diproses.

Opsi Nilai default Deskripsi
numFetchPartitions Atur ke setengah dari jumlah pelaksana yang ada saat inisialisasi streaming. Jumlah tugas Spark paralel yang mengambil rekaman dari langganan.
deleteSubscriptionOnStreamStop false Jika true, langganan yang diteruskan ke aliran dihapus saat pekerjaan streaming berakhir.
maxBytesPerTrigger tidak ada Batas lunak untuk ukuran batch yang akan diproses selama setiap batch mikro yang dipicu.
maxRecordsPerFetch 1000 Jumlah rekaman yang diambil per tugas sebelum memproses rekaman.
maxFetchPeriod 10 detik Durasi waktu untuk setiap tugas diambil sebelum memproses rekaman. Databricks merekomendasikan penggunaan nilai default.

Semantik pemrosesan batch inkremental untuk Pub/Sub

Anda dapat menggunakan Trigger.AvailableNow untuk mengonsumsi rekaman yang tersedia dari sumber Pub/Sub batch inkremental.

Azure Databricks merekam tanda waktu saat Anda memulai baca dengan Trigger.AvailableNow pengaturan . Rekaman yang diproses oleh batch mencakup semua data yang diambil sebelumnya dan rekaman yang baru diterbitkan dengan tanda waktu yang kurang dari tanda waktu mulai aliran yang direkam.

Lihat Mengonfigurasi pemrosesan batch inkremental.

Memantau metrik streaming

Metrik kemajuan Streaming Terstruktur melaporkan jumlah rekaman yang diambil dan siap diproses, ukuran rekaman yang diambil dan siap diproses, dan jumlah duplikat yang terlihat sejak streaming dimulai. Berikut ini adalah contoh metrik ini:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Batasan

Eksekusi spekulatif (spark.speculation) tidak didukung dengan Pub/Sub.