Tabelas de streaming

Uma tabela de streaming é uma tabela Delta com suporte adicional para streaming ou processamento de dados incrementais. Uma tabela de streaming pode ser direcionada por um ou mais fluxos em um pipeline.

As tabelas de streaming são uma boa opção para ingestão de dados pelos seguintes motivos:

  • Cada linha de entrada é manipulada apenas uma vez, o que modela a grande maioria das cargas de trabalho de ingestão (ou seja, acrescentando ou inserindo linhas em uma tabela).
  • Elas podem lidar com grandes volumes de dados somente acréscimo.

As tabelas de streaming também são uma boa opção para transformações de streaming de baixa latência porque podem raciocinar sobre linhas e janelas de tempo, lidar com grandes volumes de dados e fornecer processamento de baixa latência.

O diagrama a seguir mostra como os fluxos leem de fontes de streaming e gravam incrementalmente em uma tabela de streaming dentro de um pipeline.

Diagrama mostrando fontes de streaming S3, Kafka e Pub/Sub conectadas através de fluxos individuais que leem novos dados para um pipeline que contém uma tabela de streaming.

Em cada atualização, os fluxos associados a uma tabela de streaming leem as informações alteradas em uma fonte de streaming e acrescentam novas informações a essa tabela.

As tabelas de streaming são de propriedade e atualizadas por um único pipeline. Você define explicitamente tabelas de streaming no código-fonte do pipeline. Tabelas definidas por um pipeline não podem ser alteradas ou atualizadas por nenhum outro pipeline. Você pode definir vários fluxos para acrescentar a uma única tabela de streaming.

Azure Databricks cria tabelas internas para suportar o processamento de tabelas de streaming. Essas tabelas aparecem em system.information_schema.tables, mas não estão visíveis no Gerenciador de Catálogos ou em outras páginas da interface do usuário do ambiente de trabalho.

Observação

Quando você cria uma tabela de streaming fora de um pipeline usando o Databricks SQL, Azure Databricks cria um pipeline usado para atualizar a tabela. Você pode ver o pipeline selecionando Trabalhos & Pipelines no menu à esquerda em seu espaço de trabalho. Você pode adicionar a coluna Tipo de pipeline à sua exibição. As tabelas de streaming definidas em um pipeline têm um tipo de ETL. As tabelas de streaming criadas no Databricks SQL têm um tipo de MV/ST.

Para obter mais informações sobre fluxos, consulte Carregar e processar dados incrementalmente com fluxos de Pipelines Declarativos do Lakeflow Spark.

Tabelas de streaming para ingestão

As tabelas de streaming são projetadas para fontes de dados somente acréscimo e entradas de processo apenas uma vez. Isso os torna adequados para cargas de trabalho de ingestão em que os dados chegam continuamente e devem ser capturados de forma confiável sem reprocessar registros existentes. Azure Databricks dá suporte à ingestão de dados do armazenamento em nuvem e de barramentos de mensagens de streaming.

Ingerir arquivos do armazenamento em nuvem

Você pode usar uma tabela de streaming para ingerir novos arquivos do armazenamento em nuvem. Esses exemplos usam o Carregador Automático para processar incrementalmente novos arquivos à medida que chegam.

Python

from pyspark import pipelines as dp

# Create a streaming table
@dp.table
def customers_bronze():
  return (
    spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .option("cloudFiles.inferColumnTypes", "true")
     .load("/Volumes/path/to/files")
  )

Para criar uma tabela de Streaming, a definição do conjunto de dados deve ser um tipo de fluxo. Quando você usa a spark.readStream função em uma definição de conjunto de dados, ela retorna um conjunto de dados de streaming.

SQL

-- Create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
  "/volumes/path/to/files",
  format => "json"
);

As tabelas de streaming exigem conjuntos de dados de streaming. A STREAM palavra-chave antes de read_files informa à expressão de consulta para tratar o conjunto de dados como um fluxo.

Ingerir mensagens de streaming

Você também pode usar tabelas de streaming para ingerir dados de barramentos de mensagens. O exemplo a seguir demonstra como criar uma tabela de Streaming que lê de um tópico Pub/Sub.

Python

@dp.table
def pubsub_raw():
  auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
  }
  return (
    spark.readStream
      .format("pubsub")
      .option("subscriptionId", "my-subscription")
      .option("topicId", "my-topic")
      .option("projectId", "my-project")
      .options(auth_options)
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'my-subscription',
  projectId => 'my-project',
  topicId => 'my-topic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

O Databricks recomenda o uso de segredos ao fornecer opções de autorização. Consulte Configurar o acesso ao Pub/Sub para todas as opções de autenticação.

Para obter mais detalhes sobre como carregar dados na tabela de streaming, consulte Carregar dados em pipelines.

O diagrama a seguir ilustra como as tabelas de streaming somente acréscimo funcionam.

Diagrama que mostra como as tabelas de streaming somente acréscimo funcionam

Uma linha que já foi anexada a uma tabela de Streaming não será consultada novamente com atualizações posteriores para o pipeline. Se você modificar a consulta (por exemplo, de SELECT LOWER (name) para SELECT UPPER (name)), as linhas existentes não serão atualizadas para serem maiúsculas, mas novas linhas serão maiúsculas. Você pode disparar uma atualização completa para consultar novamente todos os dados anteriores da tabela de origem, a fim de atualizar todas as linhas na tabela de Streaming.

Tabelas de streaming e streaming de baixa latência

As tabelas de streaming são projetadas para streaming de baixa latência em relação ao estado limitado. As tabelas de streaming usam o gerenciamento de ponto de verificação, o que as torna adequadas para streaming de baixa latência. No entanto, eles esperam fluxos que são naturalmente limitados ou limitados com uma marca d'água.

Um fluxo naturalmente limitado é produzido por uma fonte de dados de streaming que tem um início e um término bem definidos. Um exemplo de um fluxo naturalmente limitado é ler dados de um diretório de arquivos em que nenhum novo arquivo está sendo adicionado depois que um lote inicial de arquivos é colocado. O fluxo é considerado limitado porque o número de arquivos é finito e o fluxo termina depois que todos os arquivos são processados.

Você também pode usar uma marca d'água para associar um fluxo. Uma marca d'água no Streaming Estruturado é um mecanismo que ajuda a lidar com dados atrasados, especificando por quanto tempo o sistema deve esperar por eventos atrasados antes de considerar a janela de tempo como concluída. Um fluxo ilimitado que não tem uma marca d'água pode fazer com que um pipeline falhe devido à pressão de memória.

Para obter mais informações sobre o processamento de fluxo com estado, consulte Otimizar o processamento com estado usando marcas d'água.

Junções de instantâneo do fluxo

As junções de fluxo e instantâneo conectam um dataset em streaming a uma tabela de dimensão que é capturada no início do fluxo. Como a tabela de dimensões é tratada como fixa nesse ponto no tempo, as alterações feitas nela após o início do fluxo não são refletidas na junção. Isso é aceitável quando pequenas discrepâncias não importam – por exemplo, quando o número de transações é várias ordens de magnitude maior que o número de clientes.

O exemplo de código a seguir une uma tabela de dimensões com duas linhas chamadas customers com um conjunto de dados cada vez maior. transactions Ele materializa uma junção entre esses dois conjuntos de dados em uma tabela chamada sales_report. Se um processo externo atualizar a tabela de clientes adicionando uma nova linha (customer_id=3, name=Zoya), essa nova linha não estará presente na junção porque a tabela de dimensões estática foi capturada quando os fluxos foram iniciados.

from pyspark import pipelines as dp

@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
  return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
  return spark.read.table("customers")

@dp.table
def sales_report():
  facts = spark.readStream.table("v_transactions")
  dims = spark.read.table("v_customers")

  return facts.join(dims, on="customer_id", how="inner")

Limitações da tabela de streaming

As tabelas de streaming têm as seguintes limitações:

  • Evolução limitada: Você pode alterar a consulta sem recompusar todo o conjunto de dados. Sem uma atualização completa, uma tabela streaming só vê cada linha uma vez, portanto, consultas diferentes terão linhas diferentes processadas. Por exemplo, se você adicionar UPPER() a um campo na consulta, somente as linhas processadas após a alteração estarão em letras maiúsculas. Isso significa que você deve estar ciente de todas as versões anteriores da consulta que estão em execução no conjunto de dados. Para reprocessar linhas existentes que foram processadas antes da alteração, uma atualização completa é necessária.
  • Gerenciamento de estado: As tabelas de streaming são de baixa latência e exigem streams naturalmente limitados ou delimitados com uma marca d'água. Para obter mais informações, consulte Otimizar o processamento stateful com watermarks.
  • As junções não são recomputadas: As junções em tabelas de streaming não são recomputadas quando as dimensões são alteradas. Essa característica pode ser boa para cenários "rápidos, mas errados". Se você quiser que sua visão esteja sempre correta, talvez queira usar uma visão materializada. As exibições materializadas estão sempre corretas porque recomputam automaticamente as junções quando as dimensões são alteradas. Para obter mais informações, veja Exibições materializadas.
  • Sem CLONE suporte: as tabelas de streaming não podem ser usadas como origem nem como destino de um clone profundo ou superficial. Para outros comandos sem suporte, consulte Limitações.