read_pubsub fungsi streaming bernilai tabel

Berlaku untuk:centang ditandai ya Databricks SQL centang ditandai ya Databricks Runtime 13.3 LTS ke atas

Menghasilkan tabel dengan rekaman yang dibaca dari topik Pub/Sub. Hanya mendukung kueri streaming.

Sintaks

read_pubsub( { parameter => value } [, ...])

Argumen

read_pubsub memerlukan pemanggilan parameter bernama.

Satu-satunya argumen yang diperlukan adalah subscriptionId, projectId, dan topicId. Semua argumen lainnya bersifat opsional.

Untuk deskripsi argumen lengkap, lihat opsi konfigurasi untuk pembacaan streaming Pub/Sub.

Databricks merekomendasikan penggunaan rahasia saat memberikan opsi otorisasi. Silakan lihat fungsi secret.

Untuk detail tentang mengonfigurasi akses ke Pub/Sub, lihat Mengonfigurasi akses ke Pub/Sub.

Pengaturan Jenis Deskripsi
subscriptionId STRING Diperlukan, pengidentifikasi unik yang diberikan kepada langganan Pub/Sub.
projectId STRING Diperlukan, ID proyek Google Cloud yang terkait dengan topik Pub/Sub.
topicId STRING Diperlukan, ID atau nama topik Pub/Sub untuk berlangganan.
clientEmail STRING Alamat email yang terkait dengan akun layanan untuk autentikasi.
clientId STRING ID klien yang terkait dengan akun layanan untuk autentikasi.
privateKeyId STRING ID kunci privat yang terkait dengan akun layanan.
privateKey STRING Kunci privat yang terkait dengan akun layanan untuk autentikasi.

Argumen ini digunakan untuk penyempurnaan lebih lanjut saat membaca dari Pub/Sub:

Pengaturan Jenis Deskripsi
numFetchPartitions STRING Opsional dengan jumlah pelaksana default. Jumlah tugas Spark paralel yang mengambil rekaman dari langganan.
deleteSubscriptionOnStreamStop BOOLEAN Pilihan dengan nilai standar false. Jika diatur ke true, langganan yang diteruskan ke aliran akan dihapus ketika tugas streaming berakhir.
maxBytesPerTrigger STRING Batas lunak untuk ukuran batch yang akan diproses pada setiap micro-batch yang dimulai. Defaultnya adalah 'none'.
maxRecordsPerFetch STRING Jumlah rekaman yang diambil per tugas sebelum memproses rekaman. Defaultnya adalah '1000'.
maxFetchPeriod STRING Durasi waktu untuk setiap tugas diambil sebelum memproses rekaman. Defaultnya adalah '10s'.

Pengembalian

Tabel catatan Pub/Sub dengan skema berikut. Kolom atribut bisa null tetapi semua kolom lainnya tidak null.

Nama Jenis data Dapat bernilai null Standard Deskripsi
messageId STRING Tidak Pengidentifikasi unik untuk pesan Pub/Sub.
payload BINARY Tidak Konten dari pesan Pub/Sub.
attributes STRING Ya Pasangan kunci-nilai yang mewakili atribut pesan Pub/Sub. Ini adalah string yang dikodekan json.
publishTimestampInMillis BIGINT Tidak Tanda waktu saat pesan diterbitkan, dalam milidetik.
sequenceNumber BIGINT Tidak Pengidentifikasi unik rekaman dalam pecahannya.

Contoh

-- Streaming Ingestion from Pubsub using Google Service Account secrets
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                clientEmail => secret('app-events', 'clientEmail'),
                clientId => secret('app-events', 'clientId'),
        privateKeyId => secret('app-events', 'privateKeyId'),
                privateKey => secret('app-events', 'privateKey')
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic'
);

Data sekarang perlu dikueri dari testing.streaming_table untuk analisis lebih lanjut.

Kueri yang salah:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project'
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                maxRecordsPerFetchLimit => '1000001'
);