Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Google Pub/Sub'a abone olmak için yerleşik bağlayıcıyı kullanın. Bu bağlayıcı, aboneden gelen kayıtlar için tam olarak bir kez işleme semantiği sağlar.
Not
Pub/Sub yinelenen kayıtlar yayımlayabilir veya kayıtlar aboneye yanlış sırayla ulaşabilir. Yinelenen ve sıra dışı kayıtları işlemek için kod yazın.
Pub/Sub akışı yapılandırma
Aşağıdaki kod örneği, Pub/Sub'tan Structured Streaming okuma işlemi için konfigürasyon yapmaya yönelik temel söz dizimini gösterir.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Daha fazla yapılandırma seçeneği için Pub/Sub akış okuma seçeneklerini yapılandırma bölümüne bakın.
Pub/Sub erişimi yapılandırma
Yapılandırdığınız kimlik bilgileri aşağıdaki rollere sahip olmalıdır.
| Roller | Gerekli veya isteğe bağlı | Rol nasıl kullanılır? |
|---|---|---|
roles/pubsub.viewer veya roles/viewer |
Zorunlu | Aboneliğin mevcut olup olmadığını denetler ve aboneliği alır. |
roles/pubsub.subscriber |
Zorunlu | Abonelikten veri getirir. |
roles/pubsub.editor veya roles/editor |
İsteğe bağlı | Abonelik yoksa oluşturulmasını ve akış sonlandırmada abonelikleri silmek için deleteSubscriptionOnStreamStop'i kullanmayı etkinleştirir. |
Databricks, yetkilendirme seçenekleri sunulurken gizli bilgilerin kullanılmasını önerir. Bağlantıyı yetkilendirmek için aşağıdaki seçenekler gereklidir:
clientEmailclientIdprivateKeyprivateKeyId
Pub/Sub şemasını anlama
Akışın şeması, aşağıdaki tabloda açıklandığı gibi Pub/Sub'dan getirilen kayıtlara uygundur.
| Alan | Tür |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Pub/Sub akış okuma seçeneklerini yapılandırma
Aşağıdaki tabloda Pub/Sub için desteklenen seçenekler açıklanmaktadır. Tüm seçenekler, .option("<optionName>", "<optionValue>") söz dizimi kullanılarak Yapılandırılmış Akış okumasının bir parçası olarak yapılandırılır.
Not
Bazı Pub/Sub yapılandırma seçenekleri, getirme kavramını mikro toplu işlemler yerine kullanır. Bu, iç uygulama ayrıntılarını yansıtır ve seçenekler, kayıtların getirilip işlenmeleri dışında diğer Yapılandırılmış Akış bağlayıcılarındaki corollaries'lere benzer şekilde çalışır.
| Seçenek | Varsayılan değer | Açıklama |
|---|---|---|
numFetchPartitions |
Akış başlatma sırasında mevcut yürütücü sayısının yarısına ayarlayın. | Bir abonelikten kayıt getiren paralel Spark görevlerinin sayısı. |
deleteSubscriptionOnStreamStop |
false |
Eğer true ise, akış işi sona erdiğinde akışa geçirilen abonelik silinir. |
maxBytesPerTrigger |
none |
Tetiklenen her mikro partide işlenecek parti boyutu için esnek sınır. |
maxRecordsPerFetch |
1000 |
Kayıtları işlemeden önce görev başına getirilmeye çalışacak kayıt sayısı. |
maxFetchPeriod |
10s |
Her bir görevin kayıtları işlemeye başlamadan önce alınması için gereken zaman süresi. Örneğin 1s , 1 saniye veya 1m 1 dakika boyunca bir süre dizesi kabul eder. Databricks varsayılan değerin kullanılmasını önerir. |
Pub/Sub ile artımlı toplu işlem kullanma
Trigger.AvailableNow kullanarak Pub/Sub kaynaklarından kullanılabilir kayıtları artımlı toplu iş olarak tüketebilirsiniz.
Azure Databricks, Trigger.AvailableNow ayarıyla bir okumaya başladığınızda zaman damgasını kaydeder. Toplu işlem tarafından işlenen kayıtlar, daha önce alınan tüm verileri ve kayıtlı akış başlangıç zaman damgasından daha küçük bir zaman damgasına sahip yeni yayımlanan kayıtları içerir. Daha fazla bilgi için bkz AvailableNow. Artımlı toplu işlem.
Pub/Sub akış ölçümlerini izleme
Yapılandırılmış Akış ilerlemesi ölçümleri getirilen ve işlemeye hazır kayıt sayısını, getirilen ve işlemeye hazır kayıtların boyutunu ve akış başlangıcından bu yana görülen yinelenenlerin sayısını bildirir. Aşağıda bu ölçümlere bir örnek verilmiştir:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Sınırlamalar
Pub/Sub tahmini yürütmeyi (spark.speculation) desteklemez.