read_pubsub
akış tablosu değerli işlevi
Şunlar için geçerlidir: Databricks SQL Databricks Runtime 13.3 LTS ve üzeri
Bir konudan Pub/Sub dosyasından okunan kayıtları içeren bir tablo döndürür. Yalnızca akış sorgularını destekler.
Söz dizimi
read_pubsub( { parameter => value } [, ...])
Bağımsız değişkenler
read_pubsub
adlandırılmış parametre çağırma gerektirir.
Tek gerekli bağımsız değişkenler , projectId
ve topicId
'tirsubscriptionId
. Diğer tüm bağımsız değişkenler isteğe bağlıdır.
Tam bağımsız değişken açıklamaları için bkz . Pub/Sub akış okuma seçeneklerini yapılandırma.
Databricks, yetkilendirme seçenekleri sağlarken gizli dizilerin kullanılmasını önerir. Bkz. gizli dizi işlevi.
Pub/Sub erişimi yapılandırma hakkında ayrıntılı bilgi için bkz . Pub/Sub erişimini yapılandırma.
Parametre | Tür | Açıklama |
---|---|---|
subscriptionId |
STRING |
Gerekli, Pub/Sub aboneliğine atanan benzersiz tanımlayıcı. |
projectId |
STRING |
Gerekli, Pub/Sub konusuyla ilişkilendirilmiş Google Cloud proje kimliği. |
topicId |
STRING |
Gerekli, abone olunacak Pub/Sub konusunun kimliği veya adı. |
clientEmail |
STRING |
Kimlik doğrulaması için bir hizmet hesabıyla ilişkilendirilmiş e-posta adresi. |
clientId |
STRING |
Kimlik doğrulaması için hizmet hesabıyla ilişkili istemci kimliği. |
privateKeyId |
STRING |
Hizmet hesabıyla ilişkili özel anahtarın kimliği. |
privateKey |
STRING |
Kimlik doğrulaması için hizmet hesabıyla ilişkili özel anahtar. |
Bu bağımsız değişkenler, Pub/Sub'dan okurken daha fazla ince ayar yapmak için kullanılır:
Parametre | Tür | Açıklama |
---|---|---|
numFetchPartitions |
STRING |
Varsayılan yürütücü sayısıyla isteğe bağlı. Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Varsayılan false ile isteğe bağlı. true olarak ayarlanırsa akış işi sona erdiğinde akışa geçirilen abonelik silinir. |
maxBytesPerTrigger |
STRING |
Tetiklenen her mikro toplu işlem sırasında işlenecek toplu iş boyutu için geçici sınır. Varsayılan değer :'none'. |
maxRecordsPerFetch |
STRING |
Kayıtları işlemeden önce görev başına getirilmeye çalışacak kayıt sayısı. Varsayılan değer '1000'dir. |
maxFetchPeriod |
STRING |
Kayıtları işlemeden önce her görevin getirilebilmesi için gereken süre. Varsayılan değer :'10s'. |
Döndürülenler
Aşağıdaki şemaya sahip Pub/Sub kayıtları tablosu. Öznitelikler sütunu null olabilir, ancak diğer tüm sütunlar null değildir.
Veri Akışı Adı | Veri türü | Null Atanabilir | Standart | Açıklama |
---|---|---|---|---|
messageId |
STRING |
Hayır | Pub/Sub iletisi için benzersiz tanımlayıcı. | |
payload |
BINARY |
Hayır | Pub/Sub iletisinin içeriği. | |
attributes |
STRING |
Yes | Pub/Sub iletisinin özniteliklerini temsil eden anahtar-değer çiftleri. Bu json ile kodlanmış bir dizedir. | |
publishTimestampInMillis |
BIGINT |
Hayır | İletinin yayımlandığı zaman damgası (milisaniye cinsinden). | |
sequenceNumber |
BIGINT |
Hayır | Kaydın parçası içindeki benzersiz tanımlayıcısı. |
Örnekler
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Verilerin artık daha fazla analiz için'den testing.streaming_table
sorgulanması gerekir.
Hatalı sorgular:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);
İlgili makaleler
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin