Otimizar o processamento com estado em Delta Live Tables com marcas d'água
Para gerenciar efetivamente os dados mantidos no estado, use marcas d'água ao executar o processamento de fluxo com estado em Delta Live Tables, incluindo agregações, junções e eliminação de duplicação. Este artigo descreve como usar marcas d'água em suas consultas Delta Live Tables e inclui exemplos das operações recomendadas.
Observação
Para garantir que as consultas que executam agregações sejam processadas incrementalmente e não totalmente recomputadas com cada atualização, você precisa usar marcas d'água.
O que é uma marca d'água?
No processamento de fluxo, uma marca d'água é um recurso do Apache Spark que pode definir um limite baseado em tempo para processar dados ao executar operações com estado, como agregações. Os dados que chegam são processados até que o limite seja atingido, momento em que a janela de tempo definida pelo limite é fechada. As marcas d'água podem ser usadas para evitar problemas durante o processamento de consulta, principalmente ao processar conjuntos de dados maiores ou processamento de execução prolongada. Esses problemas podem incluir alta latência na produção de resultados e até mesmo erros de OOM (memória insuficiente) devido à quantidade de dados mantidos no estado durante o processamento. Como os dados de streaming são inerentemente não ordenados, as marcas d'água também dão suporte ao cálculo correto de operações como agregações de janela de tempo.
Para saber mais sobre como usar marcas d'água no processamento de fluxo, consulte Marca d'água no streaming estruturado do Apache Spark e Aplicar marcas d'água para controlar os limites de processamento de dados.
Como definir uma marca-d'água?
Você define uma marca d'água especificando um campo de carimbo de data/hora e um valor que representa o limite de tempo para dados atrasados chegarem. Os dados serão considerados atrasados se chegarem após o limite de tempo definido. Por exemplo, se o limite for definido como dez minutos, os registros que chegarem após o limite de dez minutos poderão ser removidos.
Como os registros que chegam após o limite definido podem ser removidos, a seleção de um limite que atenda aos requisitos de latência versus correção é importante. Escolher um limite menor resulta em registros sendo emitidos mais cedo, mas também significa que registros atrasados são mais propensos a serem removidos. Um limite maior significa uma espera mais longa, mas possivelmente mais integridade dos dados. Devido ao tamanho de estado maior, um limite maior também pode exigir recursos de computação adicionais. Como o valor limite depende de seus dados e requisitos de processamento, testar e monitorar seu processamento é importante para determinar um limite ideal.
Use a função withWatermark()
no Python para definir uma marca d'água. No SQL, use a cláusula WATERMARK
para definir uma marca d'água:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Usar marcas d'água com junções fluxo-fluxo
Para junções fluxo-fluxo, você precisa definir uma marca d'água em ambos os lados da junção e uma cláusula de intervalo de tempo. Como cada fonte de junção tem uma exibição incompleta dos dados, a cláusula de intervalo de tempo é necessária para informar ao mecanismo de streaming quando não é possível fazer mais correspondências. A cláusula de intervalo de tempo precisa usar os mesmos campos usados para definir as marcas d'água.
Como pode haver momentos em que cada fluxo requer limites diferentes para marcas d'água, os fluxos não precisam ter os mesmos limites. Para evitar dados ausentes, o mecanismo de streaming mantém uma marca d'água global com base no fluxo mais lento.
O exemplo a seguir une um fluxo de impressões de anúncios e um fluxo de cliques do usuário em anúncios. Neste exemplo, um clique precisa ocorrer dentro de três minutos a partir da impressão. Após o intervalo de tempo de três minutos passar, as linhas do estado que não puderem mais ser correspondidas serão removidas.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(LIVE.bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Execute agregações em janelas com marcas d'água
Uma operação com estado comum em dados de streaming é uma agregação em janela. As agregações em janela são semelhantes às agregações agrupadas, exceto que os valores agregados são retornados para o conjunto de linhas que fazem parte da janela definida.
Uma janela pode ser definida como um determinado comprimento, e uma operação de agregação pode ser executada em todas as linhas que fazem parte dessa janela. O Spark Streaming dá suporte a três tipos de janelas:
- As janelas em cascata (fixas) são uma série de intervalos de tempo de tamanho fixo, não sobrepostos e contíguos. Um registro de entrada pertence a apenas uma janela.
- Janelas deslizantes: semelhantes às janelas em cascata, as janelas deslizantes são de tamanho fixo, mas as janelas podem se sobrepor e um registro pode cair em várias janelas.
Quando os dados chegam após o final da janela mais o comprimento da marca d'água, nenhum dado novo é aceito para a janela, o resultado da agregação é emitido e o estado da janela é removido.
O exemplo a seguir calcula uma soma de impressões a cada cinco minutos usando uma janela fixa. Neste exemplo, a cláusula select usa o alias impressions_window
e, em seguida, a janela propriamente dita é definida como parte da cláusula GROUP BY
. A janela precisa ser baseada na mesma coluna de carimbo de data/hora que a marca d'água, a coluna clickTimestamp
neste exemplo.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(LIVE.silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Um exemplo semelhante no Python para calcular o lucro em janelas fixas por hora:
import dlt
@dlt.table()
def profit_by_hour():
return (
dlt.read_stream("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Eliminação de duplicação de registros de streaming
O Streaming Estruturado tem garantias de processamento do tipo exatamente uma vez, mas não elimina automaticamente a duplicação de registros de fontes de dados. Por exemplo, como muitas filas de mensagens têm garantias do tipo pelo menos uma vez, deve-se esperar registros duplicados ao ler de uma dessas filas de mensagens. Você pode usar a função dropDuplicatesWithinWatermark()
para eliminar a duplicação de registros em qualquer campo especificado, removendo duplicatas de um fluxo mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada). Você precisa especificar uma marca d'água para usar a função dropDuplicatesWithinWatermark()
. Todos os dados duplicados que chegam dentro do intervalo de tempo especificado pela marca d'água são removidos.
Os dados ordenados são importantes porque os dados fora de ordem fazem com que o valor da marca d'água avance incorretamente. Em seguida, quando os dados mais antigos chegam, eles são considerados atrasados e são removidos. Use a opção withEventTimeOrder
para processar o instantâneo inicial em ordem com base no carimbo de data/hora especificado na marca d'água. A opção withEventTimeOrder
pode ser declarada no código que define o conjunto de dados ou nas configurações de pipeline usando spark.databricks.delta.withEventTimeOrder.enabled
. Por exemplo:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Observação
A opção withEventTimeOrder
é compatível apenas com Python.
No exemplo a seguir, os dados são processados ordenados por clickTimestamp
, e os registros que chegam dentro de cinco segundos um do outro que contêm colunasuserId
e clickAdId
duplicadas são removidos.
clicksDedupDf = (
spark.readStream
.option("withEventTimeOrder", "true")
.table(rawClicks)
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Otimizar a configuração do pipeline para processamento com estado
Para ajudar a evitar problemas de produção e latência excessiva, o Databricks recomenda habilitar o gerenciamento de estado baseado em RocksDB para o processamento de fluxo com estado, especialmente se o processamento exigir a economia de uma grande quantidade de estado intermediário.
Os pipelines sem vários gerenciam automaticamente as configurações do repositório de estado.
Você pode habilitar o gerenciamento de estado baseado no RockDB definindo a configuração a seguir antes de implantar um pipeline:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Para saber mais sobre como configurar o armazenamento de estado do RocksDB, incluindo recomendações de configuração para RocksDB, confira Configurar o armazenamento de estado do RocksDB no Azure Databricks.