Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Lakeflow Spark Declarative Pipelines (SDP) presenta varias nuevas construcciones de código de Python para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad de Python para desarrollar canalizaciones se basa en los conceptos básicos de las API de DataFrame y Structured Streaming de PySpark.
Para los usuarios que no están familiarizados con Python y DataFrames, Databricks recomienda usar la interfaz de SQL. Vea Desarrollo de código de canalizaciones declarativas de Lakeflow Spark con SQL.
Para obtener una referencia completa de la sintaxis de Python de Lakeflow SDP, consulte Lakeflow Spark Declarative Pipelines Python Language Reference.
Conceptos básicos de Python para el desarrollo de canalizaciones
El código de Python que crea conjuntos de datos pipline debe devolver DataFrames.
Todas las API de Python de las Canalizaciones Declarativas de Spark de Lakeflow están implementadas en el módulo pyspark.pipelines. El código de canalización implementado con Python debe importar explícitamente el módulo pipelines al principio del código fuente de Python. En nuestros ejemplos, usamos el siguiente comando de importación y usamos dp en ejemplos para hacer referencia a pipelines.
from pyspark import pipelines as dp
Nota:
Apache Spark™ incluye canalizaciones declarativas a partir de Spark 4.1, disponibles a través del módulo pyspark.pipelines. Databricks Runtime amplía estas funcionalidades de código abierto con api adicionales e integraciones para uso de producción administrado.
El código escrito con el módulo de código pipelines abierto se ejecuta sin modificaciones en Azure Databricks. Las siguientes características no forman parte de Apache Spark:
dp.create_auto_cdc_flowdp.create_auto_cdc_from_snapshot_flow@dp.expect(...)@dp.temporary_view
La canalización lee y escribe de forma predeterminada el catálogo y el esquema especificados durante la configuración de la canalización. Consulte Definir el catálogo y el esquema de destino.
El código de Python específico de la canalización difiere de otros tipos de código de Python de una manera crítica: el código de canalización de Python no llama directamente a las funciones que realizan la ingesta y transformación de datos para crear conjuntos de datos. En su lugar, SDP interpreta las funciones de decorador del dp módulo en todos los archivos de código fuente configurados en una canalización y crea un gráfico de flujo de datos.
Importante
Para evitar un comportamiento inesperado cuando se ejecuta la canalización, no incluya código que pueda tener efectos secundarios en las funciones que definen conjuntos de datos. Para obtener más información, vea la referencia de Python.
Creación de una vista materializada o una tabla de streaming con Python
Use @dp.table para crear una tabla de streaming a partir de los resultados de una lectura de streaming. Use @dp.materialized_view para crear una vista materializada a partir de los resultados de una lectura por lotes.
De forma predeterminada, los nombres de tabla de streaming y vista materializada se deducen de los nombres de función. En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada y una tabla de streaming:
Nota:
Ambas funciones hacen referencia a la misma tabla del samples catálogo y usan la misma función de decorador. Estos ejemplos resaltan que la única diferencia en la sintaxis básica para las vistas materializadas y las tablas de streaming se usa spark.read frente a spark.readStream.
No todos los orígenes de datos admiten lecturas de streaming. Algunos orígenes de datos siempre deben procesarse con la semántica de streaming.
from pyspark import pipelines as dp
@dp.materialized_view()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Opcionalmente, puede especificar el nombre de la tabla mediante el name argumento en el @dp.table decorador. En el ejemplo siguiente se muestra este patrón para una vista materializada y una tabla de streaming:
from pyspark import pipelines as dp
@dp.materialized_view(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dp.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Carga de datos desde el almacenamiento de objetos
Las canalizaciones admiten la carga de datos desde todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.
Nota:
En estos ejemplos se usan datos disponibles en /databricks-datasets, montados automáticamente en el área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.
Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.
En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:
from pyspark import pipelines as dp
@dp.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:
from pyspark import pipelines as dp
@dp.materialized_view()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Validación de datos con expectativas
Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con las expectativas de canalización.
El código siguiente usa @dp.expect_or_drop para definir una expectativa denominada valid_data que quita registros que son NULL durante la ingesta de datos:
from pyspark import pipelines as dp
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Consulte las vistas materializadas y las tablas de streaming definidas en su canalización
En el ejemplo siguiente se definen cuatro conjuntos de datos:
- Una tabla de streaming denominada
ordersque carga datos JSON. - Una vista materializada denominada
customersque carga datos CSV. - Una vista materializada denominada
customer_ordersque combina registros de los conjuntos de datosordersycustomers, convierte la marca de tiempo de pedido en una fecha y selecciona los camposcustomer_id,order_number,stateyorder_date. - Una vista materializada denominada
daily_orders_by_stateque agrega el recuento diario de pedidos para cada estado.
Nota:
Al consultar vistas o tablas en la canalización, puede especificar directamente el catálogo y el esquema, o puede usar los valores predeterminados configurados en la canalización. En este ejemplo, las tablas orders, customersy customer_orders se escriben y leen desde el catálogo y el esquema predeterminados configurados para la canalización.
El modo de publicación heredado usa el esquema de LIVE para consultar otras vistas materializadas y tablas de streaming definidas en la canalización. En las nuevas canalizaciones, se omite silenciosamente la sintaxis de esquema LIVE. Consulte el esquema LIVE (heredado).
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table()
@dp.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dp.materialized_view()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dp.materialized_view()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dp.materialized_view()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Crear tablas en un bucle de for
Puede usar bucles de Python for para crear varias tablas mediante programación. Esto puede ser útil cuando tiene muchos orígenes de datos o conjuntos de datos de destino que varían en solo unos pocos parámetros, lo que da como resultado un código menos total para mantener y menos redundancia de código.
El for bucle evalúa la lógica en orden serie, pero una vez completado el planeamiento de los conjuntos de datos, la canalización ejecuta lógica en paralelo.
Importante
Al usar este patrón para definir conjuntos de datos, asegúrese de que la lista de valores pasados al for bucle siempre es suma. Si un conjunto de datos previamente definido en una canalización se omite en una ejecución futura de esa canalización, dicho conjunto de datos se elimina automáticamente del esquema de destino.
En el ejemplo siguiente se crean cinco tablas que filtran los pedidos de clientes por región. Aquí, el nombre de la región se usa para establecer el nombre de las vistas materializadas de destino y para filtrar los datos de origen. Las vistas temporales se usan para definir combinaciones de las tablas de origen usadas para construir las vistas materializadas finales.
from pyspark import pipelines as dp
from pyspark.sql.functions import collect_list, col
@dp.temporary_view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dp.temporary_view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dp.materialized_view(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
A continuación se muestra un ejemplo del gráfico de flujo de datos para esta canalización:
Solución de problemas: for el bucle crea muchas tablas con los mismos valores
El modelo de ejecución diferida que las canalizaciones usan para evaluar el código de Python requiere que la lógica haga referencia directamente a valores individuales cuando se invoque la función decorada por @dp.materialized_view() .
En el ejemplo siguiente se muestran dos enfoques correctos para definir tablas con un for bucle . En ambos ejemplos, se hace referencia explícita a cada nombre de tabla de la tables lista dentro de la función decorada por @dp.materialized_view().
from pyspark import pipelines as dp
# Create a parent function to set local variables
def create_table(table_name):
@dp.materialized_view(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dp.materialized_view()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized_view(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
El ejemplo siguiente no se refiere a los valores de referencia correctamente. En este ejemplo se crean tablas con nombres distintos, pero todas las tablas cargan datos del último valor del for bucle:
from pyspark import pipelines as dp
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dp.materialized(name=t_name)
def create_table():
return spark.read.table(t_name)
Eliminación permanente de registros de una vista materializada o una tabla de streaming
Para eliminar permanentemente los registros de una vista materializada o una tabla de streaming con vectores de eliminación habilitados, como para el cumplimiento del RGPD, se deben realizar operaciones adicionales en las tablas delta subyacentes del objeto. Para garantizar la eliminación de registros de una vista materializada, consulte Eliminación permanente de registros de una vista materializada con vectores de eliminación habilitados. Para garantizar la eliminación de registros de una tabla de streaming, consulte Eliminación permanente de registros de una tabla de streaming.