Dela via


Strömma från Apache Pulsar

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

I Databricks Runtime 14.1 och senare kan du använda Structured Streaming för att strömma data från Apache Pulsar på Azure Databricks.

Strukturerad direktuppspelning ger exakt en gång bearbetningssemantik för data som lästs från Pulsar-källor.

Syntaxexempel

Följande är ett grundläggande exempel på hur du använder Structured Streaming för att läsa från Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Du måste alltid ange ett service.url och något av följande alternativ för att ange ämnen:

  • topic
  • topics
  • topicsPattern

En fullständig lista över alternativ finns i Konfigurera alternativ för Pulsar-strömning.

Autentisera till Pulsar

Azure Databricks stöder autentisering med förtroendearkiv och nyckelarkiv till Pulsar. Databricks rekommenderar att du använder hemligheter när du lagrar konfigurationsinformation.

Du kan ange följande alternativ under strömkonfigurationen:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Om strömmen använder en PulsarAdminanger du även följande:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

I följande exempel visas hur du konfigurerar autentiseringsalternativ:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar-schema

Schemat för poster som läss från Pulsar beror på hur ämnen har sina scheman kodade.

  • För ämnen med Avro- eller JSON-schema bevaras fältnamn och fälttyper i den resulterande Spark DataFrame.
  • För ämnen utan schema eller med en enkel datatyp i Pulsar läses nyttolasten in i en value kolumn.
  • Om läsaren är konfigurerad för att läsa flera ämnen med olika scheman anger du allowDifferentTopicSchemas för att läsa in råinnehållet i en value kolumn.

Pulsar-poster har följande metadatafält:

Column Typ
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Konfigurera alternativ för läsläsning av Pulsar-strömning

Alla alternativ konfigureras som en del av en strukturerad strömningsläsning med hjälp av .option("<optionName>", "<optionValue>") syntax. Du kan också konfigurera autentisering med hjälp av alternativ. Se Autentisera till Pulsar.

I följande tabell beskrivs nödvändiga konfigurationer för Pulsar. Du måste bara ange ett av alternativen topic, topics eller topicsPattern.

Alternativ Standardvärde Description
service.url inget Pulsar-konfigurationen serviceUrl för Pulsar-tjänsten.
topic inget En ämnesnamnsträng som ämnet ska använda.
topics inget En kommaavgränsad lista över de ämnen som ska användas.
topicsPattern inget En Java regex-sträng som matchar ämnen som ska användas.

I följande tabell beskrivs andra alternativ som stöds för Pulsar:

Alternativ Standardvärde Description
predefinedSubscription inget Det fördefinierade prenumerationsnamnet som används av anslutningsappen för att spåra spark-programmets förlopp.
subscriptionPrefix inget Ett prefix som används av anslutningsappen för att generera en slumpmässig prenumeration för att spåra spark-programförloppet.
pollTimeoutMs 120000 Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder.
waitingForNonExistedTopic false Om anslutningsappen ska vänta tills de önskade ämnena har skapats.
failOnDataLoss true Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip).
allowDifferentTopicSchemas false Om du läser flera ämnen med olika scheman kan du använda den här parametern för att inaktivera automatisk schemabaserad deserialisering av ämnesvärden. Endast råvärdena returneras när detta är true.
startingOffsets latest Om latestläser läsaren de senaste posterna när den börjar köras. Om earliestläser läsaren från den tidigaste förskjutningen. Användaren kan också ange en JSON-sträng som anger en specifik förskjutning.
maxBytesPerTrigger inget En mjuk gräns för det maximala antalet byte som vi vill bearbeta per mikrobatch. Om detta anges admin.url måste du också anges.
admin.url inget Pulsar-konfigurationen serviceHttpUrl . Behövs endast när maxBytesPerTrigger anges.

Du kan också ange pulsar-klient-, administratörs- och läsarkonfigurationer med hjälp av följande mönster:

Mönster Länka till konfigurationsalternativ
pulsar.client.* Pulsar-klientkonfiguration
pulsar.admin.* Pulsar-administratörskonfiguration
pulsar.reader.* Pulsar-läsarkonfiguration

Konstruktionsstarten förskjuter JSON

Du kan manuellt konstruera ett meddelande-ID för att ange en specifik förskjutning och skicka detta som en JSON till alternativet startingOffsets . Följande kodexempel visar den här syntaxen:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()