Compartir a través de


Desarrollo de código de canalización con Python

Lakeflow Declarative Pipelines presenta varias nuevas construcciones de código de Python para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad de Python para desarrollar canalizaciones se basa en los conceptos básicos de las API de DataFrame y Structured Streaming de PySpark.

Para los usuarios que no están familiarizados con Python y DataFrames, Databricks recomienda usar la interfaz de SQL. Consulte Desarrollo de código de canalización con SQL.

Para obtener una referencia completa de la sintaxis de Python de Las canalizaciones declarativas de Lakeflow, consulte Referencia del lenguaje Python de Canalizaciones declarativas de Lakeflow.

Conceptos básicos de Python para el desarrollo de canalizaciones

El código de Python que crea conjuntos de datos de canalizaciones declarativas de Lakeflow debe devolver dataframes.

Las API de Python de las canalizaciones declarativas de Lakeflow están implementadas en el módulo dlt. El código de canalizaciones declarativas de Lakeflow implementado con Python debe importar explícitamente el dlt módulo en la parte superior de los cuadernos y archivos de Python.

Lee y escribe de forma predeterminada el catálogo y el esquema especificados durante la configuración de la canalización. Consulte Definir el catálogo y el esquema de destino.

El código de Python específico de Las canalizaciones declarativas de Lakeflow difiere de otros tipos de código de Python de una manera crítica: el código de canalización de Python no llama directamente a las funciones que realizan la ingesta y transformación de datos para crear conjuntos de datos de canalizaciones declarativas de Lakeflow. En su lugar, Lakeflow Declarative Pipelines interpreta las funciones de decorador del dlt módulo en todos los archivos de código fuente configurados en una canalización y crea un grafo de flujo de datos.

Importante

Para evitar un comportamiento inesperado cuando se ejecuta la canalización, no incluya código que pueda tener efectos secundarios en las funciones que definen conjuntos de datos. Para obtener más información, vea la referencia de Python.

Creación de una vista materializada o una tabla de streaming con Python

El @dlt.table decorador indica a Lakeflow Declarative Pipelines que cree una vista materializada o una tabla de streaming en función de los resultados devueltos por una función. Los resultados de una lectura por lotes crean una vista materializada, mientras que los resultados de una lectura de streaming crean una tabla de streaming.

De forma predeterminada, los nombres de tabla de streaming y vista materializada se deducen de los nombres de función. En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada y una tabla de streaming:

Nota:

Ambas funciones hacen referencia a la misma tabla del samples catálogo y usan la misma función de decorador. Estos ejemplos resaltan que la única diferencia en la sintaxis básica para las vistas materializadas y las tablas de streaming se usa spark.read frente a spark.readStream.

No todos los orígenes de datos admiten lecturas de streaming. Algunos orígenes de datos siempre deben procesarse con la semántica de streaming.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcionalmente, puede especificar el nombre de la tabla mediante el name argumento en el @dlt.table decorador. En el ejemplo siguiente se muestra este patrón para una vista materializada y una tabla de streaming:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Carga de datos desde el almacenamiento de objetos

Lakeflow Declarative Pipelines admite la carga de datos desde todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.

Nota:

En estos ejemplos se usan datos disponibles en /databricks-datasets, montados automáticamente en el área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.

Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.

En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Validación de datos con expectativas

Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con las expectativas de canalización.

El código siguiente usa @dlt.expect_or_drop para definir una expectativa denominada valid_data que quita registros que son NULL durante la ingesta de datos:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Consulte las vistas materializadas y las tablas de streaming definidas en su canalización

En el ejemplo siguiente se definen cuatro conjuntos de datos:

  • Una tabla de streaming denominada orders que carga datos JSON.
  • Una vista materializada denominada customers que carga datos CSV.
  • Una vista materializada denominada customer_orders que combina registros de los conjuntos de datos orders y customers, convierte la marca de tiempo de pedido en una fecha y selecciona los campos customer_id, order_number, state y order_date.
  • Una vista materializada denominada daily_orders_by_state que agrega el recuento diario de pedidos para cada estado.

Nota:

Al consultar vistas o tablas en la canalización, puede especificar directamente el catálogo y el esquema, o puede usar los valores predeterminados configurados en la canalización. En este ejemplo, las tablas orders, customersy customer_orders se escriben y leen desde el catálogo y el esquema predeterminados configurados para la canalización.

El modo de publicación heredado usa el esquema de LIVE para consultar otras vistas materializadas y tablas de streaming definidas en la canalización. En las nuevas canalizaciones, se omite silenciosamente la sintaxis de esquema LIVE. Consulte Esquema LIVE (heredado).

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Crear tablas en un bucle de for

Puede usar bucles de Python for para crear varias tablas mediante programación. Esto puede ser útil cuando tiene muchos orígenes de datos o conjuntos de datos de destino que varían en solo unos pocos parámetros, lo que da como resultado un código menos total para mantener y menos redundancia de código.

El for bucle evalúa la lógica en orden serie, pero una vez completado el planeamiento de los conjuntos de datos, la canalización ejecuta lógica en paralelo.

Importante

Al usar este patrón para definir conjuntos de datos, asegúrese de que la lista de valores pasados al for bucle siempre es suma. Si un conjunto de datos previamente definido en una canalización se omite en una ejecución futura de esa canalización, dicho conjunto de datos se elimina automáticamente del esquema de destino.

En el ejemplo siguiente se crean cinco tablas que filtran los pedidos de clientes por región. Aquí, el nombre de la región se usa para establecer el nombre de las vistas materializadas de destino y para filtrar los datos de origen. Las vistas temporales se usan para definir combinaciones de las tablas de origen usadas para construir las vistas materializadas finales.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

A continuación se muestra un ejemplo del gráfico de flujo de datos para esta canalización:

Gráfico de flujo de datos de dos vistas que conducen a cinco tablas regionales.

Solución de problemas: for el bucle crea muchas tablas con los mismos valores

El modelo de ejecución diferida que las canalizaciones usan para evaluar el código de Python requiere que la lógica haga referencia directamente a valores individuales cuando se invoque la función decorada por @dlt.table() .

En el ejemplo siguiente se muestran dos enfoques correctos para definir tablas con un for bucle . En ambos ejemplos, se hace referencia explícita a cada nombre de tabla de la tables lista dentro de la función decorada por @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

El ejemplo siguiente no se refiere a los valores de referencia correctamente. En este ejemplo se crean tablas con nombres distintos, pero todas las tablas cargan datos del último valor del for bucle:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)

Eliminación permanente de registros de una vista materializada o una tabla de streaming

Para eliminar permanentemente los registros de una vista materializada o una tabla de streaming con vectores de eliminación habilitados, como para el cumplimiento del RGPD, se deben realizar operaciones adicionales en las tablas delta subyacentes del objeto. Para garantizar la eliminación de registros de una vista materializada, consulte Eliminación permanente de registros de una vista materializada con vectores de eliminación habilitados. Para garantizar la eliminación de registros de una tabla de streaming, consulte Eliminación permanente de registros de una tabla de streaming.