Google Pub/Sub'a abone olma

Google Pub/Sub'a abone olmak için yerleşik bağlayıcıyı kullanın. Bu bağlayıcı, aboneden gelen kayıtlar için tam olarak bir kez işleme semantiği sağlar.

Not

Pub/Sub yinelenen kayıtlar yayımlayabilir veya kayıtlar aboneye yanlış sırayla ulaşabilir. Yinelenen ve sıra dışı kayıtları işlemek için kod yazın.

Pub/Sub akışı yapılandırma

Aşağıdaki kod örneği, Pub/Sub'tan Structured Streaming okuma işlemi için konfigürasyon yapmaya yönelik temel söz dizimini gösterir.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Daha fazla yapılandırma seçeneği için Pub/Sub akış okuma seçeneklerini yapılandırma bölümüne bakın.

Pub/Sub erişimi yapılandırma

Yapılandırdığınız kimlik bilgileri aşağıdaki rollere sahip olmalıdır.

Roller Gerekli veya isteğe bağlı Rol nasıl kullanılır?
roles/pubsub.viewer veya roles/viewer Zorunlu Aboneliğin mevcut olup olmadığını denetler ve aboneliği alır.
roles/pubsub.subscriber Zorunlu Abonelikten veri getirir.
roles/pubsub.editor veya roles/editor İsteğe bağlı Abonelik yoksa oluşturulmasını ve akış sonlandırmada abonelikleri silmek için deleteSubscriptionOnStreamStop'i kullanmayı etkinleştirir.

Databricks, yetkilendirme seçenekleri sunulurken gizli bilgilerin kullanılmasını önerir. Bağlantıyı yetkilendirmek için aşağıdaki seçenekler gereklidir:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Sub şemasını anlama

Akışın şeması, aşağıdaki tabloda açıklandığı gibi Pub/Sub'dan getirilen kayıtlara uygundur.

Alan Tür
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Pub/Sub akış okuma seçeneklerini yapılandırma

Aşağıdaki tabloda Pub/Sub için desteklenen seçenekler açıklanmaktadır. Tüm seçenekler, .option("<optionName>", "<optionValue>") söz dizimi kullanılarak Yapılandırılmış Akış okumasının bir parçası olarak yapılandırılır.

Not

Bazı Pub/Sub yapılandırma seçenekleri, getirme kavramını mikro toplu işlemler yerine kullanır. Bu, iç uygulama ayrıntılarını yansıtır ve seçenekler, kayıtların getirilip işlenmeleri dışında diğer Yapılandırılmış Akış bağlayıcılarındaki corollaries'lere benzer şekilde çalışır.

Seçenek Varsayılan değer Açıklama
numFetchPartitions Akış başlatma sırasında mevcut yürütücü sayısının yarısına ayarlayın. Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı.
deleteSubscriptionOnStreamStop false Eğer true ise, akış işi sona erdiğinde akışa geçirilen abonelik silinir.
maxBytesPerTrigger none Tetiklenen her mikro partide işlenecek parti boyutu için esnek sınır.
maxRecordsPerFetch 1000 Kayıtları işlemeden önce görev başına getirilmeye çalışacak kayıt sayısı.
maxFetchPeriod 10s Her bir görevin kayıtları işlemeye başlamadan önce alınması için gereken zaman süresi. Örneğin 1s , 1 saniye veya 1m 1 dakika boyunca bir süre dizesi kabul eder. Databricks varsayılan değerin kullanılmasını önerir.

Pub/Sub ile artımlı toplu işlem kullanma

Trigger.AvailableNow kullanarak Pub/Sub kaynaklarından kullanılabilir kayıtları artımlı toplu iş olarak tüketebilirsiniz.

Azure Databricks, Trigger.AvailableNow ayarıyla bir okumaya başladığınızda zaman damgasını kaydeder. Toplu işlem tarafından işlenen kayıtlar, daha önce alınan tüm verileri ve kayıtlı akış başlangıç zaman damgasından daha küçük bir zaman damgasına sahip yeni yayımlanan kayıtları içerir. Daha fazla bilgi için bkz AvailableNow. Artımlı toplu işlem.

Pub/Sub akış ölçümlerini izleme

Yapılandırılmış Akış ilerlemesi ölçümleri getirilen ve işlemeye hazır kayıt sayısını, getirilen ve işlemeye hazır kayıtların boyutunu ve akış başlangıcından bu yana görülen yinelenenlerin sayısını bildirir. Aşağıda bu ölçümlere bir örnek verilmiştir:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Sınırlamalar

Pub/Sub tahmini yürütmeyi (spark.speculation) desteklemez.