分享方式:


Databricks SDK for Go

在本文中,您將瞭解如何使用 Databricks SDK for Go 將 Azure Databricks 帳戶、工作區和相關資源的作業自動化。 本文補充適用於 Go 自述檔API 參考範例的 Databricks SDK。

注意

這項功能在 Beta,而且可在生產環境中使用。

在 Beta 期間,Databricks 建議您將相依性釘選到程式代碼相依於程式代碼所相依之 Databricks SDK for Go 的特定次要版本,例如,在專案的 go.mod 檔案中。 如需釘選相依性的詳細資訊,請參閱 管理相依性

開始之前

開始使用 Databricks SDK for Go 之前,您的開發電腦必須具有:

開始使用 Databricks SDK for Go

  1. 在已安裝 Go 的開發電腦上,已建立現有的 Go 程式代碼專案,並已設定 Azure Databricks 驗證,藉由執行 go mod init 命令來建立go.mod檔案來追蹤 Go 程式代碼的相依性,例如:

    go mod init sample
    
  2. 執行 go mod edit -require 命令以相依於 Databricks SDK for Go 套件,並將 取代0.8.0為最新版的 Databricks SDK for Go 套件,如 CHANGELOG所列:

    go mod edit -require github.com/databricks/databricks-sdk-go@v0.8.0
    

    您的 go.mod 檔案現在看起來應該像這樣:

    module sample
    
    go 1.18
    
    require github.com/databricks/databricks-sdk-go v0.8.0
    
  3. 在您的專案中,建立 Go 程式代碼檔案,以匯入 Databricks SDK for Go。 下列範例會在名為 main.go 且具有下列內容的檔案中,列出 Azure Databricks 工作區中的所有叢集:

    package main
    
    import (
      "context"
    
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/service/compute"
    )
    
    func main() {
      w := databricks.Must(databricks.NewWorkspaceClient())
      all, err := w.Clusters.ListAll(context.Background(), compute.ListClustersRequest{})
      if err != nil {
        panic(err)
      }
      for _, c := range all {
        println(c.ClusterName)
      }
    }
    
  4. 執行 命令以新增任何遺漏的 go mod tidy 模組相依性:

    go mod tidy
    

    注意

    如果您收到錯誤 go: warning: "all" matched no packages,您忘記新增 Go 程式代碼檔案,以匯入 Databricks SDK for Go。

  5. 執行 go mod vendor 命令,以擷取支援模組中main封裝組建和測試所需的所有套件複本:

    go mod vendor
    
  6. 設定您的開發計算機以進行 Azure Databricks 驗證

  7. 執行 命令,以執行名為 main.gogo run 的檔案執行 Go 程式代碼檔案:

    go run main.go
    

    注意

    若未在上述呼叫w := databricks.Must(databricks.NewWorkspaceClient())中將 *databricks.Config 設定為自變數,Databricks SDK for Go 會使用其預設程式嘗試執行 Azure Databricks 驗證。 若要覆寫此預設行為,請參閱 使用 Azure Databricks 帳戶或工作區驗證 Databricks SDK for Go。

更新 Databricks SDK for Go

若要更新 Go 專案,以使用 CHANGELOG 中列出的其中一個 Databricks SDK for Go 套件,請執行下列動作:

  1. go get從專案的根目錄執行命令,指定要-u執行更新的旗標,並提供 Databricks SDK for Go 套件的名稱和目標版本號碼。 例如,若要更新為 版本 0.12.0,請執行下列命令:

    go get -u github.com/databricks/databricks-sdk-go@v0.12.0
    
  2. 執行 go mod tidy 命令,以新增和更新任何遺失和過期的模組相依性:

    go mod tidy
    
  3. 執行 go mod vendor 命令,以擷取支援模組中main套件組建和測試所需的所有新和更新套件複本:

    go mod vendor
    

使用 Azure Databricks 帳戶或工作區驗證 Databricks SDK for Go

Databricks SDK for Go 會 實作 Databricks 用戶端統一驗證 標準,這是一種整合且一致的架構和驗證程序設計方法。 這種方法有助於使用 Azure Databricks 更集中且可預測的方式來設定和自動化驗證。 它可讓您設定 Databricks 驗證一次,然後在多個 Databricks 工具和 SDK 之間使用該組態,而不需要進一步的驗證組態變更。 如需詳細資訊,包括 Go 中更完整的程式代碼範例,請參閱 Databricks 用戶端整合驗證

使用 Databricks SDK for Go 初始化 Databricks 驗證的一些可用編碼模式包括:

  • 執行下列其中一項動作,使用 Databricks 預設驗證:

    • 建立或識別具有目標 Databricks 驗證類型所需字段的自定義 Databricks 組態配置檔 。 然後將環境變數設定 DATABRICKS_CONFIG_PROFILE 為自定義組態配置檔的名稱。
    • 設定目標 Databricks 驗證類型所需的環境變數。

    然後具現化 ,例如 WorkspaceClient 具有 Databricks 預設驗證的物件,如下所示:

    import (
      "github.com/databricks/databricks-sdk-go"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient())
    
  • 支援硬式編碼所需的字段,但不建議這麼做,因為它可能會暴露程序代碼中的敏感性資訊,例如 Azure Databricks 個人存取令牌。 下列範例硬式編碼 Azure Databricks 主機和 Databricks 令牌驗證的存取令牌值:

    import (
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/config"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient(&databricks.Config{
      Host:  "https://...",
      Token: "...",
    }))
    

請參閱 Databricks SDK for Go README 中的驗證

範例

下列程式代碼範例示範如何使用 Databricks SDK for Go 來建立和刪除叢集、執行作業和列出帳戶使用者。 這些程式代碼範例會使用 Databricks SDK for Go 的預設 Azure Databricks 驗證 程式。

如需其他程式代碼範例,請參閱 GitHub 中 Databricks SDK for Go 存放庫中的 examples 資料夾。

建立叢集

此程式代碼範例會建立具有最新可用 Databricks Runtime 長期支援 (LTS) 版本的叢集,以及具有本機磁碟的最小可用叢集節點類型。 此叢集有一個背景工作角色,且叢集會在閑置時間 15 分鐘後自動終止。 方法 CreateAndWait 呼叫會導致程式代碼暫停,直到新叢集在工作區中執行為止。

package main

import (
  "context"
  "fmt"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/compute"
)

func main() {
  const clusterName            = "my-cluster"
  const autoTerminationMinutes = 15
  const numWorkers             = 1

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  // Get the full list of available Spark versions to choose from.
  sparkVersions, err := w.Clusters.SparkVersions(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the latest Long Term Support (LTS) version.
  latestLTS, err := sparkVersions.Select(compute.SparkVersionRequest{
    Latest:          true,
    LongTermSupport: true,
  })

  if err != nil {
    panic(err)
  }

  // Get the list of available cluster node types to choose from.
  nodeTypes, err := w.Clusters.ListNodeTypes(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the smallest available cluster node type.
  smallestWithLocalDisk, err := nodeTypes.Smallest(clusters.NodeTypeRequest{
    LocalDisk: true,
  })

  if err != nil {
    panic(err)
  }

  fmt.Println("Now attempting to create the cluster, please wait...")

  runningCluster, err := w.Clusters.CreateAndWait(ctx, compute.CreateCluster{
    ClusterName:            clusterName,
    SparkVersion:           latestLTS,
    NodeTypeId:             smallestWithLocalDisk,
    AutoterminationMinutes: autoTerminationMinutes,
    NumWorkers:             numWorkers,
  })

  if err != nil {
    panic(err)
  }

  switch runningCluster.State {
  case compute.StateRunning:
    fmt.Printf("The cluster is now ready at %s#setting/clusters/%s/configuration\n",
      w.Config.Host,
      runningCluster.ClusterId,
    )
  default:
    fmt.Printf("Cluster is not running or failed to create. %s", runningCluster.StateMessage)
  }

  // Output:
  //
  // Now attempting to create the cluster, please wait...
  // The cluster is now ready at <workspace-host>#setting/clusters/<cluster-id>/configuration
}

永久刪除叢集

此程式代碼範例會從工作區中永久刪除具有指定叢集標識符的叢集。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/clusters"
)

func main() {
  // Replace with your cluster's ID.
  const clusterId = "1234-567890-ab123cd4"

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  err := w.Clusters.PermanentDelete(ctx, compute.PermanentDeleteCluster{
    ClusterId: clusterId,
  })

  if err != nil {
    panic(err)
  }
}

執行作業

此程式代碼範例會建立 Azure Databricks 作業,以在指定的叢集上執行指定的筆記本。 當程式代碼執行時,它會從終端機的使用者取得現有的筆記本路徑、現有的叢集標識碼和相關作業設定。 方法 RunNowAndWait 呼叫會導致程式代碼暫停,直到新作業在工作區中完成執行為止。

package main

import (
  "bufio"
  "context"
  "fmt"
  "os"
  "strings"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/jobs"
)

func main() {
  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  nt := jobs.NotebookTask{
    NotebookPath: askFor("Workspace path of the notebook to run:"),
  }

  jobToRun, err := w.Jobs.Create(ctx, jobs.CreateJob{
    Name: askFor("Some short name for the job:"),
    Tasks: []jobs.JobTaskSettings{
      {
        Description:       askFor("Some short description for the job:"),
        TaskKey:           askFor("Some key to apply to the job's tasks:"),
        ExistingClusterId: askFor("ID of the existing cluster in the workspace to run the job on:"),
        NotebookTask:      &nt,
      },
    },
  })

  if err != nil {
    panic(err)
  }

  fmt.Printf("Now attempting to run the job at %s/#job/%d, please wait...\n",
    w.Config.Host,
    jobToRun.JobId,
  )

  runningJob, err := w.Jobs.RunNow(ctx, jobs.RunNow{
    JobId: jobToRun.JobId,
  })

  if err != nil {
    panic(err)
  }

  jobRun, err := runningJob.Get()

  if err != nil {
    panic(err)
  }

  fmt.Printf("View the job run results at %s/#job/%d/run/%d\n",
    w.Config.Host,
    jobRun.JobId,
    jobRun.RunId,
  )

  // Output:
  //
  // Now attempting to run the job at <workspace-host>/#job/<job-id>, please wait...
  // View the job run results at <workspace-host>/#job/<job-id>/run/<run-id>
}

// Get job settings from the user.
func askFor(prompt string) string {
  var s string
  r := bufio.NewReader(os.Stdin)
  for {
    fmt.Fprint(os.Stdout, prompt+" ")
    s, _ = r.ReadString('\n')
    if s != "" {
      break
    }
  }
  return strings.TrimSpace(s)
}

管理 Unity 目錄磁碟區中的檔案

此程式代碼範例示範內部各種功能files呼叫,以存取 Unity 目錄磁碟區WorkspaceClient

package main

import (
  "context"
  "io"
  "os"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/files"
)

func main() {
  w := databricks.Must(databricks.NewWorkspaceClient())

  catalog          := "main"
  schema           := "default"
  volume           := "my-volume"
  volumePath       := "/Volumes/" + catalog + "/" + schema + "/" + volume // /Volumes/main/default/my-volume
  volumeFolder     := "my-folder"
  volumeFolderPath := volumePath + "/" + volumeFolder // /Volumes/main/default/my-volume/my-folder
  volumeFile       := "data.csv"
  volumeFilePath   := volumeFolderPath + "/" + volumeFile // /Volumes/main/default/my-volume/my-folder/data.csv
  uploadFilePath   := "./data.csv"

  // Create an empty folder in a volume.
  err := w.Files.CreateDirectory(
    context.Background(),
    files.CreateDirectoryRequest{DirectoryPath: volumeFolderPath},
  )
  if err != nil {
    panic(err)
  }

  // Upload a file to a volume.
  fileUpload, err := os.Open(uploadFilePath)
  if err != nil {
    panic(err)
  }
  defer fileUpload.Close()

  w.Files.Upload(
    context.Background(),
    files.UploadRequest{
      Contents:  fileUpload,
      FilePath:  volumeFilePath,
      Overwrite: true,
    },
  )

  // List the contents of a volume.
  items := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumePath},
  )

  for {
    if items.HasNext(context.Background()) {
      item, err := items.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)

    } else {
      break
    }
  }

  // List the contents of a folder in a volume.
  itemsFolder := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumeFolderPath},
  )

  for {
    if itemsFolder.HasNext(context.Background()) {
      item, err := itemsFolder.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)
    } else {
      break
    }
  }

  // Print the contents of a file in a volume.
  file, err := w.Files.DownloadByFilePath(
    context.Background(),
    volumeFilePath,
  )
  if err != nil {
    panic(err)
  }

  bufDownload := make([]byte, file.ContentLength)

  for {
    file, err := file.Contents.Read(bufDownload)
    if err != nil && err != io.EOF {
      panic(err)
    }
    if file == 0 {
      break
    }

    println(string(bufDownload[:file]))
  }

  // Delete a file from a volume.
  w.Files.DeleteByFilePath(
    context.Background(),
    volumeFilePath,
  )

  // Delete a folder from a volume.
  w.Files.DeleteDirectory(
    context.Background(),
    files.DeleteDirectoryRequest{
      DirectoryPath: volumeFolderPath,
    },
  )
}

列出帳戶使用者

此程式代碼範例會列出 Azure Databricks 帳戶內可用的使用者。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/iam"
)

func main() {
  a := databricks.Must(databricks.NewAccountClient())
  all, err := a.Users.ListAll(context.Background(), iam.ListAccountUsersRequest{})
  if err != nil {
    panic(err)
  }
  for _, u := range all {
    println(u.UserName)
  }
}

其他資源

如需詳細資訊,請參閱