Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esse recurso está em uma versão prévia.
No Databricks Runtime 14.1 e superior, você pode usar o Streaming Estruturado para transmitir dados do Apache Pulsar no Azure Databricks.
O Structured Streaming fornece semântica de processamento única para dados lidos a partir de fontes do Pulsar.
Exemplo de sintaxe
Veja a seguir um exemplo básico de como usar o Structured Streaming para ler do Pulsar:
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Para ler tópicos do Pulsar, você deve fornecer service.url e uma das seguintes opções:
topictopicstopicsPattern
Para obter uma lista completa de opções, consulte Configurar opções para leitura de streaming do Pulsar.
Autentique-se no Pulsar
O Azure Databricks suporta autenticação com truststore e keystore no Pulsar. O Databricks recomenda que você use segredos para armazenar detalhes de configuração.
As opções de configuração de fluxo disponíveis incluem o seguinte:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
Se o fluxo usar um PulsarAdmin, você deverá definir as seguintes opções:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Example
O exemplo a seguir demonstra a configuração das opções de autenticação:
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Esquema do Pulsar
Quando você lê dados do Pulsar, o esquema das linhas depende dos esquemas dos tópicos da fonte.
- Para os tópicos com esquema Avro ou JSON, os nomes de campo e os tipos de campo são preservados no DataFrame do Spark resultante.
- Para os tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma coluna
value. - Se você configurar o fluxo para ler vários tópicos com esquemas diferentes, defina
allowDifferentTopicSchemaspara carregar o conteúdo bruto em umavaluecoluna.
Os registros Pulsar têm os seguintes campos de metadados:
| Coluna | Tipo |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configurar opções para leitura de streaming do Pulsar
Configure todas as opções a seguir usando a sintaxe .option("<optionName>", "<optionValue>") para fluxos de leitura. Você também pode configurar a autenticação usando .options(). Consulte Autenticar no Pulsar.
A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topic, topics ou topicsPattern.
| Opção | Valor padrão | Descrição |
|---|---|---|
service.url |
nenhum | A configuração do Pulsar serviceUrl para o serviço Pulsar. |
topic |
nenhum | Uma cadeia de caracteres de nome de tópico para o tópico a ser consumido. |
topics |
nenhum | Uma lista de tópicos separada por vírgulas a serem consumidos. |
topicsPattern |
nenhum | Uma string de regex Java para corresponder aos tópicos a serem consumidos. |
A tabela a seguir descreve outras opções com suporte do Pulsar:
| Opção | Valor padrão | Descrição |
|---|---|---|
predefinedSubscription |
nenhum | O nome de assinatura predefinido usado pelo conector para monitorar o progresso do aplicativo Spark. |
subscriptionPrefix |
nenhum | Um prefixo utilizado pelo conector para gerar uma assinatura aleatória a fim de acompanhar o progresso da aplicação Spark. |
pollTimeoutMs |
120000 | O tempo limite para ler mensagens do Pulsar em milissegundos. |
waitingForNonExistedTopic |
false |
Se o conector deve aguardar até que os tópicos desejados sejam criados. |
failOnDataLoss |
true |
Controla se uma consulta deve falhar quando os dados são perdidos (por exemplo, os tópicos são excluídos ou as mensagens são excluídas devido à política de retenção). |
allowDifferentTopicSchemas |
false |
Se vários tópicos com esquemas diferentes forem lidos, use essa opção para desativar a desserialização automática do valor do tópico baseado em esquema. Somente os valores brutos são retornados quando isso for true. |
startingOffsets |
latest |
Se latest, o leitor lê os registros mais recentes após iniciar a execução. Se earliest, o leitor faz a leitura a partir do deslocamento mais antigo. Você também pode especificar uma cadeia de caracteres JSON para um deslocamento específico. |
maxBytesPerTrigger |
nenhum | Um limite flexível para o número máximo de bytes a serem processados por microlote. Se você especificar essa opção, também deverá especificar admin.url. |
admin.url |
nenhum | A configuração serviceHttpUrl do Pulsar. Necessário quando maxBytesPerTrigger está especificado. |
Você também pode especificar as configurações de cliente, administrador e leitor do Pulsar usando os seguintes padrões:
| Padrão | Opções de configuração |
|---|---|
pulsar.client.* |
Configuração do cliente do Pulsar |
pulsar.admin.* |
Configuração do administrador do Pulsar |
pulsar.reader.* |
Configuração do leitor do Pulsar |
Construir deslocamentos iniciais em JSON
Para usar uma ID de mensagem personalizada que especifica um deslocamento, como JSON, com a opção startingOffsets , consulte o exemplo a seguir:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()