Compartilhar via


Carregar e processar dados incrementalmente com fluxos do Delta Live Tables

Este artigo explica o que são os fluxos e como você pode usá-los em pipelines do Delta Live Tables para processar dados incrementalmente de uma fonte para uma tabela de streaming de destino. No Delta Live Tables, os fluxos são definidos de duas maneiras:

  1. Um fluxo é definido automaticamente quando você cria uma consulta que atualiza uma tabela de streaming.
  2. O Delta Live Tables também fornece funcionalidade para definir explicitamente fluxos para processamento mais complexo, como acrescentar a uma tabela de streaming de várias fontes de streaming.

Este artigo discute os fluxos implícitos criados quando você define uma consulta para atualizar uma tabela de streaming e fornece detalhes sobre a sintaxe para definir fluxos mais complexos.

O que é um fluxo?

No Delta Live Tables, um fluxo é uma consulta de streaming que processa os dados de origem incrementalmente para atualizar uma tabela de streaming de destino. A maioria dos conjuntos de dados do Delta Live Tables que você cria em um pipeline define o fluxo como parte da consulta e não exige a definição explícita do fluxo. Por exemplo, você cria uma tabela de streaming no Delta Live Tables em um único comando DDL em vez de usar instruções de tabela e fluxo separadas para criar a tabela de streaming:

Observação

Este exemplo CREATE FLOW é fornecido apenas para fins ilustrativos e inclui palavras-chave que não são sintaxe válida do Delta Live Tables.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Além do fluxo padrão definido por uma consulta, as interfaces Python e SQL do Delta Live Tables fornecem a funcionalidade de fluxo de acréscimo. O fluxo de acréscimo dá suporte ao processamento que requer a leitura de dados de várias fontes de streaming para atualizar uma única tabela de streaming. Por exemplo, você pode usar a funcionalidade de fluxo de acréscimo quando tiver uma tabela de streaming e um fluxo existentes e quiser adicionar uma nova fonte de streaming que grava nessa tabela de streaming existente.

Usar o fluxo de acréscimo para gravar em uma tabela de streaming a partir de vários streams de origem

Observação

Para usar o processamento de fluxo anexado, seu pipeline deve ser configurado para usar o canal de visualização.

Use o decorador @append_flow na interface do Python ou a cláusula CREATE FLOW na interface SQL para gravar em uma tabela de streaming de várias fontes de streaming. Use o fluxo de acréscimo para tarefas de processamento, como a seguinte:

  • Adicionar fontes de streaming que acrescentam dados a uma tabela de streaming existente sem a necessidade de uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de todas as regiões nas quais você opera. À medida que novas regiões são distribuídas, você pode adicionar os novos dados de região à tabela sem executar uma atualização completa. Consulte Exemplo: gravar em uma tabela de streaming de vários tópicos do Kafka.
  • Atualizar uma tabela de streaming acrescentando dados históricos ausentes (provisionamento). Por exemplo, você tem uma tabela de streaming existente que é gravada por um tópico do Apache Kafka. Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de streaming e não pode transmitir os dados porque seu processamento inclui executar uma agregação complexa antes de inserir os dados. Consulte Exemplo: executar um provisionamento único de dados.
  • Combine dados de várias fontes e escreva em uma única tabela de streaming em vez de usar a cláusula UNION em uma consulta. Usar o processamento de fluxo de acréscimo em vez de UNION permite que você atualize a tabela de destino incrementalmente sem executar uma atualização completa. Veja Exemplo: use o processamento de fluxo de acréscimo em vez de UNION.

O destino da saída de registros pelo processamento de fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.

Importante

  • Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função create_streaming_table() ou uma definição de tabela existente. Você não pode definir expectativas na definição @append_flow.
  • Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
    • Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
    • Não é possível reutilizar um nome de fluxo em um pipeline, pois o ponto de verificação existente não corresponderá à nova definição de fluxo.

A seguir está a sintaxe de @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Exemplo: gravar em uma tabela de streaming de vários tópicos do Kafka

Os exemplos a seguir criam uma tabela de streaming nomeada kafka_target e grava nessa tabela de streaming a partir dois tópicos do Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para saber mais sobre a função com valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência de linguagem SQL.

Exemplo: executar um provisionamento único de dados

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de streaming:

Observação

Para garantir um verdadeiro provisionamento único quando a consulta de provisionamento fizer parte de um pipeline que é executado de forma agendada ou continuamente, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem ao diretório de provisionamento, deixe a consulta em vigor.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Exemplo: use o processamento de fluxo de acréscimo em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION, você pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de streaming. Usar consultas de fluxo de acréscimo em vez de UNION permite que você acrescente a uma tabela de streaming de várias fontes sem executar uma atualização completa.

O exemplo do Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );