Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Lakeflow Declarative Pipelines presenta varias nuevas construcciones de código de Python para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad de Python para desarrollar canalizaciones se basa en los conceptos básicos de las API de DataFrame y Structured Streaming de PySpark.
Para los usuarios que no están familiarizados con Python y DataFrames, Databricks recomienda usar la interfaz de SQL. Consulte Desarrollo de código de canalización con SQL.
Para obtener una referencia completa de la sintaxis de Python de Las canalizaciones declarativas de Lakeflow, consulte Referencia del lenguaje Python de Canalizaciones declarativas de Lakeflow.
Conceptos básicos de Python para el desarrollo de canalizaciones
El código de Python que crea conjuntos de datos de canalizaciones declarativas de Lakeflow debe devolver dataframes.
Las API de Python de las canalizaciones declarativas de Lakeflow están implementadas en el módulo dlt
. El código de canalizaciones declarativas de Lakeflow implementado con Python debe importar explícitamente el dlt
módulo en la parte superior de los cuadernos y archivos de Python.
Lee y escribe de forma predeterminada el catálogo y el esquema especificados durante la configuración de la canalización. Consulte Definir el catálogo y el esquema de destino.
El código de Python específico de Las canalizaciones declarativas de Lakeflow difiere de otros tipos de código de Python de una manera crítica: el código de canalización de Python no llama directamente a las funciones que realizan la ingesta y transformación de datos para crear conjuntos de datos de canalizaciones declarativas de Lakeflow. En su lugar, Lakeflow Declarative Pipelines interpreta las funciones de decorador del dlt
módulo en todos los archivos de código fuente configurados en una canalización y crea un grafo de flujo de datos.
Importante
Para evitar un comportamiento inesperado cuando se ejecuta la canalización, no incluya código que pueda tener efectos secundarios en las funciones que definen conjuntos de datos. Para obtener más información, vea la referencia de Python.
Creación de una vista materializada o una tabla de streaming con Python
El @dlt.table
decorador indica a Lakeflow Declarative Pipelines que cree una vista materializada o una tabla de streaming en función de los resultados devueltos por una función. Los resultados de una lectura por lotes crean una vista materializada, mientras que los resultados de una lectura de streaming crean una tabla de streaming.
De forma predeterminada, los nombres de tabla de streaming y vista materializada se deducen de los nombres de función. En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada y una tabla de streaming:
Nota:
Ambas funciones hacen referencia a la misma tabla del samples
catálogo y usan la misma función de decorador. Estos ejemplos resaltan que la única diferencia en la sintaxis básica para las vistas materializadas y las tablas de streaming se usa spark.read
frente a spark.readStream
.
No todos los orígenes de datos admiten lecturas de streaming. Algunos orígenes de datos siempre deben procesarse con la semántica de streaming.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Opcionalmente, puede especificar el nombre de la tabla mediante el name
argumento en el @dlt.table
decorador. En el ejemplo siguiente se muestra este patrón para una vista materializada y una tabla de streaming:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Carga de datos desde el almacenamiento de objetos
Lakeflow Declarative Pipelines admite la carga de datos desde todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.
Nota:
En estos ejemplos se usan datos disponibles en /databricks-datasets
, montados automáticamente en el área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.
Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.
En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Validación de datos con expectativas
Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con las expectativas de canalización.
El código siguiente usa @dlt.expect_or_drop
para definir una expectativa denominada valid_data
que quita registros que son NULL durante la ingesta de datos:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Consulte las vistas materializadas y las tablas de streaming definidas en su canalización
En el ejemplo siguiente se definen cuatro conjuntos de datos:
- Una tabla de streaming denominada
orders
que carga datos JSON. - Una vista materializada denominada
customers
que carga datos CSV. - Una vista materializada denominada
customer_orders
que combina registros de los conjuntos de datosorders
ycustomers
, convierte la marca de tiempo de pedido en una fecha y selecciona los camposcustomer_id
,order_number
,state
yorder_date
. - Una vista materializada denominada
daily_orders_by_state
que agrega el recuento diario de pedidos para cada estado.
Nota:
Al consultar vistas o tablas en la canalización, puede especificar directamente el catálogo y el esquema, o puede usar los valores predeterminados configurados en la canalización. En este ejemplo, las tablas orders
, customers
y customer_orders
se escriben y leen desde el catálogo y el esquema predeterminados configurados para la canalización.
El modo de publicación heredado usa el esquema de LIVE
para consultar otras vistas materializadas y tablas de streaming definidas en la canalización. En las nuevas canalizaciones, se omite silenciosamente la sintaxis de esquema LIVE
. Consulte Esquema LIVE (heredado).
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Crear tablas en un bucle de for
Puede usar bucles de Python for
para crear varias tablas mediante programación. Esto puede ser útil cuando tiene muchos orígenes de datos o conjuntos de datos de destino que varían en solo unos pocos parámetros, lo que da como resultado un código menos total para mantener y menos redundancia de código.
El for
bucle evalúa la lógica en orden serie, pero una vez completado el planeamiento de los conjuntos de datos, la canalización ejecuta lógica en paralelo.
Importante
Al usar este patrón para definir conjuntos de datos, asegúrese de que la lista de valores pasados al for
bucle siempre es suma. Si un conjunto de datos previamente definido en una canalización se omite en una ejecución futura de esa canalización, dicho conjunto de datos se elimina automáticamente del esquema de destino.
En el ejemplo siguiente se crean cinco tablas que filtran los pedidos de clientes por región. Aquí, el nombre de la región se usa para establecer el nombre de las vistas materializadas de destino y para filtrar los datos de origen. Las vistas temporales se usan para definir combinaciones de las tablas de origen usadas para construir las vistas materializadas finales.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
A continuación se muestra un ejemplo del gráfico de flujo de datos para esta canalización:
Solución de problemas: for
el bucle crea muchas tablas con los mismos valores
El modelo de ejecución diferida que las canalizaciones usan para evaluar el código de Python requiere que la lógica haga referencia directamente a valores individuales cuando se invoque la función decorada por @dlt.table()
.
En el ejemplo siguiente se muestran dos enfoques correctos para definir tablas con un for
bucle . En ambos ejemplos, se hace referencia explícita a cada nombre de tabla de la tables
lista dentro de la función decorada por @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
El ejemplo siguiente no se refiere a los valores de referencia correctamente. En este ejemplo se crean tablas con nombres distintos, pero todas las tablas cargan datos del último valor del for
bucle:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)
Eliminación permanente de registros de una vista materializada o una tabla de streaming
Para eliminar permanentemente los registros de una vista materializada o una tabla de streaming con vectores de eliminación habilitados, como para el cumplimiento del RGPD, se deben realizar operaciones adicionales en las tablas delta subyacentes del objeto. Para garantizar la eliminación de registros de una vista materializada, consulte Eliminación permanente de registros de una vista materializada con vectores de eliminación habilitados. Para garantizar la eliminación de registros de una tabla de streaming, consulte Eliminación permanente de registros de una tabla de streaming.