Ejercicio: Integración de un cuaderno en canalizaciones de Azure Synapse
En esta unidad, creará un cuaderno de Spark de Azure Synapse para analizar y transformar los datos cargados por un flujo de datos de asignación y los almacenará en un lago de datos. Creará una celda de parámetro que acepte un parámetro de cadena que defina el nombre de la carpeta para los datos que el cuaderno escribe en el lago de datos.
Después, agregará este cuaderno a una canalización de Synapse y pasará el identificador de ejecución de canalización único al parámetro de cuaderno para que posteriormente pueda poner en correlación la ejecución de la canalización con los datos guardados por la actividad del cuaderno.
Por último, usará el centro de supervisión de Synapse Studio para supervisar la ejecución de la canalización, obtener el identificador de ejecución y, después, buscar los archivos correspondientes almacenados en el lago de datos.
Acerca de Apache Spark y los cuadernos
Apache Spark es una plataforma de procesamiento paralelo que admite el procesamiento en memoria para mejorar el rendimiento de aplicaciones de análisis de macrodatos. Apache Spark en Azure Synapse Analytics es una de las implementaciones de Microsoft de Apache Spark en la nube.
Un cuaderno de Apache Spark en Synapse Studio es una interfaz web para crear archivos que contienen código dinámico, visualizaciones y texto narrativo. Los cuadernos son un buen lugar para validar ideas y aplicar experimentos rápidos para sacar conclusiones a partir de los datos. Los cuadernos también se usan ampliamente en la preparación de datos, la visualización de datos, el aprendizaje automático y otros escenarios de macrodatos.
Creación de un cuaderno de Synapse Spark
Imagine que ha creado un flujo de datos de asignación en Synapse Analytics para procesar, combinar e importar datos de perfil de usuario. Ahora, quiere encontrar los cinco productos principales para cada usuario, en función de cuáles son los preferidos y los principales, y las compras principales en los últimos 12 meses. Después, quiere calcular los cinco productos principales en general.
En este paso, creará un cuaderno Spark de Synapse para realizar estos cálculos.
Abra Synapse Analytics Studio (https://web.azuresynapse.net/) y vaya al centro Data (Datos).
Seleccione la pestaña Linked (Vinculado) (1) y expanda la cuenta de almacenamiento de lago de datos principal (2) debajo de Azure Data Lake Storage Gen2. Seleccione el contenedor wwi-02(3) y abra la carpeta top-products(4). Haga clic con el botón derecho en cualquier archivo de Parquet (5), seleccione el elemento de menú New notebook (Nuevo cuaderno) (6) y después Load to DataFrame (7) (Cargar en DataFrame). Si no ve la carpeta, seleccione
Refresh
.Asegúrese de que el cuaderno está adjunto al grupo de Spark.
Reemplace el nombre del archivo de Parquet por
*.parquet
(1) para seleccionar todos los archivos de Parquet de la carpetatop-products
. Por ejemplo, la ruta de acceso debe ser similar a:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Seleccione Ejecutar todo en la barra de herramientas del cuaderno para ejecutarlo.
Nota
La primera vez que se ejecuta un cuaderno en un grupo de Spark, Synapse crea una nueva sesión. Este proceso puede tardar aproximadamente entre 3 y 5 minutos.
Nota
Para ejecutar solo la celda, mantenga el mouse sobre ella y seleccione el icono Run cell (Ejecutar celda) situado a la izquierda de la celda, o bien seleccione la celda y presione Ctrl+Entrar.
Cree una celda debajo; para ello, seleccione el botón + y después el elemento Code cell (Celda de código). El botón + se encuentra debajo de la celda del cuaderno de la izquierda. Como alternativa, también puede expandir el menú + Cell (+ Celda) en la barra de herramientas del cuaderno y seleccionar el elemento Code cell.
Ejecute el comando siguiente en la nueva celda para rellenar una nueva trama de datos denominada
topPurchases
, crear una vista temporal denominadatop_purchases
y mostrar las primeras 100 filas:PythontopPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
La salida debe tener una apariencia similar a la siguiente:
text+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Ejecute el comando siguiente en una nueva celda para crear una vista temporal mediante SQL:
SQL%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Nota
No hay ninguna salida para esta consulta.
La consulta usa la vista temporal
top_purchases
como origen y aplica un métodorow_number() over
para aplicar un número de fila para los registros de cada usuario dondeItemsPurchasedLast12Months
sea mayor. La cláusulawhere
filtra los resultados para que solo se recuperen hasta cinco productos en los que tantoIsTopProduct
comoIsPreferredProduct
estén establecidos en true. Esto proporciona los cinco productos más comprados para cada usuario en los que esos productos también se han identificado como sus favoritos, según su perfil de usuario almacenado en Azure Cosmos DB.Ejecute el comando siguiente en una nueva celda para crear y mostrar un nuevo elemento DataFrame que almacena los resultados de la vista temporal
top_5_products
que ha creado en la celda anterior:Pythontop5Products = sqlContext.table("top_5_products") top5Products.show(100)
Debería ver una salida similar a la siguiente, en la que se muestran los cinco productos principales preferidos por cada usuario:
Calcule los cinco productos principales generales, en función de los que prefieran los clientes y que más hayan comprado. Para ello, ejecute el comando siguiente en una nueva celda:
Pythontop5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
En esta celda, se agrupan los cinco productos principales preferidos por identificador de producto, se suma el total de artículos comprados en los últimos 12 meses, se ordena ese valor en orden descendente y se devuelven los cinco primeros resultados. La salida debe ser similar a la siguiente:
text+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Creación de una celda de parámetros
Las canalizaciones de Azure Synapse buscan la celda de parámetros y la tratan como el valor predeterminado para los parámetros que se pasan en tiempo de ejecución. El motor de ejecución agregará una nueva celda debajo de la celda de parámetros con los parámetros de entrada que van a sobrescribir los valores predeterminados. Cuando no se designa ninguna celda de parámetros, la celda insertada se insertará en la parte superior del cuaderno.
Ahora se ejecutará este cuaderno desde una canalización. El objetivo es pasar un parámetro que establece un valor variable
runId
que se usará para dar nombre al archivo de Parquet. Ejecute el comando siguiente en una nueva celda:Pythonimport uuid # Generate random GUID runId = uuid.uuid4()
Se usa la biblioteca
uuid
incluida en Spark para generar un GUID aleatorio. La intención es reemplazar la variablerunId
con un parámetro pasado por la canalización. Para ello, es necesario alternar esto como una celda de parámetro.Seleccione los puntos suspensivos de acciones (...) en la esquina superior derecha de la celda (1) y, después, seleccione Toggle parameter cell (2) (Alternar celda de parámetros).
Después de alternar esta opción, verá la etiqueta Parameters (Parámetros) en la celda.
Pegue el código siguiente en una nueva celda para usar la variable
runId
como nombre de archivo de Parquet en la ruta de acceso/top5-products/
de la cuenta del lago de datos principal. ReemplaceYOUR_DATALAKE_NAME
en la ruta de acceso con el nombre de la cuenta del lago de datos principal. Para encontrarla, desplácese hasta la Celda 1 en la parte superior de la página (1). Copie la cuenta de almacenamiento de lago de datos de la ruta de acceso (2). Pegue este valor como reemplazo deYOUR_DATALAKE_NAME
en la ruta (3) dentro de la nueva celda y, después, ejecute la celda.Python%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Compruebe que el archivo se ha escrito en el lago de datos. Vaya al centro Data (Datos) y seleccione la pestaña Linked (Vinculado) (1). Expanda la cuenta de almacenamiento de lago de datos principal y seleccione el contenedor wwi-02(2). Vaya a la carpeta top5-products(3). Debería ver una carpeta para el archivo de Parquet en el directorio con un GUID como nombre de archivo (4).
El método de escritura de Parquet en la trama de datos de la celda del cuaderno ha creado este directorio porque no existía antes.
Adición del cuaderno a una canalización de Synapse
Volviendo al flujo de datos de asignación que se ha descrito al principio del ejercicio, imagine que quiere ejecutar este cuaderno después de que el flujo de datos se ejecute como parte del proceso de orquestación. Para ello, agregue este cuaderno a una canalización como una nueva actividad de cuaderno.
Vuelva al cuaderno. Seleccione Propiedades (1) en la esquina superior derecha del cuaderno y, después, escriba
Calculate Top 5 Products
en Nombre (2).Seleccione Add to pipeline (Agregar a la canalización) (1) en la esquina superior derecha del cuaderno y, después, seleccione Existing pipeline (2) (Canalización existente).
Seleccione la canalización Write User Profile Data to ASA (Escribir datos de perfil de usuario en ASA) (1) y después Agregar *(2).
Synapse Studio agrega la actividad Cuaderno a la canalización. Reorganice la actividad Cuaderno para ubicarla a la derecha de la actividad Flujo de datos. Seleccione la actividad Flujo de datos y arrastre un cuadro verde de conexión de canalización de actividad Success (Correcta) a la actividad Cuaderno.
La flecha de actividad Success (Correcta) indica a la canalización que ejecute la actividad Cuaderno después de que la actividad Flujo de datos se ejecute correctamente.
Seleccione la actividad Cuaderno (1), después la pestaña Settings (Configuración) (2), expanda Base parameters (3) (Parámetros base) y seleccione + New (4) (+ Nuevo). Escriba
runId
en el campo Nombre(5). Seleccione String (Cadena) para Type (6) (Tipo). En Value (Valor), seleccione Add dynamic content (7) (Agregar contenido).Seleccione Pipeline run ID (Identificador de ejecución de canalización) en System variables (1) (Variables del sistema). Esto agrega
@pipeline().RunId
al cuadro de contenido dinámico (2). Seleccione Finish (3) (Finalizar) para cerrar el cuadro de diálogo.El valor de identificador de ejecución de canalización es un GUID único asignado a cada ejecución de canalización. Se usará este valor para el nombre del archivo de Parquet, y se pasará como parámetro del cuaderno
runId
. Después, se puede examinar el historial de ejecución de la canalización y buscar el archivo de Parquet específico creado para cada ejecución de canalización.Seleccione Publish all (Publicar todo) y después Publish (Publicar) para guardar los cambios.
Una vez que se ha completado la publicación, seleccione Add trigger (1) (Agregar desencadenador) y después Trigger now (2) (Desencadenar ahora) para ejecutar la canalización actualizada.
Seleccione OK (Aceptar) para ejecutar el desencadenador.
Supervisión de la ejecución de la canalización
El centro de supervisión permite supervisar las actividades actuales e históricas de SQL, Apache Spark y Pipelines.
Vaya al centro de conectividad Supervisión.
Seleccione Pipeline runs (1) (Ejecuciones de canalización) y espere a que la ejecución de la canalización se complete correctamente (2). Es posible que tenga que actualizar (3) la vista.
Seleccione el nombre de la canalización para ver sus ejecuciones de actividad.
Observe tanto la actividad Flujo de datos como la nueva actividad Cuaderno(1). Anote el valor de Pipeline run ID (Identificador de ejecución de canalización) (2). Se comparará con el nombre de archivo de Parquet generado por el cuaderno. Seleccione el nombre del cuaderno Calculate Top 5 Products (Calcular los cinco productos principales) para ver sus detalles (3).
Aquí se ven los detalles de ejecución del cuaderno. Puede seleccionar Reproducir (1) para ver una reproducción del progreso mediante los trabajos (2). En la parte inferior, puede ver Diagnósticos y Registros con diferentes opciones de filtro (3). A la derecha, se pueden ver los detalles de la ejecución, como la duración, el identificador de Livy, los detalles del grupo de Spark, entre otros. Seleccione el vínculo Ver detalles de un trabajo para ver sus detalles (5).
La interfaz de usuario de la aplicación Spark se abre en una nueva pestaña donde se pueden ver los detalles de la fase. Expanda la visualización de DAG para ver los detalles de la fase.
Vuelva al centro Data (Datos).
Seleccione la pestaña Linked (Vinculado) (1), después seleccione el contenedor wwi-02(2) en la cuenta de almacenamiento de lago de datos principal, vaya a la carpeta top5-products(3) y compruebe que existe una carpeta para el archivo de Parquet cuyo nombre coincide con el identificador de ejecución de canalización.
Como puede ver, hay un archivo cuyo nombre coincide con el identificador de ejecución de canalización que se ha anotado antes:
Estos valores coinciden porque se ha pasado el identificador de ejecución de canalización al parámetro
runId
en la actividad Cuaderno.