Compartir a través de


Transformación de datos con Delta Live Tables

En este artículo se describe cómo puede usar Delta Live Tables para declarar transformaciones en conjuntos de datos y especificar cómo se procesan los registros a través de la lógica de consulta. También contiene ejemplos de patrones de transformación comunes para crear canalizaciones de Delta Live Tables.

Puede definir un conjunto de datos en cualquier consulta que devuelva un dataframe. Puede usar las operaciones integradas de Apache Spark, las UDF, la lógica personalizada y los modelos de MLflow como transformaciones en la canalización de Delta Live Tables. Una vez que los datos se han ingerido en la canalización de Delta Live Tables, puede definir nuevos conjuntos de datos en orígenes ascendentes para crear nuevas tablas de streaming, vistas materializadas y vistas.

Para obtener información sobre cómo realizar eficazmente el procesamiento con estado con tablas dinámicas diferenciales, consulte Optimizar el procesamiento con estado en Tablas dinámicas diferenciales con marcas de agua.

Cuándo usar vistas, vistas materializadas y tablas de streaming

Al implementar las consultas de canalización, elija el mejor tipo de conjunto de datos para asegurarse de que son eficaces y fáciles de mantener.

Considere la posibilidad de usar una vista para hacer lo siguiente:

  • Interrumpa una consulta grande o compleja que desee en consultas más fáciles de administrar.
  • Valide los resultados intermedios mediante expectativas.
  • Reduzca los costos de almacenamiento y proceso de los resultados que no es necesario conservar. Como las tablas se materializan, requieren recursos adicionales de cálculo y almacenamiento.

Considere la posibilidad de usar una vista materializada cuando:

  • Haya varias consultas de bajada que consuman la tabla. Dado que las vistas se calculan bajo demanda, la vista se recalcula cada vez que se consulta.
  • Otras canalizaciones, trabajos o consultas consumen la tabla. Dado que las vistas no se materializan, solo puede usarlas en la misma canalización.
  • Quiera ver los resultados de una consulta durante el desarrollo. Dado que las tablas se materializan y se pueden ver y consultar fuera de la canalización, el uso de tablas durante el desarrollo puede ayudar a validar la exactitud de los cálculos. Después de validar, convierta en vistas las consultas que no requieran materialización.

Considere la posibilidad de usar una tabla de streaming cuando:

  • Una consulta se define en un origen de datos que crece continua o incrementalmente.
  • Los resultados de la consulta deben calcularse incrementalmente.
  • La canalización necesita un alto rendimiento y una baja latencia.

Nota:

Las tablas de streaming siempre se definen en orígenes de streaming. También puede utilizar fuentes de streaming con APPLY CHANGES INTO para aplicar las actualizaciones de los feeds CDC. Consulte API PARA APLICAR CAMBIOS: simplificación de la captura de datos modificados con Delta Live Tables.

Excluir tablas del esquema de destino

Si debe calcular tablas intermedias no diseñadas para el consumo externo, puede impedir que se publiquen en un esquema mediante la TEMPORARY palabra clave . Las tablas temporales siguen almacenando y procesando datos según la semántica de Delta Live Tables, pero no se debe tener acceso a ellas fuera de la canalización actual. Una tabla temporal persiste durante la vigencia de la canalización que la crea. Use la sintaxis siguiente para declarar tablas temporales:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Combinación de tablas de streaming y vistas materializadas en una sola canalización

Las tablas de streaming heredan las garantías de procesamiento de Apache Spark Structured Streaming y están configuradas para procesar consultas de orígenes de datos de solo anexión, donde las nuevas filas siempre se insertan en la tabla de origen en lugar de modificarse.

Nota:

Aunque, de forma predeterminada, las tablas de flujo de datos requieren orígenes de datos de solo anexión, cuando un origen de flujo es otra tabla de flujo de datos que requiere actualizaciones o eliminaciones, puede invalidar este comportamiento con la marca skipChangeCommits.

Un patrón de streaming común implica la ingesta de datos de origen para crear los conjuntos de datos iniciales en una canalización. Estos conjuntos de datos iniciales se denominan comúnmente tablas de bronce, y suelen realizar transformaciones simples.

Por el contrario, las tablas finales de una canalización, que normalmente se denominan tablas gold, suelen requerir agregaciones complicadas o leer desde destinos de una APPLY CHANGES INTO operación. Dado que estas operaciones crean actualizaciones inherentemente en lugar de anexadas, no se admiten como entradas para transmitir tablas en directo. Estas transformaciones son más adecuadas para vistas materializadas.

Al mezclar tablas de flujo y vistas materializadas en una única canalización, puede simplificar su canalización, evitar la costosa reintroducción o reprocesamiento de datos sin procesar y disponer de toda la potencia de SQL para calcular agregaciones complejas sobre un conjunto de datos codificados y filtrados de forma eficaz. En el ejemplo siguiente se muestra este tipo de procesamiento mixto:

Nota:

En estos ejemplos se usa el cargador automático para cargar archivos desde el almacenamiento en la nube. Para cargar archivos con el cargador automático en una canalización habilitada para el catálogo de Unity, debe usar ubicaciones externas. Para obtener más información sobre el uso del catálogo de Unity con Delta Live Tables, consulte Uso del catálogo de Unity con las canalizaciones de Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Obtenga más información sobre el uso de Auto Loader para ingerir incrementalmente archivos JSON desde Azure Storage.

Combinaciones estáticas de secuencias

Las combinaciones de flujo por lotes son una buena opción cuando se desnormaliza un flujo continuo de datos de tipo apéndice con una tabla de dimensiones principalmente estática.

Con cada actualización de canalización, los nuevos registros de la secuencia se unen con la instantánea más actual de la tabla estática. Si los registros se agregan o actualizan en la tabla estática una vez procesados los datos correspondientes de la tabla de streaming, los registros resultantes no se vuelven a calcular a menos que se realice una actualización completa.

En las canalizaciones configuradas para la ejecución desencadenada, la tabla estática devuelve resultados a partir de la hora en que se inició la actualización. En las canalizaciones configuradas para la ejecución continua, se consulta la versión más reciente de la tabla estática cada vez que la tabla procesa una actualización.

A continuación se muestra un ejemplo de una combinación de secuencia estática:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Calcule los agregados de forma eficaz

Puede utilizar tablas de flujo para calcular incrementalmente agregados distributivos simples como recuento, mín., máx. o suma, y agregados algebraicos como media o desviación estándar. Databricks recomienda la agregación incremental para las consultas con un número limitado de grupos, como una consulta con una GROUP BY country cláusula . Solo se leen nuevos datos de entrada con cada actualización.

Para más información sobre cómo escribir consultas de Delta Live Tables que realizan agregaciones incrementales, vea Agregaciones en ventanas con marcas de agua.

Usar modelos de MLFlow en una canalización de Delta Live Tables

Nota:

Para usar modelos MLflow en una canalización habilitada para catálogo de Unity, la canalización debe configurarse para usar el canal preview. Para usar el canal current, debe configurar la canalización para publicarla en el metastore de Hive.

Puede usar modelos entrenados por MLflow en canalizaciones de Delta Live Tables. Los modelos de MLflow se tratan como transformaciones en Azure Databricks, lo que significa que actúan sobre una entrada de DataFrame de Spark y devuelven resultados como dataFrame de Spark. Dado que Delta Live Tables define conjuntos de datos en DataFrames, puede convertir cargas de trabajo de Apache Spark que usan MLflow en Delta Live Tables con solo unas pocas líneas de código. Para obtener más información sobre MLflow, consulte Administración del ciclo de vida de Machine Learning mediante MLflow.

Si ya tiene un cuaderno de Python que llama a un modelo de MLflow, puede adaptar este código a Delta Live Tables mediante el @dlt.table decorador y asegurarse de que las funciones se definen para devolver resultados de transformación. Delta Live Tables no instala MLflow de forma predeterminada, por lo que debe comprobar que usted %pip install mlflow e importe mlflow y dlt en la parte superior del cuaderno. Para obtener una introducción a la sintaxis de Delta Live Tables, consulte Implementación de una canalización de Delta Live Tables con Python.

Para usar modelos de MLflow en Delta Live Tables, complete los pasos siguientes:

  1. Obtenga el id. de ejecución y el nombre de modelo del modelo de MLFlow. El id. de ejecución y el nombre de modelo se usan para construir el URI del modelo de MLFlow.
  2. Use el URI para definir una UDF de Spark para cargar el modelo de MLFlow.
  3. Llame a la UDF en las definiciones de tabla para usar el modelo de MLFlow.

En el ejemplo siguiente se muestra la sintaxis básica de este patrón:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

En el ejemplo siguiente se define una UDF de Spark denominada loaded_model_udf, que carga un modelo de MLFlow entrenado con datos de riesgo de préstamos. Las columnas de datos usadas para realizar la predicción se pasan como argumento a la UDF. La tabla loan_risk_predictions calcula las predicciones de cada fila de loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Conservación de las eliminaciones o actualizaciones manuales

Delta Live Tables permite eliminar o actualizar manualmente los registros de una tabla y realizar una operación de actualización para volver a calcular las tablas de bajada.

De forma predeterminada, Delta Live Tables vuelve a calcular los resultados de la tabla en función de los datos de entrada cada vez que se actualiza una canalización, por lo que debe asegurarse de que el registro eliminado no se vuelve a cargar desde los datos de origen. Establecer la pipelines.reset.allowed propiedad table en false impide que las actualizaciones se actualicen a una tabla, pero no impide que las escrituras incrementales en las tablas o los nuevos datos fluyan a la tabla.

En el diagrama siguiente se muestra un ejemplo con dos tablas de streaming:

  • raw_user_table ingiere un conjunto de datos de usuario, sin procesar, de un origen.
  • bmi_table calcula incrementalmente los niveles de IMC, mediante el peso y la altura de raw_user_table.

Desea eliminar o actualizar manualmente los registros de usuario de raw_user_table y volver a calcular el bmi_table.

Diagrama de retención de datos

El código siguiente demuestra la configuración de la propiedad de tabla de pipelines.reset.allowed a false para desactivar la actualización completa para raw_user_table de manera que los cambios previstos se conserven en el tiempo, pero las tablas descendentes se vuelvan a calcular cuando se ejecuta una actualización de canalización:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);