Compartir vía


Uso de Databricks SQL en un trabajo de Azure Databricks

Puede usar el tipo de tarea SQL en un trabajo de Azure Databricks, lo que le permite crear, programar, operar y supervisar flujos de trabajo que incluyen objetos SQL de Databricks, como consultas, paneles heredados y alertas. Por ejemplo, el flujo de trabajo puede ingerir datos, preparar los datos, realizar análisis mediante consultas de Databricks SQL y, después, mostrar los resultados en un panel heredado.

En este artículo se proporciona un flujo de trabajo de ejemplo que crea un panel heredado que muestra métricas para las contribuciones de GitHub. En este ejemplo, hará lo siguiente:

  • Ingerir datos de GitHub usando un script de Python y la API de REST de GitHub.
  • Transformar los datos de GitHub usando una canalización de Delta Live Tables.
  • Desencadenar consultas de Databricks SQL realizando análisis sobre los datos preparados.
  • Mostrará el análisis en un panel heredado.

Panel de análisis de GitHub

Antes de empezar

Necesita lo siguiente para poder llevar a cabo este tutorial:

Paso 1: Almacenar los tokens de GitHub en un secreto

En lugar de codificar credenciales como el token de acceso personal de GitHub en un trabajo, Databricks recomienda usar un ámbito de secretos para almacenarlos y administrarlos de forma segura. Los siguientes comandos de la CLI de Databricks son un ejemplo de cómo crear un ámbito de secretos y almacenar el token de GitHub en un secreto en ese ámbito:

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • Reemplace <scope-name por el nombre de un ámbito de secretos de Azure Databricks para almacenar los tokens.
  • Reemplace <token-key> por el nombre de una clave para asignar a los tokens.
  • Reemplace <token> por el valor del token de acceso personal de GitHub.

Paso 2: Crear un script para obtener los datos de GitHub

El siguiente script de Python usa la API de REST de GitHub para obtener datos sobre confirmaciones y contribuciones de un repositorio de GitHub. Los argumentos de entrada especifican el repositorio de GitHub. Los registros se guardan en una ubicación de DBFS especificada por otro argumento de entrada.

En este ejemplo se usa DBFS para almacenar el script de Python, pero también se pueden usar carpetas de Git de Databricks o archivos de área de trabajo para almacenar y administrar el script.

  • Guarde este script en una ubicación en el disco local:

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    if __name__ == "__main__":
        main()
    
  • Cargue el script en DBFS:

    1. Vaya a su página de aterrizaje de Azure Databricks y haga clic en Icono de catálogo Catálogo en la barra lateral.
    2. Haga clic en Examinar DBFS.
    3. En el explorador de archivos DBFS, haga clic en Cargar. Aparecerá el cuadro de diálogo Cargar datos en DBFS.
    4. Escriba una ruta de acceso en DBFS para almacenar el script, haga clic en Coloque los archivos que quiera cargar o haga clic para examinar y seleccione el script de Python.
    5. Haga clic en Done(Listo).

Paso 3: Crear una canalización de Delta Live Tables para procesar los datos de GitHub

En esta sección, se crea una canalización de Delta Live Tables para convertir los datos sin procesar de GitHub en tablas que puedan ser analizadas mediante consultas de Databricks SQL. Para crear la canalización, realice los pasos siguientes:

  1. En la barra lateral, haga clic en Icono Nuevo Nuevo y seleccione Cuaderno en el menú. Aparece el cuadro de diálogo Create Notebook (Crear cuaderno).

  2. En Lenguaje predeterminado, escriba un nombre y seleccione Python. Puede dejar el clúster en el valor predeterminado. El entorno de ejecución de Delta Live Tables crea un clúster antes de que ejecute la canalización.

  3. Haga clic en Crear.

  4. Copie el ejemplo de código Python y péguelo en su nuevo cuaderno. Puede agregar el código de ejemplo a una sola celda del cuaderno o a varias celdas.

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. En la barra lateral, haga clic en Icono de flujos de trabajo Flujos de trabajo, luego en la pestaña Delta Live Tables y, después, en Crear canalización.

  6. Asigne un nombre a la canalización, por ejemplo, Transform GitHub data.

  7. En el campo Bibliotecas de cuadernos, escriba la ruta de acceso al cuaderno o haga clic en Icono del selector de archivos para seleccionar el cuaderno.

  8. Haga clic en Añadir configuración. En el cuadro de texto Key, escriba commits-path. En el cuadro de texto Value, introduzca la ruta de acceso de DBFS en la que se escribirán los registros de GitHub. Puede ser cualquier ruta de acceso que elija y es la misma que usará al configurar la primera tarea de Python cuando cree el flujo de trabajo.

  9. Haga clic de nuevo en Añadir configuración. En el cuadro de texto Key, escriba contribs-path. En el cuadro de texto Value, introduzca la ruta de acceso de DBFS en la que se escribirán los registros de GitHub. Puede ser cualquier ruta de acceso que elija y es la misma que usará al configurar la segunda tarea de Python cuando cree el flujo de trabajo.

  10. En el campo Destino, escriba una base de datos de destino, por ejemplo, github_tables. Establecer una base de datos de destino publica los datos de salida en el metastore y es necesario para las consultas posteriores que analizan los datos producidos por la canalización.

  11. Haga clic en Save(Guardar).

Paso 4: Crear un flujo de trabajo para ingerir y transformar los datos de GitHub

Antes de analizar y visualizar los datos de GitHub con Databricks SQL, es necesario ingerir y preparar los datos. Para crear un flujo de trabajo para completar estas tareas, realice los pasos siguientes:

Cree un trabajo de Azure Databricks y añada la primera tarea

  1. Vaya a la página de aterrizaje de Azure Databricks y realice una de las siguientes acciones:

    • En la barra lateral, haga clic en Icono de flujos de trabajo Flujos de trabajo y, después, en Botón Crear trabajo.
    • En la barra lateral, haga clic en Icono Nuevo Nuevo y seleccione Trabajo en el menú.
  2. En el cuadro de diálogo de tarea que aparece en la pestaña Tareas, reemplace Agregar un nombre para el trabajo... por el nombre del trabajo, por ejemplo, GitHub analysis workflow.

  3. En el campo Nombre de tarea, escriba el nombre que quiera darle a la tarea, por ejemplo, get_commits.

  4. En Tipo, seleccione Script de Python.

  5. En Origen, seleccione DBFS/S3.

  6. En Ruta de acceso, escriba la ruta de acceso al script en DBFS.

  7. En Parámetros, escriba los argumentos siguientes para el script de Python:

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • Reemplace <owner> por el nombre del propietario del repositorio. Por ejemplo, para capturar registros del repositorio github.com/databrickslabs/overwatch, escriba databrickslabs.
    • Reemplace <repo> por el nombre del repositorio, por ejemplo, overwatch.
    • Reemplace <DBFS-output-dir> por una ruta de acceso de DBFS para almacenar los registros capturados desde GitHub.
    • Reemplace <scope-name> por el nombre del ámbito de secreto que creó para almacenar el token de GitHub.
    • Reemplace <github-token-key> por el nombre de la clave que asignó al token de GitHub.
  8. Haga clic en Guardar tarea.

Agregar otra tarea

  1. Haga clic en botón Agregar tarea debajo de la tarea que acaba de crear.

  2. En el campo Nombre de tarea, escriba el nombre que quiera darle a la tarea, por ejemplo, get_contributors.

  3. En Tipo, seleccione el tipo de tarea Script de Python.

  4. En Origen, seleccione DBFS/S3.

  5. En Ruta de acceso, escriba la ruta de acceso al script en DBFS.

  6. En Parámetros, escriba los argumentos siguientes para el script de Python:

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • Reemplace <owner> por el nombre del propietario del repositorio. Por ejemplo, para capturar registros del repositorio github.com/databrickslabs/overwatch, escriba databrickslabs.
    • Reemplace <repo> por el nombre del repositorio, por ejemplo, overwatch.
    • Reemplace <DBFS-output-dir> por una ruta de acceso de DBFS para almacenar los registros capturados desde GitHub.
    • Reemplace <scope-name> por el nombre del ámbito de secreto que creó para almacenar el token de GitHub.
    • Reemplace <github-token-key> por el nombre de la clave que asignó al token de GitHub.
  7. Haga clic en Guardar tarea.

Agregar una tarea para transformar los datos

  1. Haga clic en botón Agregar tarea debajo de la tarea que acaba de crear.
  2. En el campo Nombre de tarea, escriba el nombre que quiera darle a la tarea, por ejemplo, transform_github_data.
  3. En Tipo, seleccione Canalización de Delta Live Tables y escriba un nombre para la tarea.
  4. En Canalización, seleccione la canalización creada en el Paso 3: Crear una canalización de Delta Live Tables para procesar los datos de GitHub.
  5. Haga clic en Crear.

Paso 5: Ejecutar el flujo de trabajo de transformación de datos

Haga clic en botón Ejecutar ahora para ejecutar el flujo de trabajo. Para ver detalles de la ejecución, haga clic en el vínculo de la columna Hora de inicio de la ejecución en la vista Ejecuciones de trabajo. Haga clic en cada tarea para ver los detalles de la ejecución de la tarea.

Paso 6: (Opcional) Para ver los datos de salida una vez completada la ejecución del flujo de trabajo, realice los pasos siguientes:

  1. En la vista de detalles de ejecución, haga clic en la tarea Delta Live Tables.
  2. En el panel Detalles de ejecución de la tarea, haga clic en el nombre de la canalización en Canalización. La página Detalles de canalización aparecerá.
  3. Seleccione la tabla commits_by_author en el DAG de canalización.
  4. Haga clic en el nombre de la tabla junto a Metastore en el panel commits_by_author. Se abre la página del Explorador de catálogos.

En el Explorador de catálogos, puede ver el esquema de tabla, los datos de ejemplo y otros detalles de los datos. Siga los mismos pasos para ver los datos de la tabla github_contributors_raw.

Paso 7: Eliminar los datos de GitHub

En una aplicación real, es posible que esté continuamente ingiriendo y procesando datos. Dado que este ejemplo descarga y procesa todo el conjunto de datos, debe eliminar los datos de GitHub ya descargados para evitar un error al volver a ejecutar el flujo de trabajo. Para quitar los datos descargados, realice los pasos siguientes:

  1. Cree un cuaderno y escriba los siguientes comandos en la primera celda:

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    Reemplace <commits-path> y <contributors-path> por las rutas de acceso de DBFS que configuró al crear las tareas de Python.

  2. Haga clic en Menú Ejecutar y seleccione Ejecutar celda.

También puede agregar este cuaderno como una tarea en el flujo de trabajo.

Paso 8: Crear las consultas de Databricks SQL

Después de ejecutar el flujo de trabajo y crear las tablas necesarias, cree consultas para analizar los datos preparados. Para crear las consultas y visualizaciones de ejemplo, realice los pasos siguientes:

Mostrar los 10 principales colaboradores por mes

  1. Haga clic en el icono situado bajo el logotipo de Databricks Logotipo de Databricks en la barra lateral y seleccione SQL.

  2. Haga clic en Crear una consulta para abrir el editor de consultas de Databricks SQL.

  3. Asegúrese de que el valor se haya establecido en hive_metastore. Haga clic en valor predeterminado junto a hive_metastore y establezca la base de datos en el valor de Destino establecido en la canalización de Delta Live Tables.

  4. En la pestaña Nueva consulta, escriba la siguiente consulta:

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. Haga clic en la pestaña Nueva consulta y cambie el nombre de la consulta, por ejemplo, Commits by month top 10 contributors.

  6. De forma predeterminada, los resultados se muestran como una tabla. Para cambiar cómo se visualizan los datos, por ejemplo, mediante un gráfico de barras, en el panel Resultados, haga clic en menú Kebab y haga clic en Editar.

  7. En Tipo de visualización, seleccione Barra.

  8. En Columna X, seleccione mes.

  9. En Columnas Y, seleccione recuento(1).

  10. En Agrupar por, seleccione nombre.

  11. Haga clic en Save(Guardar).

Mostrar los 20 principales colaboradores

  1. Haga clic en + > Crear nueva consulta y asegúrese de que el catálogo está establecido en hive_metastore. Haga clic en valor predeterminado junto a hive_metastore y establezca la base de datos en el valor de Destino establecido en la canalización de Delta Live Tables.

  2. Escriba la siguiente consulta:

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. Haga clic en la pestaña Nueva consulta y cambie el nombre de la consulta, por ejemplo, Top 20 contributors.

  4. Para cambiar la visualización de la tabla predeterminada, en el panel Resultados, haga clic en los menú Kebab y haga clic en Editar.

  5. En Tipo de visualización, seleccione Barra.

  6. En la Columna X, seleccione inicio de sesión.

  7. En Columnas Y, seleccione contribuciones.

  8. Haga clic en Save(Guardar).

Mostrar el total de confirmaciones por autor

  1. Haga clic en + > Crear nueva consulta y asegúrese de que el catálogo está establecido en hive_metastore. Haga clic en valor predeterminado junto a hive_metastore y establezca la base de datos en el valor de Destino establecido en la canalización de Delta Live Tables.

  2. Escriba la siguiente consulta:

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. Haga clic en la pestaña Nueva consulta y cambie el nombre de la consulta, por ejemplo, Total commits by author.

  4. Para cambiar la visualización de la tabla predeterminada, en el panel Resultados, haga clic en los menú Kebab y haga clic en Editar.

  5. En Tipo de visualización, seleccione Barra.

  6. En la Columna X, seleccione nombre.

  7. En Columnas Y, seleccione confirmaciones.

  8. Haga clic en Save(Guardar).

Paso 9: Crear un panel

  1. En la barra lateral, haga clic en Dashboards Icon Paneles
  2. Haga clic en Crear panel.
  3. Escriba un nombre para el panel, por ejemplo, GitHub analysis.
  4. Para cada consulta y visualización creada en el Paso 8: Crear consultas de Databricks SQL, haga clic en Añadir > Visualización y seleccione cada visualización.

Paso 10: Añada las tareas SQL al flujo de trabajo

Para añadir las nuevas tareas de consulta al flujo de trabajo que creó en Crear un trabajo de Azure Databricks y añadir la primera tarea, para cada consulta que creó en el Paso 8: Crear consultas de Databricks SQL:

  1. Haga clic en Icono de flujos de trabajo Flujos de trabajo en la barra lateral.
  2. En la columna Name (Nombre), haga clic en el nombre de trabajo.
  3. Haga clic en la ficha Tareas.
  4. Haga clic en botón Agregar tarea debajo de la última tarea.
  5. Escriba un nombre para la tarea, en Tipo, seleccione SQL, y en Tarea SQL seleccione Consulta.
  6. Seleccione la consulta en consulta SQL.
  7. En Almacén SQL, seleccione un almacén SQL sin servidor o un almacén SQL pro para ejecutar la tarea.
  8. Haga clic en Crear.

Paso 11: Añadir una tarea de panel

  1. Haga clic en botón Agregar tarea debajo de la última tarea.
  2. Escriba un nombre para la tarea, en Tipo, seleccione SQL, y en Tarea SQL seleccione Panel heredado.
  3. Seleccione el panel creado en Paso 9: Crear un panel.
  4. En Almacén SQL, seleccione un almacén SQL sin servidor o un almacén SQL pro para ejecutar la tarea.
  5. Haga clic en Crear.

Paso 12: Ejecutar el flujo de trabajo completo

Para ejecutar el flujo de trabajo, haga clic en el Botón Ejecutar ahora. Para ver detalles de la ejecución, haga clic en el vínculo de la columna Hora de inicio de la ejecución en la vista Ejecuciones de trabajo.

Paso 13: Ver los resultados

Para ver los resultados cuando finalice la ejecución, haga clic en la tarea final del panel y haga clic en el nombre del panel bajo Panel de SQL en el panel derecho.