Assinar o Google Pub/Sub
O Azure Databricks fornece um conector integrado para assinar o Google Pub/Sub no Databricks Runtime 13.3 LTS e superior. Esse conector fornece semântica de processamento exatamente uma vez para registros do assinante.
Observação
Pub/Sub pode publicar registros duplicados e os registros podem chegar ao assinante fora de ordem. Você deve escrever o código do Azure Databricks para lidar com registros duplicados e fora de ordem.
Sintaxe/exemplo
O exemplo de código a seguir demonstra a sintaxe básica para configurar uma leitura de Streaming Estruturado do Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções para de leitura de streaming do Pub/Sub.
Configurar o acesso ao Pub/Sub
O Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmail
clientId
privateKey
privateKeyId
A tabela a seguir descreve as funções necessárias para as credenciais configuradas:
Funções | Obrigatória ou opcional | Como ele é usado |
---|---|---|
roles/pubsub.viewer ou roles/viewer |
Obrigatório | Verificar se a assinatura existe e obter assinatura |
roles/pubsub.subscriber |
Obrigatório | Buscar dados de uma assinatura |
roles/pubsub.editor ou roles/editor |
Opcional | Habilita a criação de uma assinatura se uma não existir e também habilita o uso do deleteSubscriptionOnStreamStop para excluir assinaturas na terminação de fluxo |
Esquema Pub/Sub
O esquema do fluxo corresponde aos registros que são buscados do Pub/Sub, conforme descrito na tabela a seguir:
Campo | Type |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opções para leitura de streaming do Pub/Sub
A tabela a seguir descreve as opções com suporte do Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando a sintaxe .option("<optionName>", "<optionValue>")
.
Observação
Algumas opções de configuração do Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes da implementação interna e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e processados.
Opção | Valor padrão | Descrição |
---|---|---|
numFetchPartitions |
Defina como metade do número de executores presentes na inicialização do fluxo. | O número de tarefas paralelas do Spark que buscam registros de uma assinatura. |
deleteSubscriptionOnStreamStop |
false |
Se true , a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar. |
maxBytesPerTrigger |
nenhum | Um limite flexível para o tamanho do lote a ser processado durante cada microlote disparado. |
maxRecordsPerFetch |
1000 | O número de registros a serem buscados por tarefa antes do processamento de registros. |
maxFetchPeriod |
10 segundos | A duração do tempo para cada tarefa a ser buscada antes do processamento de registros. O Databricks recomenda o uso do valor padrão. |
Semântica de processamento de lote incremental para Pub/Sub
Você pode usar Trigger.AvailableNow
para consumir registros disponíveis das fontes Pub/Sub em um lote incremental.
O Azure Databricks registra o carimbo de data/hora quando você começa uma leitura com a configuração de Trigger.AvailableNow
. Os registros processados pelo lote incluem todos os dados buscados anteriormente e todos os registros recém-publicados com um carimbo de data/hora menor que o carimbo de data/hora de início do fluxo registrado.
Confira Configuração do processamento em lote incremental.
Monitorar métricas de streaming
As métricas de progresso do Streaming Estruturado relatam o número de registros buscados e prontos para serem processados, o tamanho dos registros buscados e prontos para serem processados e o número de duplicatas vistas desde o início do fluxo. O item a seguir é um exemplo destas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
Não há suporte para execução especulativa (spark.speculation
) com Pub/Sub.