Ejecución de la primera carga de trabajo de ETL en Azure Databricks
Artigo
Obtenga información sobre cómo usar herramientas listas para producción de Azure Databricks a fin de desarrollar e implementar sus primeras canalizaciones de extracción, transformación y carga (ETL) para la orquestación de datos.
Al final de este artículo, podrá hacer lo siguiente:
En este tutorial se usan cuadernos interactivos para completar tareas comunes de ETL en Python o Scala.
También puede usar Delta Live Tables para compilar canalizaciones de ETL. Databricks creó Delta Live Tables para reducir la complejidad de la creación, implementación y mantenimiento de canalizaciones de ETL de producción. Consulte Tutorial: ejecute su primera canalización de Delta Live Tables.
Aunque no tenga privilegios de control de clúster, puede completar la mayoría de los pasos siguientes siempre que tenga acceso a un clúster.
Paso 1: creación de un clúster
Para realizar análisis de datos exploratorios e ingeniería de datos, puede crear un clúster a fin de proporcionar los recursos de proceso necesarios para ejecutar comandos.
Haga clic en Proceso en la barra lateral.
En la página de proceso, haga clic en Crear clúster. Se abrirá la página Nuevo clúster.
Especifique un nombre único para el clúster, deje los valores restantes en su estado predeterminado y haga clic en Crear clúster.
Para obtener más información sobre los clústeres de Databricks, consulte Compute.
Paso 2: creación de un cuaderno de Databricks
Para crear un cuaderno en el área de trabajo, haga clic en Nuevo en la barra lateral y, después, haga clic en Cuaderno. Se abre un cuaderno en blanco en el área de trabajo.
Para obtener más información sobre cómo crear y administrar cuadernos, consulte Administración de cuadernos.
Paso 3: configuración de Auto Loader para ingerir datos en Delta Lake
Databricks recomienda usar el Auto Loader para la ingesta de datos incremental. Auto Loader detecta y procesa automáticamente los nuevos archivos a medida que llegan al almacenamiento de objetos en la nube.
Databricks recomienda el almacenamiento de datos con Delta Lake. Delta Lake es una capa de almacenamiento de código abierto que proporciona transacciones ACID y habilita el almacén de lago de datos. Delta Lake es el formato predeterminado para las tablas que se crean en Databricks.
Para configurar Auto Loader a fin de ingerir datos en Delta Lake, copie y pegue el código siguiente en la celda vacía del cuaderno:
Python
Python
# Import functionsfrom pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
Scala
// Importsimport org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Triggerimport spark.implicits._
// Define variables used in code belowval file_path = "/databricks-datasets/structured-streaming/events"val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"val checkpoint_path = s"/tmp/${username}/_checkpoint"// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
Nota
Las variables que se definen en este código deben permitirle que la ejecute de forma segura, sin correr el riesgo de entrar en conflicto con los recursos del área de trabajo existentes u otros usuarios. Los permisos de red o almacenamiento restringidos producirán errores cuando ejecuten este código; póngase en contacto con el administrador del área de trabajo para solucionar problemas de estas restricciones.
Para obtener más información sobre Auto Loader, consulte Auto Loader.
Paso 4: procesamiento e interacción con datos
Los cuadernos ejecutan la lógica celda por celda. Para ejecutar la lógica en la celda:
Para ejecutar la celda que completó en el paso anterior, selecciónela y presione SHIFT+ENTER.
Para consultar la tabla que acaba de crear, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.
Python
Python
df = spark.read.table(table_name)
Scala
Scala
val df = spark.read.table(table_name)
Para obtener una versión preliminar de los datos de DataFrame, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.
Puede ejecutar cuadernos de Databricks como scripts de producción al agregarlos como una tarea en un trabajo de Databricks. En este paso, creará un nuevo trabajo que podrá desencadenar manualmente.
Para programar el cuaderno como una tarea:
Haga clic en Programación en el lado derecho de la barra de encabezado.
Introduzca un nombre único para el nombre del trabajo.
Haga clic en Manual.
En la lista desplegable Clúster, seleccione el clúster que creó en el paso 1.
Haga clic en Crear.
En la ventana que aparece, haga clic en Ejecutar ahora.
Para ver los resultados de la ejecución del trabajo, haga clic en el icono junto a la marca de tiempo Última ejecución.
Demostrar la comprensión de las tareas comunes de ingeniería de datos para implementar y administrar cargas de trabajo de ingeniería de datos en Microsoft Azure mediante una serie de servicios de Azure.