Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Use o conector interno para assinar o Google Pub/Sub. Este conector tem semântica de processamento de exatamente uma vez para as linhas provenientes do assinante.
Observação
Pub/Sub pode publicar linhas duplicadas ou as linhas podem chegar ao assinante fora de ordem. Você deve escrever código para lidar com linhas duplicadas e fora de ordem.
Configurar um fluxo pub/sub
O exemplo de código a seguir mostra como configurar uma leitura de Streaming Estruturado do Pub/Sub e autenticar com chaves privadas.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Para obter mais opções de configuração, consulte Configurar opções para leitura de streaming do Pub/Sub.
Configurar o acesso ao Pub/Sub
Suas credenciais devem ter as seguintes funções:
| Funções | Obrigatória ou opcional | Como a função é usada |
|---|---|---|
roles/pubsub.viewer ou roles/viewer |
Obrigatório | Verifica se a assinatura existe e obtém a assinatura. |
roles/pubsub.subscriber |
Obrigatório | Busca dados de uma assinatura. |
roles/pubsub.editor ou roles/editor |
Opcional | Habilita a criação de uma assinatura se não existir uma e possibilita o uso do deleteSubscriptionOnStreamStop para excluir assinaturas ao finalizar o fluxo. |
O Databricks recomenda que você use segredos ao usar chaves. As seguintes opções são necessárias para autorizar uma conexão:
clientEmailclientIdprivateKeyprivateKeyId
Entender o esquema Pub/Sub
O esquema do streaming corresponde às linhas recuperadas do Pub/Sub, conforme descrito na tabela a seguir:
| Campo | Tipo |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opções para leitura de streaming do Pub/Sub
A tabela a seguir descreve as opções com suporte do Pub/Sub. Todas as opções são configuradas no seu leitor de fluxo com .option("<optionName>", "<optionValue>").
Observação
Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de microlotes. Este é um detalhe interno de implementação, e as opções funcionam de forma semelhante à de outros conectores de Streaming Estruturado, exceto pelo fato de que as linhas são recuperadas e depois processadas.
| Chave | Valor padrão | Descrição |
|---|---|---|
numFetchPartitions |
Defina como metade do número de executores presentes na inicialização do fluxo. | O número de tarefas paralelas do Spark que buscam linhas de uma assinatura. |
deleteSubscriptionOnStreamStop |
false |
Se true, a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar. |
maxBytesPerTrigger |
none |
Um limite ajustável para o tamanho do lote a ser processado durante cada micro-lote acionado. |
maxRecordsPerFetch |
1000 |
O número de linhas a serem buscadas por tarefa antes de processar linhas. |
maxFetchPeriod |
10s |
O intervalo de tempo para buscar cada tarefa antes de processar as linhas. Aceita uma cadeia de caracteres de duração, por exemplo, 1s por 1 segundo ou 1m por 1 minuto. O Databricks recomenda o uso do valor padrão. |
Usar o processamento em lote incremental com Pub/Sub
Você pode usar Trigger.AvailableNow para consumir linhas disponíveis das fontes Pub/Sub como um lote incremental.
O Azure Databricks registra o timestamp quando você inicia uma leitura com a configuração de Trigger.AvailableNow. As linhas processadas pelo lote incluem todos os dados obtidos anteriormente e quaisquer linhas recém-publicadas com timestamp anterior ao timestamp de início registrado. Para obter mais informações, consulte AvailableNow: Processamento em lote incremental.
Monitorar métricas de streaming Pub/Sub
As métricas de progresso do Streaming Estruturado relatam o número de linhas buscadas e prontas para processar, o tamanho das linhas buscadas e prontas para processar e o número de duplicatas vistas desde o início do fluxo.
Veja a seguir um exemplo de métricas pub/sub:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
Pub/Sub não dá suporte à execução especulativa com spark.speculation.