Condividi tramite


Flusso da Apache Pulsar

Importante

Questa funzionalità è disponibile in anteprima pubblica.

In Databricks Runtime 14.1 e versioni successive è possibile usare Structured Streaming per trasmettere dati da Apache Pulsar in Azure Databricks.

Structured Streaming offre una semantica di elaborazione esattamente una volta per i dati letti dalle origini Pulsar.

Esempio di sintassi

Di seguito è riportato un esempio di base dell'uso di Structured Streaming per leggere da Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Per specificare gli argomenti, è necessario specificare sempre un service.url e una delle opzioni seguenti:

  • topic
  • topics
  • topicsPattern

Per un elenco completo delle opzioni, vedere Configurare le opzioni per la lettura in streaming Pulsar.

Eseguire l'autenticazione a Pulsar

Azure Databricks supporta l'autenticazione dell'archivio attendibilità e dell'archivio chiavi in Pulsar. Databricks consiglia di usare i segreti durante l'archiviazione dei dettagli di configurazione.

È possibile impostare le opzioni seguenti durante la configurazione del flusso:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se il flusso usa , PulsarAdminimpostare anche quanto segue:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

L'esempio seguente illustra la configurazione delle opzioni di autenticazione:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Schema Pulsar

Lo schema dei record letti da Pulsar dipende dal modo in cui gli argomenti hanno i relativi schemi codificati.

  • Per gli argomenti con lo schema Avro o JSON, i nomi dei campi e i tipi di campo vengono mantenuti nel dataframe Spark risultante.
  • Per gli argomenti senza schema o con un tipo di dati semplice in Pulsar, il payload viene caricato in una value colonna.
  • Se il lettore è configurato per leggere più argomenti con schemi diversi, impostare allowDifferentTopicSchemas per caricare il contenuto non elaborato in una value colonna.

I record Pulsar hanno i campi di metadati seguenti:

Column Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurare le opzioni per la lettura in streaming Pulsar

Tutte le opzioni vengono configurate come parte di una lettura structured streaming usando .option("<optionName>", "<optionValue>") la sintassi. È anche possibile configurare l'autenticazione usando le opzioni. Vedere Eseguire l'autenticazione a Pulsar.

Nella tabella seguente vengono descritte le configurazioni necessarie per Pulsar. È necessario specificare solo una delle opzioni topico topicstopicsPattern.

Opzione Valore predefinito Descrizione
service.url Nessuno Configurazione pulsar per il servizio Pulsar serviceUrl .
topic Nessuno Stringa del nome dell'argomento da utilizzare.
topics Nessuno Elenco delimitato da virgole degli argomenti da utilizzare.
topicsPattern Nessuno Stringa regex Java da trovare in base agli argomenti da utilizzare.

La tabella seguente descrive altre opzioni supportate per Pulsar:

Opzione Valore predefinito Descrizione
predefinedSubscription Nessuno Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark.
subscriptionPrefix Nessuno Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark.
pollTimeoutMs 120000 Timeout per la lettura dei messaggi da Pulsar in millisecondi.
waitingForNonExistedTopic false Indica se il connettore deve attendere fino a quando non vengono creati gli argomenti desiderati.
failOnDataLoss true Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione).
allowDifferentTopicSchemas false Se vengono letti più argomenti con schemi diversi, usare questo parametro per disattivare la deserializzazione automatica del valore dell'argomento basato su schema. Quando si tratta truedi , vengono restituiti solo i valori non elaborati.
startingOffsets latest Se latest, il lettore legge i record più recenti dopo l'avvio dell'esecuzione. Se earliest, il lettore legge dall'offset meno recente. L'utente può anche specificare una stringa JSON che specifica un offset specifico.
maxBytesPerTrigger Nessuno Limite flessibile del numero massimo di byte da elaborare per microbatch. Se questa opzione è specificata, admin.url è necessario specificare anche .
admin.url Nessuno Configurazione di Pulsar serviceHttpUrl . È necessario solo quando maxBytesPerTrigger viene specificato .

È anche possibile specificare qualsiasi configurazione client, amministratore e lettore Pulsar usando i modelli seguenti:

Modello Collegamento alle opzioni di conifigurazione
pulsar.client.* Configurazione del client Pulsar
pulsar.admin.* Configurazione dell'amministratore pulsar
pulsar.reader.* Configurazione del lettore Pulsar

Costruire offset iniziali JSON

È possibile costruire manualmente un ID messaggio per specificare un offset specifico e passarlo come JSON all'opzione startingOffsets . L'esempio di codice seguente illustra questa sintassi:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()