在 Azure Databricks 作业中使用 Databricks SQL

可以在 Azure Databricks 作业中使用 SQL 任务类型,以便创建、计划、操作和监视包含 Databricks SQL 对象(例如查询、旧版仪表板和警报)的工作流。 例如,工作流可以引入数据、准备数据、使用 Databricks SQL 查询执行分析,然后在旧版仪表板中显示结果。

本文提供了一个示例工作流,该工作流可以创建旧版仪表板来显示 GitHub 贡献指标。 在此示例中,你将:

  • 使用 Python 脚本和 GitHub REST API 引入 GitHub 数据。
  • 使用增量实时表管道转换 GitHub 数据。
  • 触发 Databricks SQL 查询,以便对准备好的数据执行分析。
  • 在旧版仪表板中显示分析结果。

GitHub 分析仪表板

开始之前的准备工作

若要完成本演练,你需要具备以下条件:

步骤 1:将 GitHub 令牌存储在机密中

Databricks 建议使用机密范围来安全存储和管理机密,而不要在作业中对凭据(例如 GitHub 个人访问令牌)进行硬编码。 以下 Databricks CLI 命令是创建机密范围并将 GitHub 令牌存储在该范围内的机密中的一个示例:

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • 请将 <scope-name 替换为用于存储令牌的 Azure Databricks 机密范围的名称。
  • 请将 <token-key> 替换为要分配到令牌的密钥的名称。
  • 请将 <token> 替换为 GitHub 个人访问令牌的值。

步骤 2:创建用于提取 GitHub 数据的脚本

以下 Python 脚本使用 GitHub REST API 从 GitHub 存储库提取有关提交内容和贡献的数据。 输入参数指定 GitHub 存储库。 记录将保存到 DBFS 中由另一个输入参数指定的位置。

此示例使用 DBFS 存储 Python 脚本,但你也可以使用 Databricks Git 文件夹工作区文件来存储和管理该脚本。

  • 将此脚本保存到本地磁盘上的某个位置:

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    if __name__ == "__main__":
        main()
    
  • 将该脚本上传到 DBFS:

    1. 转到 Azure Databricks 登陆页面,然后在边栏中单击 “目录”图标目录”。
    2. 单击“浏览 DBFS”。
    3. 在 DBFS 文件浏览器中单击“上传”。 此时会显示“将数据上传到 DBFS ”对话框。
    4. 在 DBFS 中输入该脚本的存储路径,单击“放入要上传的文件”,或单击“浏览”并选择该 Python 脚本。
    5. 单击“Done”(完成) 。

步骤 3:创建用于处理 GitHub 数据的增量实时表管道

在本部分,你将创建一个增量实时表管道,以将原始 GitHub 数据转换为可由 Databricks SQL 查询分析的表。 若要创建管道,请执行以下步骤:

  1. 在边栏中,单击 新建图标新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。

  2. 在“默认语言”中,输入名称并选择“Python”。 可将“群集”设置保留为默认值。 增量实时表运行时在运行管道之前创建群集。

  3. 单击“创建”。

  4. 复制 Python 代码示例并将其粘贴到新笔记本中。 可以将示例代码添加到笔记本的单个单元格或多个单元格。

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. 在边栏中,单击 工作流图标工作流”,单击“增量实时表”选项卡,然后单击“创建管道”。

  6. 为管道命名,例如 Transform GitHub data

  7. 在“笔记本库”字段中输入笔记本的路径,或单击 文件选取器图标 以选择笔记本。

  8. 单击“添加配置”。 在 Key 文本框中输入 commits-path。 在 Value 文本框中,输入要将 GitHub 记录写入到的 DBFS 路径。 这可以是你选择的任何路径,它与你在创建工作流期间配置第一个 Python 任务时使用的路径相同。

  9. 再次单击“添加配置”。 在 Key 文本框中输入 contribs-path。 在 Value 文本框中,输入要将 GitHub 记录写入到的 DBFS 路径。 这可以是你选择的任何路径,它与你在创建工作流期间配置第二个 Python 任务时使用的路径相同。

  10. 在“目标”字段中输入目标数据库,例如 github_tables。 设置目标数据库会将输出数据发布到元存储,对于分析管道生成的数据的下游查询,必须执行此步骤。

  11. 单击“ 保存”。

步骤 4:创建工作流以引入并转换 GitHub 数据

在使用 Databricks SQL 分析和可视化 GitHub 数据之前,需要引入并准备数据。 若要创建工作流以完成这些任务,请执行以下步骤:

创建 Azure Databricks 作业并添加第一个任务

  1. 转到 Azure Databricks 登陆页面并执行以下操作之一:

    • 在边栏中,单击 “工作流”图标工作流”,然后单击 创建作业按钮
    • 在边栏中,单击 新建图标新建”,然后从菜单中选择“作业”。
  2. 在“任务”选项卡上显示的任务对话框中,将“为作业添加名称...”替换为你的作业名称,例如 GitHub analysis workflow

  3. 在“任务名称”中输入任务的名称,例如 get_commits

  4. 在“类型”中选择“Python 脚本”。

  5. 在“源”中选择“DBFS/S3”。

  6. 在“路径”中,输入该脚本在 DBFS 中的路径。

  7. 在“参数”中,输入该 Python 脚本的以下参数:

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • 请将 <owner> 替换为存储库所有者的名称。 例如,若要从 github.com/databrickslabs/overwatch 存储库提取记录,请输入 databrickslabs
    • 请将 <repo> 替换为存储库名称,例如 overwatch
    • 请将 <DBFS-output-dir> 替换为从 GitHub 提取的记录在 DBFS 中的存储路径。
    • 请将 <scope-name> 替换为你创建的、用于存储 GitHub 令牌的机密范围的名称。
    • 请将 <github-token-key> 替换为分配到 GitHub 令牌的密钥的名称。
  8. 单击“保存任务”。

添加另一个任务

  1. 单击刚刚创建的任务下方的 添加任务按钮

  2. 在“任务名称”中输入任务的名称,例如 get_contributors

  3. 在“类型”中选择“Python 脚本”任务类型。

  4. 在“源”中选择“DBFS/S3”。

  5. 在“路径”中,输入该脚本在 DBFS 中的路径。

  6. 在“参数”中,输入该 Python 脚本的以下参数:

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • 请将 <owner> 替换为存储库所有者的名称。 例如,若要从 github.com/databrickslabs/overwatch 存储库提取记录,请输入 databrickslabs
    • 请将 <repo> 替换为存储库名称,例如 overwatch
    • 请将 <DBFS-output-dir> 替换为从 GitHub 提取的记录在 DBFS 中的存储路径。
    • 请将 <scope-name> 替换为你创建的、用于存储 GitHub 令牌的机密范围的名称。
    • 请将 <github-token-key> 替换为分配到 GitHub 令牌的密钥的名称。
  7. 单击“保存任务”。

添加任务以转换数据

  1. 单击刚刚创建的任务下方的 添加任务按钮
  2. 在“任务名称”中输入任务的名称,例如 transform_github_data
  3. 在“类型”中,选择“增量实时表管道”并输入任务的名称。
  4. 在“管道”中,选择在步骤 3:创建增量实时表管道以处理 GitHub 数据中创建的管道。
  5. 单击“创建”。

步骤 5:运行数据转换工作流

单击 立即运行按钮 以运行工作流。 若要查看运行详细信息,请在作业运行视图中单击该运行的“开始时间”列中的链接。 单击每个任务以查看任务运行详细信息。

步骤 6:(可选)若要在工作流运行完成后查看输出数据,请执行以下步骤:

  1. 在运行详细信息视图中,单击增量实时表任务。
  2. 在“任务运行详细信息”面板中,单击“管道”下的管道名称。 此时将显示“管道详细信息”页面。
  3. 在管道 DAG 中选择 commits_by_author 表。
  4. 在“commits_by_author”面板中单击“元存储”旁边的表名称。 此时会打开“目录资源管理器”页。

在目录资源管理器中,可以查看表架构、示例数据以及数据的其他详细信息。 按照相同的步骤查看 github_contributors_raw 表的数据。

步骤 7:删除 GitHub 数据

在实际应用程序中,你可能会持续引入和处理数据。 由于此示例会下载并处理整个数据集,因此必须删除已下载的 GitHub 数据,以防止在重新运行工作流时出错。 若要删除下载的数据,请执行以下步骤:

  1. 创建一个新笔记本,并在第一个单元格中输入以下命令:

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    请将 <commits-path><contributors-path> 替换为在创建 Python 任务时配置的 DBFS 路径。

  2. 单击 运行菜单,然后选择“运行单元格”。

还可以将此笔记本添加为工作流中的任务。

步骤 8:创建 Databricks SQL 查询

运行工作流并创建所需的表后,创建查询以分析准备好的数据。 若要创建示例查询和可视化效果,请执行以下步骤:

按月显示排名靠前的 10 位贡献者

  1. 在边栏中单击 Databricks 徽标 Databricks 徽标 下方的图标,然后选择“SQL”。

  2. 单击“创建查询”打开 Databricks SQL 查询编辑器。

  3. 确保目录设置为“hive_metastore”。 单击“hive_metastore”旁边的“default”,并将数据库设置为在增量实时表管道中设置的“目标”值。

  4. 在“新建查询”选项卡中输入以下查询:

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. 单击“新建查询”选项卡并重命名查询,例如 Commits by month top 10 contributors

  6. 默认情况下,结果显示为表。 若要更改数据的可视化方式(例如使用条形图),请在“结果”面板中单击 Kebab 菜单,然后单击“编辑”。

  7. 在“可视化效果类型”中选择“条形图”。

  8. 在“X 列”中选择“月份”。

  9. 在“Y 列”中选择“计数(1)”。

  10. 在“分组依据”中选择“名称”。

  11. 单击“ 保存”。

显示排名靠前的 20 位贡献者

  1. 单击“+ > 创建新查询”,并确保目录设置为“hive_metastore”。 单击“hive_metastore”旁边的“default”,并将数据库设置为在增量实时表管道中设置的“目标”值。

  2. 输入以下查询:

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. 单击“新建查询”选项卡并重命名查询,例如 Top 20 contributors

  4. 若要更改 default 表中的可视化效果,请在“结果”面板中单击 Kebab 菜单,然后单击“编辑”。

  5. 在“可视化效果类型”中选择“条形图”。

  6. 在“X 列”中选择“登录”。

  7. 在“Y 列”中选择“贡献”。

  8. 单击“ 保存”。

按作者显示提交内容总数

  1. 单击“+ > 创建新查询”,并确保目录设置为“hive_metastore”。 单击“hive_metastore”旁边的“default”,并将数据库设置为在增量实时表管道中设置的“目标”值。

  2. 输入以下查询:

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. 单击“新建查询”选项卡并重命名查询,例如 Total commits by author

  4. 若要更改 default 表中的可视化效果,请在“结果”面板中单击 Kebab 菜单,然后单击“编辑”。

  5. 在“可视化效果类型”中选择“条形图”。

  6. 在“X 列”中选择“名称”。

  7. 在“Y 列”中选择“提交”。

  8. 单击“ 保存”。

步骤 9:创建仪表板

  1. 在边栏中,单击 Dashboards Icon仪表板
  2. 单击创建仪表板
  3. 输入仪表板的名称,例如 GitHub analysis
  4. 对于在步骤 8:创建 Databricks SQL 查询中创建的每个查询和可视化效果,单击“添加”>“可视化效果”并选择每个可视化效果。

步骤 10:将 SQL 任务添加到工作流

若要将新查询任务添加到在创建 Azure Databricks 作业并添加第一个任务中创建的工作流,请对在步骤 8:创建 Databricks SQL 查询中创建的每个查询执行以下操作:

  1. 在边栏中,单击 工作流图标工作流”。
  2. 在“名称”列中单击作业名称。
  3. 单击“任务”选项卡。
  4. 单击最后一个任务下方的 添加任务按钮
  5. 输入任务的名称,在“类型”中选择“SQL”,然后在“SQL 任务”中选择“查询”。
  6. 在“SQL 查询”中选择查询。
  7. 在“SQL 仓库”中,选择一个无服务器 SQL 仓库或专业 SQL 仓库以运行任务。
  8. 单击“创建”。

步骤 11:添加仪表板任务

  1. 单击最后一个任务下方的 添加任务按钮
  2. 输入任务的名称,在“类型”中选择“SQL”,然后在“SQL 任务”中选择“旧版仪表板”。
  3. 选择在步骤 9:创建仪表板中创建的仪表板。
  4. 在“SQL 仓库”中,选择一个无服务器 SQL 仓库或专业 SQL 仓库以运行任务。
  5. 单击“创建”。

步骤 12:运行完整工作流

若要运行工作流,请单击 立即运行按钮。 若要查看运行详细信息,请在作业运行视图中单击该运行的“开始时间”列中的链接。

步骤 13:查看结果

若要在运行完成时查看结果,请单击最后一个仪表板任务,然后在右侧面板中单击“SQL 仪表板”下面的仪表板名称。