참고
이 문서에서는 Databricks Runtime 13.3 LTS 이상에 대한 Databricks Connect에 대해 설명합니다.
이 문서에서는 Scala용 Databricks Connect를 사용하는 코드 예제를 제공합니다. Databricks Connect를 사용하면 인기 있는 IDE, Notebook 서버 및 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다.
Databricks Connect를 참조하세요. 이 문서의 Python 버전은
Databricks Connect 사용을 시작하기 전에 Databricks Connect 클라이언트을 설정해야 합니다.
다음 예제에서는 Databricks Connect 클라이언트 설정에 기본 인증을 사용한다고 가정합니다.
예: 테이블 읽기
이 간단한 코드 예제에서는 지정된 테이블을 쿼리한 다음 지정된 테이블의 처음 5개 행을 보여 줍니다.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
DataFrame 만들기
아래 코드 예제:
- 메모리 내 데이터 프레임을 만듭니다.
-
zzz_demo_temps_table스키마 내에 이름이default인 테이블을 생성합니다. 이 이름의 테이블이 이미 있는 경우 먼저 테이블이 삭제됩니다. 다른 스키마 또는 테이블을 사용하려면 호출을spark.sql,temps.write.saveAsTable또는 둘 다로 조정합니다. - DataFrame의 내용을 테이블에 저장합니다.
-
SELECT테이블 내용에 대한 쿼리를 실행합니다. - 쿼리의 결과를 표시합니다.
- 테이블을 삭제합니다.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
예: DatabricksSession 또는 SparkSession 사용
다음 예제에서는 Databricks Connect의 SparkSession 클래스를 사용할 수 없는 경우 DatabricksSession 클래스를 사용하는 방법을 설명합니다.
다음은 지정한 테이블을 쿼리하고 처음 5개 행을 반환하는 예제입니다. 이 예제에서는 인증에 SPARK_REMOTE 환경 변수를 사용합니다.
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}
추가 리소스
Databricks는 다음을 포함하여 Databricks Connect GitHub 리포지토리 Databricks Connect를 사용하는 방법을 보여 주는 추가 예제 애플리케이션을 제공합니다.