Ler em inglês

Partilhar via


Streaming e ingestão incremental

O Azure Databricks usa o Apache Spark Structured Streaming para apoiar vários produtos associados a cargas de trabalho de ingestão, incluindo:

  • Carregador Automático
  • COPY INTO
  • Pipelines Delta Live Tables
  • Visualizações materializadas e tabelas de streaming no Databricks SQL

Este artigo discute algumas das diferenças entre a semântica de streaming e processamento em lote incremental e fornece uma visão geral de alto nível da configuração de cargas de trabalho de ingestão para a semântica desejada no Databricks.

Qual é a diferença entre streaming e ingestão incremental em lote?

As possíveis configurações de fluxo de trabalho de ingestão variam de processamento quase em tempo real a processamento em lote incremental pouco frequente. Ambos os padrões usam o Apache Spark Structured Streaming para alimentar o processamento incremental, mas têm semânticas diferentes. Para simplificar, este artigo refere-se à ingestão quase em tempo real como ingestão por streaming e ao processamento incremental mais infrequente como ingestão incremental em lote.

Ingestão em transmissão em fluxo

Streaming, no contexto de ingestão de dados e atualizações de tabela, refere-se ao processamento de dados quase em tempo real em que o Azure Databricks ingere registros da origem para o coletor em microlotes usando infraestrutura sempre ativa. Uma carga de trabalho de streaming ingere continuamente atualizações de fontes de dados configuradas, a menos que ocorra uma falha que interrompa a ingestão.

Ingestão incremental de lotes

A ingestão incremental em lote refere-se a um padrão em que todos os novos registros são processados a partir de uma fonte de dados em um trabalho de curta duração. A ingestão incremental de lotes geralmente ocorre de acordo com um cronograma, mas também pode ser acionada manualmente ou com base na chegada do arquivo.

A ingestão incremental em lote difere da ingestão em lote porque deteta automaticamente novos registros na fonte de dados e ignora os registros que já foram ingeridos.

Ingestão com Jobs

O Databricks Jobs permite orquestrar fluxos de trabalho e agendar tarefas que incluem blocos de anotações, bibliotecas, pipelines Delta Live Tables e consultas Databricks SQL.

Nota

Você pode usar todos os tipos de computação e tipos de tarefa do Azure Databricks para configurar a ingestão incremental em lote. A ingestão de streaming só é suportada na produção em trabalhos clássicos de computação e Delta Live Tables.

Os trabalhos têm dois modos principais de operação:

  • Os trabalhos contínuos voltam a tentar automaticamente se encontrarem uma falha. Este modo destina-se à ingestão de streaming.
  • Os trabalhos acionados executam tarefas quando acionados. Os gatilhos incluem:
    • Gatilhos baseados em tempo que executam trabalhos em um cronograma especificado.
    • Gatilhos baseados em arquivo que executam trabalhos quando os arquivos pousam em um local especificado.
    • Outros gatilhos, como chamadas de API REST, execução de comandos da CLI do Azure Databricks ou clicar no botão Executar agora na interface do usuário do espaço de trabalho.

Para cargas de trabalho incrementais em lote, configure seus trabalhos usando o AvailableNow modo de disparo, da seguinte maneira:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

Para cargas de trabalho de streaming, o intervalo de gatilho padrão é processingTime ="500ms". O exemplo a seguir mostra como processar um microlote a cada 5 segundos:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

Importante

Os trabalhos sem servidor não suportam Scala, modo contínuo ou intervalos de gatilho baseados em tempo para Streaming Estruturado. Use trabalhos clássicos se precisar de semântica de ingestão quase em tempo real.

Ingestão com Delta Live Tables

Semelhante aos Jobs, os pipelines do Delta Live Tables podem ser executados no modo acionado ou contínuo. Para obter semântica de streaming quase em tempo real com tabelas de streaming, use o modo contínuo.

Use tabelas de streaming para configurar streaming ou ingestão incremental em lote do armazenamento de objetos na nuvem, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.

O LakeFlow Connect usa Delta Live Tables para configurar pipelines de ingestão de sistemas conectados. Veja LakeFlow Connect.

As visualizações materializadas garantem semântica de operação equivalente a cargas de trabalho em lote, mas podem otimizar muitas operações para calcular resultados de forma incremental. Consulte Atualização incremental de visualizações materializadas.

Ingestão com Databricks SQL

Você pode usar tabelas de streaming para configurar a ingestão incremental em lote do armazenamento de objetos na nuvem, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.

Você pode usar exibições materializadas para configurar o processamento incremental em lote a partir de fontes Delta. Consulte Atualização incremental para ver vistas materializadas.

COPY INTO fornece sintaxe SQL familiar para processamento em lote incremental para arquivos de dados no armazenamento de objetos em nuvem. COPY INTO O comportamento é semelhante aos padrões suportados por tabelas de streaming para armazenamento de objetos na nuvem, mas nem todas as configurações padrão são equivalentes para todos os formatos de arquivo suportados.