Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Şunlar için geçerlidir: Databricks SQL
Databricks Runtime 14.1 ve üzeri
Önemli
Bu özellik Genel Önizlemededir.
Pulsar'dan okunan kayıtları içeren bir tablo döndürür.
Bu tablo değerli işlev, yalnızca akışlı işlemi destekler ve toplu iş sorgusunu desteklemez.
Söz dizimi
read_pulsar ( { option_key => option_value } [, ...] )
Argümanlar
Bu işlev, seçenek anahtarları için adlandırılmış parametre çağırması gerektirir.
serviceUrl
ve topic
seçenekleri zorunludur.
Bağımsız değişkenlerin açıklamaları burada kısadır. Genişletilmiş açıklamalar için yapılandırılmış akış Pulsar belgelerine bakın.
Seçenek | Tür | Varsayılan | Açıklama |
---|---|---|---|
serviceUrl | DİZGİ | Zorunlu | Pulsar hizmetinin URI'si. |
konu | DİZGİ | Zorunlu | Okunacak konu. |
önceden tanımlı abonelik | DİZGİ | Hiçbiri | Spark uygulamasının ilerleme durumunu izlemek için bağlayıcı tarafından kullanılan önceden tanımlanmış abonelik adı. |
subscriptionPrefix | String | Hiçbiri | Spark uygulamasının ilerleme durumunu izlemek üzere rastgele bir abonelik oluşturmak için bağlayıcı tarafından kullanılan ön ek. |
pollTimeoutMs | UZUN | 120000 | Pulsar'dan gelen iletileri milisaniye cinsinden okumak için zaman aşımı. |
VeriKaybındaBaşarısızOl | BOOLEAN | doğru | Veriler kaybolduğunda sorgunun başarısız olup olmayacağını denetler (örneğin, konular silinir veya bekletme ilkesi nedeniyle iletiler silinir). |
startingOffsets | DİZGİ | en yeni | Bir sorgunun başlatıldığında başlangıç noktası, en erken, en geç veya belirli bir uzaklığı belirten bir JSON dizesi olabilir. "Eğer en yeni ise, okuyucu çalışmaya başladıktan sonra en yeni kayıtları okur." En erken durumda, okuyucu en erken başlangıç noktasından okur. Kullanıcı ayrıca belirli bir uzaklığı belirten bir JSON dizesi de belirtebilir. |
başlangıç zamanı | DİZGİ | Hiçbiri | Belirtildiğinde, Pulsar kaynağı belirtilen startingTime konumundan başlayarak iletileri okur. |
Pulsar istemcisinin kimlik doğrulaması için aşağıdaki bağımsız değişkenler kullanılır:
Seçenek | Tür | Varsayılan | Açıklama |
---|---|---|---|
pulsarClientAuthPluginClassName | KARAKTER DİZİSİ | Hiçbiri | Kimlik doğrulama eklentisinin adı. |
pulsarClientAuthParams | KARAKTER DİZİSİ | Hiçbiri | Kimlik doğrulama eklentisinin parametreleri. |
pulsarClientUseKeyStoreTls | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için KeyStore'un kullanılıp kullanılmaymayacağı. |
pulsarClientTlsTrustStoreType | KARAKTER DİZİSİ | Hiçbiri | Tls kimlik doğrulaması için TrustStore dosya türü. |
pulsarClientTlsTrustStorePath | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için TrustStore dosya yolu. |
pulsarClientTlsTrustStorePassword | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için TrustStore parolası. |
Bu bağımsız değişkenler pulsar erişim denetiminin yapılandırması ve kimlik doğrulaması için kullanılır, pulsar yönetici yapılandırması yalnızca erişim denetimi etkinleştirildiğinde gereklidir (maxBytesPerTrigger ayarlandığında)
Seçenek | Tür | Varsayılan | Açıklama |
---|---|---|---|
maxBytesPerTrigger (tetikleyici başına maksimum bayt) | BIGINT | Hiçbiri | Mikrobatch başına işlemeyi hedeflediğimiz maksimum bayt sayısı için bir yumuşak sınır. Bu belirtilirse admin.url'nin de belirtilmesi gerekir. |
adminUrl | DİZGİ | Hiçbiri | Pulsar hizmeti HttpUrl yapılandırması. Yalnızca maxBytesPerTrigger belirtildiğinde gereklidir. |
pulsarAdminAuthPlugin | DİZGİ | Hiçbiri | Kimlik doğrulama eklentisinin adı. |
pulsarAdminAuthParams | DİZGİ | Hiçbiri | Kimlik doğrulama eklentisinin parametreleri. |
pulsarClientUseKeyStoreTls | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için KeyStore'un kullanılıp kullanılmaymayacağı. |
pulsarAdminTlsTrustStoreType | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için TrustStore dosya türü. |
pulsarAdminTlsTrustStorePath | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için TrustStore dosya yolu. |
pulsarAdminTlsTrustStorePassword | DİZGİ | Hiçbiri | Tls kimlik doğrulaması için TrustStore parolası. |
İadeler
Aşağıdaki şemaya sahip pulsar kayıtları tablosu.
__key STRING NOT NULL
: Pulsar mesaj anahtarı.value BINARY NOT NULL
: Pulsar mesaj değeri.Not: Avro veya JSON şemasına sahip konular için, içeriği ikili değer alanına yüklemek yerine, pulsar konusunun alan adlarını ve alan türlerini korumak için içerik genişletilecektir.
__topic STRING NOT NULL
: Pulsar başlık adı.__messageId BINARY NOT NULL
: Pulsar mesaj kimliği.__publishTime TIMESTAMP NOT NULL
: Pulsar mesajın yayımlanma zamanı.__eventTime TIMESTAMP NOT NULL
: Mesaj olay zamanı (Pulsar).__messageProperties MAP<STRING, STRING>
: Pulsar ileti özellikleri.
Örnekler
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.