Python용 Databricks 커넥트 대한 코드 예제
참고 항목
이 문서에서는 Databricks Runtime 13.0 이상용 Databricks 커넥트 대해 설명합니다.
이 문서에서는 Python용 Databricks 커넥트 사용하는 코드 예제를 제공합니다. Databricks 커넥트 사용하면 인기 있는 IDE, Notebook 서버 및 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다. Databricks 커넥트란?을 참조하세요. 이 문서의 Scala 버전은 Scala용 Databricks 커넥트 대한 코드 예제를 참조하세요.
참고 항목
Databricks 커넥트 사용하기 전에 Databricks 커넥트 클라이언트를 설정해야 합니다.
Databricks는 Databricks 커넥트 사용하는 방법을 보여 주는 몇 가지 추가 예제 애플리케이션을 제공합니다. 특히 GitHub의 Databricks 커넥트 리포지토리에 대한 예제 애플리케이션을 참조하세요.
다음 간단한 코드 예제를 사용하여 Databricks 커넥트 실험할 수도 있습니다. 이러한 예제에서는 Databricks 커넥트 클라이언트 설정에 대한 기본 인증을 사용하고 있다고 가정합니다.
이 간단한 코드 예제에서는 지정된 테이블을 쿼리한 다음 지정된 테이블의 처음 5개 행을 보여 줍니다. 다른 테이블을 사용하려면 호출을 조정합니다 spark.read.table
.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
이 긴 코드 예제는 다음을 수행합니다.
- 메모리 내 데이터 프레임을 만듭니다.
- 스키마 내에
default
이름이zzz_demo_temps_table
있는 테이블을 만듭니다. 이 이름의 테이블이 이미 있는 경우 먼저 테이블이 삭제됩니다. 다른 스키마 또는 테이블을 사용하려면 호출spark.sql
temps.write.saveAsTable
을 둘 다로 조정합니다. - DataFrame의 내용을 테이블에 저장합니다.
SELECT
테이블 내용에 대한 쿼리를 실행합니다.- 쿼리의 결과를 표시합니다.
- 테이블을 삭제합니다.
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
참고 항목
다음 예제에서는 클래스를 사용할 수 없는 환경에서 Databricks Runtime 13.3 LTS 이상용 Databricks 커넥트 간에 이식 가능한 코드를 작성하는 방법을 설명합니다DatabricksSession
.
다음 예제에서는 클래스를 DatabricksSession
사용하거나 클래스를 SparkSession
사용할 수 없는 경우 DatabricksSession
클래스를 사용하여 지정된 테이블을 쿼리하고 처음 5개 행을 반환합니다. 이 예제에서는 인증에 SPARK_REMOTE
환경 변수를 사용합니다.
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)
피드백
https://aka.ms/ContentUserFeedback
출시 예정: 2024년 내내 콘텐츠에 대한 피드백 메커니즘으로 GitHub 문제를 단계적으로 폐지하고 이를 새로운 피드백 시스템으로 바꿀 예정입니다. 자세한 내용은 다음을 참조하세요.다음에 대한 사용자 의견 제출 및 보기