read_pubsub
streaming fungsi bernilai tabel
Berlaku untuk: Databricks SQL Databricks Runtime 13.3 LTS ke atas
Mengembalikan tabel dengan rekaman yang dibaca dari Pub/Sub dari topik. Hanya mendukung kueri streaming.
Sintaks
read_pubsub( { parameter => value } [, ...])
Argumen
read_pubsub
memerlukan pemanggilan parameter bernama.
Satu-satunya argumen yang diperlukan adalah subscriptionId
, projectId
, dan topicId
. Semua argumen lainnya bersifat opsional.
Untuk deskripsi argumen lengkap, lihat Mengonfigurasi opsi untuk baca streaming Pub/Sub.
Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Lihat fungsi rahasia.
Untuk detail tentang mengonfigurasi akses ke Pub/Sub, lihat Mengonfigurasi akses ke Pub/Sub.
Parameter | Jenis | Deskripsi |
---|---|---|
subscriptionId |
STRING |
Diperlukan, pengidentifikasi unik yang ditetapkan ke langganan Pub/Sub. |
projectId |
STRING |
Diperlukan, ID proyek Google Cloud yang terkait dengan topik Pub/Sub. |
topicId |
STRING |
Diperlukan, ID atau nama topik Pub/Sub untuk berlangganan. |
clientEmail |
STRING |
Alamat email yang terkait dengan akun layanan untuk autentikasi. |
clientId |
STRING |
ID klien yang terkait dengan akun layanan untuk autentikasi. |
privateKeyId |
STRING |
ID kunci privat yang terkait dengan akun layanan. |
privateKey |
STRING |
Kunci privat yang terkait dengan akun layanan untuk autentikasi. |
Argumen ini digunakan untuk penyempurnaan lebih lanjut saat membaca dari Pub/Sub:
Parameter | Jenis | Deskripsi |
---|---|---|
numFetchPartitions |
STRING |
Opsional dengan jumlah default pelaksana. Jumlah tugas Spark paralel yang mengambil rekaman dari langganan. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Opsional dengan default false . Jika diatur ke true, langganan yang diteruskan ke aliran akan dihapus saat pekerjaan streaming berakhir. |
maxBytesPerTrigger |
STRING |
Batas lunak untuk ukuran batch yang akan diproses selama setiap batch mikro yang dipicu. Defaultnya adalah 'none'. |
maxRecordsPerFetch |
STRING |
Jumlah rekaman yang diambil per tugas sebelum memproses rekaman. Defaultnya adalah '1000'. |
maxFetchPeriod |
STRING |
Durasi waktu untuk setiap tugas diambil sebelum memproses rekaman. Defaultnya adalah '10s'. |
Mengembalikan
Tabel rekaman Pub/Sub dengan skema berikut. Kolom atribut bisa null tetapi semua kolom lainnya tidak null.
Nama | Jenis data | Dapat diubah ke null | Standard | Deskripsi |
---|---|---|---|---|
messageId |
STRING |
No | Pengidentifikasi unik untuk pesan Pub/Sub. | |
payload |
BINARY |
No | Konten pesan Pub/Sub. | |
attributes |
STRING |
Ya | Pasangan kunci-nilai yang mewakili atribut pesan Pub/Sub. Ini adalah string yang dikodekan json. | |
publishTimestampInMillis |
BIGINT |
No | Tanda waktu saat pesan diterbitkan, dalam milidetik. | |
sequenceNumber |
BIGINT |
No | Pengidentifikasi unik rekaman dalam pecahannya. |
Contoh
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Data sekarang perlu dikueri dari testing.streaming_table
untuk analisis lebih lanjut.
Kueri yang salah:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);