Aracılığıyla paylaş


Google Pub/Sub'a abone olma

Azure Databricks, Databricks Runtime 13.3 LTS ve üzerinde Google Pub/Sub'a abone olmak için yerleşik bir bağlayıcı sağlar. Bu bağlayıcı, aboneden gelen kayıtlar için tam olarak bir kez işleme semantiği sağlar.

Not

Pub/Sub yinelenen kayıtlar yayımlayabilir ve kayıtlar aboneye sırasız gelebilir. Yinelenen ve sıra dışı kayıtları işlemek için Azure Databricks kodu yazmanız gerekir.

Söz dizimi örneği

Aşağıdaki kod örneği, Pub/Sub'dan okunan yapılandırılmış akış yapılandırmaya yönelik temel söz dizimini gösterir:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Daha fazla yapılandırma seçeneği için bkz . Pub/Sub akış okuma seçeneklerini yapılandırma.

Pub/Sub erişimi yapılandırma

Databricks, yetkilendirme seçenekleri sağlarken gizli dizilerin kullanılmasını önerir. Bağlantıyı yetkilendirmek için aşağıdaki seçenekler gereklidir:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Aşağıdaki tabloda, yapılandırılan kimlik bilgileri için gereken roller açıklanmaktadır:

Roller Gerekli veya isteğe bağlı Nasıl kullanılır?
roles/pubsub.viewer veya roles/viewer Zorunlu Aboneliğin mevcut olup olmadığını denetleyin ve aboneliği alın
roles/pubsub.subscriber Zorunlu Abonelikten veri getirme
roles/pubsub.editor veya roles/editor İsteğe bağlı Yoksa aboneliğin oluşturulmasını ve akış sonlandırmada abonelikleri silmek için özelliğinin deleteSubscriptionOnStreamStop kullanılmasını etkinleştirir

Pub/Sub şeması

Akışın şeması, aşağıdaki tabloda açıklandığı gibi Pub/Sub dosyasından getirilen kayıtlara eşleşiyor:

Alan Tür
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Pub/Sub akış okuma seçeneklerini yapılandırma

Aşağıdaki tabloda Pub/Sub için desteklenen seçenekler açıklanmaktadır. Tüm seçenekler, söz dizimi kullanılarak .option("<optionName>", "<optionValue>") Yapılandırılmış Akış okumasının bir parçası olarak yapılandırılır.

Not

Bazı Pub/Sub yapılandırma seçenekleri, mikro toplu işlemler yerine getirme kavramını kullanır. Bu, iç uygulama ayrıntılarını yansıtır ve seçenekler, kayıtların getirilip işlenmeleri dışında diğer Yapılandırılmış Akış bağlayıcılarındaki corollaries'lere benzer şekilde çalışır.

Seçenek Varsayılan değer Açıklama
numFetchPartitions Akış başlatma sırasında mevcut yürütücü sayısının yarısına ayarlayın. Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı.
deleteSubscriptionOnStreamStop false ise true, akış işi sona erdiğinde akışa geçirilen abonelik silinir.
maxBytesPerTrigger yok Tetiklenen her mikro toplu işlem sırasında işlenecek toplu iş boyutu için geçici sınır.
maxRecordsPerFetch 1000 Kayıtları işlemeden önce görev başına getirilmeye çalışacak kayıt sayısı.
maxFetchPeriod 10 saniye Kayıtları işlemeden önce her görevin getirilebilmesi için gereken süre. Databricks varsayılan değerin kullanılmasını önerir.

Pub/Sub için artımlı toplu işleme semantiği

Pub/Sub kaynaklarından kullanılabilir kayıtları artımlı bir toplu iş olarak kullanmak için kullanabilirsiniz Trigger.AvailableNow .

Azure Databricks, ayarıyla bir okumaya başladığınızda zaman damgasını Trigger.AvailableNow kaydeder. Toplu işlem tarafından işlenen kayıtlar, daha önce getirilen tüm verileri ve kaydedilen akış başlangıç zaman damgasından daha az zaman damgasına sahip yeni yayımlanan kayıtları içerir.

Bkz . Artımlı toplu işlemeyi yapılandırma.

Akış ölçümlerini izleme

Yapılandırılmış Akış ilerlemesi ölçümleri getirilen ve işlemeye hazır kayıt sayısını, getirilen ve işlemeye hazır kayıtların boyutunu ve akış başlangıcından bu yana görülen yinelenenlerin sayısını bildirir. Aşağıda bu ölçümlere bir örnek verilmiştir:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Sınırlamalar

Pub/Sub ile tahmini yürütme (spark.speculation) desteklenmez.