Partilhar via


Inscrever-se no Google Pub/Sub

O Azure Databricks fornece um conector interno para assinar o Google Pub/Sub no Databricks Runtime 13.3 LTS e superior. Este conector fornece semântica de processamento exatamente uma vez para registros do assinante.

Nota

Pub/Sub pode publicar registros duplicados e os registros podem chegar ao assinante fora de ordem. Você deve escrever o código do Azure Databricks para lidar com registros duplicados e fora de ordem.

Exemplo de sintaxe

O exemplo de código a seguir demonstra a sintaxe básica para configurar um Structured Streaming lido de Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Para obter mais opções de configuração, consulte Configurar opções para leitura de streaming Pub/Sub.

Configurar o acesso a Pub/Sub

A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

A tabela a seguir descreve as funções necessárias para as credenciais configuradas:

Funções Obrigatório ou opcional Como é utilizado
roles/pubsub.viewer ou roles/viewer Necessário Verifique se existe uma subscrição e obtenha uma subscrição
roles/pubsub.subscriber Necessário Buscar dados de uma assinatura
roles/pubsub.editor ou roles/editor Opcional Permite a criação de uma assinatura, caso não exista, e também permite o deleteSubscriptionOnStreamStop uso da para excluir assinaturas na terminação de fluxo

Esquema Pub/Sub

O esquema para o fluxo corresponde aos registros que são buscados de Pub/Sub, conforme descrito na tabela a seguir:

Campo Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opções para leitura de streaming Pub/Sub

A tabela a seguir descreve as opções suportadas para Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando .option("<optionName>", "<optionValue>") sintaxe.

Nota

Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes internos da implementação, e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e, em seguida, processados.

Opção Valor predefinido Description
numFetchPartitions Defina como metade do número de executores presentes na inicialização do fluxo. O número de tarefas paralelas do Spark que buscam registros de uma assinatura.
deleteSubscriptionOnStreamStop false Se trueo , a assinatura passada para o fluxo for excluída quando o trabalho de streaming terminar.
maxBytesPerTrigger nenhum Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado.
maxRecordsPerFetch 1000 O número de registros a serem buscados por tarefa antes de processar registros.
maxFetchPeriod 10 segundos A duração do tempo para cada tarefa a ser buscada antes de processar registros. O Databricks recomenda o uso do valor padrão.

Semântica incremental de processamento em lote para Pub/Sub

Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub um lote incremental.

O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow configuração. Os registros processados pelo lote incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora menor do que o carimbo de data/hora de início do fluxo gravado.

Consulte Configurando o processamento incremental em lote.

Monitoramento de métricas de streaming

As métricas de progresso do Streaming estruturado relatam o número de registros buscados e prontos para processar, o tamanho dos registros buscados e prontos para processar e o número de duplicatas vistas desde o início do fluxo. Segue-se um exemplo destas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

A execução especulativa (spark.speculation) não é suportada com Pub/Sub.