Assinar o Google Pub/Sub

Use o conector interno para assinar o Google Pub/Sub. Esse conector fornece semântica de processamento exatamente uma vez para registros do assinante.

Observação

Pub/Sub pode publicar registros duplicados ou os registros podem chegar ao assinante fora de ordem. Escreva código para lidar com registros duplicados e fora de ordem.

Configurar um fluxo pub/sub

O exemplo de código a seguir demonstra a sintaxe básica para configurar uma leitura de Streaming Estruturado do Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Para obter mais opções de configuração, consulte Configurar opções para leitura de streaming do Pub/Sub.

Configurar o acesso ao Pub/Sub

As credenciais configuradas devem ter as seguintes funções.

Funções Obrigatória ou opcional Como a função é usada
roles/pubsub.viewer ou roles/viewer Obrigatório Verifica se a assinatura existe e obtém a assinatura.
roles/pubsub.subscriber Obrigatório Busca dados de uma assinatura.
roles/pubsub.editor ou roles/editor Opcional Habilita a criação de uma assinatura se não existir uma e possibilita o uso do deleteSubscriptionOnStreamStop para excluir assinaturas ao finalizar o fluxo.

O Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Entender o esquema Pub/Sub

O esquema do streaming corresponde aos registros que são buscados de Pub/Sub, como descrito na tabela a seguir.

Campo Tipo
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opções para leitura de streaming do Pub/Sub

A tabela a seguir descreve as opções com suporte do Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando a sintaxe .option("<optionName>", "<optionValue>").

Observação

Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de microlotes. Isso reflete os detalhes da implementação interna e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e processados.

Opção Valor padrão Descrição
numFetchPartitions Defina como metade do número de executores presentes na inicialização do fluxo. O número de tarefas paralelas do Spark que buscam registros de uma assinatura.
deleteSubscriptionOnStreamStop false Se true, a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar.
maxBytesPerTrigger none Um limite ajustável para o tamanho do lote a ser processado durante cada micro-lote acionado.
maxRecordsPerFetch 1000 O número de registros a serem buscados por tarefa antes do processamento de registros.
maxFetchPeriod 10s A duração de tempo necessária para cada tarefa antes do processamento dos registros. Aceita uma cadeia de caracteres de duração, por exemplo, 1s por 1 segundo ou 1m por 1 minuto. O Databricks recomenda o uso do valor padrão.

Usar o processamento em lote incremental com Pub/Sub

Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub como um lote incremental.

O Azure Databricks registra o timestamp quando você inicia uma leitura com a configuração de Trigger.AvailableNow. Os registros processados pelo lote incluem todos os dados buscados anteriormente e todos os registros recém-publicados com um carimbo de data/hora menor que o carimbo de data/hora de início do fluxo registrado. Para obter mais informações, consulte AvailableNow: Processamento em lote incremental.

Monitorar métricas de streaming Pub/Sub

As métricas de progresso do Streaming Estruturado relatam o número de registros buscados e prontos para serem processados, o tamanho dos registros buscados e prontos para serem processados e o número de duplicatas vistas desde o início do fluxo. O item a seguir é um exemplo destas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

Pub/Sub não dá suporte à execução especulativa (spark.speculation).