共用方式為


Databricks Runtime 12.2 LTS 和以下的 Databricks Connect

注意

Databricks Connect 建議您改用 Databricks Connect for Databricks Runtime 13.0 和更新版本

Databricks 計劃 Databricks Connect for Databricks Runtime 12.2 LTS 和以下版本沒有新功能運作。

Databricks Connect 可讓您將熱門 IDE,例如 Visual Studio Code 和 PyCharm、Notebook 伺服器和其他自定義應用程式連線至 Azure Databricks 叢集。

本文說明 Databricks Connect 的運作方式、逐步引導您完成開始使用 Databricks Connect 的步驟、說明如何針對使用 Databricks Connect 時可能發生的問題進行疑難解答,以及使用 Databricks Connect 執行與在 Azure Databricks 筆記本中執行之間的差異。

概觀

Databricks Connect 是 Databricks Runtime 的用戶端連結庫。 它可讓您使用 Spark API 撰寫作業,並在 Azure Databricks 叢集上遠端執行作業,而不是在本機 Spark 工作階段中執行。

例如,當您使用 Databricks Connect 執行 DataFrame 命令 spark.read.format(...).load(...).groupBy(...).agg(...).show() 時,命令的邏輯表示法會傳送至在 Azure Databricks 中執行的 Spark 伺服器,以在遠端叢集上執行。

使用 Databricks Connect,您可以:

  • 從任何 Python、R、Scala 或 Java 應用程式執行大規模的 Spark 作業。 您 import pyspark現在可以從 require(SparkR)import org.apache.spark應用程式直接執行 Spark 作業,而不需要安裝任何 IDE 外掛程式或使用 Spark 提交腳本。
  • 即使在使用遠端叢集時,也請逐步執行並偵錯 IDE 中的程式碼。
  • 開發連結庫時會快速反覆運算。 您不需要在 Databricks Connect 中變更 Python 或 Java 連結庫相依性之後重新啟動叢集,因為每個用戶端會話都會與叢集中的彼此隔離。
  • 關閉閑置叢集,而不會遺失工作。 由於用戶端應用程式與叢集分離,因此不會受到叢集重新啟動或升級的影響,這通常會導致失去筆記本中定義的所有變數、RDD 和 DataFrame 物件。

注意

針對使用 SQL 查詢進行 Python 開發的 Databricks,Databricks 建議您使用 適用於 Python 的 Databricks SQL 連接器,而不是 Databricks Connect。 適用於 Python 的 Databricks SQL Connector 比 Databricks Connect 更容易設定。 此外,Databricks Connect 會剖析並規劃本機計算機上執行的作業,而作業則會在遠端計算資源上執行。 這會使偵錯運行時間錯誤變得特別困難。 適用於 Python 的 Databricks SQL 連接器會將 SQL 查詢直接提交至遠端計算資源並擷取結果。

需求

本節列出 Databricks Connect 的需求。

  • 僅支援下列 Databricks Runtime 版本:

    • Databricks Runtime 12.2 LTS ML,Databricks Runtime 12.2 LTS
    • Databricks Runtime 11.3 LTS ML、Databricks Runtime 11.3 LTS
    • Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
    • Databricks Runtime 9.1 LTS ML、Databricks Runtime 9.1 LTS
    • Databricks Runtime 7.3 LTS
  • 您必須在開發計算機上安裝 Python 3,且用戶端 Python 安裝的次要版本必須與 Azure Databricks 叢集的次要 Python 版本相同。 下表顯示每個 Databricks Runtime 一起安裝的 Python 版本。

    Databricks Runtime 版本 Python 版本
    12.2 LTS ML、12.2 LTS 3.9
    11.3 LTS ML、11.3 LTS 3.9
    10.4 LTS ML、10.4 LTS 3.8
    9.1 LTS ML、9.1 LTS 3.8
    7.3 LTS 3.7

    Databricks 強烈建議您已針對與 Databricks Connect 搭配使用的每個 Python 版本啟用 Python 虛擬環境 。 Python 虛擬環境可協助您確定您使用的是正確的 Python 版本和 Databricks Connect。 這有助於減少解決相關技術問題所花費的時間。

    例如,如果您在開發計算機上使用 venv ,而您的叢集正在執行 Python 3.9,則必須使用該版本建立 venv 環境。 下列範例命令會產生腳本來啟動 venv 具有 Python 3.9 的環境,然後此命令會將這些腳本放在目前工作目錄中名為 .venv 的隱藏資料夾中:

    # Linux and macOS
    python3.9 -m venv ./.venv
    
    # Windows
    python3.9 -m venv .\.venv
    

    若要使用這些腳本來啟用此 venv 環境,請參閱 venvs 的運作方式。

    另一個範例是,如果您在開發計算機上使用 Conda ,而您的叢集正在執行 Python 3.9,則必須使用該版本建立 Conda 環境,例如:

    conda create --name dbconnect python=3.9
    

    若要使用此環境名稱啟動 Conda 環境,請執行 conda activate dbconnect

  • Databricks Connect 主要和次要套件版本必須一律符合您的 Databricks 運行時間版本。 Databricks 建議您一律使用與 Databricks Runtime 版本相符的最新 Databricks Connect 套件。 例如,當您使用 Databricks Runtime 12.2 LTS 叢集時,您也必須使用 databricks-connect==12.2.* 套件。

  • Java Runtime Environment (JRE) 8。 用戶端已經過 OpenJDK 8 JRE 的測試。 用戶端不支援 Java 11。

注意

在 Windows 上,如果您看到 Databricks Connect 找不到winutils.exe的錯誤,請參閱在 Windows 上找不到 winutils.exe

設定用戶端

完成下列步驟以設定 Databricks Connect 的本機用戶端。

注意

開始設定本機 Databricks Connect 用戶端之前,您必須 符合 Databricks Connect 的需求

步驟 1:安裝 Databricks Connect 用戶端

  1. 啟用虛擬環境后,執行 uninstall 命令,以卸載已安裝 PySpark。 這是必要的, databricks-connect 因為套件與 PySpark 衝突。 如需詳細資訊,請參閱 衝突的 PySpark 安裝。 若要檢查是否已安裝 PySpark,請執行 show 命令。

    # Is PySpark already installed?
    pip3 show pyspark
    
    # Uninstall PySpark
    pip3 uninstall pyspark
    
  2. 在虛擬環境仍啟用之後,請執行 install 命令來安裝 Databricks Connect 用戶端。 --upgrade使用 選項,將任何現有的用戶端安裝升級至指定的版本。

    pip3 install --upgrade "databricks-connect==12.2.*"  # Or X.Y.* to match your cluster version.
    

    注意

    Databricks 建議您附加 「dot-asterisk」 表示法來指定 databricks-connect==X.Y.* ,而不是 databricks-connect=X.Y,以確保已安裝最新的套件。

步驟 2:設定連線屬性

  1. 收集下列組態屬性。

  2. 設定連線,如下所示。

    您可以使用 CLI、SQL 組態或環境變數。 組態方法從最高到最低優先順序是:SQL 組態索引鍵、CLI 和環境變數。

    • CLI(命令列介面)

      1. 執行 databricks-connect

        databricks-connect configure
        

        授權會顯示:

        Copyright (2018) Databricks, Inc.
        
        This library (the "Software") may not be used except in connection with the
        Licensee's use of the Databricks Platform Services pursuant to an Agreement
          ...
        
      2. 接受授權並提供組態值。 針對 Databricks 主機Databricks 令牌,輸入您在步驟 1 中指出的工作區 URL 和個人存取令牌。

        Do you accept the above agreement? [y/N] y
        Set new config values (leave input empty to accept default):
        Databricks Host [no current value, must start with https://]: <databricks-url>
        Databricks Token [no current value]: <databricks-token>
        Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
        Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
        Port [15001]: <port>
        

        如果您收到一則訊息,指出Microsoft Entra ID 令牌太長,您可以將 Databricks Token 字段保留空白,並在 中~/.databricks-connect手動輸入令牌。

    • SQL 組態或環境變數。 下表顯示 SQL 組態索引鍵和環境變數,這些環境變數會對應至您在步驟 1 中指出的組態屬性。 若要設定 SQL 組態金鑰,請使用 sql("set config=value")。 例如: sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")

      參數 SQL 組態金鑰 環境變數名稱
      Databricks 主機 spark.databricks.service.address DATABRICKS_地址
      Databricks 令牌 spark.databricks.service.token DATABRICKS_API_TOKEN
      叢集識別碼 spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID
      組織識別碼 spark.databricks.service.orgId DATABRICKS_ORG_ID
      連接埠 spark.databricks.service.port DATABRICKS_PORT
  3. 在虛擬環境仍啟用之後,測試 Azure Databricks 的聯機能力,如下所示。

    databricks-connect test
    

    如果您設定的叢集未執行,測試會啟動叢集,該叢集會持續執行,直到其設定的自動結束時間為止。 輸出應如下所示:

    * PySpark is installed at /.../.../pyspark
    * Checking java version
    java version "1.8..."
    Java(TM) SE Runtime Environment (build 1.8...)
    Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode)
    * Testing scala command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2...
          /_/
    
    Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.range(100).reduce(_ + _)
    Spark context Web UI available at https://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
    res0: Long = 4950
    
    scala> :quit
    
    * Testing python command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    
  4. 如果未顯示任何連線相關錯誤(WARN 訊息沒問題),則您已成功連線。

使用 Databricks Connect

本節說明如何設定慣用的 IDE 或筆記本伺服器,以使用 Databricks Connect 的用戶端。

本節內容:

JupyterLab

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要使用 Databricks Connect 與 JupyterLab 和 Python,請遵循這些指示。

  1. 若要安裝 JupyterLab,請啟動 Python 虛擬環境,從終端機或命令提示字元執行下列命令:

    pip3 install jupyterlab
    
  2. 若要在網頁瀏覽器中啟動 JupyterLab,請從您啟動的 Python 虛擬環境執行下列命令:

    jupyter lab
    

    如果 JupyterLab 未出現在您的網頁瀏覽器中,請從您的虛擬環境中複製以 localhost127.0.0.1 開頭的 URL,然後在瀏覽器的網址列中輸入。

  3. 建立新的筆記本:在 JupyterLab 中,單擊 主功能表上的 [檔案 > 新 > 筆記本 ],選取 [Python 3][ipykernel], 然後按兩下 [ 選取]。

  4. 在筆記本的第一個數據格中,輸入 範例程式代碼 或您自己的程序代碼。 如果您使用自己的程式代碼,您至少必須具現化 實例 SparkSession.builder.getOrCreate(),如範例程式 代碼所示。

  5. 若要執行筆記本,請按兩下 [ 執行 > 所有儲存格]。

  6. 若要偵錯筆記本,請按下筆記本工具列中 Python 3 (ipykernel) 旁的 Bug (啟用調試程式) 圖示。 設定一或多個斷點,然後按下 [ 執行 > 所有單元格]。

  7. 若要關閉 JupyterLab,請按兩下 [ 檔案 > 關機]。 如果 JupyterLab 進程仍在終端機或命令提示字元中執行,請按 Ctrl + c ,然後輸入 y 以確認來停止此程式。

如需更具體的偵錯指示,請參閱 調試程式

傳統 Jupyter Notebook

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

Databricks Connect 的組態腳本會自動將套件新增至專案組態。 若要開始使用 Python 核心,請執行:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

若要啟用執行和視覺化 SQL 查詢的 %sql 速記,請使用下列代碼段:

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

Visual Studio Code

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要使用 Databricks Connect 與 Visual Studio Code,請執行下列動作:

  1. 確認 已安裝 Python 延伸模組

  2. 開啟命令選擇區 (macOS 上的 Command+Shift+PWindows/Linux 上的 Ctrl+Shift+P )。

  3. 選取 Python 解譯器。 移至 [ 程序代碼 > 喜好 > 設定設定],然後選擇 [Python 設定]。

  4. 執行 databricks-connect get-jar-dir

  5. 將命令傳回的目錄新增至 底下的 python.venvPath[用戶設定 JSON]。 這應該新增至 Python 組態。

  6. 停用linter。 按下右側的 ...編輯 json 設定。 修改過的設定如下所示:

    VS Code 組態

  7. 如果使用虛擬環境執行,這是在 VS Code 中針對 Python 進行開發的建議方式,請在命令選擇區類型select python interpreter中,指向符合叢集 Python 版本的環境

    選取 Python 解釋器

    例如,如果您的叢集是 Python 3.9,您的開發環境應該是 Python 3.9。

    Python 版本

PyCharm

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

Databricks Connect 的組態腳本會自動將套件新增至專案組態。

Python 3 叢集

  1. 當您建立 PyCharm 專案時,請選取 [現有解釋器]。 從下拉功能表中,選取您建立的 Conda 環境(請參閱 需求)。

    選取解釋器

  2. 移至 [ 執行 > 編輯組態]。

  3. 新增 PYSPARK_PYTHON=python3 為環境變數。

    Python 3 叢集設定

SparkR 和 RStudio Desktop

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要使用 Databricks Connect 與 SparkR 和 RStudio Desktop,請執行下列動作:

  1. 將 開放原始碼 Spark 散發套件下載並解壓縮到您的開發電腦上。 選擇與 Azure Databricks 叢集中相同的版本(Hadoop 2.7)。

  2. 執行 databricks-connect get-jar-dir。 此命令會傳回類似 /usr/local/lib/python3.5/dist-packages/pyspark/jars的路徑。 複製 JAR 目錄檔案路徑上方一個目錄的檔案路徑,例如 ,/usr/local/lib/python3.5/dist-packages/pyspark這是SPARK_HOME目錄。

  3. 將 Spark 連結庫路徑和 Spark 首頁新增至 R 腳稿頂端,以設定 Spark 連結庫路徑和 Spark 首頁。 將 設定<spark-lib-path>為您在步驟 1 中解除封裝 開放原始碼 Spark 套件的目錄。 從步驟 2 設定 <spark-home-path> 為 Databricks Connect 目錄。

    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. 起始 Spark 工作階段並開始執行 SparkR 命令。

    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyr 和 RStudio Desktop

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

重要

這項功能處於公開預覽狀態

您可以複製您使用 Databricks Connect 在本機開發的 sparklyr 相依程式代碼,並在 Azure Databricks 筆記本中執行,或在 Azure Databricks 工作區中裝載 RStudio Server,且程式代碼變更最少或沒有任何變更。

本節內容:

需求

  • sparklyr 1.2 或更新版本。
  • Databricks Runtime 7.3 LTS 或更新版本與相符的 Databricks Connect 版本。

安裝、設定及使用sparklyr

  1. 在 RStudio Desktop 中,從 CRAN 安裝 sparklyr 1.2 或更新版本,或從 GitHub 安裝最新的主要版本。

    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. 在已安裝正確版本的 Databricks Connect 中啟動 Python 環境,並在終端機中執行下列命令以取得 <spark-home-path>

    databricks-connect get-spark-home
    
  3. 起始 Spark 工作階段並開始執行 sparklyr 命令。

    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. 關閉連線。

    spark_disconnect(sc)
    

資源

如需詳細資訊,請參閱 sparklyr GitHub 自述檔

如需程式代碼範例,請參閱 sparklyr

sparklyr 和 RStudio Desktop 限制

不支援下列功能:

  • sparklyr 串流 API
  • sparklyr ML API
  • broom API
  • csv_file串行化模式
  • spark 提交

IntelliJ (Scala 或 Java)

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要使用 Databricks Connect 與 IntelliJ (Scala 或 Java),請執行下列動作:

  1. 執行 databricks-connect get-jar-dir

  2. 將相依性指向從 命令傳回的目錄。 移至 [檔案 > 專案結構 > 模組 > 相依性 > '+' 符號 > JAR 或目錄

    IntelliJ JAR

    為避免衝突,我們強烈建議從您的 classpath 移除任何其他 Spark 安裝。 如果無法這樣做,請確定您新增的 JAR 位於 classpath 的前面。 特別是,它們必須領先於任何其他已安裝的 Spark 版本(否則您會使用其中一個其他 Spark 版本並在本機執行或擲回 ClassDefNotFoundError)。

  3. 檢查 IntelliJ 中中斷選項的設定。 默認值為 All ,如果您設定偵錯的斷點,將會導致網路逾時。 將它設定為 Thread ,以避免停止背景網路線程。

    IntelliJ 線程

PyDev with Eclipse

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要搭配 Eclipse 使用 Databricks Connect 和 PyDev,請遵循這些指示。

  1. 啟動 Eclipse。
  2. 建立專案:按兩下 [檔案 > 新 > 專案 > PyDev > PyDev 專案],然後按 [ 下一步]。
  3. 指定 項目名稱
  4. 針對 [項目內容],指定 Python 虛擬環境的路徑。
  5. 在繼續之前,按兩下 [請設定解釋器]。
  6. 按兩下 [ 手動設定]。
  7. 按兩下 [ 新增 > 瀏覽 python/pypy exe]。
  8. 流覽至並選取從虛擬環境參考之 Python 解釋器的完整路徑,然後按兩下 [ 開啟]。
  9. 在 [ 選取解釋器] 對話框中,按兩下 [ 確定]。
  10. 在 [ 需要選取專案] 對話框中,按兩下 [ 確定]。
  11. 在 [ 喜好設定 ] 對話框中,按兩下 [ 套用並關閉]。
  12. [PyDev 專案 ] 對話框中,按兩下 [ 完成]。
  13. 按兩下 [ 開啟檢視方塊]。
  14. 將 Python 程式代碼 (.py) 檔案新增至專案,其中包含 範例程式代碼或您自己的程式代碼 。 如果您使用自己的程式代碼,您至少必須具現化 實例 SparkSession.builder.getOrCreate(),如範例程式 代碼所示。
  15. 開啟 Python 程式代碼檔案後,設定您希望程式代碼在執行時暫停的任何斷點。
  16. 按兩下 [執行執行>] 或 [執行偵>錯]。

如需更具體的執行和偵錯指示,請參閱 執行程式

日蝕

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要使用 Databricks Connect 和 Eclipse,請執行下列動作:

  1. 執行 databricks-connect get-jar-dir

  2. 將外部 JAR 組態指向從 命令傳回的目錄。 移至 [專案] 功能表 > [屬性 > Java 建置路徑 > 庫 > ] [新增外部 Jar]。

    Eclipse 外部 JAR 組態

    為避免衝突,我們強烈建議從您的 classpath 移除任何其他 Spark 安裝。 如果無法這樣做,請確定您新增的 JAR 位於 classpath 的前面。 特別是,它們必須領先於任何其他已安裝的 Spark 版本(否則您會使用其中一個其他 Spark 版本並在本機執行或擲回 ClassDefNotFoundError)。

    Eclipse Spark 設定

SBT

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要搭配 SBT 使用 Databricks Connect,您必須將檔案設定 build.sbt 為針對 Databricks Connect JAR 連結,而不是一般的 Spark 連結庫相依性。 您可以使用下列範例組建檔案中的 指示詞來執行此 unmanagedBase 動作,其假設 Scala 應用程式具有 com.example.Test 主要物件:

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

Spark 殼層

注意

開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端

若要使用 Databricks Connect 搭配 Spark 殼層和 Python 或 Scala,請遵循這些指示。

  1. 啟用虛擬環境后,請確定databricks-connect test命令已在 [設定用戶端] 中順利執行。

  2. 啟動虛擬環境之後,啟動Spark殼層。 針對 Python,執行 pyspark 命令。 針對 Scala,執行 spark-shell 命令。

    # For Python:
    pyspark
    
    # For Scala:
    spark-shell
    
  3. Spark 殼層隨即出現,例如 Python:

    Python 3... (v3...)
    [Clang 6... (clang-6...)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
           ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3....
          /_/
    
    Using Python version 3... (v3...)
    Spark context Web UI available at http://...:...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    SparkSession available as 'spark'.
    >>>
    

    針對 Scala:

    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3...
          /_/
    
    Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala>
    
  4. 如需如何使用 Spark 殼層搭配 Python 或 Scala 在叢集上執行命令的相關信息,請參閱 Spark 殼層的互動式分析。

    使用內 spark 建變數來代表 SparkSession 您執行中的叢集上的 ,例如 Python:

    >>> df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    

    針對 Scala:

    >>> val df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    
  5. 若要停止 Spark 殼層,請按 Ctrl + d 或 ,或執行 命令Ctrl + zquit()或 Python 或 或 exit():q Scala:quit

程式代碼範例

這個簡單的程式代碼範例會查詢指定的數據表,然後顯示指定的數據表的前 5 個數據列。 若要使用不同的數據表,請調整 對 spark.read.table的呼叫。

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

這個較長的程式代碼範例會執行下列動作:

  1. 建立記憶體內部 DataFrame。
  2. 使用架構內zzz_demo_temps_table的名稱default建立數據表。 如果已有這個名稱的數據表存在,則會先刪除數據表。 若要使用不同的架構或數據表,請調整對 spark.sqltemps.write.saveAsTable或兩者的呼叫。
  3. 將 DataFrame 的內容儲存至數據表。
  4. 在數據表的內容上執行SELECT查詢。
  5. 顯示查詢的結果。
  6. 刪除資料表。

Python(程式語言)

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

程式語言 Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object Demo {
  def main(args: Array[String]) {
      val spark = SparkSession.builder.master("local").getOrCreate()

      // Create a Spark DataFrame consisting of high and low temperatures
      // by airport code and date.
      val schema = StructType(Array(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      ))

      val data = List(
        Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
        Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
        Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
        Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
        Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
        Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
        Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
        Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
        Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
      )

      val rdd = spark.sparkContext.makeRDD(data)
      val temps = spark.createDataFrame(rdd, schema)

      // Create a table on the Databricks cluster and then fill
      // the table with the DataFrame's contents.
      // If the table already exists from a previous run,
      // delete it first.
      spark.sql("USE default")
      spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
      temps.write.saveAsTable("zzz_demo_temps_table")

      // Query the table on the Databricks cluster, returning rows
      // where the airport code is not BLI and the date is later
      // than 2021-04-01. Group the results and order by high
      // temperature in descending order.
      val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
        "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
        "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
        "ORDER BY TempHighF DESC")
      df_temps.show()

      // Results:
      //
      // +-----------+----------+---------+--------+
      // |AirportCode|      Date|TempHighF|TempLowF|
      // +-----------+----------+---------+--------+
      // |        PDX|2021-04-03|       64|      45|
      // |        PDX|2021-04-02|       61|      41|
      // |        SEA|2021-04-03|       57|      43|
      // |        SEA|2021-04-02|       54|      39|
      // +-----------+----------+---------+--------+

      // Clean up by deleting the table from the Databricks cluster.
      spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

爪哇島

import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class App {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
            .builder()
            .appName("Temps Demo")
            .config("spark.master", "local")
            .getOrCreate();

        // Create a Spark DataFrame consisting of high and low temperatures
        // by airport code and date.
        StructType schema = new StructType(new StructField[] {
            new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
            new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
            new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
        });

        List<Row> dataList = new ArrayList<Row>();
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

        Dataset<Row> temps = spark.createDataFrame(dataList, schema);

        // Create a table on the Databricks cluster and then fill
        // the table with the DataFrame's contents.
        // If the table already exists from a previous run,
        // delete it first.
        spark.sql("USE default");
        spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
        temps.write().saveAsTable("zzz_demo_temps_table");

        // Query the table on the Databricks cluster, returning rows
        // where the airport code is not BLI and the date is later
        // than 2021-04-01. Group the results and order by high
        // temperature in descending order.
        Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
            "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
            "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
            "ORDER BY TempHighF DESC");
        df_temps.show();

        // Results:
        //
        // +-----------+----------+---------+--------+
        // |AirportCode|      Date|TempHighF|TempLowF|
        // +-----------+----------+---------+--------+
        // |        PDX|2021-04-03|       64|      45|
        // |        PDX|2021-04-02|       61|      41|
        // |        SEA|2021-04-03|       57|      43|
        // |        SEA|2021-04-02|       54|      39|
        // +-----------+----------+---------+--------+

        // Clean up by deleting the table from the Databricks cluster.
        spark.sql("DROP TABLE zzz_demo_temps_table");
    }
}

使用相依性

一般而言,您的主要類別或 Python 檔案會有其他相依性 JAR 和檔案。 您可以呼叫 sparkContext.addJar("path-to-the-jar")sparkContext.addPyFile("path-to-the-file")來新增這類相依性 JAR 和檔案。 您也可以使用 addPyFile() 介面新增 Egg 檔案和 zip 檔案。 每次在 IDE 中執行程式代碼時,相依性 JAR 和檔案都會安裝在叢集上。

Python(程式語言)

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Python + Java UDF

from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column

## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
#  val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}

spark = SparkSession.builder \
  .config("spark.jars", "/path/to/udf.jar") \
  .getOrCreate()
sc = spark.sparkContext

def plus_one_udf(col):
  f = sc._jvm.com.example.Test.plusOne()
  return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()

程式語言 Scala

package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.format("parquet").load("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

存取 Databricks 公用程式

本節說明如何使用 Databricks Connect 來存取 Databricks 公用程式

您可以使用 dbutils.fsdbutils.secrets 公用程式,這些是 Databricks Utilities (dbutils) 參考模組的一部分。 支援的指令包括dbutils.fs.cp、、dbutils.fs.headdbutils.fs.lsdbutils.fs.mkdirsdbutils.fs.mvdbutils.fs.putdbutils.fs.rmdbutils.secrets.get、、、、 。 dbutils.secrets.getBytesdbutils.secrets.listdbutils.secrets.listScopes 請參閱 檔案系統公用程式 (dbutils.fs) 或 run dbutils.fs.help()Secrets 公用程式 (dbutils.secrets) 或執行 dbutils.secrets.help()

Python(程式語言)

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

使用 Databricks Runtime 7.3 LTS 或更新版本時,若要以在本機和 Azure Databricks 叢集中運作的方式存取 DBUtils 模組,請使用下列專案 get_dbutils()

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

否則,請使用下列 get_dbutils()

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

程式語言 Scala

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

在本機和遠端檔案系統之間複製檔案

您可以使用 dbutils.fs 在用戶端與遠端檔案系統之間複製檔案。 配置 file:/ 是指用戶端上的本機檔系統。

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

可透過該方式傳輸的檔案大小上限為 250 MB。

使 dbutils.secrets.get

由於安全性限制,預設會停用呼叫 dbutils.secrets.get 的能力。 請連絡 Azure Databricks 支持人員,為您的工作區啟用此功能。

設定Hadoop組態

在用戶端上,您可以使用適用於 SQL 和 DataFrame 作業的 spark.conf.set API 來設定 Hadoop 組態。 上設定的 sparkContext Hadoop組態必須在叢集組態或使用筆記本中設定。 這是因為上 sparkContext 設定的組態不會系結至用戶會話,但會套用至整個叢集。

疑難排解

執行 databricks-connect test 以檢查連線問題。 本節說明您在 Databricks Connect 中可能會遇到的一些常見問題,以及如何加以解決。

本節內容:

Python 版本不符

檢查您在本機使用的 Python 版本,至少具有與叢集上版本相同的次要版本(例如, 3.9.16 相較於 3.9.15 是 OK, 3.93.8 不是)。

如果您在本機安裝多個 Python 版本,請藉由設定 PYSPARK_PYTHON 環境變數 (例如, PYSPARK_PYTHON=python3) 確定 Databricks Connect 使用正確的版本。

伺服器未啟用

確定叢集已啟用 Spark 伺服器。spark.databricks.service.server.enabled true 如果是,您應該會在驅動程序記錄中看到下列幾行:

../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms

衝突的 PySpark 安裝

套件 databricks-connect 與 PySpark 衝突。 在 Python 中初始化 Spark 內容時,安裝這兩者會造成錯誤。 這可以透過數種方式來顯示,包括「數據流損毀」或「找不到類別」錯誤。 如果您已在 Python 環境中安裝 PySpark,請確定它已卸載,再安裝 databricks-connect。 卸載 PySpark 之後,請務必完全重新安裝 Databricks Connect 套件:

pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*"  # or X.Y.* to match your specific cluster version.

衝突 SPARK_HOME

如果您先前已在計算機上使用Spark,IDE可能會設定為使用其中一個其他版本的Spark,而不是 Databricks Connect Spark。 這可以透過數種方式來顯示,包括「數據流損毀」或「找不到類別」錯誤。 您可以藉由檢查環境變數的值 SPARK_HOME 來查看正在使用哪一個 Spark 版本:

Python(程式語言)

import os
print(os.environ['SPARK_HOME'])

程式語言 Scala

println(sys.env.get("SPARK_HOME"))

爪哇島

System.out.println(System.getenv("SPARK_HOME"));

解決方法

如果 SPARK_HOME 設定為用戶端中 Spark 以外的版本,您應該取消設定 SPARK_HOME 變數,然後再試一次。

檢查 IDE 環境變數設定、您的 .bashrc.zshrc.bash_profile 檔案,以及可能設定的任何其他環境變數。 您很可能必須結束並重新啟動 IDE 以清除舊狀態,而且如果問題持續發生,您甚至可能需要建立新的專案。

您不應該將 設定為新的值;取消設定 SPARK_HOME 它應該就已足夠。

二進位檔的衝突或遺漏 PATH 專案

您可以設定PATH,讓類似的 spark-shell 命令執行其他先前安裝的二進位檔,而不是 Databricks Connect 所提供的二進位檔。 這可能會導致 databricks-connect test 失敗。 您應該確定 Databricks Connect 二進位檔優先,或移除先前安裝的二進位檔。

如果您無法執行像spark-shell這樣的命令,也可能是pip3 install無法自動設定您的PATH,因此您必須手動將安裝目錄bin新增至您的PATH。 即使沒有設定,仍然可以使用 Databricks Connect 和 IDE。 不過, databricks-connect test 命令將無法運作。

叢集上的串行化設定衝突

如果您在執行 databricks-connect test時看到「數據流損毀」錯誤,這可能是因為叢集串行化設定不相容。 例如,設定 spark.io.compression.codec 設定可能會導致此問題。 若要解決此問題,請考慮從叢集設定中移除這些設定,或在 Databricks Connect 用戶端中設定組態。

在 Windows 上找不到winutils.exe

如果您在 Windows 上使用 Databricks Connect,請參閱:

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

請遵循指示在 Windows 上設定 Hadoop 路徑。

Windows 上的檔名、目錄名稱或磁碟區標籤法不正確

如果您使用 Windows 和 Databricks Connect,請參閱:

The filename, directory name, or volume label syntax is incorrect.

Java 或 Databricks Connect 已安裝到路徑中有空格的目錄。 您可以藉由安裝至不含空格的目錄路徑,或使用簡短名稱格式來設定路徑,來解決此問題。

使用 Microsoft Entra 識別碼令牌進行驗證

注意

下列資訊僅適用於 Databricks Connect 7.3.5 到 12.2.x 版。

Databricks Connect for Databricks Runtime 13.3 LTS 和更新版本目前不支援Microsoft Entra ID 令牌。

當您使用 Databricks Connect 7.3.5 到 12.2.x 版時,您可以使用 Microsoft Entra ID 令牌進行驗證,而不是使用個人存取令牌。 Microsoft Entra 標識符令牌的存留期有限。 當Microsoft Entra ID 令牌過期時,Databricks Connect 會失敗並出現 Invalid Token 錯誤。

針對 Databricks Connect 7.3.5 到 12.2.x 版,您可以在執行中的 Databricks Connect 應用程式中提供Microsoft Entra ID 令牌。 您的應用程式必須取得新的存取令牌,並將其設定為 spark.databricks.service.token SQL 組態密鑰。

Python(程式語言)

spark.conf.set("spark.databricks.service.token", new_aad_token)

程式語言 Scala

spark.conf.set("spark.databricks.service.token", newAADToken)

更新令牌之後,應用程式可以繼續使用會話內容中建立的相同 SparkSession 和任何對象和狀態。 為避免間歇性錯誤,Databricks 建議您在舊令牌到期之前提供新的令牌。

您可以延長Microsoft Entra ID 令牌的存留期,以在應用程式執行期間保存。 若要這樣做,請將 具有適當長存留期的 TokenLifetimePolicy 附加至您用來取得存取令牌的 Microsoft Entra ID 授權應用程式。

注意

Microsoft Entra ID 傳遞使用兩個令牌:Microsoft Databricks Connect 7.3.5 版至 12.2.x 版中所設定的 Entra ID 存取令牌,以及 Databricks 處理要求時所產生特定資源的 ADLS 傳遞令牌。 您無法使用 Microsoft Entra ID 令牌存留期原則來延長 ADLS 傳遞令牌的存留期。 如果您將命令傳送至超過一小時的叢集,如果命令在一小時後存取 ADLS 資源,就會失敗。

限制

  • 結構化串流。
  • 在遠端叢集上執行不屬於 Spark 作業的任意程式代碼。
  • 不支援 Delta 資料表作業的原生 Scala、Python 和 R API。例如, DeltaTable.forPath不支援 。 不過,支援在 Delta 數據表上使用 Delta Lake 作業和 Spark API 的 SQL API (spark.sql(...)例如, spark.read.load)。
  • 複製到 。
  • 使用屬於伺服器目錄一部分的 SQL 函式、Python 或 Scala UDF。 不過,本機引進的 Scala 和 Python UDF 運作正常。
  • Apache Zeppelin 0.7.x 和以下版本。
  • 使用 資料表訪問控制連線到叢集。
  • 聯機到已啟用進程隔離的叢集(換句話說,其中 spark.databricks.pyspark.enableProcessIsolation 設定為 true)。
  • Delta CLONE SQL 命令。
  • 全域暫存檢視。
  • 考拉和pyspark.pandas
  • CREATE TABLE table AS SELECT ... SQL 命令不一定會運作。 請改用 spark.sql("SELECT ...").write.saveAsTable("table")