Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Şunlar için geçerlidir:
Databricks SQL
Databricks Runtime 13.3 LTS ve üzeri
Pub/Sub'daki bir konudan okunan kayıtları içeren bir tablo döndürür. Yalnızca akış sorgularını destekler.
Söz dizimi
read_pubsub( { parameter => value } [, ...])
Tartışmalar
read_pubsub
adlandırılmış parametre çağırma gerektirir.
Tek gerekli bağımsız değişkenler , subscriptionIdve projectId'tirtopicId. Diğer tüm bağımsız değişkenler isteğe bağlıdır.
Tam bağımsız değişken açıklamaları için bakınız Pub/Sub akış okuma seçeneklerini yapılandırma.
Databricks, yetkilendirme seçenekleri sunarken gizli bilgilerin kullanılmasını önerir. bkz. secret işlevi.
Pub/Sub erişimi yapılandırma hakkında ayrıntılı bilgi için bkz . Pub/Sub erişimini yapılandırma.
| Parametre | Tür | Açıklama |
|---|---|---|
subscriptionId |
STRING |
Gerekli, Pub/Sub aboneliğine atanan benzersiz tanımlayıcı. |
projectId |
STRING |
Gerekli, Pub/Sub konusuyla ilişkilendirilmiş Google Cloud proje kimliği. |
topicId |
STRING |
Gerekli, abone olunacak Pub/Sub konusunun kimliği veya adı. |
clientEmail |
STRING |
Kimlik doğrulaması için bir hizmet hesabıyla ilişkilendirilmiş e-posta adresi. |
clientId |
STRING |
Kimlik doğrulaması için hizmet hesabıyla ilişkili istemci kimliği. |
privateKeyId |
STRING |
Hizmet hesabıyla ilişkili özel anahtarın kimliği. |
privateKey |
STRING |
Kimlik doğrulaması için hizmet hesabıyla ilişkili özel anahtar. |
Bu parametreler, Pub/Sub'dan okuma sırasında daha fazla hassas ayar yapmak için kullanılır.
| Parametre | Tür | Açıklama |
|---|---|---|
numFetchPartitions |
STRING |
Varsayılan yürütücü sayısıyla isteğe bağlı. Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Varsayılan falseile isteğe bağlı. true olarak ayarlanırsa akış işi sona erdiğinde akışa geçirilen abonelik silinir. |
maxBytesPerTrigger |
STRING |
Tetiklenen her mikro toplu işlem sırasında işlenecek toplu iş boyutu için geçici sınır. Varsayılan değer :'none'. |
maxRecordsPerFetch |
STRING |
İşlenmeden önce görev başına alınacak kayıt sayısı. Varsayılan değer '1000'dir. |
maxFetchPeriod |
STRING |
Kayıtları işlemeden önce her görevin getirilebilmesi için gereken süre. Varsayılan değer :'10s'. |
İadeler
Aşağıdaki şemaya sahip Pub/Sub kayıtları tablosu. Öznitelikler sütunu null olabilir, ancak diğer tüm sütunlar null değildir.
| Veri Akışı Adı | Veri türü | Null Atanabilir | Standart | Açıklama |
|---|---|---|---|---|
messageId |
STRING |
Hayır | Pub/Sub iletisi için benzersiz tanımlayıcı. | |
payload |
BINARY |
Hayır | Pub/Sub iletisinin içeriği. | |
attributes |
STRING |
Evet | Pub/Sub iletisinin özniteliklerini temsil eden anahtar-değer çiftleri. Bu json ile kodlanmış bir dizedir. | |
publishTimestampInMillis |
BIGINT |
Hayır | İletinin yayımlandığı zaman damgası (milisaniye cinsinden). | |
sequenceNumber |
BIGINT |
Hayır | Kaydın parçası içindeki benzersiz tanımlayıcısı. |
Örnekler
-- Streaming Ingestion from Pubsub using Google Service Account secrets
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
clientEmail => secret('app-events', 'clientEmail'),
clientId => secret('app-events', 'clientId'),
privateKeyId => secret('app-events', 'privateKeyId'),
privateKey => secret('app-events', 'privateKey')
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic'
);
Artık verilerin daha fazla analiz için testing.streaming_table üzerinden sorgulanması gerekiyor.
Hatalı sorgular:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project'
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
maxRecordsPerFetchLimit => '1000001'
);