Fluxo do Apache Pulsar

Importante

Esse recurso está em uma versão prévia.

No Databricks Runtime 14.1 e superior, você pode usar o Streaming Estruturado para transmitir dados do Apache Pulsar no Azure Databricks.

O Structured Streaming fornece semântica de processamento única para dados lidos a partir de fontes do Pulsar.

Exemplo de sintaxe

Veja a seguir um exemplo básico de como usar o Structured Streaming para ler do Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Para ler tópicos do Pulsar, você deve fornecer service.url e uma das seguintes opções:

  • topic
  • topics
  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para leitura de streaming do Pulsar.

Autentique-se no Pulsar

O Azure Databricks suporta autenticação com truststore e keystore no Pulsar. O Databricks recomenda que você use segredos para armazenar detalhes de configuração.

As opções de configuração de fluxo disponíveis incluem o seguinte:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se o fluxo usar um PulsarAdmin, você deverá definir as seguintes opções:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

O exemplo a seguir demonstra a configuração das opções de autenticação:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Esquema do Pulsar

Quando você lê dados do Pulsar, o esquema das linhas depende dos esquemas dos tópicos da fonte.

  • Para os tópicos com esquema Avro ou JSON, os nomes de campo e os tipos de campo são preservados no DataFrame do Spark resultante.
  • Para os tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma coluna value.
  • Se você configurar o fluxo para ler vários tópicos com esquemas diferentes, defina allowDifferentTopicSchemas para carregar o conteúdo bruto em uma value coluna.

Os registros Pulsar têm os seguintes campos de metadados:

Coluna Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurar opções para leitura de streaming do Pulsar

Configure todas as opções a seguir usando a sintaxe .option("<optionName>", "<optionValue>") para fluxos de leitura. Você também pode configurar a autenticação usando .options(). Consulte Autenticar no Pulsar.

A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topic, topics ou topicsPattern.

Opção Valor padrão Descrição
service.url nenhum A configuração do Pulsar serviceUrl para o serviço Pulsar.
topic nenhum Uma cadeia de caracteres de nome de tópico para o tópico a ser consumido.
topics nenhum Uma lista de tópicos separada por vírgulas a serem consumidos.
topicsPattern nenhum Uma string de regex Java para corresponder aos tópicos a serem consumidos.

A tabela a seguir descreve outras opções com suporte do Pulsar:

Opção Valor padrão Descrição
predefinedSubscription nenhum O nome de assinatura predefinido usado pelo conector para monitorar o progresso do aplicativo Spark.
subscriptionPrefix nenhum Um prefixo utilizado pelo conector para gerar uma assinatura aleatória a fim de acompanhar o progresso da aplicação Spark.
pollTimeoutMs 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
waitingForNonExistedTopic false Se o conector deve aguardar até que os tópicos desejados sejam criados.
failOnDataLoss true Controla se uma consulta deve falhar quando os dados são perdidos (por exemplo, os tópicos são excluídos ou as mensagens são excluídas devido à política de retenção).
allowDifferentTopicSchemas false Se vários tópicos com esquemas diferentes forem lidos, use essa opção para desativar a desserialização automática do valor do tópico baseado em esquema. Somente os valores brutos são retornados quando isso for true.
startingOffsets latest Se latest, o leitor lê os registros mais recentes após iniciar a execução. Se earliest, o leitor faz a leitura a partir do deslocamento mais antigo. Você também pode especificar uma cadeia de caracteres JSON para um deslocamento específico.
maxBytesPerTrigger nenhum Um limite flexível para o número máximo de bytes a serem processados por microlote. Se você especificar essa opção, também deverá especificar admin.url.
admin.url nenhum A configuração serviceHttpUrl do Pulsar. Necessário quando maxBytesPerTrigger está especificado.

Você também pode especificar as configurações de cliente, administrador e leitor do Pulsar usando os seguintes padrões:

Padrão Opções de configuração
pulsar.client.* Configuração do cliente do Pulsar
pulsar.admin.* Configuração do administrador do Pulsar
pulsar.reader.* Configuração do leitor do Pulsar

Construir deslocamentos iniciais em JSON

Para usar uma ID de mensagem personalizada que especifica um deslocamento, como JSON, com a opção startingOffsets , consulte o exemplo a seguir:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()