O streaming de tabela delta lê e grava

Esta página descreve como usar tabelas Delta como fontes e sinks para Spark Structured Streaming com readStream e writeStream. O Delta Lake resolve problemas comuns de desempenho e fiabilidade para sistemas e ficheiros de streaming. Os benefícios incluem:

  • Condensam ficheiros pequenos produzidos por ingestimento de baixa latência e melhoram o desempenho.
  • Manter o processamento "exatamente uma vez" com mais do que um fluxo (ou trabalhos em lote simultâneos).
  • Descubra novos ficheiros de forma eficiente ao usar ficheiros como fonte de fluxo.

Para saber como carregar dados usando tabelas de streaming no Databricks SQL, consulte Usar tabelas de streaming no Databricks SQL.

Para junções estáticas em cursos de água com o Lago Delta, veja Junções estáticas em cursos de água.

Usa as tabelas Delta como sumidouro

Pode escrever dados numa tabela Delta usando Structured Streaming. O registo de transações do Delta Lake garante um processamento exato uma vez, mesmo quando existem outros fluxos ou consultas em lote a serem executados simultaneamente na tabela.

Quando escreve numa tabela Delta usando um sink de Structured Streaming, poderá observar commits vazios marcados com epochId = -1. Estes são esperados e normalmente ocorrem:

  • No primeiro lote de cada execução da consulta de streaming (isso acontece a cada lote para Trigger.AvailableNow).
  • Quando um esquema é alterado (como adicionar uma coluna).

Estes commits vazios são intencionais e não indicam erro. Não afetam de forma significativa a correção ou o desempenho da consulta.

Note

A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints.

Monitorizar o backlog com métricas

Use as seguintes métricas para monitorizar o atraso de um processo de consulta em streaming:

  • numBytesOutstanding: Número de bytes ainda por processar na fila de espera.
  • numFilesOutstanding: Número de ficheiros ainda por serem processados no acumulado.
  • numNewListedFiles: Número de ficheiros Delta Lake listados para calcular o atraso deste lote.
  • backlogEndOffset: A versão da tabela Delta usada para calcular o backlog.

Num caderno, veja estas métricas no separador Dados Brutos no painel de progresso da consulta em streaming:

{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}

Modo de acréscimo

Por defeito, os fluxos correm em modo de anexação e só adicionam novos registos à tabela.

Use o toTable método ao transmitir para tabelas:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Modo completo

Utilize o Structured Streaming em modo completo para substituir a tabela inteira após cada lote. Por exemplo, pode atualizar continuamente uma tabela resumida agregada de eventos por cliente:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

Para aplicações sem requisitos rigorosos de latência, pode poupar recursos e custos computacionais com gatilhos únicos como AvailableNow. Por exemplo, use este gatilho para atualizar tabelas de agregação de resumos de acordo com um determinado cronograma, processando apenas os novos dados que foram recebidos desde a última atualização. Ver AvailableNow: Processamento em lote incremental.

Gerir alterações às tabelas Delta de origem

O Streaming estruturado lê incrementalmente tabelas Delta. Quando uma consulta de streaming lê de uma tabela Delta, novos registos são processados idempotentemente à medida que novas versões da tabela são confirmadas na tabela de origem. O Structured Streaming só aceita entradas adicionais e lança uma exceção caso ocorram modificações na tabela Delta de origem. Por exemplo, se uma UPDATE, DELETE, MERGE INTO, ou OVERWRITE operação modificar uma tabela Delta de origem lida por uma consulta de streaming, o fluxo falha com um erro.

Existem quatro abordagens típicas para lidar com alterações a montante das tabelas Delta de origem, dependendo do seu caso de uso. Uma tabela de referência e detalhes sobre cada um são fornecidos abaixo:

Abordagem Vantagens Cons
skipChangeCommits Simples, não exige que escrevas lógica complexa. Útil para processamento apenas de adição, onde as alterações a montante são tratadas separadamente, ou para tratar temporariamente de um registo incorreto. Não propaga alterações e apenas processa acréscimos.
Atualização completa Também simples, não exige que escrevas lógica complexa. Útil para conjuntos de dados pequenos com raras alterações a montante. É caro para conjuntos de dados grandes. Requer reprocessar todas as tabelas a jusante.
Alterar feed de dados Processar todos os tipos de alterações (inserções, atualizações e eliminações). O Databricks recomenda fazer streaming a partir do feed CDC de uma tabela Delta em vez de diretamente da tabela sempre que possível. Requer que escrevas uma lógica mais complexa para lidar com cada tipo de alteração.
Visões materializadas Alternativa simples ao Streaming Estruturado que tem propagação automática de alterações. Maior latência. Disponível apenas em Lakeflow Spark Declarative Pipelines e Databricks SQL.

Saltar commits de alterações a montante com skipChangeCommits

Definir skipChangeCommits para ignorar transações que eliminam ou modificar registos existentes, e para processar apenas anexos. Isto é útil quando as alterações aos dados existentes não precisam de ser propagadas pelo fluxo, ou quando prefere lógica separada para lidar com essas alterações. Podes ligar e desligar skipChangeCommits se precisares de ignorar temporariamente alterações pontuais.

Databricks recomenda usar skipChangeCommits para a maioria das tarefas que não utilizam fluxos de dados de alteração.

Python

(spark.readStream
  .option("skipChangeCommits", "true")
  .table("source_table")
)

Scala

spark.readStream
  .option("skipChangeCommits", "true")
  .table("source_table")

Important

Se o esquema de uma tabela Delta for alterado após uma leitura de transmissão em fluxo começar na tabela, a consulta falhará. Para a maioria das alterações de esquema, pode reiniciar o fluxo para resolver o erro de correspondência do esquema e continuar o processamento.

No Databricks Runtime 12.2 LTS e inferior, não é possível transmitir de uma tabela Delta com mapeamento de colunas habilitado que tenha sofrido evolução de esquema não aditiva, como renomear ou soltar colunas. Para mais detalhes, veja Mapeamento de colunas e streaming.

Note

No Databricks Runtime 12.2 LTS e superiores, skipChangeCommits substitui ignoreChanges. No Databricks Runtime 11.3 LTS e inferior, ignoreChanges é a única opção suportada. Consulte a opção Herança: ignoreChanges para mais detalhes.

Opção legado: ignoreDeletes

ignoreDeletes é uma opção legada que só trata transações que apagam dados nos limites de partição (ou seja, eliminação completa da partição). Se precisares de lidar com eliminações, atualizações ou outras modificações sem partição, usa skipChangeCommits em vez disso.

Python
(spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")
)
Scala
spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Opção legado: ignoreChanges

ignoreChanges está disponível em Databricks Runtime 11.3 LTS e inferiores. No Databricks Runtime 12.2 LTS e superiores, é substituído por skipChangeCommits.

Com ignoreChanges ativado, os ficheiros de dados reescritos na tabela de origem são reemitidos após uma operação de modificação de dados como UPDATE, MERGE INTO, DELETE (dentro das partições), ou OVERWRITE. Muitas vezes, as linhas inalteradas são emitidas juntamente com novas linhas, pelo que os consumidores posteriores devem ser capazes de lidar com duplicados. As eliminações não são propagadas mais abaixo. ignoreChanges tem precedência sobre ignoreDeletes.

Em contraste, skipChangeCommits ignora completamente as operações de alteração de ficheiros. Ficheiros de dados reescritos na tabela de origem devido a operações de modificação de dados como UPDATE, MERGE INTO, DELETE, e OVERWRITE são completamente ignorados. Para refletir alterações nas tabelas de fontes de fluxo, deve implementar lógica separada para propagar essas alterações.

O Databricks recomenda usar skipChangeCommits para todas as cargas de trabalho novas. Para migrar uma carga de trabalho de ignoreChanges para skipChangeCommits, refatora a lógica do streaming.

Atualização completa das tabelas a jusante

Se as alterações a montante forem raras e os dados forem suficientemente pequenos para serem reprocessados, podes eliminar o checkpoint de streaming e a tabela de saída, e depois reiniciar o stream do início. Isto faz com que o fluxo reprocesse todos os dados da tabela de origem. Esteja ciente de que esta abordagem também requer reprocessar todas as tabelas a jusante que dependem da saída deste fluxo de dados.

Esta abordagem é mais adequada para conjuntos de dados mais pequenos ou cargas de trabalho onde as alterações a montante são pouco frequentes e o custo de uma atualização completa é aceitável.

Utilizar o feed de dados de alteração

Para cargas de trabalho que processam todos os tipos de alterações (inserções, atualizações e eliminações), use o feed de dados de alterações do Delta Lake. O feed de dados de alterações regista alterações ao nível das linhas numa tabela Delta, permitindo-te transmitir essas alterações e escrever lógica para gerir cada tipo de alteração nas tabelas a jusante. Esta é a abordagem mais robusta porque o seu código lida explicitamente com todos os tipos de eventos de alteração. Veja Use o feed de dados de alterações do Delta Lake em Azure Databricks.

Se estiver a usar Lakeflow Spark Declarative Pipelines, consulte As APIs AUTO CDC: Simplifique a captura de dados de alterações com pipelines.

Important

No Databricks Runtime 12.2 LTS e versões anteriores, não é possível fazer streaming do feed de dados de alteração para uma tabela Delta com mapeamento de colunas habilitado que tenha sofrido evolução do esquema não aditiva, como renomear ou remover colunas. Ver Mapeamento de colunas e streaming.

Utilize vistas materializadas

As visualizações materializadas tratam automaticamente das alterações a montante, recalculando os resultados quando os dados de origem mudam. Se não precisar da menor latência possível e quiser evitar gerir a complexidade do streaming, uma visualização materializada pode simplificar a sua arquitetura. As vistas materializadas estão disponíveis nos pipelines do Lakeflow Spark Declarative Pipelines e no Databricks SQL. Ver Vistas materializadas.

Example

Por exemplo, suponha que você tenha uma tabela user_events com date, user_emaile action colunas particionada por date. Você sai da user_events tabela e precisa excluir dados dela devido ao GDPR.

skipChangeCommits permite eliminar dados em múltiplas partições (neste exemplo, filtrando em user_email). Utilize a seguinte sintaxe:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Se atualizar o user_email com a declaração UPDATE, o ficheiro que contém o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

Databricks recomenda utilizar skipChangeCommits em vez de ignoreDeletes, a menos que tenha a certeza de que as eliminações são sempre eliminação total de partições.

Utilização foreachBatch para escritas de tabelas idempotentes

Note

O Databricks recomenda configurar uma gravação de streaming separada para cada coletor que você deseja atualizar em vez de usar foreachBatcho . Escritas para múltiplos sinks em foreachBatch reduzem a paralelização e aumentam a latência global porque escritas para múltiplas tabelas são serializadas em foreachBatch.

As tabelas delta suportam as seguintes DataFrameWriter opções para fazer gravações em várias tabelas dentro foreachBatch do idempotent:

  • txnAppId: Uma cadeia de caracteres exclusiva que tu podes passar a cada gravação de DataFrame. Por exemplo, você pode usar a ID StreamingQuery como txnAppId. txnAppId pode ser qualquer cadeia única gerada pelo utilizador e não tem de estar relacionada com o ID do fluxo.
  • txnVersion: Um número monotonicamente crescente que atua como versão de transação.

O Delta Lake utiliza txnAppId e txnVersion para identificar e ignorar escritas duplicadas. Por exemplo, após uma falha interromper uma escrita em lote, pode reexecutar o lote com o mesmo txnAppId e txnVersion para identificar e ignorar corretamente os duplicados. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.

Warning

Caso exclua o ponto de verificação de streaming e reinicie a consulta com um novo ponto de verificação, deverá fornecer um txnAppId. Novos pontos de verificação começam com um ID de lote de 0. O Delta Lake usa o ID do lote e txnAppId como uma chave exclusiva, e ignora os lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}

Upsert de consultas em streaming usando foreachBatch

Pode usar merge e foreachBatch para escrever upserts complexos a partir de uma query de streaming para uma tabela Delta. Veja Utilizar foreachBatch para escrever em sinks de dados arbitrários.

Esta abordagem tem muitas aplicações:

Note

  • Verifique se a sua merge afirmação interior foreachBatch é idempotente. Caso contrário, reiniciar a consulta de streaming pode aplicar a operação ao mesmo lote de dados várias vezes. Ver Utilização foreachBatch para escritas de tabelas idempotentes.

  • Quando merge é usado em foreachBatch, a métrica de taxa de dados de entrada pode devolver um múltiplo da taxa real à qual os dados são gerados na fonte. merge Lê os dados de entrada várias vezes, o que multiplica as métricas. Para evitar a multiplicação de métricas, armazene em cache o DataFrame em lote antes de merge e remova-o do cache depois de merge.

    A taxa de dados de entrada está disponível através de StreamingQueryProgress e do gráfico de taxa de transmissão do notebook. Consulte Monitorização de consultas de streaming estruturado no Azure Databricks.

Por exemplo, pode usar MERGE instruções SQL dentro de foreachBatch:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Também pode usar as APIs do Delta Lake para realizar transmissões de upserts:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Definir a versão inicial da tabela para processar alterações

Por defeito, as transmissões começam com a versão mais recente disponível da tabela Delta. Isto inclui uma imagem completa da tabela naquele momento e todas as alterações futuras. O Databricks recomenda que use a versão inicial padrão da tabela para a maioria das cargas de trabalho.

Opcionalmente, pode usar as seguintes opções para especificar o ponto de partida da fonte de streaming do Delta Lake sem processar toda a tabela.

  • startingVersion: A versão da tabela Delta para começar a ler. Todas as alterações de tabela cometidas na versão especificada ou após são lidas pelo fluxo. Se a versão especificada não estiver disponível, a transmissão não começa.

    Para encontrar as versões de commit disponíveis, execute DESCRIBE HISTORY e verifique o version. Para retornar apenas as alterações mais recentes, especifique latest. Para informações sobre versões de tabelas Delta, veja Trabalhar com o histórico de tabelas.

  • startingTimestamp: O carimbo temporal para começar a ler. Todas as alterações de tabela cometidas no ou após o carimbo temporal especificado são lidas pelo fluxo de dados. Se o carimbo de data/hora fornecido preceder todas as confirmações da tabela, a leitura de streaming começará com o carimbo de data/hora mais antigo disponível. Configure uma das opções:

    • Uma cadeia de caracteres que representa um carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".
    • Uma cadeia de caracteres de data. Por exemplo, "2019-01-01".

Não podes definir tanto startingVersion como startingTimestamp ao mesmo tempo. Estas definições aplicam-se apenas a novas consultas de streaming. Se uma consulta de streaming já foi iniciada e o progresso foi registado no seu checkpoint, estas definições são ignoradas.

Important

Embora tu possas iniciar a fonte de streaming a partir de uma versão especificada ou de um timestamp, o esquema da fonte de streaming é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão especificada ou o timestamp. Caso contrário, a fonte de streaming pode devolver resultados incorretos ao ler os dados com um esquema incorreto.

Example

Por exemplo, suponha que você tenha uma tabela user_events. Se você quiser ler as alterações desde a versão 5, use:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Se quiser ler as alterações desde 2018-10-18, utilize:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Processar instantâneo inicial sem perder dados

Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior.

Numa consulta de fluxo com estado e marca de progresso temporal definida, processar ficheiros com base no tempo de modificação pode levar a que os registos sejam processados na ordem errada. Isto pode fazer com que a marca d'água marque incorretamente os registos como eventos tardios e os descarte. Isto só pode ocorrer quando o snapshot Delta inicial é processado na ordem padrão.

Para fluxos com uma tabela de origem Delta, a consulta processa primeiro todos os dados presentes na tabela e cria uma versão chamada snapshot inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.

Para evitar quedas de dados durante o processamento inicial do snapshot, ative a withEventTimeOrder opção. withEventTimeOrder divide o intervalo temporal dos dados do instantâneo inicial em intervalos de tempo. Cada micro-lote processa um balde filtrando dados dentro do intervalo de tempo. As opções maxFilesPerTrigger e maxBytesPerTrigger continuam a ser aplicáveis para controlar o tamanho do micro-lote, mas apenas de forma aproximada devido à abordagem de processamento.

O diagrama seguinte mostra este processo:

Instantâneo inicial

Restrições

  • Não pode alterar withEventTimeOrder se a consulta do fluxo já começou e o snapshot inicial estiver a ser processado ativamente. Para reiniciar com withEventTimeOrder alterado, tens de apagar o checkpoint.
  • Se withEventTimeOrder estiver ativado, não pode fazer downgrade de um fluxo para uma versão Databricks Runtime que não suporta esta funcionalidade até que o processamento inicial do snapshot esteja concluído. Para fazer downgrade, espere que o snapshot inicial termine ou apague o checkpoint e reinicie a consulta.
  • Esta funcionalidade não é suportada nos seguintes cenários:
    • A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.
    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.

Desempenho

Se withEventTimeOrder estiver ativado, o desempenho inicial do processamento de snapshots pode ser mais lento. Cada micro-batch analisa a captura instantânea inicial para filtrar os dados dentro do intervalo de tempo de eventos correspondente. Para melhorar o desempenho da filtragem:

  • Use uma coluna de origem Delta como hora do evento para que o salto de dados possa ser aplicado. Ver Salto de dados.
  • Particione a tabela ao longo da coluna de tempo do evento.

Use a interface do Spark para ver quantos ficheiros Delta são digitalizados para um micro-lote específico.

Example

Suponha que você tenha uma tabela user_events com uma event_time coluna. Sua consulta de streaming é uma consulta de agregação. Se quiser garantir que não haja queda de dados durante o processamento inicial do snapshot, você pode usar:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Pode configurar withEventTimeOrder , com uma configuração Spark no cluster, para a aplicar a todas as consultas de streaming: spark.databricks.delta.withEventTimeOrder.enabled true.

Limitar a taxa de entrada para melhorar o desempenho do processamento

Por defeito, o Structured Streaming processa o maior número possível de ficheiros em cada micro-lote. Para limitar a quantidade de dados processados por lote e gerir o uso de memória, estabilizar a latência ou reduzir custos de armazenamento na cloud, utilize as seguintes opções:

  • maxFilesPerTrigger: O número de novos arquivos a serem considerados em cada microlote. A predefinição é 1000.
  • maxBytesPerTrigger: A quantidade de dados que são processados em cada microlote. Esta opção define um "soft max", significando que um lote processa aproximadamente esta quantidade de dados e pode processar mais do que o limite para que a consulta de streaming avance nos casos em que a menor unidade de entrada seja maior do que esse limite. Isso não é definido por padrão.

Se utilizares ambos maxBytesPerTrigger e maxFilesPerTrigger, o micro-batch processa dados até que o limite de maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.

Note

Por padrão, se logRetentionDuration remove transações na tabela de origem e a consulta de streaming tentar processar essas versões, a consulta falha para evitar a perda de dados. Você pode definir a opção failOnDataLoss para false ignorar dados perdidos e continuar o processamento. Consulte Configuração de retenção de dados para consultas de viagem no tempo.

Controla o custo do armazenamento na nuvem

As consultas de streaming têm vários modos de gatilho disponíveis que permitem equilibrar custo e latência, incluindo processingTime, availableNow, e realTime. Veja Controlar o custo do armazenamento na nuvem.