Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo se describe cómo puede usar canalizaciones para declarar transformaciones en conjuntos de datos y especificar cómo se procesan los registros a través de la lógica de consulta. También contiene ejemplos de patrones de transformación comunes para construir canalizaciones.
Puede definir un conjunto de datos en cualquier consulta que devuelva un DataFrame. Puede usar operaciones integradas de Apache Spark, UDFs, lógica personalizada y modelos de MLflow como transformaciones en las canalizaciones declarativas de Lakeflow Spark. Una vez que los datos se han ingerido en la canalización, puede definir nuevos conjuntos de datos a partir de orígenes ascendentes para crear nuevas tablas de transmisión en streaming, vistas materializadas y vistas.
Para obtener información sobre cómo realizar eficazmente el procesamiento con estado en una canalización, consulte Optimizar el procesamiento con estado usando marcas de agua.
Cuándo usar vistas, vistas materializadas y tablas de streaming
Al implementar las consultas de canalización, elija el mejor tipo de conjunto de datos para asegurarse de que son eficaces y fáciles de mantener.
Considere la posibilidad de usar una vista para hacer lo siguiente:
- Interrumpa una consulta grande o compleja que desee en consultas más fáciles de administrar.
- Valide los resultados intermedios mediante expectativas.
- Reduzca los costos de almacenamiento y computación para los resultados que no necesita conservar. Como las tablas se materializan, requieren recursos adicionales de cálculo y almacenamiento.
Considere la posibilidad de usar una vista materializada cuando:
- Hay varias consultas descendentes que consumen la tabla. Dado que las vistas se calculan bajo demanda, la vista se recalcula cada vez que se consulta.
- Otras canalizaciones, trabajos o consultas consumen la tabla. Dado que las vistas no se materializan, solo puede usarlas en la misma canalización.
- Te gustaría ver los resultados de una consulta durante el desarrollo. Dado que las tablas se materializan y se pueden ver y consultar fuera de la canalización, el uso de tablas durante el desarrollo puede ayudar a validar la exactitud de los cálculos. Después de validar, convierta en vistas las consultas que no requieran materialización.
Considere la posibilidad de usar una tabla de streaming cuando:
- Una consulta se define en un origen de datos que crece continua o incrementalmente.
- Los resultados de la consulta deben calcularse incrementalmente.
- La canalización necesita un alto rendimiento y una baja latencia.
Nota:
Las tablas de streaming siempre se definen en orígenes de streaming. También puede usar orígenes de streaming con AUTO CDC ... INTO para aplicar actualizaciones de fuentes CDC. Consulte Las API DE AUTO CDC: Simplificación de la captura de datos modificados con canalizaciones.
Excluir tablas del esquema de destino
Si debe calcular tablas intermedias no diseñadas para el consumo externo, puede impedir que se publiquen en un esquema mediante la TEMPORARY palabra clave . Las tablas temporales siguen almacenando y procesando datos según la semántica de canalizaciones declarativas de Spark de Lakeflow, pero no se debe tener acceso a ellos fuera de la canalización actual. Una tabla temporal persiste durante la vigencia de la canalización que la crea. Use la sintaxis siguiente para declarar tablas temporales:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Pitón
@dp.table(
temporary=True)
def temp_table():
return ("...")
Combinación de tablas de streaming y vistas materializadas en una sola canalización
Las tablas de streaming heredan las garantías de procesamiento de Apache Spark Structured Streaming y están configuradas para procesar consultas de orígenes de datos de solo anexión, donde las nuevas filas siempre se insertan en la tabla de origen en lugar de modificarse.
Nota:
Aunque, de manera predeterminada, las tablas de streaming requieren orígenes de datos de solo anexión, cuando un origen de streaming es otra tabla de streaming que requiere actualizaciones o eliminaciones, puede invalidar este comportamiento con la marca skipChangeCommits
Un patrón de streaming común implica la ingesta de datos de origen para crear los conjuntos de datos iniciales en una canalización. Estos conjuntos de datos iniciales se denominan comúnmente tablas de bronce, y suelen realizar transformaciones simples.
Por el contrario, las tablas finales de una canalización, comúnmente llamadas tablas de oro, a menudo requieren complicadas agregaciones o se leen desde objetivos de una operación AUTO CDC ... INTO. Dado que estas operaciones crean actualizaciones inherentemente en lugar de anexarse, no se admiten como entradas para las tablas de streaming. Estas transformaciones son más adecuadas para vistas materializadas.
Al mezclar tablas de flujo y vistas materializadas en una única canalización, puede simplificar su canalización, evitar la costosa reintroducción o reprocesamiento de datos sin procesar y disponer de toda la potencia de SQL para calcular agregaciones complejas sobre un conjunto de datos codificados y filtrados de forma eficaz. En el ejemplo siguiente se muestra este tipo de procesamiento mixto:
Nota:
En estos ejemplos se usa el cargador automático para cargar archivos desde el almacenamiento en la nube. Para cargar archivos con el autocargador en una canalización habilitada para el catálogo de Unity, debe usar ubicaciones externas. Para más información sobre el uso del catálogo de Unity con canalizaciones, consulte Uso del catálogo de Unity con canalizaciones.
Pitón
@dp.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dp.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dp.materialized_view
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.read.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Obtenga más información sobre el uso de Auto Loader para ingerir incrementalmente archivos JSON desde Azure Storage.
combinaciones estáticas de stream
Las combinaciones de flujo por lotes son una buena opción cuando se desnormaliza un flujo continuo de datos de tipo apéndice con una tabla de dimensiones principalmente estática.
Con cada actualización de canalización, los nuevos registros de la secuencia se unen con la instantánea más actual de la tabla estática. Si los registros se agregan o actualizan en la tabla estática una vez procesados los datos correspondientes de la tabla de streaming, los registros resultantes no se vuelven a calcular a menos que se realice una actualización completa.
En las canalizaciones configuradas para la ejecución desencadenada, la tabla estática devuelve resultados a partir de la hora en que se inició la actualización. En las canalizaciones configuradas para la ejecución continua, se consulta la versión más reciente de la tabla estática cada vez que la tabla procesa una actualización.
A continuación se muestra un ejemplo de una combinación de secuencia estática:
Pitón
@dp.table
def customer_sales():
return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Calcule los agregados de forma eficaz
Puede utilizar tablas de flujo para calcular incrementalmente agregados distributivos simples como recuento, mín., máx. o suma, y agregados algebraicos como media o desviación estándar. Databricks recomienda la agregación incremental para las consultas con un número limitado de grupos, como una consulta con una GROUP BY country cláusula . Solo se leen nuevos datos de entrada con cada actualización.
Para obtener más información sobre cómo redactar consultas de canalizaciones declarativas Lakeflow Spark que realizan agregaciones incrementales, consulte Realizar agregaciones en ventanas con marcas de agua.
Uso de modelos de MLflow en canalizaciones declarativas de Spark de Lakeflow
Nota:
Para usar modelos MLflow en una canalización habilitada para catálogo de Unity, la canalización debe configurarse para usar el canal preview. Para usar el canal de current, debe configurar la canalización para publicarla en el metastore de Hive.
Puede usar modelos entrenados por MLflow en canalizaciones. Los modelos de MLflow se tratan como transformaciones en Azure Databricks, lo que significa que actúan sobre una entrada de DataFrame de Spark y devuelven resultados como un DataFrame de Spark. Dado que Lakeflow Spark Declarative Pipelines define conjuntos de datos en DataFrames, puede convertir cargas de trabajo de Apache Spark que usan MLflow en canalizaciones con solo unas pocas líneas de código. Para más información sobre MLflow, consulte MLflow para el ciclo de vida del modelo de ML.
Si ya tiene un script de Python que llama a un modelo de MLflow, puede adaptar este código a una canalización utilizando el decorador @dp.table o @dp.materialized_view y asegurándose de que las funciones estén definidas para devolver los resultados de la transformación. Las canalizaciones declarativas de Spark de Lakeflow no instalan MLflow de forma predeterminada, por lo que debe confirmar que ha instalado las bibliotecas de MLFlow con %pip install mlflow y que ha importado mlflow y dp en la parte superior del origen. Para obtener una introducción a la sintaxis de canalización, consulte Desarrollo de código de canalización con Python.
Para usar modelos de MLflow en canalizaciones, complete los pasos siguientes:
- Obtenga el identificador de ejecución y el nombre del modelo de MLflow. El id. de ejecución y el nombre de modelo se usan para construir el URI del modelo de MLFlow.
- Use el URI para definir una UDF de Spark para cargar el modelo de MLFlow.
- Llame a la UDF en las definiciones de tabla para usar el modelo de MLflow.
En el ejemplo siguiente se muestra la sintaxis básica de este patrón:
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dp.materialized_view
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
En el ejemplo siguiente se define una UDF de Spark denominada loaded_model_udf, que carga un modelo de MLFlow entrenado con datos de riesgo de préstamos. Las columnas de datos usadas para realizar la predicción se pasan como argumento a la UDF. La tabla loan_risk_predictions calcula las predicciones de cada fila de loan_risk_input_data.
%pip install mlflow
from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dp.materialized_view(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Conservación de las eliminaciones o actualizaciones manuales
Las canalizaciones declarativas de Spark de Lakeflow permiten eliminar o actualizar manualmente los registros de una tabla y realizar una operación de actualización para volver a calcular las tablas de bajada.
De forma predeterminada, las canalizaciones vuelven a calcular los resultados de la tabla en función de los datos de entrada cada vez que se actualizan, por lo que debe asegurarse de que el registro eliminado no se vuelve a cargar de los datos de origen. Establecer la pipelines.reset.allowed propiedad table en false impide que las actualizaciones se actualicen a una tabla, pero no impide que las escrituras incrementales en las tablas o los nuevos datos fluyan a la tabla.
En el diagrama siguiente se muestra un ejemplo con dos tablas de streaming:
-
raw_user_tableingiere un conjunto de datos de usuario, sin procesar, de un origen. -
bmi_tablecalcula incrementalmente las puntuaciones del IMC mediante el peso y la altura deraw_user_table.
Desea eliminar o actualizar manualmente los registros de usuario de raw_user_table y volver a calcular el bmi_table.
En el código siguiente se muestra cómo establecer la propiedad de tabla pipelines.reset.allowed en false para deshabilitar la actualización completa de raw_user_table para que los cambios previstos se conserven con el tiempo, pero las tablas de bajada se vuelven a calcular cuando se ejecuta una actualización de canalización:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);