Condividi tramite


Usare Databricks SQL in un processo di Azure Databricks

È possibile usare il tipo di attività SQL in un processo di Azure Databricks per creare, pianificare, gestire e monitorare flussi di lavoro che includono oggetti SQL di Databricks, ad esempio query, dashboard legacy e avvisi. Ad esempio, il flusso di lavoro può inserire dati, preparare i dati, eseguire l'analisi usando query SQL di Databricks e quindi visualizzare i risultati in un dashboard legacy.

Questo articolo fornisce un esempio di flusso di lavoro che crea un dashboard legacy che visualizza le metriche per i contributi di GitHub. Nel seguente esempio, si eseguirà quanto segue:

  • Inserire dati GitHub usando uno script Python e l'API REST di GitHub.
  • Trasformare i dati di GitHub usando una pipeline di tabelle Delta Live.
  • Attivare query SQL di Databricks che eseguono l'analisi sui dati preparati.
  • Visualizzare l'analisi in un dashboard legacy.

Dashboard di analisi di GitHub

Prima di iniziare

Per completare questa procedura dettagliata, sono necessari i seguenti elementi:

Passaggio 1: Archiviare il token GitHub in un segreto

Anziché impostare le credenziali come hardcoding, ad esempio il token di accesso personale GitHub in un processo, Databricks consiglia di usare un ambito segreto per archiviare e gestire i segreti in modo sicuro. I seguenti comandi dell'interfaccia della riga di comando di Databricks sono un esempio di creazione di un ambito segreto e di archiviazione del token GitHub in un segreto in tale ambito:

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • Sostituire <scope-name con il nome di un ambito segreto di Azure Databricks per archiviare il token.
  • Sostituire <token-key> con il nome di una chiave da assegnare al token.
  • Sostituire <token> con il token di accesso personale di GitHub.

Passaggio 2: Creare uno script per recuperare i dati di GitHub

Il seguente script Python usa l'API REST di GitHub per recuperare i dati sui commit e i contributi da un repository GitHub. Gli argomenti di input specificano il repository GitHub. I record vengono salvati in un percorso in DBFS specificato da un altro argomento di input.

Questo esempio usa DBFS per archiviare lo script Python, ma è anche possibile usare cartelle Git di Databricks o file dell'area di lavoro per archiviare e gestire lo script.

  • Salvare questo script in un percorso nel disco locale:

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    if __name__ == "__main__":
        main()
    
  • Caricare lo script in DBFS:

    1. Andare alla pagina di destinazione di Azure Databricks e fare clic su Icona catalogo Catalogo nella barra laterale.
    2. Fare clic su Sfoglia DBFS.
    3. Nel browser file DBFS fare clic su Carica. Viene visualizzata la finestra di dialogo Carica dati in DBFS.
    4. Immettere un percorso in DBFS per archiviare lo script, fare clic su Rilascia file da caricare oppure fare clic per esplorare e selezionare lo script Python.
    5. Fare clic su Fatto.

Passaggio 3: Creare una pipeline di tabelle Delta Live per elaborare i dati di GitHub

In questa sezione viene creata una pipeline di tabelle Delta Live per convertire i dati GitHub non elaborati in tabelle che possono essere analizzate dalle query SQL di Databricks. Per creare la pipeline, seguire questa procedura:

  1. Nella barra laterale, fare clic su Nuova icona Nuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.

  2. In Lingua predefinita immettere un nome e selezionare Python. È possibile lasciare il valore predefinito per Cluster. Il runtime di DLT crea un cluster prima di eseguire la pipeline.

  3. Cliccare su Crea.

  4. Copiare l'esempio di codice Python e incollarlo nel nuovo notebook. È possibile aggiungere il codice di esempio a una singola cella o a più celle del notebook.

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. Nella barra laterale, fare clic su Fare clic su Icona Flussi di lavoro Flussi di lavoro, poi fare clic sulla scheda Delta Live Tables e infine su Crea Pipeline.

  6. Assegnare alla pipeline un nome, ad esempio Transform GitHub data.

  7. Nel campo Librerie del notebook immettere il percorso del notebook o fare clic su Icona della selezione file per selezionare il notebook.

  8. Fare clic su Aggiungi configurazione. Nella casella di testo Key immettere commits-path. Nella casella di testo Value immettere il percorso DBFS in cui verranno scritti i record GitHub. Si può scegliere qualsiasi percorso ed è lo stesso percorso che verrà usato durante la configurazione della prima attività Python quando si crea il flusso di lavoro.

  9. Fare di nuovo clic su Aggiungi configurazione. Nella casella di testo Key immettere contribs-path. Nella casella di testo Value immettere il percorso DBFS in cui verranno scritti i record GitHub. Si può scegliere qualsiasi percorso ed è lo stesso percorso che verrà usato durante la configurazione della seconda attività Python quando si crea il flusso di lavoro.

  10. Nel campo Destinazione immettere un database di destinazione, ad esempio github_tables. L'impostazione di un database di destinazione pubblica i dati di output nel metastore ed è necessaria per le query downstream che analizzano i dati prodotti dalla pipeline.

  11. Fare clic su Salva.

Passaggio 4: Creare un flusso di lavoro per inserire e trasformare i dati di GitHub

Prima di analizzare e visualizzare i dati di GitHub con Databricks SQL, è necessario inserire e preparare i dati. Per creare un flusso di lavoro per completare queste attività, seguire questa procedura:

Creare un processo di Azure Databricks e aggiungere la prima attività

  1. Passare alla pagina di destinazione di Azure Databricks ed eseguire una delle operazioni seguenti:

    • Nella barra laterale, fare clic su Icona Flussi di lavoro Flussi di lavoro e quindi Pulsante Crea processo.
    • Nella barra laterale, fare clic su Nuova icona Nuovo e selezionare Processo dal menu.
  2. Nella finestra di dialogo delle attività visualizzata nella scheda Attività sostituire Aggiungi un nome per il processo... con il nome del processo, ad esempio GitHub analysis workflow.

  3. In Nome attività immettere un nome per l'attività, ad esempio get_commits.

  4. In Tipo selezionare Script Python.

  5. In Origine selezionare DBFS/S3.

  6. In Percorso immettere il percorso dello script in DBFS.

  7. In Parametri immettere gli argomenti seguenti per lo script Python:

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • Sostituire <owner> con il nome del proprietario del repository. Ad esempio, per recuperare i record dal repository github.com/databrickslabs/overwatch, immettere databrickslabs.
    • Sostituire <repo> con il nome del repository, ad esempio overwatch.
    • Sostituire <DBFS-output-dir> con un percorso in DBFS per archiviare i record recuperati da GitHub.
    • Sostituire <scope-name> con il nome dell'ambito segreto creato per archiviare il token GitHub.
    • Sostituire <github-token-key> con il nome della chiave assegnata al token GitHub.
  8. Fare clic su Salva attività.

Aggiungere un'altra attività

  1. Cliccare su Pulsante Aggiungi attività sotto la task appena creata.

  2. In Nome attività immettere un nome per l'attività, ad esempio get_contributors.

  3. In Tipo selezionare il tipo di attività Script Python.

  4. In Origine selezionare DBFS/S3.

  5. In Percorso immettere il percorso dello script in DBFS.

  6. In Parametri immettere gli argomenti seguenti per lo script Python:

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • Sostituire <owner> con il nome del proprietario del repository. Ad esempio, per recuperare i record dal repository github.com/databrickslabs/overwatch, immettere databrickslabs.
    • Sostituire <repo> con il nome del repository, ad esempio overwatch.
    • Sostituire <DBFS-output-dir> con un percorso in DBFS per archiviare i record recuperati da GitHub.
    • Sostituire <scope-name> con il nome dell'ambito segreto creato per archiviare il token GitHub.
    • Sostituire <github-token-key> con il nome della chiave assegnata al token GitHub.
  7. Fare clic su Salva attività.

Aggiungere un'attività per trasformare i dati

  1. Cliccare su Pulsante Aggiungi attività sotto la task appena creata.
  2. In Nome attività immettere un nome per l'attività, ad esempio transform_github_data.
  3. In Tipo selezionare Pipeline tabelle Delta Live e immettere un nome per l'attività.
  4. In Pipeline selezionare la pipeline creata nel Passaggio 3: Creare una pipeline di tabelle Delta Live per elaborare i dati di GitHub.
  5. Cliccare su Crea.

Passaggio 5: Eseguire il flusso di lavoro di trasformazione dei dati

Fare clic su Pulsante Esegui ora per eseguire il flusso di lavoro. Per visualizzare i dettagli per l'esecuzione, fare clic sul collegamento nella colonna Ora di inizio per l'esecuzione nella visualizzazione esecuzioni del processo . Fare clic su ciascuna attività per visualizzare i dettagli per l'esecuzione dell'attività.

Passaggio 6: (Facoltativo) Per visualizzare i dati di output al termine dell'esecuzione del flusso di lavoro, seguire questa procedura:

  1. Nella visualizzazione dei dettagli dell’esecuzione fare clic sull'attività tabelle Delta Live.
  2. Nel pannello Dettagli esecuzione attività fare clic sul nome della pipeline in Pipeline. Verrà visualizzata la pagina Dettagli pipeline.
  3. Selezionare la tabella commits_by_author nel DAG della pipeline.
  4. Fare clic sul nome della tabella accanto a Metastore nel pannello commits_by_author. Verrà visualizzata la pagina Esplora cataloghi.

In Esplora cataloghi è possibile visualizzare lo schema della tabella, i dati di esempio e altri dettagli per i dati. Seguire la stessa procedura per visualizzare i dati per la tabella github_contributors_raw.

Passaggio 7: Rimuovere i dati di GitHub

In un'applicazione reale, è possibile inserire ed elaborare continuamente i dati. Poiché questo esempio scarica ed elabora l'intero set di dati, è necessario rimuovere i dati GitHub già scaricati per evitare un errore durante l'esecuzione del flusso di lavoro. Per rimuovere i dati scaricati, seguire questa procedura:

  1. Creare un nuovo notebook e immettere nella prima cella i comandi seguenti:

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    Sostituire <commits-path> e <contributors-path> con i percorsi DBFS configurati durante la creazione delle attività Python.

  2. Fare clic su Esegui Menu e selezionare Esegui cella.

È anche possibile aggiungere questo notebook come attività nel flusso di lavoro.

Passaggio 8: Creare le query SQL di Databricks

Dopo aver eseguito il flusso di lavoro e creato le tabelle necessarie, creare query per analizzare i dati preparati. Per creare le query e le visualizzazioni di esempio, seguire questa procedura:

Visualizzare i primi 10 collaboratori per mese

  1. Fare clic sull'icona sotto il logo Logo di Databricks di Databricks nella barra laterale e selezionare SQL.

  2. Fare clic su Crea una query per aprire l'editor di query SQL di Databricks.

  3. Assicurarsi che il catalogo sia impostato su hive_metastore. Fare clic sul valore predefinito accanto a hive_metastore e impostare il database sul valore di Destinazione impostato nella pipeline tabelle Delta Live.

  4. Nella scheda Nuova Query, immettere la query seguente:

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. Fare clic sulla scheda Nuova query e rinominare la query, ad esempio Commits by month top 10 contributors.

  6. Per impostazione predefinita, i risultati vengono visualizzati sotto forma di tabella. Per modificare la modalità di visualizzazione dei dati, ad esempio usando un grafico a barre, nel pannello Risultati fare clic su Menu kebab e poi su Modifica.

  7. In Tipo di visualizzazione selezionare Barra.

  8. Nella colonna X, selezionare il mese.

  9. Nelle colonne Y selezionare count(1).

  10. In Raggruppa per, selezionare nome.

  11. Fare clic su Salva.

Visualizzare i primi 20 collaboratori

  1. Fare clic su + > Crea nuova query e verificare che il catalogo sia impostato su hive_metastore. Fare clic sul valore predefinito accanto a hive_metastore e impostare il database sul valore di Destinazione impostato nella pipeline tabelle Delta Live.

  2. Immettere la query seguente:

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. Fare clic sulla scheda Nuova query e rinominare la query, ad esempio Top 20 contributors.

  4. Per modificare la visualizzazione dalla tabella predefinita, nel pannello Risultati fare clic su Menu kebab e poi su Modifica.

  5. In Tipo di visualizzazione selezionare Barra.

  6. Nella colonna X, selezionare login.

  7. Nelle colonne Y selezionare contributi.

  8. Fare clic su Salva.

Visualizzare i commit totali per autore

  1. Fare clic su + > Crea nuova query e verificare che il catalogo sia impostato su hive_metastore. Fare clic sul valore predefinito accanto a hive_metastore e impostare il database sul valore di Destinazione impostato nella pipeline tabelle Delta Live.

  2. Immettere la query seguente:

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. Fare clic sulla scheda Nuova query e rinominare la query, ad esempio Total commits by author.

  4. Per modificare la visualizzazione dalla tabella predefinita, nel pannello Risultati fare clic su Menu kebab e poi su Modifica.

  5. In Tipo di visualizzazione selezionare Barra.

  6. Nella colonna X, selezionare nome.

  7. Nelle colonne Y selezionare commits.

  8. Fare clic su Salva.

Passaggio 9: Creare un dashboard

  1. Nella barra laterale, cliccare su icona del dashboard Dashboard
  2. Fare clic su Crea dashboard.
  3. Immettere un nome per il dashboard, ad esempio GitHub analysis.
  4. Per ogni query e visualizzazione creata nel Passaggio 8: Creare le query SQL di Databricks, fare clic su Aggiungi > Visualizzazione e selezionare ciascuna visualizzazione.

Passaggio 10: Aggiungere le attività SQL al flusso di lavoro

Per aggiungere le nuove attività di query al flusso di lavoro creato in Creare un processo di Azure Databricks e aggiungere la prima attività, per ogni query creata nel Passaggio 8: Creare le query SQL di Databricks:

  1. Fare clic su Icona Flussi di lavoro Flussi di lavoro nella barra laterale.
  2. Nella colonna Nome, fare clic sul nome di un processo.
  3. Fare clic sulla scheda Attività.
  4. Fare clic su Pulsante Aggiungi attività sotto l'ultima attività.
  5. Immettere un nome per l'attività, in Tipo selezionare SQL e in attività SQL selezionare Query.
  6. Selezionare la query nella query SQL.
  7. In SQL Warehouse selezionare una warehouse SQL serverless o pro per eseguire l'attività.
  8. Cliccare su Crea.

Passaggio 11: Aggiungere un'attività dashboard

  1. Fare clic su Pulsante Aggiungi attività sotto l'ultima attività.
  2. Immettere un nome per l'attività, in Tipo selezionare SQL e in attività SQL selezionare Dashboard legacy.
  3. Selezionare il dashboard creato nel Passaggio 9: Creare un dashboard.
  4. In SQL Warehouse selezionare una warehouse SQL serverless o pro per eseguire l'attività.
  5. Cliccare su Crea.

Passaggio 12: Eseguire il flusso di lavoro completo

Fare clic su Pulsante Esegui ora per eseguire il flusso di lavoro. Per visualizzare i dettagli per l'esecuzione, fare clic sul collegamento nella colonna Ora di inizio per l'esecuzione nella visualizzazione esecuzioni del processo .

Passaggio 13: Visualizzare i risultati

Per visualizzare i risultati al termine dell'esecuzione, fare clic sull'attività del dashboard finale e poi fare clic sul nome del dashboard in Dashboard SQL nel pannello a destra.