Ejecución de la primera carga de trabajo de ETL en Azure Databricks

Obtenga información sobre cómo usar herramientas listas para producción de Azure Databricks a fin de desarrollar e implementar sus primeras canalizaciones de extracción, transformación y carga (ETL) para la orquestación de datos.

Al final de este artículo, podrá hacer lo siguiente:

  1. Inicio de un clúster de proceso multiuso de Databricks.
  2. Crear un cuaderno de Databricks.
  3. Configuración de la ingesta de datos incremental en Delta Lake con el cargador automático.
  4. Ejecutar celdas de cuaderno para procesar, consultar y obtener una versión preliminar de los datos.
  5. Programar un cuaderno como un trabajo de Databricks.

En este tutorial se usan cuadernos interactivos para completar tareas comunes de ETL en Python o Scala.

También puede usar Delta Live Tables para compilar canalizaciones de ETL. Databricks creó Delta Live Tables para reducir la complejidad de la creación, implementación y mantenimiento de canalizaciones de ETL de producción. Consulte Tutorial: ejecute su primera canalización de Delta Live Tables.

También puede usar el proveedor de Terraform de Databricks para crear los recursos de este artículo. Consulte Creación de clústeres, cuadernos y trabajos con Terraform.

Requisitos

Nota:

Aunque no tenga privilegios de control de clúster, puede completar la mayoría de los pasos siguientes siempre que tenga acceso a un clúster.

Paso 1: creación de un clúster

Para realizar análisis de datos exploratorios e ingeniería de datos, puede crear un clúster a fin de proporcionar los recursos de proceso necesarios para ejecutar comandos.

  1. Haga clic en Icono ProcesoProceso en la barra lateral.
  2. En la página de proceso, haga clic en Crear clúster. Se abrirá la página Nuevo clúster.
  3. Especifique un nombre único para el clúster, deje los valores restantes en su estado predeterminado y haga clic en Crear clúster.

Para obtener más información sobre los clústeres de Databricks, consulte Compute.

Paso 2: creación de un cuaderno de Databricks

Para empezar a escribir y ejecutar código interactivo en Azure Databricks, cree un cuaderno.

  1. Haga clic en Icono NuevoNuevo en la barra lateral y, luego, en Cuaderno.
  2. En la página Crear cuaderno:
    • Especifique un nombre único para el cuaderno.
    • Asegúrese de que el idioma predeterminado esté establecido en Python o Scala.
    • Seleccione el clúster que creó en el paso 1 en la lista desplegable Clúster.
    • Haga clic en Crear.

Se abre un cuaderno con una celda vacía en la parte superior.

Para obtener más información sobre cómo crear y administrar cuadernos, consulte Administración de cuadernos.

Paso 3: configuración de Auto Loader para ingerir datos en Delta Lake

Databricks recomienda usar el Auto Loader para la ingesta de datos incremental. Auto Loader detecta y procesa automáticamente los nuevos archivos a medida que llegan al almacenamiento de objetos en la nube.

Databricks recomienda el almacenamiento de datos con Delta Lake. Delta Lake es una capa de almacenamiento de código abierto que proporciona transacciones ACID y habilita el almacén de lago de datos. Delta Lake es el formato predeterminado para las tablas que se crean en Databricks.

Para configurar Auto Loader a fin de ingerir datos en Delta Lake, copie y pegue el código siguiente en la celda vacía del cuaderno:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Nota

Las variables que se definen en este código deben permitirle que la ejecute de forma segura, sin correr el riesgo de entrar en conflicto con los recursos del área de trabajo existentes u otros usuarios. Los permisos de red o almacenamiento restringidos producirán errores cuando ejecuten este código; póngase en contacto con el administrador del área de trabajo para solucionar problemas de estas restricciones.

Para obtener más información sobre Auto Loader, consulte Auto Loader.

Paso 4: procesamiento e interacción con datos

Los cuadernos ejecutan la lógica celda por celda. Para ejecutar la lógica en la celda:

  1. Para ejecutar la celda que completó en el paso anterior, selecciónela y presione SHIFT+ENTER.

  2. Para consultar la tabla que acaba de crear, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Para obtener una versión preliminar de los datos de DataFrame, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.

    Python

    display(df)
    

    Scala

    display(df)
    

Para obtener más información sobre las opciones interactivas para visualizar los datos, consulte Visualizaciones en cuadernos de Databricks.

Paso 5: programación de un trabajo

Puede ejecutar cuadernos de Databricks como scripts de producción al agregarlos como una tarea en un trabajo de Databricks. En este paso, creará un nuevo trabajo que podrá desencadenar manualmente.

Para programar el cuaderno como una tarea:

  1. Haga clic en Programación en el lado derecho de la barra de encabezado.
  2. Introduzca un nombre único para el nombre del trabajo.
  3. Haga clic en Manual.
  4. En la lista desplegable Clúster, seleccione el clúster que creó en el paso 1.
  5. Haga clic en Crear.
  6. En la ventana que aparece, haga clic en Ejecutar ahora.
  7. Para ver los resultados de la ejecución del trabajo, haga clic en el icono Vínculo externo junto a la marca de tiempo Última ejecución.

Para obtener más información sobre los trabajos, consulte ¿Qué son los trabajos de Azure Databricks?.

Otras integraciones

Obtenga más información sobre las integraciones y herramientas para la ingeniería de datos con Azure Databricks: