Berlangganan Google Pub/Sub

Gunakan konektor bawaan untuk berlangganan Google Pub/Sub. Konektor ini menyediakan semantik pemrosesan tepat sekali untuk rekaman dari pelanggan.

Catatan

Pub/Sub mungkin menerbitkan rekaman duplikat, atau rekaman mungkin tiba di pelanggan dalam urutan yang tidak sesuai. Tulis kode untuk menangani rekaman duplikat dan di luar urutan.

Mengonfigurasi aliran Pub/Sub

Contoh kode berikut menunjukkan sintaks dasar untuk mengonfigurasi Bacaan Streaming Terstruktur dari Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Untuk opsi konfigurasi lainnya, lihat Mengonfigurasi opsi untuk pembacaan streaming Pub/Sub.

Mengonfigurasi akses ke Pub/Sub

Kredensial yang Anda konfigurasi harus memiliki peran berikut.

Peran Diperlukan atau opsional Bagaimana peran digunakan
roles/pubsub.viewer atau roles/viewer Wajib Memeriksa apakah langganan ada dan mendapatkan langganan.
roles/pubsub.subscriber Wajib Mengambil data dari langganan.
roles/pubsub.editor atau roles/editor Opsional Mengaktifkan pembuatan langganan jika tidak ada dan memungkinkan penggunaan deleteSubscriptionOnStreamStop untuk menghapus langganan pada penghentian streaming.

Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Opsi berikut diperlukan untuk mengotorisasi koneksi:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Memahami skema Pub/Sub

Skema untuk stream sesuai dengan rekaman yang diperoleh dari Pub/Sub, seperti yang dijelaskan dalam tabel di bawah ini.

Bidang Jenis
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Mengonfigurasi opsi untuk baca streaming Pub/Sub

Tabel berikut ini menjelaskan opsi yang didukung untuk Pub/Sub. Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>") sintaksis.

Catatan

Beberapa opsi konfigurasi Pub/Sub menggunakan konsep pengambilan data alih-alih mikro-batch. Ini mencerminkan detail implementasi internal, dan opsi berfungsi mirip dengan kaidah di konektor streaming terstruktur lainnya, kecuali bahwa rekaman diambil dan kemudian diproses.

Opsi Nilai default Deskripsi
numFetchPartitions Atur ke setengah dari jumlah pelaksana yang ada saat inisialisasi streaming. Jumlah tugas Spark paralel yang mengambil rekaman dari langganan.
deleteSubscriptionOnStreamStop false Jika true, langganan yang diteruskan ke aliran dihapus saat tugas streaming berakhir.
maxBytesPerTrigger none Batas lunak untuk ukuran batch yang akan diproses pada setiap micro-batch yang dimulai.
maxRecordsPerFetch 1000 Jumlah rekaman yang diambil per tugas sebelum memproses rekaman.
maxFetchPeriod 10s Durasi waktu untuk persiapan setiap tugas sebelum memproses catatan. Menerima string durasi, misalnya, 1s selama 1 detik atau 1m selama 1 menit. Databricks merekomendasikan penggunaan nilai default.

Menggunakan pemrosesan batch inkremental dengan Pub/Sub

Anda dapat menggunakan Trigger.AvailableNow untuk mengonsumsi rekaman yang tersedia dari sumber Pub/Sub sebagai batch inkremental.

Azure Databricks mencatat tanda waktu ketika Anda memulai pembacaan dengan pengaturan Trigger.AvailableNow. Rekaman yang diproses oleh batch mencakup semua data yang diambil sebelumnya dan rekaman yang baru diterbitkan dengan tanda waktu yang lebih awal dari tanda waktu mulai streaming yang dicatat. Untuk informasi selengkapnya, lihat AvailableNow: Pemrosesan batch inkremental.

Memantau metrik streaming Pub/Sub

Metrik kemajuan Streaming Terstruktur melaporkan jumlah rekaman yang diambil dan siap diproses, ukuran rekaman yang diambil dan siap diproses, dan jumlah duplikat yang terlihat sejak streaming dimulai. Berikut ini adalah contoh metrik ini:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Batasan

Pub/Sub tidak mendukung eksekusi spekulatif (spark.speculation).