다음을 통해 공유


Scala용 Databricks 커넥트 대한 코드 예제

참고 항목

이 문서에서는 Databricks Runtime 13.3 LTS 이상에 대한 Databricks 커넥트 설명합니다.

이 문서에서는 Scala에 Databricks 커넥트 사용하는 코드 예제를 제공합니다. Databricks 커넥트 사용하면 인기 있는 IDE, Notebook 서버 및 사용자 지정 애플리케이션을 Azure Databricks 클러스터에 연결할 수 있습니다. Databricks 커넥트란?을 참조하세요. 이 문서의 Python 버전은 Python용 Databricks 커넥트 대한 코드 예제를 참조하세요.

참고 항목

Databricks 커넥트 사용하기 전에 Databricks 커넥트 클라이언트를 설정해야 합니다.

Databricks는 Databricks 커넥트 사용하는 방법을 보여 주는 몇 가지 추가 예제 애플리케이션을 제공합니다. 특히 GitHub의 Databricks 커넥트 리포지토리에 대한 예제 애플리케이션을 참조하세요.

다음 간단한 코드 예제를 사용하여 Databricks 커넥트 실험할 수도 있습니다. 이러한 예제에서는 Databricks 커넥트 클라이언트 설정에 대한 기본 인증을 사용하고 있다고 가정합니다.

이 간단한 코드 예제에서는 지정된 테이블을 쿼리한 다음 지정된 테이블의 처음 5개 행을 보여 줍니다. 다른 테이블을 사용하려면 호출을 조정합니다 spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

이 긴 코드 예제는 다음을 수행합니다.

  1. 메모리 내 데이터 프레임을 만듭니다.
  2. 스키마 내에 default 이름이 zzz_demo_temps_table 있는 테이블을 만듭니다. 이 이름의 테이블이 이미 있는 경우 먼저 테이블이 삭제됩니다. 다른 스키마 또는 테이블을 사용하려면 호출spark.sqltemps.write.saveAsTable을 둘 다로 조정합니다.
  3. DataFrame의 내용을 테이블에 저장합니다.
  4. SELECT 테이블 내용에 대한 쿼리를 실행합니다.
  5. 쿼리의 결과를 표시합니다.
  6. 테이블을 삭제합니다.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

참고 항목

다음 예제에서는 Databricks 커넥트 클래스를 DatabricksSession 사용할 수 없는 경우 클래스를 사용하는 SparkSession 방법을 설명합니다.

다음은 지정한 테이블을 쿼리하고 처음 5개 행을 반환하는 예제입니다. 이 예제에서는 인증에 SPARK_REMOTE 환경 변수를 사용합니다.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}