Compartir a través de


Desarrollo de código de canalización con SQL

Las canalizaciones declarativas de Lakeflow presentan varias nuevas funciones y palabras clave SQL para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad con SQL para desarrollar canalizaciones se basa en los conceptos básicos de Spark SQL y agrega soporte a la funcionalidad de Structured Streaming.

Los usuarios familiarizados con dataFrames de PySpark pueden preferir desarrollar código de canalización con Python. Python admite pruebas y operaciones más amplias que son difíciles de implementar con SQL, como las operaciones de metaprogramación. Consulte Desarrollo de código de canalización con Python.

Para obtener una referencia completa de la sintaxis SQL de las Canalizaciones Declarativas de Lakeflow, consulte Referencia del Lenguaje SQL de Canalizaciones Declarativas de Lakeflow.

Conceptos básicos de SQL para el desarrollo de canalizaciones

El código SQL que crea conjuntos de datos de las Lakeflow Declarative Pipelines utiliza la sintaxis CREATE OR REFRESH para definir vistas materializadas y tablas de streaming sobre los resultados de las consultas.

La STREAM palabra clave indica si el origen de datos al que se hace referencia en una SELECT cláusula debe leerse con la semántica de streaming.

Lee y escribe de forma predeterminada el catálogo y el esquema especificados durante la configuración de la canalización. Consulte Definir el catálogo y el esquema de destino.

El código fuente de las canalizaciones declarativas de Lakeflow difiere críticamente de los scripts SQL: Las canalizaciones declarativas de Lakeflow evalúan todas las definiciones de conjuntos de datos en todos los archivos de código fuente configurados en una canalización y compilan un grafo de flujo de datos antes de que se ejecuten las consultas. El orden de las consultas que aparecen en un cuaderno o script define el orden de evaluación de código, pero no el orden de ejecución de la consulta.

Creación de una vista materializada con SQL

En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada con SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Creación de una tabla de streaming con SQL

En el ejemplo de código siguiente se muestra la sintaxis básica para crear una tabla de streaming con SQL. Al leer un origen para una tabla de streaming, la STREAM palabra clave indica que se usa la semántica de streaming para el origen. No use la STREAM palabra clave al crear una vista materializada:

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Nota:

Usa la palabra clave STREAM para aplicar la semántica de streaming al leer desde el origen. Si la lectura encuentra un cambio o eliminación en un registro existente, se produce un error. Es más seguro leer de orígenes estáticos o de solo anexión. Para ingerir datos que tienen confirmaciones de cambios, puede usar Python y la SkipChangeCommits opción para controlar errores.

Carga de datos desde el almacenamiento de objetos

Lakeflow Declarative Pipelines admite la carga de datos desde todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.

Nota:

En estos ejemplos se usan datos disponibles en /databricks-datasets, montados automáticamente en el área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.

Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.

SQL usa la read_files función para invocar la funcionalidad del cargador automático. También debe usar la palabra clave STREAM para configurar una lectura en streaming con read_files.

A continuación se describe la sintaxis de read_files en SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

Las opciones del cargador automático son pares clave-valor. Para obtener más información sobre los formatos y opciones admitidos, consulte Opciones.

En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

La read_files función también admite la semántica por lotes para crear vistas materializadas. En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

Validación de datos con expectativas

Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con las expectativas de canalización.

El código siguiente define una expectativa denominada valid_data que quita los registros que son NULL durante la ingesta de datos:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consulte las vistas materializadas y las tablas de streaming definidas en su canalización

En el ejemplo siguiente se definen cuatro conjuntos de datos:

  • Una tabla de streaming denominada orders que carga datos JSON.
  • Una vista materializada denominada customers que carga datos CSV.
  • Una vista materializada denominada customer_orders que combina registros de los conjuntos de datos orders y customers, convierte la marca de tiempo de pedido en una fecha y selecciona los campos customer_id, order_number, state y order_date.
  • Una vista materializada denominada daily_orders_by_state que agrega el recuento diario de pedidos para cada estado.

Nota:

Al consultar vistas o tablas en la canalización, puede especificar directamente el catálogo y el esquema, o puede usar los valores predeterminados configurados en la canalización. En este ejemplo, las tablas orders, customersy customer_orders se escriben y leen desde el catálogo y el esquema predeterminados configurados para la canalización.

El modo de publicación heredado usa el esquema de LIVE para consultar otras vistas materializadas y tablas de streaming definidas en la canalización. En las nuevas canalizaciones, se omite silenciosamente la sintaxis de esquema LIVE. Consulte Esquema de LIVE (heredado).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Definición de una tabla privada

Puede usar la PRIVATE cláusula al crear una vista materializada o una tabla de streaming. Al crear una tabla privada, se crea la tabla, pero no se crean los metadatos de la tabla. La cláusula PRIVATE instruye a Lakeflow Declarative Pipelines a crear una tabla que esté disponible para la tubería de proceso, pero a la que no se debe acceder fuera de ella. Para reducir el tiempo de procesamiento, una tabla privada se mantiene durante toda la duración de la canalización que la crea, y no se limita solo a una sola actualización.

Las tablas privadas pueden tener el mismo nombre que las tablas del catálogo. Si especifica un nombre no cualificado para una tabla dentro de una canalización, en caso de que haya tanto una tabla privada como una tabla de catálogo con ese nombre, se usará la tabla privada.

Anteriormente, las tablas privadas se hacían referencia a como tablas temporales.

Eliminación permanente de registros de una vista materializada o una tabla de streaming

Para eliminar permanentemente los registros de una tabla de streaming con vectores de eliminación habilitados, como para el cumplimiento del RGPD, se deben realizar operaciones adicionales en las tablas delta subyacentes del objeto. Para garantizar la eliminación de registros de una tabla de streaming, consulte Eliminación permanente de registros de una tabla de streaming.

Las vistas materializadas siempre reflejan los datos de las tablas subyacentes cuando se actualizan. Para eliminar datos en una vista Materializada, debe eliminar los datos del origen y actualizar la vista materializada.

Parametrización de valores usados al declarar tablas o vistas con SQL

Use SET para especificar un valor de configuración en una consulta que declare una tabla o una vista, incluidas las configuraciones de Spark. Cualquier tabla o vista que defina en un cuaderno después de la instrucción SET tiene acceso al valor definido. Las configuraciones de Spark especificadas mediante la instrucción SET se usan al ejecutar la consulta de Spark para cualquier tabla o vista siguiendo la instrucción SET. Para leer un valor de configuración en una consulta, use la sintaxis de interpolación de cadenas ${}. En el ejemplo siguiente, se establece un valor de configuración de Spark denominado startDate y se usa ese valor en una consulta:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar varios valores de configuración, use una instrucción SET aparte para cada valor.

Limitaciones

No se admite la cláusula PIVOT. La operación pivot en Spark requiere una carga diligente de los datos de entrada para calcular el esquema de salida. Esta funcionalidad no se admite en canalizaciones declarativas de Lakeflow.

Nota:

La sintaxis CREATE OR REFRESH LIVE TABLE para crear una vista materializada está en desuso. En su lugar, utilice CREATE OR REFRESH MATERIALIZED VIEW.