Contoh kode untuk Databricks Connect untuk Scala

Catatan

Artikel ini membahas Databricks Connect untuk Databricks Runtime 13.3 LTS ke atas.

Artikel ini menyediakan contoh kode yang menggunakan Databricks Connect untuk Scala. Databricks Connect memungkinkan Anda menyambungkan IDE populer, server notebook, dan aplikasi kustom ke kluster Azure Databricks. Lihat Databricks Connect. Untuk artikel versi Python ini, lihat Contoh kode untuk Databricks Connect untuk Python.

Sebelum mulai menggunakan Databricks Connect, Anda harus menyiapkan klien Databricks Connect.

Contoh berikut mengasumsikan bahwa Anda menggunakan autentikasi default untuk penyiapan klien Databricks Connect.

Contoh: Membaca tabel

Contoh kode sederhana ini mengkueri tabel yang ditentukan lalu memperlihatkan 5 baris pertama tabel yang ditentukan.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Membuat DataFrame

Contoh kode di bawah ini:

  1. Membuat DataFrame dalam memori.
  2. Membuat tabel dengan nama zzz_demo_temps_table dalam default skema. Jika tabel dengan nama ini sudah ada, tabel akan dihapus terlebih dahulu. Untuk menggunakan skema atau tabel yang berbeda, sesuaikan panggilan ke spark.sql, temps.write.saveAsTable, atau keduanya.
  3. Menyimpan konten DataFrame ke tabel.
  4. SELECT Menjalankan kueri pada konten tabel.
  5. Memperlihatkan hasil kueri.
  6. Menghapus tabel.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Contoh: Menggunakan DatabricksSesssion atau SparkSession

Contoh berikut menjelaskan cara menggunakan SparkSession kelas jika DatabricksSession kelas di Databricks Connect tidak tersedia.

Contoh ini mengkueri tabel yang ditentukan dan mengembalikan 5 baris pertama. Contoh ini menggunakan SPARK_REMOTE variabel lingkungan untuk autentikasi.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}

Sumber daya tambahan

Databricks menyediakan contoh aplikasi tambahan yang menunjukkan cara menggunakan Databricks Connect di Databricks Connect GitHub repositori, termasuk: