注意
Databricks Connect 建議您改用 Databricks Connect for Databricks Runtime 13.0 和更新版本 。
Databricks 計劃 Databricks Connect for Databricks Runtime 12.2 LTS 和以下版本沒有新功能運作。
Databricks Connect 可讓您將熱門 IDE,例如 Visual Studio Code 和 PyCharm、Notebook 伺服器和其他自定義應用程式連線至 Azure Databricks 叢集。
本文說明 Databricks Connect 的運作方式、逐步引導您完成開始使用 Databricks Connect 的步驟、說明如何針對使用 Databricks Connect 時可能發生的問題進行疑難解答,以及使用 Databricks Connect 執行與在 Azure Databricks 筆記本中執行之間的差異。
概觀
Databricks Connect 是 Databricks Runtime 的用戶端連結庫。 它可讓您使用 Spark API 撰寫作業,並在 Azure Databricks 叢集上遠端執行作業,而不是在本機 Spark 工作階段中執行。
例如,當您使用 Databricks Connect 執行 DataFrame 命令 spark.read.format(...).load(...).groupBy(...).agg(...).show()
時,命令的邏輯表示法會傳送至在 Azure Databricks 中執行的 Spark 伺服器,以在遠端叢集上執行。
使用 Databricks Connect,您可以:
- 從任何 Python、R、Scala 或 Java 應用程式執行大規模的 Spark 作業。 您
import pyspark
現在可以從require(SparkR)
import org.apache.spark
應用程式直接執行 Spark 作業,而不需要安裝任何 IDE 外掛程式或使用 Spark 提交腳本。 - 即使在使用遠端叢集時,也請逐步執行並偵錯 IDE 中的程式碼。
- 開發連結庫時會快速反覆運算。 您不需要在 Databricks Connect 中變更 Python 或 Java 連結庫相依性之後重新啟動叢集,因為每個用戶端會話都會與叢集中的彼此隔離。
- 關閉閑置叢集,而不會遺失工作。 由於用戶端應用程式與叢集分離,因此不會受到叢集重新啟動或升級的影響,這通常會導致失去筆記本中定義的所有變數、RDD 和 DataFrame 物件。
注意
針對使用 SQL 查詢進行 Python 開發的 Databricks,Databricks 建議您使用 適用於 Python 的 Databricks SQL 連接器,而不是 Databricks Connect。 適用於 Python 的 Databricks SQL Connector 比 Databricks Connect 更容易設定。 此外,Databricks Connect 會剖析並規劃本機計算機上執行的作業,而作業則會在遠端計算資源上執行。 這會使偵錯運行時間錯誤變得特別困難。 適用於 Python 的 Databricks SQL 連接器會將 SQL 查詢直接提交至遠端計算資源並擷取結果。
需求
本節列出 Databricks Connect 的需求。
僅支援下列 Databricks Runtime 版本:
- Databricks Runtime 12.2 LTS ML,Databricks Runtime 12.2 LTS
- Databricks Runtime 11.3 LTS ML、Databricks Runtime 11.3 LTS
- Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
- Databricks Runtime 9.1 LTS ML、Databricks Runtime 9.1 LTS
- Databricks Runtime 7.3 LTS
您必須在開發計算機上安裝 Python 3,且用戶端 Python 安裝的次要版本必須與 Azure Databricks 叢集的次要 Python 版本相同。 下表顯示每個 Databricks Runtime 一起安裝的 Python 版本。
Databricks Runtime 版本 Python 版本 12.2 LTS ML、12.2 LTS 3.9 11.3 LTS ML、11.3 LTS 3.9 10.4 LTS ML、10.4 LTS 3.8 9.1 LTS ML、9.1 LTS 3.8 7.3 LTS 3.7 Databricks 強烈建議您已針對與 Databricks Connect 搭配使用的每個 Python 版本啟用 Python 虛擬環境 。 Python 虛擬環境可協助您確定您使用的是正確的 Python 版本和 Databricks Connect。 這有助於減少解決相關技術問題所花費的時間。
例如,如果您在開發計算機上使用 venv ,而您的叢集正在執行 Python 3.9,則必須使用該版本建立
venv
環境。 下列範例命令會產生腳本來啟動venv
具有 Python 3.9 的環境,然後此命令會將這些腳本放在目前工作目錄中名為.venv
的隱藏資料夾中:# Linux and macOS python3.9 -m venv ./.venv # Windows python3.9 -m venv .\.venv
若要使用這些腳本來啟用此
venv
環境,請參閱 venvs 的運作方式。另一個範例是,如果您在開發計算機上使用 Conda ,而您的叢集正在執行 Python 3.9,則必須使用該版本建立 Conda 環境,例如:
conda create --name dbconnect python=3.9
若要使用此環境名稱啟動 Conda 環境,請執行
conda activate dbconnect
。Databricks Connect 主要和次要套件版本必須一律符合您的 Databricks 運行時間版本。 Databricks 建議您一律使用與 Databricks Runtime 版本相符的最新 Databricks Connect 套件。 例如,當您使用 Databricks Runtime 12.2 LTS 叢集時,您也必須使用
databricks-connect==12.2.*
套件。Java Runtime Environment (JRE) 8。 用戶端已經過 OpenJDK 8 JRE 的測試。 用戶端不支援 Java 11。
注意
在 Windows 上,如果您看到 Databricks Connect 找不到winutils.exe
的錯誤,請參閱在 Windows 上找不到 winutils.exe
。
設定用戶端
完成下列步驟以設定 Databricks Connect 的本機用戶端。
注意
開始設定本機 Databricks Connect 用戶端之前,您必須 符合 Databricks Connect 的需求 。
步驟 1:安裝 Databricks Connect 用戶端
啟用虛擬環境后,執行
uninstall
命令,以卸載已安裝 PySpark。 這是必要的,databricks-connect
因為套件與 PySpark 衝突。 如需詳細資訊,請參閱 衝突的 PySpark 安裝。 若要檢查是否已安裝 PySpark,請執行show
命令。# Is PySpark already installed? pip3 show pyspark # Uninstall PySpark pip3 uninstall pyspark
在虛擬環境仍啟用之後,請執行
install
命令來安裝 Databricks Connect 用戶端。--upgrade
使用 選項,將任何現有的用戶端安裝升級至指定的版本。pip3 install --upgrade "databricks-connect==12.2.*" # Or X.Y.* to match your cluster version.
注意
Databricks 建議您附加 「dot-asterisk」 表示法來指定
databricks-connect==X.Y.*
,而不是databricks-connect=X.Y
,以確保已安裝最新的套件。
步驟 2:設定連線屬性
收集下列組態屬性。
Azure Databricks 個別工作區 URL。 這與叢集的伺服器主機名值相同
https://
;請參閱取得 Azure Databricks 計算資源的連線詳細數據。您的 Azure Databricks 個人存取令牌 或 Microsoft Entra ID(先前稱為 Azure Active Directory) 令牌。
- 針對 Azure Data Lake Storage (ADLS) 認證傳遞,您必須使用Microsoft Entra ID 令牌。 Microsoft只有在執行 Databricks Runtime 7.3 LTS 和更新版本的標準叢集上才支援 Entra ID 認證傳遞,而且與服務主體驗證不相容。
- 如需使用 Microsoft Entra 識別元令牌進行驗證的詳細資訊,請參閱 使用 Microsoft Entra ID 令牌進行驗證。
叢集的標識碼。 您可以從 URL 取得叢集識別碼。 在這裡,叢集識別碼為
1108-201635-xxxxxxxx
。 另 請參閱叢集 URL 和標識碼。工作區的唯一組織標識碼。 請參閱 取得工作區對象的識別碼。
Databricks Connect 連線到叢集上的埠。 預設連接埠為
15001
。 如果您的叢集已設定為使用不同的埠,例如8787
先前的 Azure Databricks 指示中所提供的埠,請使用已設定的埠號碼。
設定連線,如下所示。
您可以使用 CLI、SQL 組態或環境變數。 組態方法從最高到最低優先順序是:SQL 組態索引鍵、CLI 和環境變數。
CLI(命令列介面)
執行
databricks-connect
。databricks-connect configure
授權會顯示:
Copyright (2018) Databricks, Inc. This library (the "Software") may not be used except in connection with the Licensee's use of the Databricks Platform Services pursuant to an Agreement ...
接受授權並提供組態值。 針對 Databricks 主機 和 Databricks 令牌,輸入您在步驟 1 中指出的工作區 URL 和個人存取令牌。
Do you accept the above agreement? [y/N] y Set new config values (leave input empty to accept default): Databricks Host [no current value, must start with https://]: <databricks-url> Databricks Token [no current value]: <databricks-token> Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id> Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id> Port [15001]: <port>
如果您收到一則訊息,指出Microsoft Entra ID 令牌太長,您可以將 Databricks Token 字段保留空白,並在 中
~/.databricks-connect
手動輸入令牌。
SQL 組態或環境變數。 下表顯示 SQL 組態索引鍵和環境變數,這些環境變數會對應至您在步驟 1 中指出的組態屬性。 若要設定 SQL 組態金鑰,請使用
sql("set config=value")
。 例如:sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")
。參數 SQL 組態金鑰 環境變數名稱 Databricks 主機 spark.databricks.service.address DATABRICKS_地址 Databricks 令牌 spark.databricks.service.token DATABRICKS_API_TOKEN 叢集識別碼 spark.databricks.service.clusterId DATABRICKS_CLUSTER_ID 組織識別碼 spark.databricks.service.orgId DATABRICKS_ORG_ID 連接埠 spark.databricks.service.port DATABRICKS_PORT
在虛擬環境仍啟用之後,測試 Azure Databricks 的聯機能力,如下所示。
databricks-connect test
如果您設定的叢集未執行,測試會啟動叢集,該叢集會持續執行,直到其設定的自動結束時間為止。 輸出應如下所示:
* PySpark is installed at /.../.../pyspark * Checking java version java version "1.8..." Java(TM) SE Runtime Environment (build 1.8...) Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode) * Testing scala command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2... /_/ Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi res0: Long = 4950 scala> :quit * Testing python command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
如果未顯示任何連線相關錯誤(
WARN
訊息沒問題),則您已成功連線。
使用 Databricks Connect
本節說明如何設定慣用的 IDE 或筆記本伺服器,以使用 Databricks Connect 的用戶端。
本節內容:
- JupyterLab
- 傳統 Jupyter Notebook
- PyCharm
- SparkR 和 RStudio Desktop
- sparklyr 和 RStudio Desktop
- IntelliJ (Scala 或 Java)
- PyDev with Eclipse
- 日蝕
- SBT
- Spark 殼層
JupyterLab
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要使用 Databricks Connect 與 JupyterLab 和 Python,請遵循這些指示。
若要安裝 JupyterLab,請啟動 Python 虛擬環境,從終端機或命令提示字元執行下列命令:
pip3 install jupyterlab
若要在網頁瀏覽器中啟動 JupyterLab,請從您啟動的 Python 虛擬環境執行下列命令:
jupyter lab
如果 JupyterLab 未出現在您的網頁瀏覽器中,請從您的虛擬環境中複製以
localhost
或127.0.0.1
開頭的 URL,然後在瀏覽器的網址列中輸入。建立新的筆記本:在 JupyterLab 中,單擊 主功能表上的 [檔案 > 新 > 筆記本 ],選取 [Python 3][ipykernel], 然後按兩下 [ 選取]。
在筆記本的第一個數據格中,輸入 範例程式代碼 或您自己的程序代碼。 如果您使用自己的程式代碼,您至少必須具現化 實例
SparkSession.builder.getOrCreate()
,如範例程式 代碼所示。若要執行筆記本,請按兩下 [ 執行 > 所有儲存格]。
若要偵錯筆記本,請按下筆記本工具列中 Python 3 (ipykernel) 旁的 Bug (啟用調試程式) 圖示。 設定一或多個斷點,然後按下 [ 執行 > 所有單元格]。
若要關閉 JupyterLab,請按兩下 [ 檔案 > 關機]。 如果 JupyterLab 進程仍在終端機或命令提示字元中執行,請按
Ctrl + c
,然後輸入y
以確認來停止此程式。
如需更具體的偵錯指示,請參閱 調試程式。
傳統 Jupyter Notebook
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
Databricks Connect 的組態腳本會自動將套件新增至專案組態。 若要開始使用 Python 核心,請執行:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
若要啟用執行和視覺化 SQL 查詢的 %sql
速記,請使用下列代碼段:
from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
@magics_class
class DatabricksConnectMagics(Magics):
@line_cell_magic
def sql(self, line, cell=None):
if cell and line:
raise ValueError("Line must be empty for cell magic", line)
try:
from autovizwidget.widget.utils import display_dataframe
except ImportError:
print("Please run `pip install autovizwidget` to enable the visualization widget.")
display_dataframe = lambda x: x
return display_dataframe(self.get_spark().sql(cell or line).toPandas())
def get_spark(self):
user_ns = get_ipython().user_ns
if "spark" in user_ns:
return user_ns["spark"]
else:
from pyspark.sql import SparkSession
user_ns["spark"] = SparkSession.builder.getOrCreate()
return user_ns["spark"]
ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)
Visual Studio Code
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要使用 Databricks Connect 與 Visual Studio Code,請執行下列動作:
確認 已安裝 Python 延伸模組 。
開啟命令選擇區 (macOS 上的 Command+Shift+P 和 Windows/Linux 上的 Ctrl+Shift+P )。
選取 Python 解譯器。 移至 [ 程序代碼 > 喜好 > 設定設定],然後選擇 [Python 設定]。
執行
databricks-connect get-jar-dir
。將命令傳回的目錄新增至 底下的
python.venvPath
[用戶設定 JSON]。 這應該新增至 Python 組態。停用linter。 按下右側的 ... 並 編輯 json 設定。 修改過的設定如下所示:
如果使用虛擬環境執行,這是在 VS Code 中針對 Python 進行開發的建議方式,請在命令選擇區類型
select python interpreter
中,指向符合叢集 Python 版本的環境。例如,如果您的叢集是 Python 3.9,您的開發環境應該是 Python 3.9。
PyCharm
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
Databricks Connect 的組態腳本會自動將套件新增至專案組態。
Python 3 叢集
當您建立 PyCharm 專案時,請選取 [現有解釋器]。 從下拉功能表中,選取您建立的 Conda 環境(請參閱 需求)。
移至 [ 執行 > 編輯組態]。
新增
PYSPARK_PYTHON=python3
為環境變數。
SparkR 和 RStudio Desktop
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要使用 Databricks Connect 與 SparkR 和 RStudio Desktop,請執行下列動作:
將 開放原始碼 Spark 散發套件下載並解壓縮到您的開發電腦上。 選擇與 Azure Databricks 叢集中相同的版本(Hadoop 2.7)。
執行
databricks-connect get-jar-dir
。 此命令會傳回類似/usr/local/lib/python3.5/dist-packages/pyspark/jars
的路徑。 複製 JAR 目錄檔案路徑上方一個目錄的檔案路徑,例如 ,/usr/local/lib/python3.5/dist-packages/pyspark
這是SPARK_HOME
目錄。將 Spark 連結庫路徑和 Spark 首頁新增至 R 腳稿頂端,以設定 Spark 連結庫路徑和 Spark 首頁。 將 設定
<spark-lib-path>
為您在步驟 1 中解除封裝 開放原始碼 Spark 套件的目錄。 從步驟 2 設定<spark-home-path>
為 Databricks Connect 目錄。# Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7 library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths()))) # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark Sys.setenv(SPARK_HOME = "<spark-home-path>")
起始 Spark 工作階段並開始執行 SparkR 命令。
sparkR.session() df <- as.DataFrame(faithful) head(df) df1 <- dapply(df, function(x) { x }, schema(df)) collect(df1)
sparklyr 和 RStudio Desktop
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
重要
這項功能處於公開預覽狀態。
您可以複製您使用 Databricks Connect 在本機開發的 sparklyr 相依程式代碼,並在 Azure Databricks 筆記本中執行,或在 Azure Databricks 工作區中裝載 RStudio Server,且程式代碼變更最少或沒有任何變更。
本節內容:
需求
- sparklyr 1.2 或更新版本。
- Databricks Runtime 7.3 LTS 或更新版本與相符的 Databricks Connect 版本。
安裝、設定及使用sparklyr
在 RStudio Desktop 中,從 CRAN 安裝 sparklyr 1.2 或更新版本,或從 GitHub 安裝最新的主要版本。
# Install from CRAN install.packages("sparklyr") # Or install the latest master version from GitHub install.packages("devtools") devtools::install_github("sparklyr/sparklyr")
在已安裝正確版本的 Databricks Connect 中啟動 Python 環境,並在終端機中執行下列命令以取得
<spark-home-path>
:databricks-connect get-spark-home
起始 Spark 工作階段並開始執行 sparklyr 命令。
library(sparklyr) sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>") iris_tbl <- copy_to(sc, iris, overwrite = TRUE) library(dplyr) src_tbls(sc) iris_tbl %>% count
關閉連線。
spark_disconnect(sc)
資源
如需詳細資訊,請參閱 sparklyr GitHub 自述檔。
如需程式代碼範例,請參閱 sparklyr。
sparklyr 和 RStudio Desktop 限制
不支援下列功能:
- sparklyr 串流 API
- sparklyr ML API
- broom API
- csv_file串行化模式
- spark 提交
IntelliJ (Scala 或 Java)
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要使用 Databricks Connect 與 IntelliJ (Scala 或 Java),請執行下列動作:
執行
databricks-connect get-jar-dir
。將相依性指向從 命令傳回的目錄。 移至 [檔案 > 專案結構 > 模組 > 相依性 > '+' 符號 > JAR 或目錄。
為避免衝突,我們強烈建議從您的 classpath 移除任何其他 Spark 安裝。 如果無法這樣做,請確定您新增的 JAR 位於 classpath 的前面。 特別是,它們必須領先於任何其他已安裝的 Spark 版本(否則您會使用其中一個其他 Spark 版本並在本機執行或擲回
ClassDefNotFoundError
)。檢查 IntelliJ 中中斷選項的設定。 默認值為 All ,如果您設定偵錯的斷點,將會導致網路逾時。 將它設定為 Thread ,以避免停止背景網路線程。
PyDev with Eclipse
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要搭配 Eclipse 使用 Databricks Connect 和 PyDev,請遵循這些指示。
- 啟動 Eclipse。
- 建立專案:按兩下 [檔案 > 新 > 專案 > PyDev > PyDev 專案],然後按 [ 下一步]。
- 指定 項目名稱。
- 針對 [項目內容],指定 Python 虛擬環境的路徑。
- 在繼續之前,按兩下 [請設定解釋器]。
- 按兩下 [ 手動設定]。
- 按兩下 [ 新增 > 瀏覽 python/pypy exe]。
- 流覽至並選取從虛擬環境參考之 Python 解釋器的完整路徑,然後按兩下 [ 開啟]。
- 在 [ 選取解釋器] 對話框中,按兩下 [ 確定]。
- 在 [ 需要選取專案] 對話框中,按兩下 [ 確定]。
- 在 [ 喜好設定 ] 對話框中,按兩下 [ 套用並關閉]。
- 在 [PyDev 專案 ] 對話框中,按兩下 [ 完成]。
- 按兩下 [ 開啟檢視方塊]。
- 將 Python 程式代碼 (
.py
) 檔案新增至專案,其中包含 範例程式代碼或您自己的程式代碼 。 如果您使用自己的程式代碼,您至少必須具現化 實例SparkSession.builder.getOrCreate()
,如範例程式 代碼所示。 - 開啟 Python 程式代碼檔案後,設定您希望程式代碼在執行時暫停的任何斷點。
- 按兩下 [執行執行>] 或 [執行偵>錯]。
如需更具體的執行和偵錯指示,請參閱 執行程式。
日蝕
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要使用 Databricks Connect 和 Eclipse,請執行下列動作:
執行
databricks-connect get-jar-dir
。將外部 JAR 組態指向從 命令傳回的目錄。 移至 [專案] 功能表 > [屬性 > Java 建置路徑 > 庫 > ] [新增外部 Jar]。
為避免衝突,我們強烈建議從您的 classpath 移除任何其他 Spark 安裝。 如果無法這樣做,請確定您新增的 JAR 位於 classpath 的前面。 特別是,它們必須領先於任何其他已安裝的 Spark 版本(否則您會使用其中一個其他 Spark 版本並在本機執行或擲回
ClassDefNotFoundError
)。
SBT
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要搭配 SBT 使用 Databricks Connect,您必須將檔案設定 build.sbt
為針對 Databricks Connect JAR 連結,而不是一般的 Spark 連結庫相依性。 您可以使用下列範例組建檔案中的 指示詞來執行此 unmanagedBase
動作,其假設 Scala 應用程式具有 com.example.Test
主要物件:
build.sbt
name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")
Spark 殼層
注意
開始使用 Databricks Connect 之前,您必須先 符合需求 ,並 設定 Databricks Connect 的用戶端 。
若要使用 Databricks Connect 搭配 Spark 殼層和 Python 或 Scala,請遵循這些指示。
啟用虛擬環境后,請確定
databricks-connect test
命令已在 [設定用戶端] 中順利執行。啟動虛擬環境之後,啟動Spark殼層。 針對 Python,執行
pyspark
命令。 針對 Scala,執行spark-shell
命令。# For Python: pyspark
# For Scala: spark-shell
Spark 殼層隨即出現,例如 Python:
Python 3... (v3...) [Clang 6... (clang-6...)] on darwin Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.... /_/ Using Python version 3... (v3...) Spark context Web UI available at http://...:... Spark context available as 'sc' (master = local[*], app id = local-...). SparkSession available as 'spark'. >>>
針對 Scala:
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3... /_/ Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala>
如需如何使用 Spark 殼層搭配 Python 或 Scala 在叢集上執行命令的相關信息,請參閱 Spark 殼層的互動式分析。
使用內
spark
建變數來代表SparkSession
您執行中的叢集上的 ,例如 Python:>>> df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
針對 Scala:
>>> val df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
若要停止 Spark 殼層,請按
Ctrl + d
或 ,或執行 命令Ctrl + z
quit()
或 Python 或 或exit()
:q
Scala:quit
。
程式代碼範例
這個簡單的程式代碼範例會查詢指定的數據表,然後顯示指定的數據表的前 5 個數據列。 若要使用不同的數據表,請調整 對 spark.read.table
的呼叫。
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
這個較長的程式代碼範例會執行下列動作:
- 建立記憶體內部 DataFrame。
- 使用架構內
zzz_demo_temps_table
的名稱default
建立數據表。 如果已有這個名稱的數據表存在,則會先刪除數據表。 若要使用不同的架構或數據表,請調整對spark.sql
、temps.write.saveAsTable
或兩者的呼叫。 - 將 DataFrame 的內容儲存至數據表。
- 在數據表的內容上執行
SELECT
查詢。 - 顯示查詢的結果。
- 刪除資料表。
Python(程式語言)
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName('temps-demo').getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
程式語言 Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object Demo {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local").getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
爪哇島
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
temps.write().saveAsTable("zzz_demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table");
}
}
使用相依性
一般而言,您的主要類別或 Python 檔案會有其他相依性 JAR 和檔案。 您可以呼叫 sparkContext.addJar("path-to-the-jar")
或 sparkContext.addPyFile("path-to-the-file")
來新增這類相依性 JAR 和檔案。 您也可以使用 addPyFile()
介面新增 Egg 檔案和 zip 檔案。 每次在 IDE 中執行程式代碼時,相依性 JAR 和檔案都會安裝在叢集上。
Python(程式語言)
from lib import Foo
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
#sc.setLogLevel("INFO")
print("Testing simple count")
print(spark.range(100).count())
print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())
class Foo(object):
def __init__(self, x):
self.x = x
Python + Java UDF
from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column
## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
# val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}
spark = SparkSession.builder \
.config("spark.jars", "/path/to/udf.jar") \
.getOrCreate()
sc = spark.sparkContext
def plus_one_udf(col):
f = sc._jvm.com.example.Test.plusOne()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()
程式語言 Scala
package com.example
import org.apache.spark.sql.SparkSession
case class Foo(x: String)
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
...
.getOrCreate();
spark.sparkContext.setLogLevel("INFO")
println("Running simple show query...")
spark.read.format("parquet").load("/tmp/x").show()
println("Running simple UDF query...")
spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
spark.udf.register("f", (x: Int) => x + 1)
spark.range(10).selectExpr("f(id)").show()
println("Running custom objects query...")
val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
println(objs.toSeq)
}
}
存取 Databricks 公用程式
本節說明如何使用 Databricks Connect 來存取 Databricks 公用程式。
您可以使用 dbutils.fs
和 dbutils.secrets
公用程式,這些是 Databricks Utilities (dbutils
) 參考模組的一部分。
支援的指令包括dbutils.fs.cp
、、dbutils.fs.head
dbutils.fs.ls
dbutils.fs.mkdirs
dbutils.fs.mv
dbutils.fs.put
、dbutils.fs.rm
、dbutils.secrets.get
、、、、 。 dbutils.secrets.getBytes
dbutils.secrets.list
dbutils.secrets.listScopes
請參閱 檔案系統公用程式 (dbutils.fs) 或 run dbutils.fs.help()
和 Secrets 公用程式 (dbutils.secrets) 或執行 dbutils.secrets.help()
。
Python(程式語言)
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())
使用 Databricks Runtime 7.3 LTS 或更新版本時,若要以在本機和 Azure Databricks 叢集中運作的方式存取 DBUtils 模組,請使用下列專案 get_dbutils()
:
def get_dbutils(spark):
from pyspark.dbutils import DBUtils
return DBUtils(spark)
否則,請使用下列 get_dbutils()
:
def get_dbutils(spark):
if spark.conf.get("spark.databricks.service.client.enabled") == "true":
from pyspark.dbutils import DBUtils
return DBUtils(spark)
else:
import IPython
return IPython.get_ipython().user_ns["dbutils"]
程式語言 Scala
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())
在本機和遠端檔案系統之間複製檔案
您可以使用 dbutils.fs
在用戶端與遠端檔案系統之間複製檔案。 配置 file:/
是指用戶端上的本機檔系統。
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')
可透過該方式傳輸的檔案大小上限為 250 MB。
使 dbutils.secrets.get
由於安全性限制,預設會停用呼叫 dbutils.secrets.get
的能力。 請連絡 Azure Databricks 支持人員,為您的工作區啟用此功能。
設定Hadoop組態
在用戶端上,您可以使用適用於 SQL 和 DataFrame 作業的 spark.conf.set
API 來設定 Hadoop 組態。 上設定的 sparkContext
Hadoop組態必須在叢集組態或使用筆記本中設定。 這是因為上 sparkContext
設定的組態不會系結至用戶會話,但會套用至整個叢集。
疑難排解
執行 databricks-connect test
以檢查連線問題。 本節說明您在 Databricks Connect 中可能會遇到的一些常見問題,以及如何加以解決。
本節內容:
- Python 版本不符
- 伺服器未啟用
- 衝突的 PySpark 安裝
-
衝突
SPARK_HOME
-
二進位檔的衝突或遺漏
PATH
專案 - 叢集上的串行化設定衝突
-
在 Windows 上找不到
winutils.exe
- Windows 上的檔名、目錄名稱或磁碟區標籤法不正確
Python 版本不符
檢查您在本機使用的 Python 版本,至少具有與叢集上版本相同的次要版本(例如, 3.9.16
相較於 3.9.15
是 OK, 3.9
與 3.8
不是)。
如果您在本機安裝多個 Python 版本,請藉由設定 PYSPARK_PYTHON
環境變數 (例如, PYSPARK_PYTHON=python3
) 確定 Databricks Connect 使用正確的版本。
伺服器未啟用
確定叢集已啟用 Spark 伺服器。spark.databricks.service.server.enabled true
如果是,您應該會在驅動程序記錄中看到下列幾行:
../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms
衝突的 PySpark 安裝
套件 databricks-connect
與 PySpark 衝突。 在 Python 中初始化 Spark 內容時,安裝這兩者會造成錯誤。 這可以透過數種方式來顯示,包括「數據流損毀」或「找不到類別」錯誤。 如果您已在 Python 環境中安裝 PySpark,請確定它已卸載,再安裝 databricks-connect。 卸載 PySpark 之後,請務必完全重新安裝 Databricks Connect 套件:
pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*" # or X.Y.* to match your specific cluster version.
衝突 SPARK_HOME
如果您先前已在計算機上使用Spark,IDE可能會設定為使用其中一個其他版本的Spark,而不是 Databricks Connect Spark。 這可以透過數種方式來顯示,包括「數據流損毀」或「找不到類別」錯誤。 您可以藉由檢查環境變數的值 SPARK_HOME
來查看正在使用哪一個 Spark 版本:
Python(程式語言)
import os
print(os.environ['SPARK_HOME'])
程式語言 Scala
println(sys.env.get("SPARK_HOME"))
爪哇島
System.out.println(System.getenv("SPARK_HOME"));
解決方法
如果 SPARK_HOME
設定為用戶端中 Spark 以外的版本,您應該取消設定 SPARK_HOME
變數,然後再試一次。
檢查 IDE 環境變數設定、您的 .bashrc
、 .zshrc
或 .bash_profile
檔案,以及可能設定的任何其他環境變數。 您很可能必須結束並重新啟動 IDE 以清除舊狀態,而且如果問題持續發生,您甚至可能需要建立新的專案。
您不應該將 設定為新的值;取消設定 SPARK_HOME
它應該就已足夠。
二進位檔的衝突或遺漏 PATH
專案
您可以設定PATH,讓類似的 spark-shell
命令執行其他先前安裝的二進位檔,而不是 Databricks Connect 所提供的二進位檔。 這可能會導致 databricks-connect test
失敗。 您應該確定 Databricks Connect 二進位檔優先,或移除先前安裝的二進位檔。
如果您無法執行像spark-shell
這樣的命令,也可能是pip3 install
無法自動設定您的PATH,因此您必須手動將安裝目錄bin
新增至您的PATH。 即使沒有設定,仍然可以使用 Databricks Connect 和 IDE。 不過, databricks-connect test
命令將無法運作。
叢集上的串行化設定衝突
如果您在執行 databricks-connect test
時看到「數據流損毀」錯誤,這可能是因為叢集串行化設定不相容。 例如,設定 spark.io.compression.codec
設定可能會導致此問題。 若要解決此問題,請考慮從叢集設定中移除這些設定,或在 Databricks Connect 用戶端中設定組態。
在 Windows 上找不到winutils.exe
如果您在 Windows 上使用 Databricks Connect,請參閱:
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
請遵循指示在 Windows 上設定 Hadoop 路徑。
Windows 上的檔名、目錄名稱或磁碟區標籤法不正確
如果您使用 Windows 和 Databricks Connect,請參閱:
The filename, directory name, or volume label syntax is incorrect.
Java 或 Databricks Connect 已安裝到路徑中有空格的目錄。 您可以藉由安裝至不含空格的目錄路徑,或使用簡短名稱格式來設定路徑,來解決此問題。
使用 Microsoft Entra 識別碼令牌進行驗證
注意
下列資訊僅適用於 Databricks Connect 7.3.5 到 12.2.x 版。
Databricks Connect for Databricks Runtime 13.3 LTS 和更新版本目前不支援Microsoft Entra ID 令牌。
當您使用 Databricks Connect 7.3.5 到 12.2.x 版時,您可以使用 Microsoft Entra ID 令牌進行驗證,而不是使用個人存取令牌。 Microsoft Entra 標識符令牌的存留期有限。 當Microsoft Entra ID 令牌過期時,Databricks Connect 會失敗並出現 Invalid Token
錯誤。
針對 Databricks Connect 7.3.5 到 12.2.x 版,您可以在執行中的 Databricks Connect 應用程式中提供Microsoft Entra ID 令牌。 您的應用程式必須取得新的存取令牌,並將其設定為 spark.databricks.service.token
SQL 組態密鑰。
Python(程式語言)
spark.conf.set("spark.databricks.service.token", new_aad_token)
程式語言 Scala
spark.conf.set("spark.databricks.service.token", newAADToken)
更新令牌之後,應用程式可以繼續使用會話內容中建立的相同 SparkSession
和任何對象和狀態。 為避免間歇性錯誤,Databricks 建議您在舊令牌到期之前提供新的令牌。
您可以延長Microsoft Entra ID 令牌的存留期,以在應用程式執行期間保存。 若要這樣做,請將 具有適當長存留期的 TokenLifetimePolicy 附加至您用來取得存取令牌的 Microsoft Entra ID 授權應用程式。
注意
Microsoft Entra ID 傳遞使用兩個令牌:Microsoft Databricks Connect 7.3.5 版至 12.2.x 版中所設定的 Entra ID 存取令牌,以及 Databricks 處理要求時所產生特定資源的 ADLS 傳遞令牌。 您無法使用 Microsoft Entra ID 令牌存留期原則來延長 ADLS 傳遞令牌的存留期。 如果您將命令傳送至超過一小時的叢集,如果命令在一小時後存取 ADLS 資源,就會失敗。
限制
- 結構化串流。
- 在遠端叢集上執行不屬於 Spark 作業的任意程式代碼。
- 不支援 Delta 資料表作業的原生 Scala、Python 和 R API。例如,
DeltaTable.forPath
不支援 。 不過,支援在 Delta 數據表上使用 Delta Lake 作業和 Spark API 的 SQL API (spark.sql(...)
例如,spark.read.load
)。 - 複製到 。
- 使用屬於伺服器目錄一部分的 SQL 函式、Python 或 Scala UDF。 不過,本機引進的 Scala 和 Python UDF 運作正常。
- Apache Zeppelin 0.7.x 和以下版本。
- 使用 資料表訪問控制連線到叢集。
- 聯機到已啟用進程隔離的叢集(換句話說,其中
spark.databricks.pyspark.enableProcessIsolation
設定為true
)。 - Delta
CLONE
SQL 命令。 - 全域暫存檢視。
-
考拉和
pyspark.pandas
。 -
CREATE TABLE table AS SELECT ...
SQL 命令不一定會運作。 請改用spark.sql("SELECT ...").write.saveAsTable("table")
。
- Microsoft只有在執行 Databricks Runtime 7.3 LTS 和更新版本的標準叢集上才支援 Entra ID 認證傳遞,而且與服務主體驗證不相容。
- 下列 Databricks 工具(
dbutils
)參考資料: