分享方式:


適用於 Python 的 Databricks Connect 程式代碼範例

注意

本文涵蓋 Databricks Runtime 13.3 LTS 和更新版本適用的 Databricks Connect。

本文提供使用 Databricks Connect for Python 的程式代碼範例。 Databricks Connect 可讓您將熱門的 IDE、Notebook 伺服器和自定義應用程式連線到 Azure Databricks 叢集。 請參閱 什麼是 Databricks Connect?。 如需本文的 Scala 版本,請參閱 Databricks Connect for Scala 的程式碼範例。

注意

開始使用 Databricks Connect 之前,您必須先 設定 Databricks Connect 用戶端

Databricks 提供數個額外的範例應用程式,示範如何使用 Databricks Connect。 請參閱 GitHub 中 Databricks Connect 存放庫的範例應用程式,特別是:

您也可以使用下列更簡單的程式代碼範例來實驗 Databricks Connect。 這些範例假設您使用 Databricks Connect 用戶端設定的預設驗證

這個簡單的程式代碼範例會查詢指定的數據表,然後顯示指定的數據表的前 5 個數據列。 若要使用不同的數據表,請調整 對 spark.read.table的呼叫。

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

這個較長的程式代碼範例會執行下列動作:

  1. 建立記憶體內部 DataFrame。
  2. 使用架構內default的名稱zzz_demo_temps_table建立數據表。 如果已有這個名稱的數據表存在,則會先刪除數據表。 若要使用不同的架構或數據表,請調整對 spark.sqltemps.write.saveAsTable或兩者的呼叫。
  3. 將 DataFrame 的內容儲存至數據表。
  4. SELECT在數據表的內容上執行查詢。
  5. 顯示查詢的結果。
  6. 刪除資料表。
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

注意

下列範例說明如何在無法使用 類別的環境中 DatabricksSession ,撰寫 Databricks Connect for Databricks Runtime 13.3 LTS 和更新版本之間可移植的程序代碼。

下列範例會DatabricksSession使用 類別,或在類別無法使用時DatabricksSession使用 SparkSession 類別來查詢指定的數據表,並傳回前 5 個數據列。 此範例會 SPARK_REMOTE 使用環境變數進行驗證。

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)