Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Berlaku untuk:
Databricks SQL
Databricks Runtime 14.1 ke atas
Penting
Fitur ini ada di Pratinjau Publik.
Menampilkan tabel dengan rekaman yang diambil dari Pulsar.
Fungsi nilai tabel ini hanya mendukung streaming dan bukan kueri batch.
Sintaks
read_pulsar ( { option_key => option_value } [, ...] )
Argumen
Fungsi ini memerlukan pemanggilan parameter bernama untuk kunci opsi.
Opsi serviceUrl dan topic wajib.
Deskripsi argumen singkat di sini. Lihat dokumentasi Pulsar streaming terstruktur untuk deskripsi yang diperluas.
| Opsi | Jenis | Default | Deskripsi |
|---|---|---|---|
| serviceUrl | STRING | Wajib | URI layanan Pulsar. |
| topik | STRING | Wajib | Topik yang akan dibaca. |
| predefinedSubscription | STRING | Tidak | Nama langganan yang telah ditentukan sebelumnya yang digunakan oleh konektor untuk melacak kemajuan aplikasi spark. |
| subscriptionPrefix | STRING | Tidak | Awalan yang digunakan oleh konektor untuk menghasilkan langganan acak guna melacak kemajuan aplikasi Spark. |
| pollTimeoutMs | LONG | 120000 | Batas waktu untuk membaca pesan dari Pulsar dalam milidetik. |
| failOnDataLoss | BOOLEAN | benar | Mengontrol apakah akan gagal kueri saat data hilang (misalnya, topik dihapus, atau pesan dihapus karena kebijakan penyimpanan). |
| startingOffsets | STRING | terbaru | Titik awal ketika kueri dimulai, bisa berupa paling awal, terakhir, atau sebuah string JSON yang menentukan offset tertentu. Jika terbaru, pembaca membaca rekaman terbaru setelah mulai berjalan. Jika paling awal, pembaca membaca dari offset paling awal. Pengguna juga dapat menentukan string JSON yang menentukan offset tertentu. |
| waktu mulai | STRING | Tidak | Ketika ditentukan, sumber Pulsar akan membaca pesan mulai dari posisi startingTime yang ditentukan. |
Argumen berikut digunakan untuk autentikasi klien pulsar:
| Opsi | Jenis | Default | Deskripsi |
|---|---|---|---|
| pulsarClientAuthPluginClassName | STRING | Tidak | Nama plugin autentikasi. |
| pulsarClientAuthParams | STRING | Tidak | Parameter untuk plugin autentikasi. |
| pulsarClientUseKeyStoreTls | STRING | Tidak | Apakah akan menggunakan KeyStore untuk autentikasi tls. |
| pulsarClientTlsTrustoreType | STRING | Tidak | Jenis file TrustStore untuk autentikasi tls. |
| pulsarClientTlsTrustStorePath | STRING | Tidak | Jalur file TrustStore untuk autentikasi tls. |
| pulsarClientTlsTruststorePassword | STRING | Tidak | Kata sandi TrustStore untuk autentikasi tls. |
Argumen ini digunakan untuk konfigurasi dan autentikasi kontrol penerimaan pulsar, konfigurasi admin pulsar hanya diperlukan ketika kontrol penerimaan diaktifkan (ketika maxBytesPerTrigger diatur)
| Opsi | Jenis | Default | Deskripsi |
|---|---|---|---|
| maxBytesPerTrigger | BIGINT | Tidak | Batas lunak jumlah maksimum byte yang ingin kita proses per microbatch. Jika ini ditentukan, admin.url juga perlu ditentukan. |
| adminUrl | STRING | Tidak | Konfigurasi Layanan PulsarHttpUrl. Hanya diperlukan ketika maxBytesPerTrigger ditentukan. |
| pulsarAdminAuthPlugin | STRING | Tidak | Nama plugin autentikasi. |
| pulsarAdminAuthParams | STRING | Tidak | Parameter untuk plugin autentikasi. |
| pulsarClientUseKeyStoreTls | STRING | Tidak | Apakah akan menggunakan KeyStore untuk autentikasi tls. |
| pulsarAdminTlsTrustStoreType | STRING | Tidak | Jenis file TrustStore untuk autentikasi tls. |
| pulsarAdminTlsTrustStorePath | STRING | Tidak | Jalur file TrustStore untuk autentikasi tls. |
| pulsarAdminTlsTruststorePassword | STRING | Tidak | Kata sandi TrustStore untuk autentikasi tls. |
Mengembalikan
Tabel rekaman pulsar dengan skema berikut.
__key STRING NOT NULL: Kunci pesan Pulsar.value BINARY NOT NULL: Nilai pesan Pulsar.Catatan: Untuk topik dengan skema Avro atau JSON, alih-alih memuat konten ke bidang nilai biner, konten akan diperluas untuk mempertahankan nama bidang dan jenis bidang topik Pulsar.
__topic STRING NOT NULL: Nama topik Pulsar.__messageId BINARY NOT NULL: Id pesan Pulsar.__publishTime TIMESTAMP NOT NULL: Waktu penerbitan pesan Pulsar.__eventTime TIMESTAMP NOT NULL: Waktu peristiwa pesan Pulsar.__messageProperties MAP<STRING, STRING>: Properti pesan Pulsar.
Contoh
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.