read_pulsar streaming fungsi bernilai tabel

Berlaku untuk:centang ditandai ya Databricks SQL centang ditandai ya Databricks Runtime 14.1 ke atas

Penting

Fitur ini ada di Pratinjau Publik.

Menampilkan tabel dengan rekaman yang diambil dari Pulsar.

Fungsi nilai tabel ini hanya mendukung streaming dan bukan kueri batch.

Sintaks

read_pulsar ( { option_key => option_value } [, ...] )

Argumen

Fungsi ini memerlukan pemanggilan parameter bernama untuk kunci opsi.

Opsi serviceUrl dan topic wajib.

Deskripsi argumen singkat di sini. Lihat dokumentasi Pulsar streaming terstruktur untuk deskripsi yang diperluas.

Opsi Jenis Default Deskripsi
serviceUrl STRING Wajib URI layanan Pulsar.
topik STRING Wajib Topik yang akan dibaca.
predefinedSubscription STRING Tidak Nama langganan yang telah ditentukan sebelumnya yang digunakan oleh konektor untuk melacak kemajuan aplikasi spark.
subscriptionPrefix STRING Tidak Awalan yang digunakan oleh konektor untuk menghasilkan langganan acak guna melacak kemajuan aplikasi Spark.
pollTimeoutMs LONG 120000 Batas waktu untuk membaca pesan dari Pulsar dalam milidetik.
failOnDataLoss BOOLEAN benar Mengontrol apakah akan gagal kueri saat data hilang (misalnya, topik dihapus, atau pesan dihapus karena kebijakan penyimpanan).
startingOffsets STRING terbaru Titik awal ketika kueri dimulai, bisa berupa paling awal, terakhir, atau sebuah string JSON yang menentukan offset tertentu. Jika terbaru, pembaca membaca rekaman terbaru setelah mulai berjalan. Jika paling awal, pembaca membaca dari offset paling awal. Pengguna juga dapat menentukan string JSON yang menentukan offset tertentu.
waktu mulai STRING Tidak Ketika ditentukan, sumber Pulsar akan membaca pesan mulai dari posisi startingTime yang ditentukan.

Argumen berikut digunakan untuk autentikasi klien pulsar:

Opsi Jenis Default Deskripsi
pulsarClientAuthPluginClassName STRING Tidak Nama plugin autentikasi.
pulsarClientAuthParams STRING Tidak Parameter untuk plugin autentikasi.
pulsarClientUseKeyStoreTls STRING Tidak Apakah akan menggunakan KeyStore untuk autentikasi tls.
pulsarClientTlsTrustoreType STRING Tidak Jenis file TrustStore untuk autentikasi tls.
pulsarClientTlsTrustStorePath STRING Tidak Jalur file TrustStore untuk autentikasi tls.
pulsarClientTlsTruststorePassword STRING Tidak Kata sandi TrustStore untuk autentikasi tls.

Argumen ini digunakan untuk konfigurasi dan autentikasi kontrol penerimaan pulsar, konfigurasi admin pulsar hanya diperlukan ketika kontrol penerimaan diaktifkan (ketika maxBytesPerTrigger diatur)

Opsi Jenis Default Deskripsi
maxBytesPerTrigger BIGINT Tidak Batas lunak jumlah maksimum byte yang ingin kita proses per microbatch. Jika ini ditentukan, admin.url juga perlu ditentukan.
adminUrl STRING Tidak Konfigurasi Layanan PulsarHttpUrl. Hanya diperlukan ketika maxBytesPerTrigger ditentukan.
pulsarAdminAuthPlugin STRING Tidak Nama plugin autentikasi.
pulsarAdminAuthParams STRING Tidak Parameter untuk plugin autentikasi.
pulsarClientUseKeyStoreTls STRING Tidak Apakah akan menggunakan KeyStore untuk autentikasi tls.
pulsarAdminTlsTrustStoreType STRING Tidak Jenis file TrustStore untuk autentikasi tls.
pulsarAdminTlsTrustStorePath STRING Tidak Jalur file TrustStore untuk autentikasi tls.
pulsarAdminTlsTruststorePassword STRING Tidak Kata sandi TrustStore untuk autentikasi tls.

Mengembalikan

Tabel rekaman pulsar dengan skema berikut.

  • __key STRING NOT NULL: Kunci pesan Pulsar.

  • value BINARY NOT NULL: Nilai pesan Pulsar.

    Catatan: Untuk topik dengan skema Avro atau JSON, alih-alih memuat konten ke bidang nilai biner, konten akan diperluas untuk mempertahankan nama bidang dan jenis bidang topik Pulsar.

  • __topic STRING NOT NULL: Nama topik Pulsar.

  • __messageId BINARY NOT NULL: Id pesan Pulsar.

  • __publishTime TIMESTAMP NOT NULL: Waktu penerbitan pesan Pulsar.

  • __eventTime TIMESTAMP NOT NULL: Waktu peristiwa pesan Pulsar.

  • __messageProperties MAP<STRING, STRING>: Properti pesan Pulsar.

Contoh

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.