Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Use o conector interno para assinar o Google Pub/Sub. Esse conector fornece semântica de processamento exatamente uma vez para registros do assinante.
Observação
Pub/Sub pode publicar registros duplicados ou os registros podem chegar ao assinante fora de ordem. Escreva código para lidar com registros duplicados e fora de ordem.
Configurar um fluxo pub/sub
O exemplo de código a seguir demonstra a sintaxe básica para configurar uma leitura de Streaming Estruturado do Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções para leitura de streaming do Pub/Sub.
Configurar o acesso ao Pub/Sub
As credenciais configuradas devem ter as seguintes funções.
| Funções | Obrigatória ou opcional | Como a função é usada |
|---|---|---|
roles/pubsub.viewer ou roles/viewer |
Obrigatório | Verifica se a assinatura existe e obtém a assinatura. |
roles/pubsub.subscriber |
Obrigatório | Busca dados de uma assinatura. |
roles/pubsub.editor ou roles/editor |
Opcional | Habilita a criação de uma assinatura se não existir uma e possibilita o uso do deleteSubscriptionOnStreamStop para excluir assinaturas ao finalizar o fluxo. |
O Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmailclientIdprivateKeyprivateKeyId
Entender o esquema Pub/Sub
O esquema do streaming corresponde aos registros que são buscados de Pub/Sub, como descrito na tabela a seguir.
| Campo | Tipo |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opções para leitura de streaming do Pub/Sub
A tabela a seguir descreve as opções com suporte do Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando a sintaxe .option("<optionName>", "<optionValue>").
Observação
Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de microlotes. Isso reflete os detalhes da implementação interna e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e processados.
| Opção | Valor padrão | Descrição |
|---|---|---|
numFetchPartitions |
Defina como metade do número de executores presentes na inicialização do fluxo. | O número de tarefas paralelas do Spark que buscam registros de uma assinatura. |
deleteSubscriptionOnStreamStop |
false |
Se true, a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar. |
maxBytesPerTrigger |
none |
Um limite ajustável para o tamanho do lote a ser processado durante cada micro-lote acionado. |
maxRecordsPerFetch |
1000 |
O número de registros a serem buscados por tarefa antes do processamento de registros. |
maxFetchPeriod |
10s |
A duração de tempo necessária para cada tarefa antes do processamento dos registros. Aceita uma cadeia de caracteres de duração, por exemplo, 1s por 1 segundo ou 1m por 1 minuto. O Databricks recomenda o uso do valor padrão. |
Usar o processamento em lote incremental com Pub/Sub
Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub como um lote incremental.
O Azure Databricks registra o timestamp quando você inicia uma leitura com a configuração de Trigger.AvailableNow. Os registros processados pelo lote incluem todos os dados buscados anteriormente e todos os registros recém-publicados com um carimbo de data/hora menor que o carimbo de data/hora de início do fluxo registrado. Para obter mais informações, consulte AvailableNow: Processamento em lote incremental.
Monitorar métricas de streaming Pub/Sub
As métricas de progresso do Streaming Estruturado relatam o número de registros buscados e prontos para serem processados, o tamanho dos registros buscados e prontos para serem processados e o número de duplicatas vistas desde o início do fluxo. O item a seguir é um exemplo destas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
Pub/Sub não dá suporte à execução especulativa (spark.speculation).