Bagikan melalui


read_pubsub streaming fungsi bernilai tabel

Berlaku untuk: centang ditandai ya Databricks SQL centang ditandai ya Databricks Runtime 13.3 LTS ke atas

Mengembalikan tabel dengan rekaman yang dibaca dari Pub/Sub dari topik. Hanya mendukung kueri streaming.

Sintaks

read_pubsub( { parameter => value } [, ...])

Argumen

read_pubsubmemerlukan pemanggilan parameter bernama.

Satu-satunya argumen yang diperlukan adalah subscriptionId, projectId, dan topicId. Semua argumen lainnya bersifat opsional.

Untuk deskripsi argumen lengkap, lihat Mengonfigurasi opsi untuk baca streaming Pub/Sub.

Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Lihat fungsi rahasia.

Untuk detail tentang mengonfigurasi akses ke Pub/Sub, lihat Mengonfigurasi akses ke Pub/Sub.

Parameter Jenis Deskripsi
subscriptionId STRING Diperlukan, pengidentifikasi unik yang ditetapkan ke langganan Pub/Sub.
projectId STRING Diperlukan, ID proyek Google Cloud yang terkait dengan topik Pub/Sub.
topicId STRING Diperlukan, ID atau nama topik Pub/Sub untuk berlangganan.
clientEmail STRING Alamat email yang terkait dengan akun layanan untuk autentikasi.
clientId STRING ID klien yang terkait dengan akun layanan untuk autentikasi.
privateKeyId STRING ID kunci privat yang terkait dengan akun layanan.
privateKey STRING Kunci privat yang terkait dengan akun layanan untuk autentikasi.

Argumen ini digunakan untuk penyempurnaan lebih lanjut saat membaca dari Pub/Sub:

Parameter Jenis Deskripsi
numFetchPartitions STRING Opsional dengan jumlah default pelaksana. Jumlah tugas Spark paralel yang mengambil rekaman dari langganan.
deleteSubscriptionOnStreamStop BOOLEAN Opsional dengan default false. Jika diatur ke true, langganan yang diteruskan ke aliran akan dihapus saat pekerjaan streaming berakhir.
maxBytesPerTrigger STRING Batas lunak untuk ukuran batch yang akan diproses selama setiap batch mikro yang dipicu. Defaultnya adalah 'none'.
maxRecordsPerFetch STRING Jumlah rekaman yang diambil per tugas sebelum memproses rekaman. Defaultnya adalah '1000'.
maxFetchPeriod STRING Durasi waktu untuk setiap tugas diambil sebelum memproses rekaman. Defaultnya adalah '10s'.

Mengembalikan

Tabel rekaman Pub/Sub dengan skema berikut. Kolom atribut bisa null tetapi semua kolom lainnya tidak null.

Nama Jenis data Dapat diubah ke null Standard Deskripsi
messageId STRING No Pengidentifikasi unik untuk pesan Pub/Sub.
payload BINARY No Konten pesan Pub/Sub.
attributes STRING Ya Pasangan kunci-nilai yang mewakili atribut pesan Pub/Sub. Ini adalah string yang dikodekan json.
publishTimestampInMillis BIGINT No Tanda waktu saat pesan diterbitkan, dalam milidetik.
sequenceNumber BIGINT No Pengidentifikasi unik rekaman dalam pecahannya.

Contoh

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Data sekarang perlu dikueri dari testing.streaming_table untuk analisis lebih lanjut.

Kueri yang salah:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);