Compartilhar via


Tabelas de streaming

Uma tabela de streaming é uma tabela Delta com suporte adicional para streaming ou processamento de dados incrementais. Uma tabela de streaming pode ser direcionada por um ou mais fluxos em um pipeline de ETL.

As tabelas de streaming são uma boa opção para ingestão de dados pelos seguintes motivos:

  • Cada linha de entrada é manipulada apenas uma vez, o que modela a grande maioria das cargas de trabalho de ingestão (ou seja, acrescentando ou inserindo linhas em uma tabela).
  • Elas podem lidar com grandes volumes de dados somente acréscimo.

As tabelas de streaming também são uma boa opção para transformações de streaming de baixa latência pelos seguintes motivos:

  • Analisar em linhas e janelas de tempo
  • Lidar com grandes volumes de dados
  • Baixa latência

O diagrama a seguir ilustra como as tabelas de streaming funcionam.

Diagrama que mostra como as tabelas de streaming funcionam

Em cada atualização, os fluxos associados a uma tabela de streaming leem as informações alteradas em uma fonte de streaming e acrescentam novas informações a essa tabela.

As tabelas de streaming são definidas e atualizadas por um único pipeline. Você define explicitamente tabelas de streaming no código-fonte do pipeline. Tabelas definidas por um pipeline não podem ser alteradas ou atualizadas por nenhum outro pipeline. Você pode definir vários fluxos para acrescentar a uma única tabela de streaming.

Quando você cria uma tabela de streaming fora de um pipeline no Databricks SQL, o Databricks cria um pipeline oculto que é usado para atualizar essa tabela.

Para obter mais informações sobre fluxos, consulte Carregar e processar dados incrementalmente com fluxos de Pipelines Declarativos do Lakeflow.

Tabelas de streaming para ingestão

As tabelas de streaming são projetadas para fontes de dados somente acréscimo e entradas de processo apenas uma vez.

O exemplo a seguir mostra como usar uma tabela de streaming para ingerir novos arquivos do armazenamento em nuvem.

Pitão

import dlt

# create a streaming table
@dlt.table
def customers_bronze():
  return (
    spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .option("cloudFiles.inferColumnTypes", "true")
     .load("/Volumes/path/to/files")
  )

Quando você usa a spark.readStream função em uma definição de conjunto de dados, ela faz com que os Pipelines Declarativos do Lakeflow tratem o conjunto de dados como um fluxo e a tabela criada é uma tabela de streaming.

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
  "/volumes/path/to/files",
  format => "json"
);

Para obter mais detalhes sobre como carregar dados em tabelas de streaming, consulte Carregar dados com Pipelines Declarativos do Lakeflow.

O diagrama a seguir ilustra como as tabelas de streaming somente acréscimo funcionam.

Diagrama que mostra como as tabelas de streaming somente acréscimo funcionam

Uma linha que já foi anexada a uma tabela de streaming não será consultada novamente com atualizações posteriores para o pipeline. Se você modificar a consulta (por exemplo, de SELECT LOWER (name) para SELECT UPPER (name)), as linhas existentes não serão atualizadas para maiúsculas, mas novas linhas serão maiúsculas. Você pode disparar uma atualização completa para todos os dados anteriores da tabela de origem para atualizar todas as linhas na tabela de streaming.

Transmissão de tabelas e streaming de baixa latência

As tabelas de streaming são projetadas para streaming de baixa latência em relação ao estado limitado. As tabelas de streaming usam o gerenciamento de ponto de verificação, o que as torna adequadas para streaming de baixa latência. No entanto, eles esperam fluxos que são naturalmente limitados ou limitados com uma marca d'água.

Um fluxo naturalmente limitado é produzido por uma fonte de dados de streaming que tem um início e um término bem definidos. Um exemplo de um fluxo naturalmente limitado é ler dados de um diretório de arquivos em que nenhum novo arquivo está sendo adicionado depois que um lote inicial de arquivos é colocado. O fluxo é considerado limitado porque o número de arquivos é finito e, em seguida, o fluxo termina depois que todos os arquivos são processados.

Você também pode usar uma marca d'água para associar um fluxo. Uma marca d'água no Streaming Estruturado do Spark é um mecanismo que ajuda a lidar com dados atrasados especificando por quanto tempo o sistema deve aguardar eventos atrasados antes de considerar a janela de tempo como concluída. Um fluxo ilimitado que não tem uma marca d'água pode fazer com que um pipeline falhe devido à pressão de memória.

Para mais informações sobre processamento de fluxo com estado, consulte Otimizar o processamento de fluxo com estado em Pipelines Declarativas do Lakeflow com marcas d'água.

Junções de instantâneo do fluxo

As junções de instantâneo do fluxo são junções entre um fluxo e uma dimensão que é instantâneo quando os fluxos são iniciados. Essas junções não serão recomputadas se a dimensão for alterada após o início do fluxo, pois a tabela de dimensões é tratada como um instantâneo no tempo e as alterações na tabela de dimensão após o início do fluxo não serão refletidas, a menos que você recarregue ou atualize a tabela de dimensões. Esse é um comportamento razoável se você pode aceitar pequenas discrepâncias em uma junção. Por exemplo, uma união aproximada é aceitável quando o número de transações é muitas ordens de magnitude maior que o número de clientes.

No exemplo de código a seguir, ingressamos em uma tabela de dimensões, clientes, com duas linhas com um conjunto de dados cada vez maior, transações. Materializamos uma junção entre esses dois conjuntos de dados em uma tabela chamada sales_report. Observe que, se um processo externo atualizar a tabela de clientes adicionando uma nova linha (customer_id=3, name=Zoya), essa nova linha NÃO estará presente no join porque a tabela de dimensões estática foi fotografada quando os fluxos foram iniciados.

import dlt

@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
  return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
  return spark.read.table("customers")

@dlt.table
def sales_report():
  facts = spark.readStream.table("v_transactions")
  dims = spark.read.table("v_customers")

  return (
    facts.join(dims, on="customer_id", how="inner"
  )

Limitações da tabela de streaming

As tabelas de streaming têm as seguintes limitações:

  • Evolução limitada: Você pode alterar a consulta sem recompusar todo o conjunto de dados. Como uma tabela de streaming só vê uma linha uma vez, você pode ter consultas diferentes operando em linhas diferentes. Isso significa que você deve estar ciente de todas as versões anteriores da consulta que estão em execução em seu conjunto de dados. Uma atualização completa é necessária para fazer com que a tabela de streaming atualize os dados que já foram processados.
  • Gerenciamento de estado: as tabelas de streaming são de baixa latência, portanto, você precisa garantir que os fluxos em que operam sejam naturalmente limitados ou limitados com marca d'água. Para obter mais informações, consulte Otimizar o processamento com estado em Pipelines Declarativos do Lakeflow com marcas d'água.
  • As junções não são recomputadas: As junções em tabelas de streaming não são recomputadas quando as dimensões são alteradas. Essa característica pode ser boa para cenários "rápidos, mas errados". Se você quiser que sua visão esteja sempre correta, pode utilizar uma visão materializada. As exibições materializadas estão sempre corretas porque recomputam automaticamente as junções quando as dimensões são alteradas. Para obter mais informações, veja Exibições materializadas.