Streaming de tabela delta lê e grava

O Delta Lake está profundamente integrado ao fluxo estruturado do Spark por meio de readStream e writeStream. O Delta Lake supera muitas das limitações normalmente associadas aos arquivos e sistemas de fluxo, incluindo:

  • Coalescência de pequenos arquivos produzidos pela ingestão de baixa latência.
  • Manutenção de processamento "exatamente uma vez" com mais de um fluxo (ou trabalhos em lote simultâneos).
  • Descobrir com eficiência quais arquivos são novos ao usar arquivos como origem de um fluxo.

Observação

Esse artigo descreve o uso de tabelas Delta Lake como fontes de streaming e coletores. Para saber como carregar dados usando tabelas de streaming no Databricks SQL, consulte Carregar dados usando tabelas de streaming no Databricks SQL.

Tabela Delta como origem

O fluxo estruturado lê incrementalmente tabelas Delta. Enquanto uma consulta de streaming está ativa em uma tabela Delta, novos registros são processados idempotentemente à medida que novas versões de tabela são confirmadas na tabela de origem.

Os exemplos de código a seguir mostram a configuração de uma leitura de streaming usando o nome da tabela ou o caminho do arquivo.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Importante

Se o esquema de uma tabela Delta for alterado depois que uma leitura de fluxo começar na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar o fluxo para resolver incompatibilidade de esquema e continuar o processamento.

No Databricks Runtime 12.2 LTS e abaixo, não é possível transmitir a partir de uma tabela Delta com o mapeamento de colunas ativado que tenha passado por uma evolução de esquema não aditiva, como renomear ou eliminar colunas. Para obter mais detalhes, consulte Fluxo com mapeamento de coluna e alterações de esquema.

Limitar taxa de entrada

As seguintes opções estão disponíveis para controlar os microlotes:

  • maxFilesPerTrigger: quantos arquivos novos serão considerados em todos os microlotes. O padrão é 1000.
  • maxBytesPerTrigger: a quantidade de dados que é processada em cada microlote. Essa opção define um "máximo flexível", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer com que a consulta de streaming avance em casos em que a menor unidade de entrada é maior que esse limite. Isso não está definido por padrão.

Se você usar maxBytesPerTrigger em conjunto com maxFilesPerTrigger, o microlote processará os dados até que o limite maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.

Observação

Nos casos em que as transações da tabela de origem são limpas devido à logRetentionDurationconfiguração e a consulta de streaming tenta processar essas versões, por padrão, há uma falha na consulta para evitar a perda de dados. Você pode definir a opção failOnDataLoss como false para ignorar dados perdidos e continuar o processamento.

Transmitir um feed CDA (captura de dados de alterações) do Delta Lake

O feed de dados de alterações do Delta Lake registra alterações em uma tabela Delta, incluindo atualizações e exclusões. Quando habilitado, você pode transmitir de um feed de dados de alterações e gravar lógica para processar inserções, atualizações e exclusões em tabelas downstream. Embora a saída de dados do feed de dados de alterações seja ligeiramente diferente da tabela Delta descrita, ela fornece uma solução para propagar alterações incrementais em tabelas downstream em uma arquitetura medallion.

Importante

No Databricks Runtime 12.2 LTS e abaixo, não é possível transmitir a partir do feed de dados alterados para uma tabela Delta com o mapeamento de colunas ativado que passou por uma evolução de esquema não aditiva, como renomear ou eliminar colunas. Confira Streaming com mapeamento de coluna e alterações de esquema.

Ignorar atualizações e exclusões

O Fluxo Estruturado não lida com entradas que não são anexadas, e lança uma exceção se ocorrerem modificações na tabela que está sendo usada como origem. Há duas estratégias principais para lidar com alterações que não podem ser propagadas automaticamente downstream:

  • Você pode excluir a saída e o ponto de verificação e reiniciar o fluxo desde o início.
  • Você pode definir uma dessas duas opções:
    • ignoreDeletes: ignorar transações que excluem dados em limites de partição.
    • skipChangeCommits: ignore as transações que excluem ou modificam registros existentes. skipChangeCommits incorpora ignoreDeletes.

Observação

No Databricks Runtime 12.2 LTS e superior, skipChangeCommits descontinua a configuração anterior ignoreChanges. No Databricks Runtime 11.3 LTS e inferior, ignoreChanges é a única opção suportada.

A semântica para ignoreChanges difere muito de skipChangeCommits. Com ignoreChanges habilitado, os arquivos de dados reescritos na tabela de origem são emitidos novamente após uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. Linhas inalteradas geralmente são emitidas junto com novas linhas, portanto, os consumidores downstream devem ser capazes de lidar com duplicatas. As exclusões não são propagadas downstream. ignoreChanges incorpora ignoreDeletes.

skipChangeCommits ignora totalmente as operações de alteração de arquivo. Os arquivos de dados reescritos na tabela de origem devido à operação de alteração de dados, como UPDATE, MERGE INTO, DELETE e OVERWRITE são totalmente ignorados. Para refletir as alterações em tabelas de origem upstream, você deve implementar uma lógica separada para propagar essas alterações.

As cargas de trabalho configuradas com ignoreChanges continuam a operar usando semântica conhecida, mas o Databricks recomenda usar skipChangeCommits para todas as novas cargas de trabalho. Migrar cargas de trabalho usando ignoreChanges para skipChangeCommits exigir lógica de refatoração.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events com as colunas date, user_email e action que é particionada por date. Você transmite a saída da tabela user_events e precisa excluir dados dela devido à GDPR.

Quando você exclui em limites de partição (ou seja, WHERE está em uma coluna de partição), os arquivos já são segmentados por valor, de modo que a exclusão apenas remove esses arquivos dos metadados. Ao excluir uma partição inteira de dados, você pode usar o seguinte:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

Se você excluir dados em várias partições (nesse exemplo, filtrando emuser_email) use a seguinte sintaxe:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

Se você atualizar um user_email com a instrução UPDATE, o arquivo que contém o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

Especificar posição inicial

Você pode usar as opções a seguir para especificar o ponto inicial da fonte de streaming Delta Lake sem processar a tabela inteira.

  • startingVersion: A versão do Delta Lake da qual começar. O Databricks recomenda omitir essa opção para a maioria das cargas de trabalho. Quando não definido, o fluxo começa a partir da versão mais recente disponível, incluindo um instantâneo completo da tabela naquele momento.

    Se especificado, o fluxo lê todas as alterações na tabela Delta começando com a versão especificada (inclusive). Se a versão especificada não estiver mais disponível, o fluxo não será iniciado. Você pode obter as versões de confirmação na coluna version da saída do comando DESCRIBE HISTORY.

    Para retornar apenas as alterações mais recentes, especifique latest.

  • startingTimestamp: O carimbo de data/hora do qual começar. Todas as alterações de tabela confirmadas no registro de data e hora ou depois dele (inclusive) são lidas pelo leitor de streaming. Se o carimbo de data e hora fornecido preceder todas as confirmações de tabela, a leitura de streaming começará com o carimbo de data e hora mais antigo disponível. Um destes:

    • Uma cadeia de caracteres de um carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".
    • Uma cadeia de caracteres de data. Por exemplo, "2019-01-01".

Não é possível definir as duas opções ao mesmo tempo. Elas entram em vigor somente ao iniciar uma nova consulta de streaming. Se uma consulta de streaming tiver sido iniciada e o progresso tiver sido registrado no ponto de verificação, essas opções serão ignoradas.

Importante

Embora você possa iniciar a fonte de streaming de uma versão ou carimbo de data/hora especificado, o esquema da fonte de streaming é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão ou o carimbo de data/hora especificados. Caso contrário, a fonte de streaming pode retornar resultados incorretos ao ler os dados com um esquema incorreto.

Exemplo

Por exemplo, vamos supor que você tenha uma tabela user_events. Se você quiser ler as alterações desde a versão 5, use:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Se você quiser ler as alterações desde 18/10/2018, use:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Processar instantâneo inicial sem que os dados sejam descartados

Observação

Essa recurso está disponível no Databricks Runtime 11.3 LTS e superior. Esse recurso está em uma versão prévia.

Ao usar uma tabela Delta como fonte de fluxo, a consulta primeiro processa todos os dados presentes na tabela. A tabela Delta nesta versão é chamada de instantâneo inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, o último tempo de modificação não representa necessariamente a ordem de tempo do evento de registro.

Em uma consulta de streaming com estado com uma marca d'água definida, processar arquivos por tempo de modificação pode resultar em registros sendo processados na ordem errada. Isso pode levar a registros caindo como eventos tardios pela marca d'água.

Você pode evitar o problema de queda de dados habilitando a seguinte opção:

  • withEventTimeOrder: se o instantâneo inicial deve ser processado com a ordem de tempo do evento.

Com a ordem de tempo do evento habilitada, o intervalo de tempo do evento dos dados iniciais do instantâneo é dividido em buckets de tempo. Cada microlote processa um bucket filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microlote, mas apenas de forma aproximada devido à natureza do processamento.

O gráfico abaixo mostra esse processo:

Instantâneo Inicial

Informações importantes sobre esse recurso:

  • O problema da queda de dados só acontece quando o instantâneo Delta inicial de uma consulta de streaming com estado é processado na ordem padrão.
  • Você não poderá alterar withEventTimeOrder depois que a consulta de fluxo for iniciada enquanto o instantâneo inicial ainda estiver sendo processado. Para reiniciar com withEventTimeOrder a alteração, você precisa excluir o ponto de verificação.
  • Se você estiver executando uma consulta de fluxo com o withEventTimeOrder habilitado, não será possível fazer downgrade para uma versão DBR que não dê suporte a esse recurso até que o processamento inicial do instantâneo seja concluído. Se você precisar fazer downgrade, poderá aguardar a conclusão do instantâneo inicial ou excluir o ponto de verificação e reiniciar a consulta.
  • Não há suporte para esse recurso nos seguintes cenários incomuns:
    • A coluna de tempo do evento é uma coluna gerada e há transformações de não projeção entre a origem Delta e a marca d'água.
    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.
  • Com a ordem de tempo do evento habilitada, o desempenho do processamento de instantâneo inicial delta pode ser mais lento.
  • Cada microlote examina o instantâneo inicial para filtrar dados dentro do intervalo de tempo de evento correspondente. Para uma ação de filtro mais rápida, é recomendável usar uma coluna de origem Delta como a hora do evento para que a falta de dados possa ser aplicada (verifique se os dados são ignorados para Data Lake quando aplicável). Além disso, o particionamento de tabela ao longo da coluna de tempo do evento pode acelerar ainda mais o processamento. Você pode verificar a interface do usuário do Spark para ver quantos arquivos delta são verificados em busca de um microlote específico.

Exemplo

Suponha que você tenha uma tabela user_events com uma coluna event_time. Sua consulta de streaming é uma consulta de agregação. Se você quiser garantir que não haja nenhuma queda de dados durante o processamento de instantâneo inicial, poderá usar:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Observação

Você também pode habilitar isso com a configuração do Spark no cluster que se aplicará a todas as consultas de streaming: spark.databricks.delta.withEventTimeOrder.enabled true

Tabela Delta como coletor

Você também pode gravar dados em uma tabela Delta usando o fluxo estruturado. O log de transações do Delta Lake garante o processamento exatamente uma vez, mesmo quando houver outros fluxos ou consultas em lote em execução simultaneamente na tabela.

Observação

A função VACUUM do Delta Lake remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretório, como <table-name>/_checkpoints.

Métricas

Você pode descobrir o número de bytes e o número de arquivos que ainda serão processados em um processo de consulta de streaming como as métricas numBytesOutstanding e numFilesOutstanding. Métricas adicionais incluem:

  • numNewListedFiles: número de arquivos do Delta Lake listados para calcular a lista de pendências para esse lote.
    • backlogEndOffset: a versão da tabela usada para calcular a lista de pendências.

Se você estiver executando o fluxo em um notebook, poderá ver essas métricas na guia Dados brutos no painel de progresso de consulta de fluxo:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Modo de acréscimo

Por padrão, os fluxos são executados no modo de acréscimo, que adiciona novos registros à tabela.

Você pode usar o método Path:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

ou o método toTable, conforme a seguir:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Modo completo

Você também pode usar o fluxo estruturado para substituir a tabela inteira por cada lote. Um exemplo de caso de uso é computar um resumo usando a agregação:

Python

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)

Scala

spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

O exemplo anterior atualiza continuamente uma tabela que contém o número agregado de eventos por cliente.

Para aplicativos com requisitos de latência mais lenientes, você pode economizar recursos de computação com gatilhos de uso único. Use-os para atualizar tabelas de agregação de resumo em um determinado agendamento, processando apenas novos dados que chegaram desde a última atualização.

Executar junções de fluxo/estático

Você pode contar com as garantias transacionais e o protocolo de controle de versão do Delta Lake para executar junções de fluxo/estático. Uma junção de fluxo/estático une a versão válida mais recente de uma tabela Delta (os dados estáticos) a um fluxo de dados usando uma junção sem estado.

Quando o Azure Databricks processa um microlote de dados em uma junção de fluxo/estático, a versão válida mais recente dos dados da tabela Delta estática se une aos registros presentes no microlote atual. Como a junção é sem estado, você não precisa configurar a marca d'água e pode processar resultados com baixa latência. Os dados na tabela Delta estática usada na junção devem ser de alteração lenta.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Executar upsert de consultas de fluxo usando foreachBatch

Você pode usar uma combinação de merge e foreachBatch para gravar upserts complexas de uma consulta de streaming em uma tabela Delta. Confira Usar foreachBatch para gravar em coletores de dados arbitrários.

Esse padrão tem várias aplicações, incluindo as seguintes:

Observação

  • Certifique-se de que sua merge instrução dentro dele foreachBatch seja idempotente, pois as reinicializações da consulta de streaming podem aplicar a operação no mesmo lote de dados várias vezes.
  • Quando merge é usado no foreachBatch, a taxa de dados de entrada da consulta de streaming (relatada por meio StreamingQueryProgress de e visível no grafo de taxa do bloco de anotações) pode ser relatada como um múltiplo da taxa real na qual os dados são gerados na origem. Isso ocorre porque o merge lê os dados de entrada várias vezes fazendo com que as métricas de entrada sejam multiplicadas. Se esse for um gargalo, você poderá armazenar em cache o DataFrame do lote antes merge e, em seguida, desarmazená-lo em cache após merge.

O exemplo a seguir demonstra como você pode usar o SQL no foreachBatch para realizar essa tarefa:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Você também pode optar por usar as APIs do Delta Lake para executar upserts de streaming, como no exemplo a seguir:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Gravações de tabela idempotentes em foreachBatch

Observação

O Databricks recomenda configurar uma gravação de streaming separada para cada coletor que você deseja atualizar. O uso do foreachBatch para gravar em várias tabelas serializa gravações, o que reduz o paralelismo e aumenta a latência geral.

As tabelas Delta dão suporte às seguintes opções DataFrameWriter para que as gravações em várias tabelas dentro de foreachBatch sejam idempotentes:

  • txnAppId: uma cadeia de caracteres exclusiva que você pode passar em cada gravação DataFrame. Por exemplo, você pode usar a ID do StreamingQuery como txnAppId.
  • txnVersion: um número que aumenta de forma monotônica e atua como a versão da transação.

O Delta Lake usa uma combinação de txnAppId e txnVersion para identificar gravações duplicadas e ignorá-las.

Se uma gravação em lote for interrompida com uma falha, a execução novamente do lote usará o mesmo aplicativo e ID de lote para ajudar o tempo de execução a identificar corretamente as gravações duplicadas e ignorá-las. A ID do aplicativo (txnAppId) pode ser qualquer cadeia de caracteres exclusiva gerada pelo usuário, e não precisa estar relacionada à ID do fluxo. Confira Usar foreachBatch para gravar em coletores de dados arbitrários.

Aviso

Se você excluir o ponto de verificação de streaming e reiniciar a consulta com um novo ponto de verificação, deverá fornecer uma txnAppId diferente. Novos pontos de verificação começam com uma ID de lote de 0. O Delta Lake usa a ID e o txnAppId do lote como uma chave exclusiva e ignora lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}