Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Berlaku untuk:
Databricks SQL
Databricks Runtime 13.3 LTS ke atas
Menghasilkan tabel dengan rekaman yang dibaca dari topik Pub/Sub. Hanya mendukung kueri streaming.
Sintaks
read_pubsub( { parameter => value } [, ...])
Argumen
read_pubsub memerlukan pemanggilan parameter bernama.
Satu-satunya argumen yang diperlukan adalah subscriptionId, projectId, dan topicId. Semua argumen lainnya bersifat opsional.
Untuk deskripsi argumen lengkap, lihat opsi konfigurasi untuk pembacaan streaming Pub/Sub.
Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Silakan lihat fungsi secret.
Untuk detail tentang mengonfigurasi akses ke Pub/Sub, lihat Mengonfigurasi akses ke Pub/Sub.
| Pengaturan | Jenis | Deskripsi |
|---|---|---|
subscriptionId |
STRING |
Diperlukan, pengidentifikasi unik yang diberikan kepada langganan Pub/Sub. |
projectId |
STRING |
Diperlukan, ID proyek Google Cloud yang terkait dengan topik Pub/Sub. |
topicId |
STRING |
Diperlukan, ID atau nama topik Pub/Sub untuk berlangganan. |
clientEmail |
STRING |
Alamat email yang terkait dengan akun layanan untuk autentikasi. |
clientId |
STRING |
ID klien yang terkait dengan akun layanan untuk autentikasi. |
privateKeyId |
STRING |
ID kunci privat yang terkait dengan akun layanan. |
privateKey |
STRING |
Kunci privat yang terkait dengan akun layanan untuk autentikasi. |
Argumen ini digunakan untuk penyempurnaan lebih lanjut saat membaca dari Pub/Sub:
| Pengaturan | Jenis | Deskripsi |
|---|---|---|
numFetchPartitions |
STRING |
Opsional dengan jumlah pelaksana default. Jumlah tugas Spark paralel yang mengambil rekaman dari langganan. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Pilihan dengan nilai standar false. Jika diatur ke true, langganan yang diteruskan ke aliran akan dihapus ketika tugas streaming berakhir. |
maxBytesPerTrigger |
STRING |
Batas lunak untuk ukuran batch yang akan diproses pada setiap micro-batch yang dimulai. Defaultnya adalah 'none'. |
maxRecordsPerFetch |
STRING |
Jumlah rekaman yang diambil per tugas sebelum memproses rekaman. Defaultnya adalah '1000'. |
maxFetchPeriod |
STRING |
Durasi waktu untuk setiap tugas diambil sebelum memproses rekaman. Defaultnya adalah '10s'. |
Pengembalian
Tabel catatan Pub/Sub dengan skema berikut. Kolom atribut bisa null tetapi semua kolom lainnya tidak null.
| Nama | Jenis data | Dapat bernilai null | Standard | Deskripsi |
|---|---|---|---|---|
messageId |
STRING |
Tidak | Pengidentifikasi unik untuk pesan Pub/Sub. | |
payload |
BINARY |
Tidak | Konten dari pesan Pub/Sub. | |
attributes |
STRING |
Ya | Pasangan kunci-nilai yang mewakili atribut pesan Pub/Sub. Ini adalah string yang dikodekan json. | |
publishTimestampInMillis |
BIGINT |
Tidak | Tanda waktu saat pesan diterbitkan, dalam milidetik. | |
sequenceNumber |
BIGINT |
Tidak | Pengidentifikasi unik rekaman dalam pecahannya. |
Contoh
-- Streaming Ingestion from Pubsub using Google Service Account secrets
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
clientEmail => secret('app-events', 'clientEmail'),
clientId => secret('app-events', 'clientId'),
privateKeyId => secret('app-events', 'privateKeyId'),
privateKey => secret('app-events', 'privateKey')
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic'
);
Data sekarang perlu dikueri dari testing.streaming_table untuk analisis lebih lanjut.
Kueri yang salah:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project'
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
maxRecordsPerFetchLimit => '1000001'
);