Aracılığıyla paylaş


Apache Pulsar'dan akış

Önemli

Bu özellik Genel Önizlemededir.

Databricks Runtime 14.1 ve üzerinde Yapılandırılmış Akış'ı kullanarak Azure Databricks üzerinde Apache Pulsar'dan veri akışı yapabilirsiniz.

Yapılandırılmış Akış, Pulsar kaynaklarından okunan veriler için tam olarak bir kez işleme semantiği sağlar.

Söz dizimi örneği

Aşağıda, Pulsar'dan okumak için Yapılandırılmış Akış kullanmanın temel bir örneği verilmiştir:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Konuları belirtmek için her zaman aşağıdaki seçeneklerden birini sağlamanız service.url gerekir:

  • topic
  • topics
  • topicsPattern

Seçeneklerin tam listesi için bkz . Pulsar akış okuma seçeneklerini yapılandırma.

Pulsar'da kimlik doğrulaması

Azure Databricks, Pulsar'da truststore ve keystore kimlik doğrulamasını destekler. Databricks, yapılandırma ayrıntılarını depolarken gizli dizilerin kullanılmasını önerir.

Akış yapılandırması sırasında aşağıdaki seçenekleri ayarlayabilirsiniz:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Akış bir PulsarAdminkullanıyorsa, aşağıdakileri de ayarlayın:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Aşağıdaki örnekte kimlik doğrulama seçeneklerinin yapılandırılması gösterilmektedir:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar şeması

Pulsar'dan okunan kayıtların şeması, konuların şemalarının nasıl kodlanmış olduğuna bağlıdır.

  • Avro veya JSON şemasıyla ilgili konular için, sonuçta elde edilen Spark DataFrame'de alan adları ve alan türleri korunur.
  • Pulsar'da şemasız veya basit bir veri türüne sahip konular için yük bir value sütuna yüklenir.
  • Okuyucu farklı şemalara sahip birden çok konuyu okuyacak şekilde yapılandırılmışsa ham içeriği bir value sütuna yüklenecek şekilde ayarlayınallowDifferentTopicSchemas.

Pulsar kayıtları aşağıdaki meta veri alanlarına sahiptir:

Sütun Tür
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Pulsar akış okuma seçeneklerini yapılandırma

Tüm seçenekler, söz dizimi kullanılarak .option("<optionName>", "<optionValue>") Yapılandırılmış Akış okumasının bir parçası olarak yapılandırılır. Ayrıca, seçenekleri kullanarak kimlik doğrulamayı yapılandırabilirsiniz. Bkz . Pulsar'da kimlik doğrulaması.

Aşağıdaki tabloda Pulsar için gerekli yapılandırmalar açıklanmaktadır. Seçeneklerden topicyalnızca birini belirtmeniz gerekir veya topicstopicsPattern.

Seçenek Varsayılan değer Tanım
service.url yok Pulsar serviceUrl servisi için Pulsar yapılandırması.
topic yok Kullanılacak konu için konu adı dizesi.
topics yok Kullanılacak konuların virgülle ayrılmış listesi.
topicsPattern yok Kullanılacak konu başlıklarıyla eşleşecek bir Java regex dizesi.

Aşağıdaki tabloda Pulsar için desteklenen diğer seçenekler açıklanmaktadır:

Seçenek Varsayılan değer Tanım
predefinedSubscription yok Spark uygulamasının ilerleme durumunu izlemek için bağlayıcı tarafından kullanılan önceden tanımlanmış abonelik adı.
subscriptionPrefix yok Spark uygulamasının ilerleme durumunu izlemek üzere rastgele bir abonelik oluşturmak için bağlayıcı tarafından kullanılan ön ek.
pollTimeoutMs 120000 Pulsar'dan gelen iletileri milisaniye cinsinden okumak için zaman aşımı.
waitingForNonExistedTopic false Bağlayıcının istenen konular oluşturulana kadar beklemesi gerekip gerekmediği.
failOnDataLoss true Veriler kaybolduğunda sorgunun başarısız olup olmayacağını denetler (örneğin, konular silinir veya bekletme ilkesi nedeniyle iletiler silinir).
allowDifferentTopicSchemas false Farklı şemalara sahip birden çok konu okunursa, otomatik şema tabanlı konu değerini seri durumdan çıkarma özelliğini kapatmak için bu parametreyi kullanın. Bu olduğunda trueyalnızca ham değerler döndürülür.
startingOffsets latest ise latest, okuyucu çalışmaya başladıktan sonra en yeni kayıtları okur. ise earliest, okuyucu en erken uzaklığı okur. Kullanıcı ayrıca belirli bir uzaklığı belirten bir JSON dizesi de belirtebilir.
maxBytesPerTrigger yok Mikrobatch başına işlemek istediğimiz bayt sayısı üst sınırının geçici sınırı. Bu belirtilirse, admin.url ayrıca belirtilmesi gerekir.
admin.url yok Pulsar serviceHttpUrl yapılandırması. Yalnızca belirtildiğinde maxBytesPerTrigger gereklidir.

Aşağıdaki desenleri kullanarak pulsar istemci, yönetici ve okuyucu yapılandırmalarını da belirtebilirsiniz:

Desen Birleştirme seçeneklerine bağlantı
pulsar.client.* Pulsar istemci yapılandırması
pulsar.admin.* Pulsar yönetici yapılandırması
pulsar.reader.* Pulsar okuyucu yapılandırması

Başlangıç uzaklıklarını oluşturma JSON

Belirli bir uzaklığı belirtmek için el ile bir ileti kimliği oluşturabilir ve bunu seçeneğe startingOffsets JSON olarak geçirebilirsiniz. Aşağıdaki kod örneği bu söz dizimini gösterir:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()