分享方式:


適用於 Python 的 Databricks SDK

注意

Databricks 建議 Databricks 資產套件組合,以原始程式碼的形式建立、開發、部署及測試作業和其他 Databricks 資源。 請參閱什麼是 Databricks Asset Bundles?

在本文中,您將瞭解如何自動化 Azure Databricks 作業,並使用適用於 PythonDatabricks SDK 加速開發。 本文補充適用於 Python 的 Databricks SDK 文件,請參閱 GitHub 中適用於 Python 的 Databricks SDK 存放庫中的文件和程式碼範例

注意

適用於 Python 的 Databricks SDK 處於 Beta 版,而且可用於生產環境。

在搶鮮版 (Beta) 期間,Databricks 建議您釘選相依於程式碼所依賴之適用於 Python 的 Databricks SDK 的特定次要版本。 例如,您可以在適用於 venvrequirements.txt,或適用於 Poetry 的 pyproject.tomlpoetry.lock 等檔案中釘選相依性。 如需釘選相依性的詳細資訊,請參閱適用於 venv虛擬環境和套件,或適用於 Poetry 的安裝相依性

開始之前

您可以從 Azure Databricks 筆記本或本機開發機器,使用適用於 Python 的 Databricks SDK。

開始使用適用於 Python 的 Databricks SDK 之前,您的開發機器必須具有:

  • 已設定 Azure Databricks 驗證
  • 已安裝 Python 3.8 或更高版本。 為了自動化 Azure Databricks 計算資源,Databricks 建議您安裝與目標 Azure Databricks 計算資源上安裝的版本相符的 Python 主要和次要版本。 本文的範例依賴使用 Databricks Runtime 13.3 LTS (已安裝 Python 3.10) 來自動化叢集。 如需正確的版本,請參閱您的叢集 Databricks Runtime 版本的 Databricks Runtime 版本資訊和相容性
  • Databricks 建議您為與適用於 Python 的 Databricks SDK 搭配使用的每個 Python 專案建立並啟用 Python 虛擬環境。 Python 虛擬環境有助於確保您的程式碼專案使用相容的 Python 和 Python 套件版本(在此案例中為適用於 Python 的 Databricks SDK 套件)。 如需 Python 虛擬環境的詳細資訊,請參閱 venvPoetry

開始使用適用於 Python 的 Databricks SDK

本節說明如何從本機開發機器開始使用適用於 Python 的 Databricks SDK。 若要從 Azure Databricks 筆記本內使用適用於 Python 的 Databricks SDK,請跳至從 Azure Databricks 筆記本使用適用於 Python 的 Databricks SDK

  1. 在已設定 Azure Databricks 驗證的開發機器上,已安裝 Python,且已啟動 Python 虛擬環境,從 Python 套件索引 (PyPI) 安裝 databricks-sdk 套件 (及其相依性),如下所示:

    Venv

    使用 pip 以安裝 databricks-sdk 套件。 (在某些系統上,您可能需要在此處和全文中將 pip3 取代為 pip。)

    pip3 install databricks-sdk
    

    Poetry

    poetry add databricks-sdk
    

    若要在適用於 Python 的 Databricks SDK 為搶鮮版 (Beta) 時安裝特定版本的 databricks-sdk 套件,請參閱套件的發行歷程記錄。 例如,安裝版本 0.1.6

    Venv

    pip3 install databricks-sdk==0.1.6
    

    Poetry

    poetry add databricks-sdk==0.1.6
    

    若要將現有安裝的適用於 Python 的 Databricks SDK 套件升級至最新版本,請使用下列命令:

    Venv

    pip3 install --upgrade databricks-sdk
    

    Poetry

    poetry add databricks-sdk@latest
    

    若要顯示適用於 Python 的 Databricks SDK 套件的目前 Version 和其他詳細資料,請執行下列命令:

    Venv

    pip3 show databricks-sdk
    

    Poetry

    poetry show databricks-sdk
    
  2. 在您的 Python 虛擬環境中,建立 Python 程式碼檔案,以匯入適用於 Python 的 Databricks SDK。 下列範例在名為 main.py 且具有下列內容的檔案中,只列出 Azure Databricks 工作區中的所有叢集:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    
    for c in w.clusters.list():
      print(c.cluster_name)
    
  3. 透過執行 python 命令,來執行 Python 程式碼檔案,假定檔案名為 main.py

    Venv

    python3.10 main.py
    

    Poetry

    如果您位於虛擬環境的殼層中:

    python3.10 main.py
    

    如果您不在虛擬環境的殼層中:

    poetry run python3.10 main.py
    

    注意

    透過前面對 w = WorkspaceClient() 的呼叫中不設定任何引數,適用於 Python 的 Databricks SDK 會使用其預設程式嘗試執行 Azure Databricks 驗證。 若要覆寫此預設行為,請參閱下列驗證一節。

使用 Azure Databricks 帳戶或工作區來驗證適用於 Python 的 Databricks SDK

本節說明如何從本機開發機器驗證適用於 Python 的 Databricks SDK 到 Azure Databricks 帳戶或工作區。 若要從 Azure Databricks 筆記本內驗證適用於 Python 的 Databricks SDK,請跳至從 Azure Databricks 筆記本使用適用於 Python 的 Databricks SDK

適用於 Python 的 Databricks SDK 會實作 Databricks 用戶端統一驗證 標準,這是一種整合且一致的架構和驗證程式設計驗證方法。 這種方法有助於使 Azure Databricks 的設定和自動化驗證更加集中且可預測。 可讓您只需設定 Databricks 驗證一次,然後即可在多個 Databricks 工具和 SDK 中使用該組態,而無需進一步變更驗證組態。 如需詳細資訊,包括 Python 中更完整的程式碼範例,請參閱 Databricks 用戶端整合驗證

注意

適用於 Python 的 Databricks SDK 尚未實作 Azure 受控識別驗證

使用適用於 Python 的 Databricks SDK 初始化 Databricks 驗證的一些可用編碼模式包括:

  • 透過執行下列其中一項動作,使用 Databricks 預設驗證:

    • 建立或識別具有目標 Databricks 驗證類型所需欄位的自訂 Databricks 組態設定檔。 然後將 DATABRICKS_CONFIG_PROFILE 環境變數設定為自訂組態設定檔的名稱。
    • 設定目標 Databricks 驗證類型所需的環境變數。

    然後使用 Databricks 預設驗證具現化一個 WorkspaceClient 物件,如下所示:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    # ...
    
  • 支援硬式編碼必要欄位,但不建議這麼做,因為它可能會暴露程式碼中的敏感性資訊,例如 Azure Databricks 個人存取權杖。 下列範例硬式編碼 Azure Databricks 主機和 Databricks 權杖驗證的存取權杖值:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient(
      host  = 'https://...',
      token = '...'
    )
    # ...
    

另請參閱適用於 Python 的 Databricks SDK 文件中的驗證

從 Azure Databricks 筆記本使用適用於 Python 的 Databricks SDK

您可以從已已連結 Azure Databricks 叢集並安裝適用於 Python 的 Databricks SDK 的 Azure Databricks 筆記本,呼叫適用於 Python 的 Databricks SDK 功能。 依預設,其已安裝在使用 Databricks Runtime 13.3 LTS 或更新版本的所有 Azure Databricks 叢集上。 對於使用 Databricks Runtime 12.2 LTS 及更低版本的 Azure Databricks 叢集,您必須先安裝適用於 Python 的 Databricks SDK。 請參閱步驟 1:安裝或升級適用於 Python 的 Databricks SDK

若要查看針對特定 Databricks Runtime 版本安裝的適用於 Python 的 Databricks SDK 版本,請參閱該版本的 Databricks Runtime 版本資訊中的已安裝的 Python 程式庫一節。

Databricks 建議您從 PiPy 安裝最新可用的 SDK 版本,但至少會安裝或升級至適用於 Python 0.6.0 或更新版本的 Databricks SDK,因為所有 Databricks Runtime 0.6.0 版和更新版本都會使用預設 Azure Databricks 筆記本驗證。

注意

Databricks Runtime 15.1 是第一個安裝適用於 Python (0.20.0) 的 Databricks SDK 版本,其支援預設筆記本驗證,而不需要升級。

下表概述了適用於 Python 的 Databricks SDK 和 Databricks Runtime 版本的筆記本驗證支援:

SDK/DBR 10.4 LTS 11.3 LTS 12.3 LTS 13.3 LTS 14.3 LTS 15.1 和更新版本
0.1.7 和更低版本
0.1.10
0.6.0
0.20.0 和更新版本

預設 Azure Databricks 筆記本驗證依賴 Azure Databricks 個人存取權杖,Azure Databricks 會在背景自動產生供自己使用。 Azure Databricks 會在筆記本停止執行之後刪除此暫存權杖。

重要

  • 預設的 Azure Databricks 筆記本驗證僅適用於叢集的驅動程式節點上,而不適用於任何叢集的背景工作角色或執行程式節點。
  • Azure Databricks 筆記本驗證不適用於 Azure Databricks 組態設定檔。

如果您想要呼叫 Azure Databricks 帳戶層級 API,或想要使用預設 Databricks 筆記本驗證以外的 Databricks 驗證類型,還支援下列驗證類型:

驗證類型 適用於 Python 的 Databricks SDK 版本
OAuth 機器對機器 (M2M) 驗證 0.18.0 和更新版本
OAuth 使用者對機器 (U2M) 驗證 0.19.0 和更新版本
Microsoft Entra ID 服務主體驗證 所有版本
Azure CLI 驗證 所有版本
Databricks 個人存取權杖驗證 所有版本

尚不支援 Azure 受控身分驗證

步驟 1:安裝或升級適用於 Python 的 Databricks SDK

  1. Azure Databricks Python 筆記本可以使用適用於 Python 的 Databricks SDK,就像任何其他 Python 程式庫一樣。 若要在連結的 Azure Databricks 叢集上安裝或升級適用於 Python 的 Databricks SDK 程式庫,請從筆記本儲存格執行 %pip magic 命令,如下所示:

    %pip install databricks-sdk --upgrade
    
  2. 執行 %pip magic 命令之後,您必須重新啟動 Python,筆記本才能使用安裝或升級的程式庫。 為此,請使用 %pip magic 命令,立即從筆記本儲存格執行下列命令:

    dbutils.library.restartPython()
    
  3. 若要顯示已安裝的適用於 Python 的 Databricks SDK 版本,請從筆記本儲存格執行下列命令:

    %pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
    

步驟 2:執行您的程式碼

在您的筆記本儲存格中,建立可匯入的 Python 程式碼,然後呼叫適用於 Python 的 Databricks SDK。 下列範例會使用預設的 Azure Databricks 筆記本驗證來列出 Azure Databricks 工作區中的所有叢集:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

for c in w.clusters.list():
  print(c.cluster_name)

當您執行此儲存格時,Azure Databricks 工作區中所有可用叢集的名稱清單隨即出現。

若要使用不同的 Azure Databricks 驗證類型,請參閱 Azure Databricks 驗證方法,然後按下對應的連結,以取得其他技術詳細資料。

使用 Databricks 公用程式

您可以從在本機開發機器上執行的適用於 Python 的 Databricks SDK 程式碼,或從 Azure Databricks 筆記本內,呼叫 Databricks 公用程式 (dbutils) 參考

  • 從本機開發機器,Databricks 公用程式只能存取 dbutils.fsdbutils.secretsdbutils.widgetsdbutils.jobs 命令群組。
  • 從連結至 Azure Databricks 叢集的 Azure Databricks 筆記本中,Databricks 公用程式可以存取所有可用的 Databricks 公用程式命令群組,而不只是 dbutils.fsdbutils.secretsdbutils.widgets。 此外,dbutils.notebook 命令群組僅限於兩個層級的命令,例如 dbutils.notebook.rundbutils.notebook.exit

若要從本機開發機器或 Azure Databricks 筆記本呼叫 Databricks 公用程式,請在 WorkspaceClient 中使用 dbutils。 此程式碼範例會使用預設的 Azure Databricks 筆記本驗證來呼叫 WorkspaceClient 內的 dbutils,以列出工作區之 DBFS 根目錄中所有物件的路徑。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
d = w.dbutils.fs.ls('/')

for f in d:
  print(f.path)

或者,您可以直接呼叫 dbutils。 不過,您只能使用預設的 Azure Databricks 筆記本驗證。 此程式碼範例會直接呼叫 dbutils,以列出工作區之 DBFS 根目錄中的所有物件。

from databricks.sdk.runtime import *

d = dbutils.fs.ls('/')

for f in d:
  print(f.path)

若要存取 Unity 目錄磁碟區,請使用 files 內的 WorkspaceClient。 請參閱管理 Unity 目錄磁碟區中的檔案。 您無法單獨或在 WorkspaceClient 內使用 dbutils,以存取磁碟區。

另請參閱與 dbutils 的互動

程式碼範例

下列程式碼範例示範如何使用適用於 Python 的 Databricks SDK 來建立和刪除叢集,執行作業,以及列出帳戶層級群組。 這些程式碼範例會使用預設的 Azure Databricks 筆記本驗證。 如需預設 Azure Databricks 筆記本驗證的詳細資訊,請參閱從 Azure Databricks 筆記本使用適用於 Python 的 Databricks SDK。 如需筆記本外部預設驗證的詳細資訊,請參閱使用 Azure Databricks 帳戶或工作區驗證適用於 Python 的 Databricks SDK

如需其他程式碼範例,請參閱 GitHub 中適用於 Python 的 Databricks SDK 存放庫中的範例。 另請參閱:

建立叢集

此程式碼範例會建立具有指定 Databricks Runtime 版本和叢集節點類型的叢集。 此叢集有一個背景工作角色,且叢集會在閒置時間 15 分鐘後將自動終止。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

print("Attempting to create cluster. Please wait...")

c = w.clusters.create_and_wait(
  cluster_name             = 'my-cluster',
  spark_version            = '12.2.x-scala2.12',
  node_type_id             = 'Standard_DS3_v2',
  autotermination_minutes  = 15,
  num_workers              = 1
)

print(f"The cluster is now ready at " \
      f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")

永久刪除叢集

此程式碼範例會從工作區中永久刪除具有指定叢集 ID 的叢集。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')

w.clusters.permanent_delete(cluster_id = c_id)

建立作業

此程式碼範例會建立 Azure Databricks 作業,在指定的叢集上執行指定的筆記本。 當此程式碼執行時,它會從終端機的使用者取得現有的筆記本路徑、現有的叢集標識碼和相關作業設定。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source

w = WorkspaceClient()

job_name            = input("Some short name for the job (for example, my-job): ")
description         = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path       = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key            = input("Some key to apply to the job's tasks (for example, my-key): ")

print("Attempting to create the job. Please wait...\n")

j = w.jobs.create(
  name = job_name,
  tasks = [
    Task(
      description = description,
      existing_cluster_id = existing_cluster_id,
      notebook_task = NotebookTask(
        base_parameters = dict(""),
        notebook_path = notebook_path,
        source = Source("WORKSPACE")
      ),
      task_key = task_key
    )
  ]
)

print(f"View the job at {w.config.host}/#job/{j.job_id}\n")

建立使用無伺服器計算的作業

下列範例會建立使用適用於作業的無伺服器計算的作業:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import NotebookTask, Source, Task

w = WorkspaceClient()

j = w.jobs.create(
  name = "My Serverless Job",
  tasks = [
    Task(
      notebook_task = NotebookTask(
      notebook_path = "/Users/user@databricks.com/MyNotebook",
      source = Source("WORKSPACE")
      ),
      task_key = "MyTask",
   )
  ]
)

管理 Unity 目錄磁碟區中的檔案

此程式碼範例示範對 WorkspaceClientfiles 功能的呼叫,以存取 Unity 目錄磁碟區

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Define volume, folder, and file details.
catalog            = 'main'
schema             = 'default'
volume             = 'my-volume'
volume_path        = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder      = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file        = 'data.csv'
volume_file_path   = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path   = './data.csv'

# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)

# Upload a file to a volume.
with open(upload_file_path, 'rb') as file:
  file_bytes = file.read()
  binary_data = io.BytesIO(file_bytes)
  w.files.upload(volume_file_path, binary_data, overwrite = True)

# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
  print(item.path)

# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
  print(item.path)

# Print the contents of a file in a volume.
resp = w.files.download(volume_file_path)
print(str(resp.contents.read(), encoding='utf-8'))

# Delete a file from a volume.
w.files.delete(volume_file_path)

# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)

列出帳戶層級群組

此程式碼範例會列出 Azure Databricks 帳戶內所有可用群組的顯示名稱。

from databricks.sdk import AccountClient

a = AccountClient()

for g in a.groups.list():
  print(g.display_name)

測試

若要測試程式碼,請使用 Python 測試架構,例如 pytest。 若要在不呼叫 Azure Databricks REST API 端點或變更 Azure Databricks 帳戶或工作區的狀態的情況下,在模擬條件下測試程式碼,可以使用 Python 模擬程式庫,例如 unittest.mock

提示

Databricks Labs 提供 pytest 外掛程式 ,以簡化與 Databricks 和 pylint 外掛程式 的整合測試,以確保程式代碼品質。

名為的下列範例檔案 helpers.py 包含傳 create_cluster 回新叢集相關信息的函式:

# helpers.py

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails

def create_cluster(
  w: WorkspaceClient,
  cluster_name:            str,
  spark_version:           str,
  node_type_id:            str,
  autotermination_minutes: int,
  num_workers:             int
) -> ClusterDetails:
  response = w.clusters.create(
    cluster_name            = cluster_name,
    spark_version           = spark_version,
    node_type_id            = node_type_id,
    autotermination_minutes = autotermination_minutes,
    num_workers             = num_workers
  )
  return response

假設有下列名為 main.py 的檔案,該檔案會呼叫 函 create_cluster 式:

# main.py

from databricks.sdk import WorkspaceClient
from helpers import *

w = WorkspaceClient()

# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
  w = w,
  cluster_name            = 'Test Cluster',
  spark_version           = '<spark-version>',
  node_type_id            = '<node-type-id>',
  autotermination_minutes = 15,
  num_workers             = 1
)

print(response.cluster_id)

下列名為 test_helpers.py 的檔案會測試函 create_cluster 式是否傳回預期的回應。 此測試會模擬 WorkspaceClient 物件、定義模擬對象的設定,然後將模擬物件傳遞給 create_cluster 函式,而不是在目標工作區中建立叢集。 然後測試會檢查函式是否傳回新模擬叢集的預期標識碼。

# test_helpers.py

from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.

def test_create_cluster():
  # Create a mock WorkspaceClient.
  mock_workspace_client = create_autospec(WorkspaceClient)

  # Set the mock WorkspaceClient's clusters.create().cluster_id value.
  mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'

  # Call the actual function but with the mock WorkspaceClient.
  # Replace <spark-version> with the target Spark version string.
  # Replace <node-type-id> with the target node type string.
  response = create_cluster(
    w = mock_workspace_client,
    cluster_name            = 'Test Cluster',
    spark_version           = '<spark-version>',
    node_type_id            = '<node-type-id>',
    autotermination_minutes = 15,
    num_workers             = 1
  )

  # Assert that the function returned the mocked cluster ID.
  assert response.cluster_id == '123abc'

若要執行此測試,請從程式碼專案的根目錄執行 pytest 命令,應該會產生類似下列的測試結果:

$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item

test_helpers.py . [100%]
======================== 1 passed ==========================

其他資源

如需詳細資訊,請參閱