Tutorial: ejecute su primera canalización de Delta Live Tables

Importante

Las canalizaciones de DLT sin servidor se encuentran en versión preliminar pública. Para más información sobre cómo habilitar canalizaciones de DLT sin servidor, póngase en contacto con el equipo de la cuenta de Azure Databricks.

En este tutorial se muestra cómo configurar una canalización de Delta Live Tables a partir del código de un cuaderno de Databricks y cómo ejecutar la canalización mediante la activación de una actualización de canalización. En este tutorial se incluye una canalización de ejemplo para ingerir y procesar un conjunto de datos de ejemplo con código de ejemplo mediante las interfaces de Python y SQL. También puede usar las instrucciones de este tutorial para crear una canalización con cualquier cuaderno con sintaxis de Delta Live Tables definida correctamente.

Puede configurar canalizaciones de Delta Live Tables y desencadenar actualizaciones mediante la interfaz de usuario del área de trabajo de Azure Databricks u opciones de herramientas automatizadas, como la API, la CLI, los conjuntos de recursos de Databricks o como una tarea en un flujo de trabajo de Databricks. Para familiarizarse con la funcionalidad y las características de Delta Live Tables, Databricks recomienda primero usar la interfaz de usuario para crear y ejecutar canalizaciones. Además, al configurar una canalización en la interfaz de usuario, Delta Live Tables genera una configuración JSON para la canalización que se puede usar para implementar los flujos de trabajo mediante programación.

Para demostrar la funcionalidad de Delta Live Tables, los ejemplos de este tutorial descargan un conjunto de datos disponible públicamente. Sin embargo, Databricks tiene varias maneras de conectarse a orígenes de datos e ingerir datos que las canalizaciones que implementan casos de uso reales usarán. Consulte Ingesta de datos con Delta Live Tables.

Requisitos

  • Para iniciar una canalización sin servidor, debe tener permiso de creación de clústeres o acceso a una directiva de clúster que defina un clúster de Delta Live Tables. El entorno de ejecución de Delta Live Tables crea un clúster antes de que ejecute la canalización y se produce un error si no tiene el permiso correcto.

  • Para usar los ejemplos de este tutorial, el área de trabajo debe tener habilitado Unity Catalog.

  • Debe tener estos permisos en Unity Catalog:

    • READ VOLUME y WRITE VOLUME, o ALL PRIVILEGES, para el volumen my-volume.
    • USE SCHEMA o ALL PRIVILEGES para el esquema default.
    • USE CATALOG o ALL PRIVILEGES para el catálogo main.

    Para establecer estos permisos, consulte el administrador de Databricks o los privilegios de Unity Catalog y objetos protegibles.

  • En los ejemplos de este tutorial se usa un volumen de Unity Catalog para almacenar datos de ejemplo. Para usar estos ejemplos, cree un volumen y use los nombres de catálogo, esquema y volumen de ese volumen para establecer la ruta de acceso del volumen usada por los ejemplos.

Nota:

Si el área de trabajo no tiene habilitado Unity Catalog, se adjuntan a este artículo cuadernos con ejemplos que no requieren Unity Catalog. Para usar estos ejemplos, seleccione Hive metastore como opción de almacenamiento al crear la canalización.

¿Dónde se ejecutan las consultas de Delta Live Tables?

Las consultas de Delta Live Tables se implementan principalmente en cuadernos de Databricks, pero Delta Live Tables no está diseñada para ejecutarse interactivamente en celdas de cuaderno. La ejecución de una celda que contiene la sintaxis de Delta Live Tables en un cuaderno de Databricks genera un mensaje de error. Para ejecutar las consultas, debe configurar los cuadernos como parte de una canalización.

Importante

  • No se puede confiar en el orden de ejecución de celda por celda de cuadernos al escribir consultas para Delta Live Tables. Delta Live Tables evalúa y ejecuta todo el código definido en cuadernos, pero tiene un modelo de ejecución diferente que un comando Run all del cuaderno.
  • No se pueden mezclar lenguajes en un único archivo de código fuente de Delta Live Tables. Por ejemplo, un cuaderno solo puede contener consultas de Python o consultas SQL. Si debe usar varios lenguajes en una canalización, use cuadernos o archivos específicos para cada lenguaje.

También puede usar código de Python almacenado en archivos. Por ejemplo, puede crear un módulo de Python que se pueda importar en las canalizaciones de Python o definir funciones definidas por el usuario (UDF) de Python para usarlas en consultas SQL. Para obtener información sobre cómo importar módulos, consulte Importación de módulos de Python desde carpetas de Git o archivos de área de trabajo. Para obtener información sobre el uso de UDF de Python, consulte Funciones escalares definidas por el usuario: Python.

Ejemplo: Ingesta y procesamiento de datos de nombres de bebés de Nueva York

En el ejemplo de este artículo se usa un conjunto de datos disponible públicamente que contiene registros de los nombres de bebés del estado de Nueva York. En estos ejemplos se muestra cómo usar una canalización de Delta Live Tables para:

  • Leer los datos CSV sin procesar de un conjunto de datos disponible públicamente en una tabla.
  • Lea los registros de la tabla de datos sin procesar y use las expectativas de Delta Live Tables para crear una nueva tabla que contenga datos limpios.
  • Usar los registros limpios como entrada para las consultas de Delta Live Tables que crean conjuntos de datos derivados.

Este código muestra un ejemplo simplificado de la arquitectura medallion. Consulte ¿Qué es la arquitectura de la casa del lago medallion?.

Las implementaciones de este ejemplo se proporcionan para las interfaces de Python y SQL. Puede seguir los pasos para crear nuevos cuadernos que contengan el código de ejemplo o puede ir directamente a Crear una canalización y usar uno de los cuadernos proporcionados en esta página.

Implementación de una canalización de Delta Live Tables con Python

El código de Python que crea conjuntos de datos de Delta Live Tables debe devolver DataFrames, familiares para los usuarios con experiencia en PySpark o Pandas para Spark. Para los usuarios que no están familiarizados con DataFrames, Databricks recomienda usar la interfaz de SQL. Consulte Implementación de una canalización de Delta Live Tables con SQL.

Todas las API de Python de Delta Live Tables se implementan en el módulo dlt. El código de canalización de Delta Live Tables implementado con Python debe importar explícitamente el módulo dlt en la parte superior de los cuadernos y archivos de Python. Delta Live Tables difiere de muchos scripts de Python de una manera clave: no se llama a las funciones que realizan la ingesta y transformación de datos para crear conjuntos de datos de Delta Live Tables. En su lugar, Delta Live Tables interpreta las funciones de decorador del módulo dlt en todos los archivos cargados en una canalización y crea un gráfico de flujo de datos.

Para implementar el ejemplo de este tutorial, copie y pegue el siguiente código de Python en un nuevo cuaderno de Python. Debe agregar cada fragmento de código de ejemplo a su propia celda en el cuaderno en el orden descrito. Para revisar las opciones para crear cuadernos, consulte Creación de un cuaderno.

Nota:

Al crear una canalización con la interfaz de Python, de forma predeterminada, los nombres de tabla se definen mediante nombres de función. Por ejemplo, en el ejemplo de Python siguiente se crean tres tablas denominadas baby_names_raw, baby_names_prepared y top_baby_names_2021. Puede invalidar el nombre de la tabla mediante el parámetro name. Consulte Creación de una vista materializada de delta Live Tables o tabla de streaming.

Importación del módulo Delta Live Tables

Todas las API de Python de Delta Live Tables se implementan en el módulo dlt. Importe explícitamente el módulo dlt en la parte superior de los archivos y cuadernos de Python.

En el ejemplo siguiente se muestra esta importación, junto con instrucciones import para pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Descarga de los datos

Para obtener los datos de este ejemplo, descargue un archivo CSV y guárdelo en el volumen de la siguiente manera:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog.

Creación de una tabla a partir de archivos en el almacenamiento de objetos

Delta Live Tables admite la carga de datos de todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.

El decorador @dlt.table indica a Delta Live Tables que cree una tabla que contenga el resultado de un DataFrame devuelto por una función. Agregue el decorador @dlt.table antes de cualquier definición de función de Python que devuelva un DataFrame de Spark para registrar una nueva tabla en Delta Live Tables. En el ejemplo siguiente se muestra cómo usar el nombre de la función como nombre de tabla y agregar un comentario descriptivo a la tabla:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Adición de una tabla desde un conjunto de datos ascendente en la canalización

Puede usar dlt.read() para leer datos de otros conjuntos de datos declarados en la canalización de Delta Live Tables actual. Declarar nuevas tablas de esta manera crea una dependencia que Delta Live Tables resuelve automáticamente antes de ejecutar actualizaciones. El código siguiente también incluye ejemplos de supervisión y aplicación de la calidad de los datos con expectativas. Consulte Administración de la calidad de los datos con Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Creación de una tabla con vistas de datos enriquecidas

Dado que Delta Live Tables procesa las actualizaciones de las canalizaciones como una serie de gráficos de dependencias, puede declarar vistas altamente enriquecidas que potencian paneles, BI y análisis declarando tablas con lógica empresarial específica.

Las tablas de Delta Live Tables son equivalentes conceptualmente a las vistas materializadas. Mientras que las vistas tradicionales en Spark ejecutan la lógica cada vez que se consulta la vista, una tabla de Delta Live Tables almacena la versión más reciente de los resultados de la consulta en archivos de datos. Dado que Delta Live Tables administra las actualizaciones de todos los conjuntos de datos de una canalización, puede programar actualizaciones de canalización para que coincidan con los requisitos de latencia de las vistas materializadas y saber que las consultas de estas tablas contienen la versión más reciente de los datos disponibles.

La tabla definida por el código siguiente muestra la similitud conceptual con una vista materializada derivada de los datos ascendentes de la canalización:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

Para configurar una canalización que use el cuaderno, consulte Creación de una canalización.

Implementación de una canalización de Delta Live Tables con SQL

Databricks recomienda Delta Live Tables con SQL como la forma preferida para que los usuarios de SQL creen nuevas canalizaciones de ETL, ingestión y transformación en Azure Databricks. La interfaz de SQL para Delta Live Tables amplía Spark SQL estándar con muchas nuevas palabras clave, construcciones y funciones con valores de tabla. Estas adiciones a SQL estándar permiten a los usuarios declarar dependencias entre conjuntos de datos e implementar la infraestructura de nivel de producción sin aprender nuevas herramientas ni conceptos adicionales.

Para los usuarios familiarizados con los DataFrames de Spark y que necesitan compatibilidad con pruebas y operaciones más amplias que son difíciles de implementar con SQL, como las operaciones de metaprogramación, Databricks recomienda usar la interfaz de Python. Consulte Ejemplo: Ingesta y procesamiento de datos de los nombres de bebés de Nueva York.

Descarga de los datos

Para obtener los datos de este ejemplo, copie el siguiente código, péguelo en un nuevo cuaderno y, a continuación, ejecute el cuaderno. Para revisar las opciones para crear cuadernos, consulte Creación de un cuaderno.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog.

Creación de una tabla a partir de archivos en Unity Catalog

Para el resto de este ejemplo, copie los siguientes fragmentos de código SQL y péguelos en un nuevo cuaderno de SQL, independiente del cuaderno de la sección anterior. Debe agregar cada fragmento de código de SQL a su propia celda en el cuaderno en el orden descrito.

Delta Live Tables admite la carga de datos de todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.

Todas las instrucciones SQL de Delta Live Tables usan la sintaxis y la semántica CREATE OR REFRESH. Al actualizar una canalización, Delta Live Tables determina si el resultado lógicamente correcto para la tabla puede realizarse a través del procesamiento incremental o si se requiere un nuevo cálculo completo.

En el siguiente ejemplo se crea una tabla cargando datos desde el archivo CSV almacenado en el volumen de Unity Catalog:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog.

Adición de una tabla desde un conjunto de datos ascendente a la canalización

Puede usar el esquema virtual live para consultar datos de otros conjuntos de datos declarados en la canalización de Delta Live Tables actual. Declarar nuevas tablas de esta manera crea una dependencia que Delta Live Tables resuelve automáticamente antes de ejecutar actualizaciones. El esquema live es una palabra clave personalizada implementada en Delta Live Tables que se puede sustituir por un esquema de destino si quiere publicar los conjuntos de datos. Consulte Uso de Unity Catalog con las canalizaciones de Delta Live Tables y Publicación de datos de canalizaciones de Delta Live Tables en el metastore de Hive.

El código siguiente también incluye ejemplos de supervisión y aplicación de la calidad de los datos con expectativas. Consulte Administración de la calidad de los datos con Delta Live Tables.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Crear una vista de datos enriquecida

Dado que Delta Live Tables procesa las actualizaciones de las canalizaciones como una serie de gráficos de dependencias, puede declarar vistas altamente enriquecidas que potencian paneles, BI y análisis declarando tablas con lógica empresarial específica.

Las tablas dinámicas son equivalentes conceptualmente a vistas materializadas. Mientras que las vistas tradicionales de Spark ejecutan lógica cada vez que se consulta la vista, las tablas dinámicas almacenan la versión más reciente de la consulta da como resultado archivos de datos. Dado que Delta Live Tables administra las actualizaciones de todos los conjuntos de datos de una canalización, puede programar actualizaciones de canalización para que coincidan con los requisitos de latencia de las vistas materializadas y saber que las consultas de estas tablas contienen la versión más reciente de los datos disponibles.

El código siguiente crea una vista materializada enriquecida de los datos ascendentes:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Para configurar una canalización que use el cuaderno, continúe en Creación de una canalización.

Creación de una canalización

Delta Live Tables crea canalizaciones mediante la resolución de dependencias definidas en cuadernos o archivos (denominados código fuente o bibliotecas) mediante la sintaxis Delta Live Tables. Cada archivo de código fuente solo puede contener un idioma, pero puede mezclar bibliotecas de distintos idiomas en la canalización.

  1. Haga clic en Delta Live Tables en la barra lateral y haga clic en Crear canalización.
  2. Asigne un nombre a la canalización.
  3. (Opcional) Active la casilla Sin servidor para usar el proceso totalmente administrado para esta canalización. Al seleccionar Sin servidor, la configuración de Compute se quita de la interfaz de usuario.
  4. (Opcional) Seleccione una edición del producto.
  5. Seleccione Desencadenado en Modo de canalización.
  6. Configure uno o varios cuadernos que contengan el código fuente de la canalización. En el cuadro de texto Rutas de acceso, escriba la ruta de acceso a un cuaderno o haga clic en Icono del selector de archivos para seleccionar un cuaderno.
  7. Seleccione un destino para los conjuntos de datos publicados por la canalización, ya sea el metastore de Hive o el catálogo de Unity. Consulte Publicación de conjuntos de datos.
    • Metastore de Hive:
      • (Opcional) Escriba una ubicación de almacenamiento para los datos de salida de la canalización. El sistema usa una ubicación predeterminada si deja Ubicación de almacenamiento vacía.
      • (Opcional) Especifique un esquema de destino para publicar el conjunto de datos en el metastore de Hive.
    • Catálogo de Unity: especifique un Catálogo y un esquema de destino para publicar el conjunto de datos en Unity Catalog.
  8. (Opcional) Si no ha seleccionado Sin servidor, puede configurar las opciones de proceso para la canalización. Para obtener información sobre las opciones de configuración de proceso, consulte Configuración de las opciones de canalización para Delta Live Tables.
  9. (Opcional) Haga clic en Agregar notificación para configurar una o varias direcciones de correo electrónico para recibir notificaciones para eventos de canalización. Consulte Agregar notificaciones por correo electrónico para eventos de canalización.
  10. (Opcional) Configure opciones avanzadas para la canalización. Para obtener información sobre las opciones de configuración avanzada, consulte Configuración de las opciones de canalización para Delta Live Tables.
  11. Haga clic en Crear.

El sistema muestra la página Detalles de canalización después de hacer clic en Crear. También puede acceder a la canalización haciendo clic en el nombre de la canalización en la pestaña Delta Live Tables.

Inicio de la actualización de una canalización

Para iniciar una actualización para una canalización, haga clic en el botón Delta Live Tables Icon en el panel superior. El sistema devuelve un mensaje que confirma que se está iniciando la canalización.

Después de iniciar correctamente la actualización, el sistema Delta Live Tables:

  1. Inicia un clúster mediante una configuración de clúster creada por el sistema Delta Live Tables. También se puede especificar una configuración de clúster personalizada.
  2. Crea las tablas que no existen y garantiza que el esquema es correcto para las tablas existentes.
  3. Actualiza las tablas con los datos más recientes disponibles.
  4. Cierra el clúster cuando se completa la actualización.

Nota:

El modo de ejecución se establece en Producción de forma predeterminada, lo que implementa recursos de proceso efímeros para cada actualización. Puede usar el modo Desarrollo para cambiar este comportamiento, lo que permite usar los mismos recursos de proceso para varias actualizaciones de canalización durante el desarrollo y las pruebas. Consulte Modos de desarrollo y producción.

Publicación de conjuntos de datos

Puede hacer que los conjuntos de datos de Delta Live Tables estén disponibles para realizar consultas mediante la publicación de tablas en el metastore de Hive o Unity Catalog. Si no especifica un destino para publicar los datos, solo se podrá acceder a las tablas creadas en las canalizaciones Delta Live Tables mediante otras operaciones en esa misma canalización. Consulte Publicación de datos de canalizaciones de Delta Live Tables en el metastore de Hive y Uso de Unity Catalog con las canalizaciones de Delta Live Tables.

Cuadernos de código fuente de ejemplo

Puede importar estos cuadernos en un área de trabajo de Azure Databricks y usarlos para implementar una canalización de Delta Live Tables. Consulte Creación de una canalización.

Introducción al cuaderno de Python de Delta Live Tables

Obtener el cuaderno

Introducción al cuaderno SQL de Python de Delta Live Tables

Obtener el cuaderno

Cuadernos de código fuente de ejemplo para áreas de trabajo sin Unity Catalog

Puede importar estos cuadernos en un área de trabajo de Azure Databricks sin Unity Catalog habilitado y usarlos para implementar una canalización de Delta Live Tables. Consulte Creación de una canalización.

Introducción al cuaderno de Python de Delta Live Tables

Obtener el cuaderno

Introducción al cuaderno SQL de Python de Delta Live Tables

Obtener el cuaderno