Databricks SDK for Python

注意

Databricks 建议使用 Databricks 资产捆绑包作为源代码来创建、开发、部署和测试作业和其他 Databricks 资源。 请参阅什么是 Databricks 资产捆绑包?

本文介绍如何使用 Databricks SDK for Python 自动执行 Azure Databricks 操作并加速开发。 本文补充了“阅读文档”中适用于 Python 的 Databricks SDK 文档,并补充了 GitHub 中适用于 Python 的 Databricks SDK 存储库中的代码示例

注意

Databricks SDK for Python 处于 Beta 版,可以在生产环境中使用。

在 Beta 版本期间,Databricks 建议将依赖项固定到代码所依赖的 Databricks SDK for Python 的特定次要版本。 例如,可以在文件中固定依赖项,例如 venvrequirements.txt,或 Poetry 的 pyproject.tomlpoetry.lock。 有关固定依赖项的详细信息,请参阅 venv虚拟环境和包,或 Poetry 的安装依赖项

开始之前

可以从 Azure Databricks 笔记本内或从本地开发计算机使用 Databricks SDK for Python。

在开始使用 Databricks SDK for Python 之前,开发计算机必须满足以下要求:

  • 已配置 Azure Databricks 身份验证
  • 已安装 Python 3.8 或更高版本。 要自动处理 Azure Databricks 计算资源,Databricks 建议安装与目标 Azure Databricks 计算资源上安装的版本匹配的 Python 主要和次要版本。 本文的示例依赖于使用装有 Python 3.10 的 Databricks Runtime 13.3 LTS 来实现群集自动化。 有关正确的版本,请参阅群集 Databricks Runtime 版本的 Databricks Runtime 发行说明版本和兼容性
  • Databricks 建议为与 Databricks SDK for Python 配合使用的每个 Python 项目创建并激活 Python 虚拟环境。 Python 虚拟环境有助于确保代码项目使用兼容版本的 Python 和 Python 包(在本例中为 Databricks SDK for Python 包)。 有关 Python 虚拟环境的详细信息,请参阅 venvPoetry

开始使用 Databricks SDK for Python

本部分介绍如何从本地开发计算机开始使用 Databricks SDK for Python。 若要从 Azure Databricks 笔记本内使用 Databricks SDK for Python,请跳到从 Azure Databricks 笔记本内使用 Databricks SDK for Python

  1. 在已激活 Python 虚拟环境的情况下,在配置了 Azure Databricks 身份验证并安装了 Python 的开发计算机上,从 Python 包索引 (PyPI) 安装 databricks-sdk 包(及其依赖项),如下所示:

    Venv

    使用 pip 安装 databricks-sdk 包。 (在一些系统上,可能需要在此处和全文中将 pip3 替换为 pip。)

    pip3 install databricks-sdk
    

    诗歌

    poetry add databricks-sdk
    

    若要安装特定版本的 databricks-sdk 包(当 Databricks SDK for Python 处于 Beta 测试状态时),请参阅该包的发行历史记录。 例如,若要安装版本 0.1.6,请运行:

    Venv

    pip3 install databricks-sdk==0.1.6
    

    诗歌

    poetry add databricks-sdk==0.1.6
    

    若要将现有的 Databricks SDK for Python 包安装升级到最新版本,请运行以下命令:

    Venv

    pip3 install --upgrade databricks-sdk
    

    诗歌

    poetry add databricks-sdk@latest
    

    若要显示 Databricks SDK for Python 包的当前 Version 和其他详细信息,请运行以下命令:

    Venv

    pip3 show databricks-sdk
    

    诗歌

    poetry show databricks-sdk
    
  2. 在 Python 虚拟环境中,创建一个用于导入 Databricks SDK for Python 的 Python 代码文件。 以下示例位于包含以下内容的名为 main.py 的文件中,它仅列出 Azure Databricks 工作区中的所有群集:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    
    for c in w.clusters.list():
      print(c.cluster_name)
    
  3. 通过运行 python 命令来运行 Python 代码文件(假设文件名为 main.py):

    Venv

    python3.10 main.py
    

    诗歌

    如果你在虚拟环境 shell 中:

    python3.10 main.py
    

    如果你不在虚拟环境 shell 中:

    poetry run python3.10 main.py
    

    注意

    如果不在前面的 w = WorkspaceClient() 调用中设置任何参数,Databricks SDK for Python 将使用其默认进程来尝试执行 Azure Databricks 身份验证。 若要替代此默认行为,请参阅下面的身份验证部分。

使用 Azure Databricks 帐户或工作区对 Databricks SDK for Python 进行身份验证

本部分介绍如何从本地开发计算机到 Azure Databricks 帐户或工作区对 Databricks SDK for Python 进行身份验证。 若要从 Azure Databricks 笔记本内对 Databricks SDK for Python 进行身份验证,请跳到从 Azure Databricks 笔记本内使用 Databricks SDK for Python

Databricks SDK for Python 实施 Databricks 客户端统一身份验证标准,这是一种整合且一致的体系结构和编程身份验证方法。 此方法有助于使 Azure Databricks 的身份验证设置和自动化更加集中和可预测。 借助此方法,你只需配置 Databricks 身份验证一次,然后即可在多个 Databricks 工具和 SDK 中使用该配置,而无需进一步更改身份验证配置。 有关详细信息,包括更完整的 Python 代码示例,请参阅 Databricks 客户端统一身份验证

注意

Databricks SDK for Python 尚未实现 Azure 托管标识身份验证

使用 Databricks SDK for Python 初始化 Databricks 身份验证的一些可用编码模式包括:

  • 通过以下操作之一使用 Databricks 默认身份验证:

    • 使用必填字段为目标 Databricks 身份验证类型创建或标识自定义 Databricks 配置文件。 然后将 DATABRICKS_CONFIG_PROFILE 环境变量设置为该自定义配置文件的名称。
    • 为目标 Databricks 身份验证类型设置所需的环境变量。

    然后实例化一个具有 Databricks 默认身份验证的 WorkspaceClient 对象,如下所示:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    # ...
    
  • 支持但不建议对必填字段进行硬编码,因为这可能会透露代码中的敏感信息,例如 Azure Databricks 个人访问令牌。 以下示例对用于 Databricks 令牌身份验证的 Azure Databricks 主机和访问令牌值进行硬编码:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient(
      host  = 'https://...',
      token = '...'
    )
    # ...
    

另请参阅适用于 Python 的 Databricks SDK 文档中的身份验证

从 Azure Databricks 笔记本内使用 Databricks SDK for Python

可以从附加了 Azure Databricks 群集并且安装了 Databricks SDK for Python 的 Azure Databricks 笔记本调用 Databricks SDK for Python 功能。 它默认安装在所有使用 Databricks Runtime 13.3 LTS 或更高版本的 Azure Databricks 群集上。 对于使用 Databricks Runtime 12.2 LTS 及更低版本的 Azure Databricks 群集,必须先安装 Databricks SDK for Python。 请参阅步骤 1:安装或升级 Databricks SDK for Python

若要查看为特定 Databricks Runtime 版本安装的 Databricks SDK for Python 版本,请参阅该版本的 Databricks Runtime 发行说明中的“已安装的 Python 库”部分。

Databricks 建议从 PiPy 安装最新的 SDK 可用版本,但至少安装或升级到 Databricks SDK for Python 0.6.0 或更高版本,因为所有 Databricks Runtime 版本上的 0.6.0 及更高版本均使用默认 Azure Databricks 笔记本身份验证。

注意

Databricks Runtime 15.1 是第一个安装了 Databricks SDK for Python (0.20.0) 版本的 Databricks Runtime,它支持默认笔记本身份验证,无需升级。

下表概述了 Databricks SDK for Python 和 Databricks Runtime 版本的笔记本身份验证支持:

SDK/DBR 10.4 LTS 11.3 LTS 12.3 LTS 13.3 LTS 14.3 LTS 15.1 及更高版本
0.1.7 及更低版本
0.1.10
0.6.0
0.20.0 及更高版本

默认 Azure Databricks 笔记本身份验证依赖于临时 Azure Databricks 个人访问令牌,Azure Databricks 在后台自动生成该令牌供自己使用。 Azure Databricks 会在笔记本停止运行后删除此临时令牌。

重要

  • 默认 Azure Databricks 笔记本身份验证仅适用于群集的驱动程序节点,不适用于任何群集的工作器节点或执行程序节点。
  • Azure Databricks 笔记本身份验证不适用于 Azure Databricks 配置文件。

如果要调用 Azure Databricks 帐户级 API,或者想要使用除默认 Databricks 笔记本身份验证以外的 Databricks 身份验证类型,也支持下面的身份验证类型:

身份验证类型 Databricks SDK for Python 版本
OAuth 计算机到计算机 (M2M) 身份验证 0.18.0 及更高版本
OAuth 用户到计算机 (U2M) 身份验证 0.19.0 及更高版本
Microsoft Entra ID 服务主体身份验证 所有版本
Azure CLI 身份验证 所有版本
Databricks 个人访问令牌身份验证 所有版本

尚不支持 Azure 托管标识身份验证

步骤 1:安装或升级 Databricks SDK for Python

  1. Azure Databricks Python 笔记本可以像使用任何其他 Python 库一样使用 Databricks SDK for Python。 若要在附加的 Azure Databricks 群集上安装或升级 Databricks SDK for Python 库,请从笔记本单元格运行 %pip magic 命令,如下所示:

    %pip install databricks-sdk --upgrade
    
  2. 运行 %pip magic 命令后必须重启 Python,使已安装或升级的库可供笔记本使用。 为此,请在使用 %pip magic 命令运行单元格后立即从笔记本单元格运行以下命令:

    dbutils.library.restartPython()
    
  3. 若要显示已安装的 Databricks SDK for Python 版本,请从笔记本单元格运行以下命令:

    %pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
    

步骤 2:运行代码

在笔记本单元格中,创建可导入然后调用 Databricks SDK for Python 的 Python 代码。 以下示例使用默认的 Azure Databricks 笔记本身份验证列出 Azure Databricks 工作区中的所有群集:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

for c in w.clusters.list():
  print(c.cluster_name)

运行此单元格时,会显示 Azure Databricks 工作区中所有可用群集的名称列表。

若要使用其他 Azure Databricks 身份验证类型,请参阅 Azure Databricks 身份验证方法,然后单击相应的链接以获取其他技术详细信息。

使用 Databricks 实用工具

可以从本地开发计算机上运行的 Databricks SDK for Python 代码,也可以从 Azure Databricks 笔记本内调用 Databricks 实用工具 (dbutils) 参考

  • 在本地开发计算机上,Databricks 实用工具只能访问 dbutils.fsdbutils.secretsdbutils.widgetsdbutils.jobs 命令组。
  • 从附加到 Azure Databricks 群集的 Azure Databricks 笔记本中,Databricks 实用工具可以访问所有可用的 Databricks 实用工具命令组,而不仅仅是 dbutils.fsdbutils.secretsdbutils.widgets。 此外,dbutils.notebook 命令组仅限于两个级别的命令,例如 dbutils.notebook.rundbutils.notebook.exit

若要从本地开发计算机或 Azure Databricks 笔记本调用 Databricks 实用工具,请使用 WorkspaceClient 内的 dbutils。 以下代码示例使用默认的 Azure Databricks 笔记本身份验证调用 WorkspaceClient 中的 dbutils,以列出工作区的 DBFS 根中所有对象的路径。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
d = w.dbutils.fs.ls('/')

for f in d:
  print(f.path)

另外,也可以直接调用 dbutils。 但是,只能使用默认的 Azure Databricks 笔记本身份验证。 此代码示例直接调用 dbutils 以列出工作区的 DBFS 根目录中所有对象的路径。

from databricks.sdk.runtime import *

d = dbutils.fs.ls('/')

for f in d:
  print(f.path)

若要访问 Unity Catalog ,请在 WorkspaceClient 中使用 files。 请参阅管理 Unity Catalog 卷中的文件。 不能单独使用(或在 WorkspaceClient 内使用)dbutils 来访问卷。

另请参阅与 dbutils 交互

代码示例

以下代码示例演示如何使用 Databricks SDK for Python 创建和删除群集、运行作业以及列出帐户级组。 这些代码示例使用默认的 Azure Databricks 笔记本身份验证。 有关默认 Azure Databricks 笔记本身份验证的详细信息,请参阅从 Azure Databricks 笔记本内使用 Databricks SDK for Python。 有关笔记本外部的默认身份验证的详细信息,请参阅使用 Azure Databricks 帐户或工作区对 Databricks SDK for Python 进行身份验证

有关其他代码示例,请参阅 GitHub 上的 Databricks SDK for Python 存储库中的示例。 另请参阅:

创建群集

此代码示例使用指定的 Databricks Runtime 版本和群集节点类型创建群集。 此群集有一个工作器,在空闲 15 分钟后自动终止。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

print("Attempting to create cluster. Please wait...")

c = w.clusters.create_and_wait(
  cluster_name             = 'my-cluster',
  spark_version            = '12.2.x-scala2.12',
  node_type_id             = 'Standard_DS3_v2',
  autotermination_minutes  = 15,
  num_workers              = 1
)

print(f"The cluster is now ready at " \
      f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")

永久删除群集

此代码示例从工作区中永久删除具有指定群集 ID 的群集。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')

w.clusters.permanent_delete(cluster_id = c_id)

创建作业

此代码示例创建一个在指定群集上运行指定笔记本的 Azure Databricks 作业。 当代码运行时,它会从终端处的用户获取现有笔记本的路径、现有群集 ID 和相关作业设置。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source

w = WorkspaceClient()

job_name            = input("Some short name for the job (for example, my-job): ")
description         = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path       = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key            = input("Some key to apply to the job's tasks (for example, my-key): ")

print("Attempting to create the job. Please wait...\n")

j = w.jobs.create(
  name = job_name,
  tasks = [
    Task(
      description = description,
      existing_cluster_id = existing_cluster_id,
      notebook_task = NotebookTask(
        base_parameters = dict(""),
        notebook_path = notebook_path,
        source = Source("WORKSPACE")
      ),
      task_key = task_key
    )
  ]
)

print(f"View the job at {w.config.host}/#job/{j.job_id}\n")

创建使用无服务器计算的作业

以下示例创建一个作业,该作业使用适用于作业的无服务器计算

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import NotebookTask, Source, Task

w = WorkspaceClient()

j = w.jobs.create(
  name = "My Serverless Job",
  tasks = [
    Task(
      notebook_task = NotebookTask(
      notebook_path = "/Users/user@databricks.com/MyNotebook",
      source = Source("WORKSPACE")
      ),
      task_key = "MyTask",
   )
  ]
)

管理 Unity Catalog 卷中的文件

此代码示例演示了对 WorkspaceClient 中用于访问 Unity Catalog files 功能的各种调用。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Define volume, folder, and file details.
catalog            = 'main'
schema             = 'default'
volume             = 'my-volume'
volume_path        = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder      = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file        = 'data.csv'
volume_file_path   = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path   = './data.csv'

# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)

# Upload a file to a volume.
with open(upload_file_path, 'rb') as file:
  file_bytes = file.read()
  binary_data = io.BytesIO(file_bytes)
  w.files.upload(volume_file_path, binary_data, overwrite = True)

# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
  print(item.path)

# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
  print(item.path)

# Print the contents of a file in a volume.
resp = w.files.download(volume_file_path)
print(str(resp.contents.read(), encoding='utf-8'))

# Delete a file from a volume.
w.files.delete(volume_file_path)

# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)

列出帐户级组

此代码示例列出 Azure Databricks 帐户中所有可用组的显示名称。

from databricks.sdk import AccountClient

a = AccountClient()

for g in a.groups.list():
  print(g.display_name)

测试

若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,请使用 Python 模拟库(如 unittest.mock)。

提示

Databricks Labs 提供了一个 pytest 插件 ,用于简化与 Databricks 的集成测试,以及一个 pylint 插件 ,以确保代码质量。

名为以下示例的文件 helpers.py 包含一个 create_cluster 函数,该函数返回有关新群集的信息:

# helpers.py

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails

def create_cluster(
  w: WorkspaceClient,
  cluster_name:            str,
  spark_version:           str,
  node_type_id:            str,
  autotermination_minutes: int,
  num_workers:             int
) -> ClusterDetails:
  response = w.clusters.create(
    cluster_name            = cluster_name,
    spark_version           = spark_version,
    node_type_id            = node_type_id,
    autotermination_minutes = autotermination_minutes,
    num_workers             = num_workers
  )
  return response

给定调用函数create_cluster的以下名为main.py的文件:

# main.py

from databricks.sdk import WorkspaceClient
from helpers import *

w = WorkspaceClient()

# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
  w = w,
  cluster_name            = 'Test Cluster',
  spark_version           = '<spark-version>',
  node_type_id            = '<node-type-id>',
  autotermination_minutes = 15,
  num_workers             = 1
)

print(response.cluster_id)

以下名为 test_helpers.py 的文件测试 create_cluster 函数是否返回预期的响应。 此测试模拟 WorkspaceClient 对象,定义模拟对象的设置,然后将模拟对象传递给 create_cluster 函数,而不是在目标工作区中创建群集。 然后,测试将检查函数是否返回新模拟群集的预期 ID。

# test_helpers.py

from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.

def test_create_cluster():
  # Create a mock WorkspaceClient.
  mock_workspace_client = create_autospec(WorkspaceClient)

  # Set the mock WorkspaceClient's clusters.create().cluster_id value.
  mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'

  # Call the actual function but with the mock WorkspaceClient.
  # Replace <spark-version> with the target Spark version string.
  # Replace <node-type-id> with the target node type string.
  response = create_cluster(
    w = mock_workspace_client,
    cluster_name            = 'Test Cluster',
    spark_version           = '<spark-version>',
    node_type_id            = '<node-type-id>',
    autotermination_minutes = 15,
    num_workers             = 1
  )

  # Assert that the function returned the mocked cluster ID.
  assert response.cluster_id == '123abc'

若要运行此测试,请从代码项目的根目录运行 pytest 命令,该命令应生成如下所示的测试结果:

$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item

test_helpers.py . [100%]
======================== 1 passed ==========================

其他资源

有关详细信息,请参阅: