Partilhar via


Transforme dados com Delta Live Tables

Este artigo descreve como você pode usar Delta Live Tables para declarar transformações em conjuntos de dados e especificar como os registros são processados por meio da lógica de consulta. Ele também contém alguns exemplos de padrões de transformação comuns que podem ser úteis ao criar pipelines Delta Live Tables.

Você pode definir um conjunto de dados em relação a qualquer consulta que retorna um DataFrame. Você pode usar operações internas do Apache Spark, UDFs, lógica personalizada e modelos MLflow como transformações em seu pipeline Delta Live Tables. Depois que os dados forem ingeridos em seu pipeline Delta Live Tables, você poderá definir novos conjuntos de dados em relação a fontes upstream para criar novas tabelas de streaming, exibições materializadas e exibições.

Para saber como executar efetivamente o processamento stateful com Delta Live Tables, consulte Otimizar o processamento stateful em Delta Live Tables com marcas d'água.

Quando usar modos de exibição, exibições materializadas e tabelas de streaming

Para garantir que seus pipelines sejam eficientes e fáceis de manter, escolha o melhor tipo de conjunto de dados ao implementar suas consultas de pipeline.

Considere o uso de um modo de exibição quando:

  • Você tem uma consulta grande ou complexa que deseja dividir em consultas mais fáceis de gerenciar.
  • Você deseja validar resultados intermediários usando expectativas.
  • Você deseja reduzir os custos de armazenamento e computação e não requer a materialização dos resultados da consulta. Como as tabelas são materializadas, elas exigem recursos adicionais de computação e armazenamento.

Considere o uso de uma visão materializada quando:

  • Várias consultas downstream consomem a tabela. Como os modos de exibição são computados sob demanda, o modo de exibição é recalculado toda vez que o modo de exibição é consultado.
  • Outros pipelines, trabalhos ou consultas consomem a tabela. Como as exibições não são materializadas, você só pode usá-las no mesmo pipeline.
  • Você deseja exibir os resultados de uma consulta durante o desenvolvimento. Como as tabelas são materializadas e podem ser visualizadas e consultadas fora do pipeline, o uso de tabelas durante o desenvolvimento pode ajudar a validar a correção dos cálculos. Após a validação, converta consultas que não exigem materialização em visualizações.

Considere o uso de uma tabela de streaming quando:

  • Uma consulta é definida em relação a uma fonte de dados que está crescendo contínua ou incrementalmente.
  • Os resultados da consulta devem ser calculados incrementalmente.
  • Alta taxa de transferência e baixa latência são desejadas para o pipeline.

Nota

As tabelas de streaming são sempre definidas em relação às fontes de streaming. Você também pode usar fontes de streaming com APPLY CHANGES INTO para aplicar atualizações de feeds CDC. Consulte APPLY CHANGES API: Simplifique a captura de dados de alteração no Delta Live Tables.

Combine tabelas de streaming e visualizações materializadas em um único pipeline

As tabelas de streaming herdam as garantias de processamento do Apache Spark Structured Streaming e são configuradas para processar consultas de fontes de dados somente acréscimo, onde novas linhas são sempre inseridas na tabela de origem em vez de modificadas.

Um padrão de streaming comum inclui a ingestão de dados de origem para criar os conjuntos de dados iniciais em um pipeline. Esses conjuntos de dados iniciais são comumente chamados de tabelas de bronze e geralmente executam transformações simples.

Por outro lado, as tabelas finais em um pipeline, comumente referidas como tabelas de ouro , muitas vezes exigem agregações complicadas ou leitura de fontes que são os alvos de uma APPLY CHANGES INTO operação. Como essas operações criam inerentemente atualizações em vez de acréscimos, elas não são suportadas como entradas para tabelas de streaming. Essas transformações são mais adequadas para visões materializadas.

Ao misturar tabelas de streaming e exibições materializadas em um único pipeline, você pode simplificar seu pipeline, evitar a dispendiosa reingestão ou reprocessamento de dados brutos e ter todo o poder do SQL para calcular agregações complexas em um conjunto de dados codificado e filtrado de forma eficiente. O exemplo a seguir ilustra esse tipo de processamento misto:

Nota

Estes exemplos usam o Auto Loader para carregar arquivos do armazenamento em nuvem. Para carregar ficheiros com o Carregador Automático num pipeline ativado para o Catálogo do Unity, deve usar localizações externas. Para saber mais sobre como utilizar o Catálogo unity com Tabelas Delta Live, veja Utilizar o Catálogo Unity com os pipelines do Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Saiba mais sobre como usar o Auto Loader para ler com eficiência arquivos JSON do armazenamento do Azure para processamento incremental.

Junções estáticas de fluxo

As junções estáticas de fluxo são uma boa opção ao desnormalizar um fluxo contínuo de dados somente de acréscimo com uma tabela de dimensões principalmente estática.

A cada atualização de pipeline, novos registros do fluxo são unidos ao instantâneo mais atual da tabela estática. Se os registros forem adicionados ou atualizados na tabela estática depois que os dados correspondentes da tabela de streaming tiverem sido processados, os registros resultantes não serão recalculados, a menos que uma atualização completa seja executada.

Em pipelines configurados para execução acionada, a tabela estática retorna resultados a partir do momento em que a atualização foi iniciada. Em pipelines configurados para execução contínua, cada vez que a tabela processa uma atualização, a versão mais recente da tabela estática é consultada.

Segue-se um exemplo de uma junção estática de fluxo:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Calcule agregados de forma eficiente

Você pode usar tabelas de streaming para calcular incrementalmente agregados distributivos simples, como contagem, min, max ou soma, e agregados algébricos, como média ou desvio padrão. O Databricks recomenda a agregação incremental para consultas com um número limitado de grupos, por exemplo, uma consulta com uma GROUP BY country cláusula. Apenas novos dados de entrada são lidos com cada atualização.

Para saber mais sobre como escrever consultas Delta Live Tables que executam agregações incrementais, consulte Executar agregações em janela com marcas d'água.

Usar modelos MLflow em um pipeline Delta Live Tables

Nota

Para usar modelos MLflow em um pipeline habilitado para Unity Catalog, seu pipeline deve ser configurado para usar o preview canal. Para usar o current canal, você deve configurar seu pipeline para publicar no metastore do Hive.

Você pode usar modelos treinados por MLflow em pipelines Delta Live Tables. Os modelos MLflow são tratados como transformações no Azure Databricks, o que significa que atuam sobre uma entrada do Spark DataFrame e retornam resultados como um Spark DataFrame. Como o Delta Live Tables define conjuntos de dados em relação a DataFrames, você pode converter cargas de trabalho do Apache Spark que aproveitam o MLflow para Delta Live Tables com apenas algumas linhas de código. Para obter mais informações sobre MLflow, consulte Gerenciamento do ciclo de vida de ML usando MLflow.

Se você já tiver um bloco de anotações Python chamando um modelo MLflow, poderá adaptar esse código ao Delta Live Tables usando o decorador e garantindo que as funções sejam definidas para retornar os resultados da @dlt.table transformação. Delta Live Tables não instala o MLflow por padrão, portanto, certifique-se %pip install mlflow de importar mlflow e dlt na parte superior do seu notebook. Para obter uma introdução à sintaxe Delta Live Tables, consulte Implementar um pipeline Delta Live Tables com Python.

Para usar modelos MLflow em Delta Live Tables, conclua as seguintes etapas:

  1. Obtenha a ID de execução e o nome do modelo MLflow. O ID de execução e o nome do modelo são usados para construir o URI do modelo MLflow.
  2. Use o URI para definir um UDF do Spark para carregar o modelo MLflow.
  3. Chame o UDF em suas definições de tabela para usar o modelo MLflow.

O exemplo a seguir mostra a sintaxe básica para esse padrão:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Como um exemplo completo, o código a seguir define um UDF do Spark chamado loaded_model_udf que carrega um modelo MLflow treinado em dados de risco de empréstimo. As colunas de dados usadas para fazer a previsão são passadas como um argumento para a UDF. A tabela loan_risk_predictions calcula previsões para cada linha no loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Manter exclusões ou atualizações manuais

O Delta Live Tables permite excluir ou atualizar manualmente registros de uma tabela e fazer uma operação de atualização para recalcular tabelas downstream.

Por padrão, o Delta Live Tables recalcula os resultados da tabela com base nos dados de entrada cada vez que um pipeline é atualizado, portanto, você deve garantir que o registro excluído não seja recarregado dos dados de origem. Definir a pipelines.reset.allowed propriedade table para false impedir atualizações em uma tabela, mas não impede gravações incrementais nas tabelas ou impede que novos dados fluam para a tabela.

O diagrama a seguir ilustra um exemplo usando duas tabelas de streaming:

  • raw_user_table ingere dados brutos do usuário de uma fonte.
  • bmi_table calcula incrementalmente os escores do IMC usando peso e altura de raw_user_table.

Você deseja excluir ou atualizar manualmente os registros de usuário do raw_user_table e recalcular o bmi_table.

Reter diagrama de dados

O código a seguir demonstra a configuração da propriedade table para false desabilitar a pipelines.reset.allowed atualização completa para raw_user_table que as alterações pretendidas sejam mantidas ao longo do tempo, mas as tabelas downstream são recalculadas quando uma atualização de pipeline é executada:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);