다음을 통해 공유


Python용 Databricks 커넥트 대한 코드 예제

참고 항목

이 문서에서는 Databricks Runtime 13.0 이상용 Databricks 커넥트 대해 설명합니다.

이 문서에서는 Python용 Databricks 커넥트 사용하는 코드 예제를 제공합니다. Databricks 커넥트 사용하면 인기 있는 IDE, Notebook 서버 및 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다. Databricks 커넥트란?을 참조하세요. 이 문서의 Scala 버전은 Scala용 Databricks 커넥트 대한 코드 예제를 참조하세요.

참고 항목

Databricks 커넥트 사용하기 전에 Databricks 커넥트 클라이언트를 설정해야 합니다.

Databricks는 Databricks 커넥트 사용하는 방법을 보여 주는 몇 가지 추가 예제 애플리케이션을 제공합니다. 특히 GitHub의 Databricks 커넥트 리포지토리에 대한 예제 애플리케이션을 참조하세요.

다음 간단한 코드 예제를 사용하여 Databricks 커넥트 실험할 수도 있습니다. 이러한 예제에서는 Databricks 커넥트 클라이언트 설정에 대한 기본 인증을 사용하고 있다고 가정합니다.

이 간단한 코드 예제에서는 지정된 테이블을 쿼리한 다음 지정된 테이블의 처음 5개 행을 보여 줍니다. 다른 테이블을 사용하려면 호출을 조정합니다 spark.read.table.

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

이 긴 코드 예제는 다음을 수행합니다.

  1. 메모리 내 데이터 프레임을 만듭니다.
  2. 스키마 내에 default 이름이 zzz_demo_temps_table 있는 테이블을 만듭니다. 이 이름의 테이블이 이미 있는 경우 먼저 테이블이 삭제됩니다. 다른 스키마 또는 테이블을 사용하려면 호출spark.sqltemps.write.saveAsTable을 둘 다로 조정합니다.
  3. DataFrame의 내용을 테이블에 저장합니다.
  4. SELECT 테이블 내용에 대한 쿼리를 실행합니다.
  5. 쿼리의 결과를 표시합니다.
  6. 테이블을 삭제합니다.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date

spark = DatabricksSession.builder.getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
  StructField('AirportCode', StringType(), False),
  StructField('Date', DateType(), False),
  StructField('TempHighF', IntegerType(), False),
  StructField('TempLowF', IntegerType(), False)
])

data = [
  [ 'BLI', date(2021, 4, 3), 52, 43],
  [ 'BLI', date(2021, 4, 2), 50, 38],
  [ 'BLI', date(2021, 4, 1), 52, 41],
  [ 'PDX', date(2021, 4, 3), 64, 45],
  [ 'PDX', date(2021, 4, 2), 61, 41],
  [ 'PDX', date(2021, 4, 1), 66, 39],
  [ 'SEA', date(2021, 4, 3), 57, 43],
  [ 'SEA', date(2021, 4, 2), 54, 39],
  [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
  "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
  "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
  "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')

참고 항목

다음 예제에서는 클래스를 사용할 수 없는 환경에서 Databricks Runtime 13.3 LTS 이상용 Databricks 커넥트 간에 이식 가능한 코드를 작성하는 방법을 설명합니다DatabricksSession.

다음 예제에서는 클래스를 DatabricksSession 사용하거나 클래스를 SparkSession 사용할 수 없는 경우 DatabricksSession 클래스를 사용하여 지정된 테이블을 쿼리하고 처음 5개 행을 반환합니다. 이 예제에서는 인증에 SPARK_REMOTE 환경 변수를 사용합니다.

from pyspark.sql import SparkSession, DataFrame

def get_spark() -> SparkSession:
  try:
    from databricks.connect import DatabricksSession
    return DatabricksSession.builder.getOrCreate()
  except ImportError:
    return SparkSession.builder.getOrCreate()

def get_taxis(spark: SparkSession) -> DataFrame:
  return spark.read.table("samples.nyctaxi.trips")

get_taxis(get_spark()).show(5)