Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Gunakan konektor bawaan untuk berlangganan Google Pub/Sub. Konektor ini menyediakan semantik pemrosesan tepat sekali untuk rekaman dari pelanggan.
Catatan
Pub/Sub mungkin menerbitkan rekaman duplikat, atau rekaman mungkin tiba di pelanggan dalam urutan yang tidak sesuai. Tulis kode untuk menangani rekaman duplikat dan di luar urutan.
Mengonfigurasi aliran Pub/Sub
Contoh kode berikut menunjukkan sintaks dasar untuk mengonfigurasi Bacaan Streaming Terstruktur dari Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Untuk opsi konfigurasi lainnya, lihat Mengonfigurasi opsi untuk pembacaan streaming Pub/Sub.
Mengonfigurasi akses ke Pub/Sub
Kredensial yang Anda konfigurasi harus memiliki peran berikut.
| Peran | Diperlukan atau opsional | Bagaimana peran digunakan |
|---|---|---|
roles/pubsub.viewer atau roles/viewer |
Wajib | Memeriksa apakah langganan ada dan mendapatkan langganan. |
roles/pubsub.subscriber |
Wajib | Mengambil data dari langganan. |
roles/pubsub.editor atau roles/editor |
Opsional | Mengaktifkan pembuatan langganan jika tidak ada dan memungkinkan penggunaan deleteSubscriptionOnStreamStop untuk menghapus langganan pada penghentian streaming. |
Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Opsi berikut diperlukan untuk mengotorisasi koneksi:
clientEmailclientIdprivateKeyprivateKeyId
Memahami skema Pub/Sub
Skema untuk stream sesuai dengan rekaman yang diperoleh dari Pub/Sub, seperti yang dijelaskan dalam tabel di bawah ini.
| Bidang | Jenis |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Mengonfigurasi opsi untuk baca streaming Pub/Sub
Tabel berikut ini menjelaskan opsi yang didukung untuk Pub/Sub. Semua opsi dikonfigurasi sebagai bagian dari bacaan Streaming Terstruktur menggunakan .option("<optionName>", "<optionValue>") sintaksis.
Catatan
Beberapa opsi konfigurasi Pub/Sub menggunakan konsep pengambilan data alih-alih mikro-batch. Ini mencerminkan detail implementasi internal, dan opsi berfungsi mirip dengan kaidah di konektor streaming terstruktur lainnya, kecuali bahwa rekaman diambil dan kemudian diproses.
| Opsi | Nilai default | Deskripsi |
|---|---|---|
numFetchPartitions |
Atur ke setengah dari jumlah pelaksana yang ada saat inisialisasi streaming. | Jumlah tugas Spark paralel yang mengambil rekaman dari langganan. |
deleteSubscriptionOnStreamStop |
false |
Jika true, langganan yang diteruskan ke aliran dihapus saat tugas streaming berakhir. |
maxBytesPerTrigger |
none |
Batas lunak untuk ukuran batch yang akan diproses pada setiap micro-batch yang dimulai. |
maxRecordsPerFetch |
1000 |
Jumlah rekaman yang diambil per tugas sebelum memproses rekaman. |
maxFetchPeriod |
10s |
Durasi waktu untuk persiapan setiap tugas sebelum memproses catatan. Menerima string durasi, misalnya, 1s selama 1 detik atau 1m selama 1 menit. Databricks merekomendasikan penggunaan nilai default. |
Menggunakan pemrosesan batch inkremental dengan Pub/Sub
Anda dapat menggunakan Trigger.AvailableNow untuk mengonsumsi rekaman yang tersedia dari sumber Pub/Sub sebagai batch inkremental.
Azure Databricks mencatat tanda waktu ketika Anda memulai pembacaan dengan pengaturan Trigger.AvailableNow. Rekaman yang diproses oleh batch mencakup semua data yang diambil sebelumnya dan rekaman yang baru diterbitkan dengan tanda waktu yang lebih awal dari tanda waktu mulai streaming yang dicatat. Untuk informasi selengkapnya, lihat AvailableNow: Pemrosesan batch inkremental.
Memantau metrik streaming Pub/Sub
Metrik kemajuan Streaming Terstruktur melaporkan jumlah rekaman yang diambil dan siap diproses, ukuran rekaman yang diambil dan siap diproses, dan jumlah duplikat yang terlihat sejak streaming dimulai. Berikut ini adalah contoh metrik ini:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Batasan
Pub/Sub tidak mendukung eksekusi spekulatif (spark.speculation).