Share via


Stream vanuit Apache Pulsar

Belangrijk

Deze functie is beschikbaar als openbare preview.

In Databricks Runtime 14.1 en hoger kunt u Structured Streaming gebruiken om gegevens van Apache Pulsar op Azure Databricks te streamen.

Structured Streaming biedt exact eenmaal verwerkingssemantiek voor gegevens die uit Pulsar-bronnen worden gelezen.

Voorbeeld van syntaxis

Hier volgt een eenvoudig voorbeeld van het gebruik van Structured Streaming om te lezen uit Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

U moet altijd een service.url en een van de volgende opties opgeven om onderwerpen op te geven:

  • topic
  • topics
  • topicsPattern

Zie Opties configureren voor Pulsar-streaming voor een volledige lijst met opties.

Verifiëren bij Pulsar

Azure Databricks biedt ondersteuning voor truststore- en sleutelopslagverificatie voor Pulsar. Databricks raadt aan geheimen te gebruiken bij het opslaan van configuratiegegevens.

U kunt de volgende opties instellen tijdens de streamconfiguratie:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Als de stream gebruikmaakt van een PulsarAdmin, stelt u ook het volgende in:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

In het volgende voorbeeld ziet u hoe u verificatieopties configureert:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar-schema

Het schema van records dat uit Pulsar wordt gelezen, is afhankelijk van de wijze waarop de schema's van de onderwerpen zijn gecodeerd.

  • Voor onderwerpen met avro- of JSON-schema blijven veldnamen en veldtypen behouden in het resulterende Spark DataFrame.
  • Voor onderwerpen zonder schema of met een eenvoudig gegevenstype in Pulsar wordt de nettolading geladen in een value kolom.
  • Als de lezer is geconfigureerd voor het lezen van meerdere onderwerpen met verschillende schema's, stelt u allowDifferentTopicSchemas de onbewerkte inhoud in een value kolom in.

Pulsar-records hebben de volgende metagegevensvelden:

Column Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Opties configureren voor het lezen van Pulsar-streaming

Alle opties worden geconfigureerd als onderdeel van een Structured Streaming-leesbewerking met behulp van .option("<optionName>", "<optionValue>") syntaxis. U kunt verificatie ook configureren met behulp van opties. Zie Verifiëren bij Pulsar.

In de volgende tabel worden de vereiste configuraties voor Pulsar beschreven. U moet slechts één van de opties topicopgeven, topics of topicsPattern.

Optie Standaardwaarde Beschrijving
service.url Geen De Pulsar-configuratie voor de Pulsar-service serviceUrl .
topic Geen Een tekenreeks voor de onderwerpnaam die het onderwerp moet gebruiken.
topics Geen Een door komma's gescheiden lijst met onderwerpen die moeten worden gebruikt.
topicsPattern Geen Een Java regex-tekenreeks die overeenkomt met onderwerpen die moeten worden gebruikt.

In de volgende tabel worden andere opties beschreven die worden ondersteund voor Pulsar:

Optie Standaardwaarde Beschrijving
predefinedSubscription Geen De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden.
subscriptionPrefix Geen Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te genereren om de voortgang van spark-toepassingen bij te houden.
pollTimeoutMs 120.000 De time-out voor het lezen van berichten van Pulsar in milliseconden.
waitingForNonExistedTopic false Of de connector moet wachten totdat de gewenste onderwerpen zijn gemaakt.
failOnDataLoss true Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid).
allowDifferentTopicSchemas false Als meerdere onderwerpen met verschillende schema's worden gelezen, gebruikt u deze parameter om automatische deserialisatie van onderwerpwaarden op basis van schema's uit te schakelen. Alleen de onbewerkte waarden worden geretourneerd wanneer dit is true.
startingOffsets latest Als latestde lezer de nieuwste records leest nadat deze is gestart. Als earliest, de lezer leest van de vroegste offset. De gebruiker kan ook een JSON-tekenreeks opgeven die een specifieke offset aangeeft.
maxBytesPerTrigger Geen Een zachte limiet van het maximum aantal bytes dat we per microbatch willen verwerken. Als dit is opgegeven, admin.url moet ook worden opgegeven.
admin.url Geen De Pulsar-configuratie serviceHttpUrl . Alleen nodig wanneer maxBytesPerTrigger is opgegeven.

U kunt ook configuraties voor Pulsar-clients, beheerders en lezers opgeven met behulp van de volgende patronen:

Patroon Koppeling naar opties voor conifiguratie
pulsar.client.* Pulsar-clientconfiguratie
pulsar.admin.* Configuratie van Pulsar-beheerder
pulsar.reader.* Configuratie van Pulsar-lezer

JSON maken met begin offsets

U kunt handmatig een bericht-id maken om een specifieke offset op te geven en deze als JSON door te geven aan de startingOffsets optie. In het volgende codevoorbeeld ziet u deze syntaxis:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()