Compartilhar via


Assinar o Google Pub/Sub

O Azure Databricks fornece um conector integrado para assinar o Google Pub/Sub no Databricks Runtime 13.3 LTS e superior. Esse conector fornece semântica de processamento exatamente uma vez para registros do assinante.

Observação

Pub/Sub pode publicar registros duplicados e os registros podem chegar ao assinante fora de ordem. Você deve escrever o código do Azure Databricks para lidar com registros duplicados e fora de ordem.

Sintaxe/exemplo

O exemplo de código a seguir demonstra a sintaxe básica para configurar uma leitura de Streaming Estruturado do Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Para obter mais opções de configuração, consulte Configurar opções para de leitura de streaming do Pub/Sub.

Configurar o acesso ao Pub/Sub

O Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

A tabela a seguir descreve as funções necessárias para as credenciais configuradas:

Funções Obrigatória ou opcional Como ele é usado
roles/pubsub.viewer ou roles/viewer Obrigatório Verificar se a assinatura existe e obter assinatura
roles/pubsub.subscriber Obrigatório Buscar dados de uma assinatura
roles/pubsub.editor ou roles/editor Opcional Habilita a criação de uma assinatura se uma não existir e também habilita o uso do deleteSubscriptionOnStreamStop para excluir assinaturas na terminação de fluxo

Esquema Pub/Sub

O esquema do fluxo corresponde aos registros que são buscados do Pub/Sub, conforme descrito na tabela a seguir:

Campo Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opções para leitura de streaming do Pub/Sub

A tabela a seguir descreve as opções com suporte do Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando a sintaxe .option("<optionName>", "<optionValue>").

Observação

Algumas opções de configuração do Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes da implementação interna e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e processados.

Opção Valor padrão Descrição
numFetchPartitions Defina como metade do número de executores presentes na inicialização do fluxo. O número de tarefas paralelas do Spark que buscam registros de uma assinatura.
deleteSubscriptionOnStreamStop false Se true, a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar.
maxBytesPerTrigger nenhum Um limite flexível para o tamanho do lote a ser processado durante cada microlote disparado.
maxRecordsPerFetch 1000 O número de registros a serem buscados por tarefa antes do processamento de registros.
maxFetchPeriod 10 segundos A duração do tempo para cada tarefa a ser buscada antes do processamento de registros. O Databricks recomenda o uso do valor padrão.

Semântica de processamento de lote incremental para Pub/Sub

Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub em um lote incremental.

O Azure Databricks registra o carimbo de data/hora quando você começa uma leitura com a configuração de Trigger.AvailableNow. Os registros processados pelo lote incluem todos os dados buscados anteriormente e todos os registros recém-publicados com um carimbo de data/hora menor que o carimbo de data/hora de início do fluxo registrado.

Confira Configuração do processamento em lote incremental.

Monitorar métricas de streaming

As métricas de progresso do Streaming Estruturado relatam o número de registros buscados e prontos para serem processados, o tamanho dos registros buscados e prontos para serem processados e o número de duplicatas vistas desde o início do fluxo. O item a seguir é um exemplo destas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

Não há suporte para execução especulativa (spark.speculation) com Pub/Sub.