Partilhar via


Inscrever-se no Google Pub/Sub

O Azure Databricks fornece um conector embutido para subscrever ao Google Pub/Sub no Databricks Runtime 13.3 LTS e versões posteriores. Este conector fornece processamento com semântica de exatamente uma vez para dados do assinante.

Nota

Pub/Sub pode publicar registros duplicados e os registros podem chegar ao assinante fora de ordem. Você deve escrever o código do Azure Databricks para lidar com registros duplicados e fora de ordem.

Exemplo de sintaxe

O exemplo de código a seguir demonstra a sintaxe básica para configurar um Structured Streaming lido de Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Para obter mais opções de configuração, consulte Configurar opções de leitura de streaming do Pub/Sub.

Configurar o acesso a Pub/Sub

A tabela a seguir descreve as funções necessárias para as credenciais configuradas:

Funções Obrigatório ou opcional Como é utilizado
roles/pubsub.viewer ou roles/viewer Necessário Verifique se existe uma subscrição e obtenha uma subscrição
roles/pubsub.subscriber Necessário Buscar dados de uma assinatura
roles/pubsub.editor ou roles/editor Opcional Permite a criação de uma assinatura, caso não exista, e também permite o uso do deleteSubscriptionOnStreamStop para excluir assinaturas no encerramento de fluxo.

A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Esquema Pub/Sub

O esquema para o fluxo corresponde aos registos que são extraídos do Pub/Sub, conforme descrito na tabela a seguir:

Campo Tipo
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opções para leitura de streaming Pub/Sub

A tabela a seguir descreve as opções suportadas para Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Structured Streaming usando a sintaxe .option("<optionName>", "<optionValue>").

Nota

Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes internos da implementação, e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e, em seguida, processados.

Opção Valor predefinido Descrição
numFetchPartitions Defina como metade do número de executores presentes na inicialização do fluxo. O número de tarefas paralelas do Spark que obtêm registos de uma subscrição.
deleteSubscriptionOnStreamStop false Se true, a assinatura passada para o fluxo é excluída quando o trabalho de streaming termina.
maxBytesPerTrigger nenhum Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado.
maxRecordsPerFetch 1000 O número de registros a serem buscados por tarefa antes de processar registros.
maxFetchPeriod 10 segundos A duração de cada tarefa para buscar antes de processar os registos. O Databricks recomenda o uso do valor padrão.

Semântica incremental de processamento em lote para Pub/Sub

Você pode usar Trigger.AvailableNow para consumir registos disponíveis das fontes Pub/Sub em lotes incrementais.

O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow configuração. Os registos processados pelo lote de processamento incluem todos os dados obtidos anteriormente e quaisquer registos recém-publicados com um timestamp inferior ao timestamp de início do fluxo registado.

Ver AvailableNow: Processamento em lote incremental.

Monitoramento de métricas de streaming

As métricas de progresso do Streaming estruturado relatam o número de registros buscados e prontos para processar, o tamanho dos registros buscados e prontos para processar e o número de duplicatas vistas desde o início do fluxo. Segue-se um exemplo destas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

A execução especulativa (spark.speculation) não é suportada com Pub/Sub.