Aracılığıyla paylaş


read_pubsub akış tablosu değerli işlevi

Şunlar için geçerlidir: onay işareti evet olarak işaretlenmiş Databricks SQL onay işareti evet olarak işaretlenmiş Databricks Runtime 13.3 LTS ve üzeri

Bir konudan Pub/Sub dosyasından okunan kayıtları içeren bir tablo döndürür. Yalnızca akış sorgularını destekler.

Söz dizimi

read_pubsub( { parameter => value } [, ...])

Bağımsız değişkenler

read_pubsubadlandırılmış parametre çağırma gerektirir.

Tek gerekli bağımsız değişkenler , projectIdve topicId'tirsubscriptionId. Diğer tüm bağımsız değişkenler isteğe bağlıdır.

Tam bağımsız değişken açıklamaları için bkz . Pub/Sub akış okuma seçeneklerini yapılandırma.

Databricks, yetkilendirme seçenekleri sağlarken gizli dizilerin kullanılmasını önerir. Bkz. gizli dizi işlevi.

Pub/Sub erişimi yapılandırma hakkında ayrıntılı bilgi için bkz . Pub/Sub erişimini yapılandırma.

Parametre Tür Açıklama
subscriptionId STRING Gerekli, Pub/Sub aboneliğine atanan benzersiz tanımlayıcı.
projectId STRING Gerekli, Pub/Sub konusuyla ilişkilendirilmiş Google Cloud proje kimliği.
topicId STRING Gerekli, abone olunacak Pub/Sub konusunun kimliği veya adı.
clientEmail STRING Kimlik doğrulaması için bir hizmet hesabıyla ilişkilendirilmiş e-posta adresi.
clientId STRING Kimlik doğrulaması için hizmet hesabıyla ilişkili istemci kimliği.
privateKeyId STRING Hizmet hesabıyla ilişkili özel anahtarın kimliği.
privateKey STRING Kimlik doğrulaması için hizmet hesabıyla ilişkili özel anahtar.

Bu bağımsız değişkenler, Pub/Sub'dan okurken daha fazla ince ayar yapmak için kullanılır:

Parametre Tür Açıklama
numFetchPartitions STRING Varsayılan yürütücü sayısıyla isteğe bağlı. Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı.
deleteSubscriptionOnStreamStop BOOLEAN Varsayılan falseile isteğe bağlı. true olarak ayarlanırsa akış işi sona erdiğinde akışa geçirilen abonelik silinir.
maxBytesPerTrigger STRING Tetiklenen her mikro toplu işlem sırasında işlenecek toplu iş boyutu için geçici sınır. Varsayılan değer :'none'.
maxRecordsPerFetch STRING Kayıtları işlemeden önce görev başına getirilmeye çalışacak kayıt sayısı. Varsayılan değer '1000'dir.
maxFetchPeriod STRING Kayıtları işlemeden önce her görevin getirilebilmesi için gereken süre. Varsayılan değer :'10s'.

Döndürülenler

Aşağıdaki şemaya sahip Pub/Sub kayıtları tablosu. Öznitelikler sütunu null olabilir, ancak diğer tüm sütunlar null değildir.

Veri Akışı Adı Veri türü Null Atanabilir Standart Açıklama
messageId STRING Hayır Pub/Sub iletisi için benzersiz tanımlayıcı.
payload BINARY Hayır Pub/Sub iletisinin içeriği.
attributes STRING Yes Pub/Sub iletisinin özniteliklerini temsil eden anahtar-değer çiftleri. Bu json ile kodlanmış bir dizedir.
publishTimestampInMillis BIGINT Hayır İletinin yayımlandığı zaman damgası (milisaniye cinsinden).
sequenceNumber BIGINT Hayır Kaydın parçası içindeki benzersiz tanımlayıcısı.

Örnekler

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Verilerin artık daha fazla analiz için'den testing.streaming_table sorgulanması gerekir.

Hatalı sorgular:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);