Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Esta página descreve os conceitos básicos de marca de água e fornece recomendações para o uso de marcas de água em operações comuns de streaming com estado. Deve aplicar marcas de água às operações de streaming com estado para evitar expandir infinitamente a quantidade de dados mantidos no estado, o que pode introduzir problemas de memória ou aumentar as latências de processamento durante operações de streaming de longa duração.
O que é uma marca d'água?
O Streaming Estruturado usa marcas d'água para controlar o limite por quanto tempo continuar processando atualizações para uma determinada entidade de estado. Exemplos comuns de entidades estatais incluem:
- Agregações ao longo de uma janela temporal.
- Chaves únicas numa união entre dois fluxos.
Quando declara uma marca temporal, especifica um campo de timestamp e um limiar de marca temporal em um DataFrame de fluxo contínuo. À medida que novos dados chegam, o gestor de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa todos os registos dentro do limite de atraso permitido.
O exemplo seguinte aplica um limiar de marca de água de 10 minutos a uma contagem em janelas:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Neste exemplo:
- A coluna
event_timeé utilizada para definir uma marca de água de 10 minutos e uma janela deslizante de 5 minutos. - É recolhida uma contagem para cada janela de 5 minutos não sobreposta observada para cada
id. - A informação do estado é mantida para cada contagem até que o final da janela seja 10 minutos mais antigo do que o último observado
event_time.
Importante
Os limiares de marca de água garantem que os registos que chegam dentro do limiar especificado são processados de acordo com a semântica da consulta definida. Registos que chegam tarde e que ultrapassam o limiar especificado podem ainda ser processados usando métricas de consulta, mas isso não é garantido.
Como é que as marcas de água afetam o tempo de processamento e o rendimento?
As marcas d'água interagem com os modos de saída para controlar quando os dados são gravados no coletor. Como os marcadores temporais reduzem a quantidade total de informações de estado a serem processadas, o uso eficaz desses marcadores é essencial para uma transmissão eficiente em fluxo contínuo com monitoração de estado.
Nota
Nem todos os modos de saída são suportados para todas as operações com estado.
Marcas de água e modo de saída para agregações em janelas
Os detalhes da tabela a seguir explicam o processamento para consultas com agregação em uma marca temporal com uma marca de água definida.
| Modo de saída | Comportamento |
|---|---|
| Acrescentar | As linhas são escritas na tabela de destino depois de ultrapassado o limite de tempo. Todas as gravações são atrasadas com base no limite de atraso. O antigo estado de agregação é abandonado após o limiar ser ultrapassado. |
| Atualização | As linhas são escritas na tabela alvo à medida que os resultados são calculados, podendo ser atualizadas e sobrescrevidas à medida que chegam novos dados. O antigo estado de agregação é abandonado após o limiar ter sido ultrapassado. |
| Concluído | O estado de agregação não é eliminado. A tabela de destino é reescrita a cada gatilho. |
Marcas de água e saída para uniões entre fluxos de dados
As junções entre vários fluxos suportam apenas o modo de acréscimo, e os registros correspondentes são gravados em cada lote em que são descobertos. Para junções internas, o Databricks recomenda definir um limiar de watermark em cada fonte de dados em streaming. Isso permite que as informações de estado sejam descartadas para registros antigos. Sem watermarks, o Structured Streaming tenta juntar todas as chaves de ambos os lados da união com cada trigger.
O Streaming Estruturado tem semântica especial para suportar junções externas. A marca d'água é obrigatória para junções externas, pois indica quando uma chave deve ser escrita com um valor nulo depois de não ter correspondência. Embora as junções externas possam ser úteis para registar registos que nunca são correspondidos durante o processamento de dados, porque as junções só escrevem em tabelas como operações de adição, esses dados em falta só são registados depois de o limiar de atraso ter ultrapassado.
Controla o limite de dados atrasados com uma política de múltiplos watermarks no Structured Streaming
Ao trabalhar com múltiplas entradas de Streaming Estruturado, pode definir múltiplas marcas de água para controlar os limiares de tolerância para dados que chegam tarde. Configurar marcas d'água permite controlar as informações de estado e impacta a latência.
Uma consulta de streaming pode ter vários fluxos de entrada que são unidos ou juntados. Cada um dos fluxos de entrada pode ter um limite diferente de dados atrasados que precisa ser tolerado para operações com monitoração de estado. Especifique esses limites usando withWatermarks("eventTime", delay) em cada um dos fluxos de entrada. Segue-se um exemplo de consulta com junções stream-stream.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Durante a execução da consulta, o Structured Streaming acompanha individualmente o tempo máximo de evento observado em cada fluxo de entrada, calcula watermarks com base no atraso correspondente e escolhe um único watermark global para ser usado em operações com estado. Por padrão, o mínimo é escolhido como limite de retenção global porque evita que os dados sejam acidentalmente descartados como muito atrasados se um dos fluxos estiver mais atrasado que os outros (por exemplo, um dos fluxos deixa de receber dados devido a falhas na origem). Ou seja, a marca d'água global move-se com segurança ao ritmo do fluxo de dados mais lento e a saída da consulta é atrasada em conformidade.
Se quiser obter resultados mais rápidos, pode definir a política de várias marcas de água para escolher o valor máximo como marca global, definindo a configuração SQL spark.sql.streaming.multipleWatermarkPolicy para max (o padrão é min). Isto permite que o indicador global de progresso avance ao ritmo da corrente mais rápida. No entanto, essa configuração descarta dados dos fluxos mais lentos. A Databricks recomenda usar esta configuração de forma criteriosa.
Aplique marcas de água a operações distintas
A distinct operação é um operador com estado que requer watermarks para evitar o crescimento ilimitado do estado. Sem watermarks, o Structured Streaming tenta acompanhar cada registo único indefinidamente, o que pode causar problemas de memória ou aumentar as latências de processamento.
Quando aplica distinct a um DataFrame em streaming, deve especificar uma marca de água num campo de data/hora. A marca de água controla quanto tempo o gestor estadual mantém registos para a deduplicação. Após a passagem do limiar da marca de água, os registos antigos são removidos do estado.
O exemplo seguinte aplica uma marca de água a uma distinct operação:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Neste exemplo, registos duplicados que chegam dentro de 1 hora após o último observado eventTime são removidos do fluxo. A informação de estado para a deduplicação é eliminada após a passagem do limite.
Importante
Se precisares de deduplicar em colunas específicas em vez de todas, usa dropDuplicates() ou dropDuplicatesWithinWatermark() em vez de distinct. Consulte a próxima seção para obter detalhes.
Colocar duplicados dentro da marca de água
No Databricks Runtime 13.3 LTS ou posteriores, pode eliminar registos duplicados dentro de um limite de "watermark" usando um identificador único.
O Structured Streaming oferece garantias de processamento exatamente uma vez, mas não duplica automaticamente registos das fontes de dados. Pode usar dropDuplicatesWithinWatermark para desduplicar registos em qualquer campo especificado, permitindo-lhe remover duplicados de um fluxo mesmo que alguns campos diferem (como hora do evento ou hora de chegada).
Registos duplicados que chegam dentro do limite de tempo especificado são garantidamente eliminados. Essa garantia é rigorosa em apenas uma direção, e registros duplicados que chegam fora do limite especificado também podem ser descartados. Deve definir o limiar de atraso da marca temporal maior do que as diferenças máximas de carimbo de tempo entre eventos duplicados para remover todos os duplicados.
É necessário especificar uma marca de água para usar o método dropDuplicatesWithinWatermark, como no seguinte exemplo:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))