次の方法で共有


Databricks Connect for Python のコード例

Note

この記事では、Databricks Runtime 13.0 以降用の Databricks Connect について説明します。

この記事では、Databricks Connect for Python を使用するコード例を紹介します。 Databricks Connect を使用すると、一般的な IDE、ノートブック サーバー、カスタム アプリケーションを Azure Databricks クラスターに接続できます。 「Databricks Connect とは」を参照してください。 この記事の Scala バージョンについては、「Databricks Connect for Scala のコード例」を参照してください。

Note

Databricks Connect の使用を開始する前に、Databricks Connect クライアントを設定する必要があります。

Databricks には、Databricks Connect の使用方法を示すいくつかのサンプル アプリケーションが用意されています。 具体的には、GitHub にある Databricks Connect のサンプル アプリケーション リポジトリを参照してください。

また、次のより単純なコード サンプルを使用して、Databricks Connect を実験することもできます。 これらの例では、Databricks Connect のクライアントのセットアップに既定の認証を使用していることを前提としています。

この単純なコード例は、指定されたテーブルに対してクエリを実行して、指定されたテーブルの最初の 5 行を表示します。 異なるテーブルを使用するには、spark.read.table への呼び出しを調整します。

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

次の長いコード例の実行内容は次のとおりです。

  1. メモリ内 DataFrame を作成します。
  2. default スキーマ内に zzz_demo_temps_table という名前のテーブルを作成します。 この名前のテーブルが既に存在する場合は、まずテーブルが削除されます。 異なるスキーマまたはテーブルを使用するには、spark.sqltemps.write.saveAsTable、または両方への呼び出しを調整します。
  3. DataFrame の内容をテーブルに保存します。
  4. テーブルの内容に対して SELECT クエリを実行します。
  5. クエリの結果を表示します。
  6. テーブルを削除します。
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

Note

次の例では、DatabricksSession クラスが使用できない環境で、Databricks Connect for Databricks Runtime 13.3 LTS 以上の間で移植可能なコードを記述する方法について説明します。

次の例では、DatabricksSession クラスを使用するか、DatabricksSession クラスが使用できない場合は SparkSession クラスを使用して、指定したテーブルに対してクエリを実行し、最初の 5 行を返します。 この例では、認証に SPARK_REMOTE 環境変数を使用します。

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)