Megosztás a következőn keresztül:


Példakódok a Scala Databricks Csatlakozás-hoz

Feljegyzés

Ez a cikk a Databricks-Csatlakozás a Databricks Runtime 13.3 LTS-hez és újabb verziókhoz.

Ez a cikk példákat tartalmaz a Databricks Csatlakozás Scalához való használatára. A Databricks Csatlakozás lehetővé teszi népszerű azonosítók, notebook-kiszolgálók és egyéni alkalmazások Azure Databricks-fürtökhöz való csatlakoztatását. Lásd: Mi az a Databricks Csatlakozás?. A cikk Python-verziójával kapcsolatban lásd a Databricks Csatlakozás Pythonhoz készült kód példáit.

Feljegyzés

A Databricks Csatlakozás használatának megkezdése előtt be kell állítania a Databricks Csatlakozás-ügyfelet.

A Databricks számos további példaalkalmazást kínál, amelyek bemutatják a Databricks Csatlakozás használatát. Tekintse meg a Databricks Csatlakozás GitHub-adattárra vonatkozó példaalkalmazásokat, különösen a következőt:

A Databricks Csatlakozás az alábbi egyszerűbb kód példákkal is kísérletezhet. Ezek a példák feltételezik, hogy alapértelmezett hitelesítést használ a Databricks Csatlakozás ügyfélbeállításhoz.

Ez az egyszerű példakód lekérdezi a megadott táblát, majd megjeleníti a megadott tábla első 5 sorát. Ha másik táblát szeretne használni, módosítsa a hívást a következőre spark.read.table: .

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Ez a hosszabb példakód a következőket teszi:

  1. Létrehoz egy memóriabeli DataFrame-et.
  2. Létrehoz egy táblát a zzz_demo_temps_table sémán belül default . Ha már létezik ilyen nevű tábla, a rendszer először törli a táblát. Ha másik sémát vagy táblát szeretne használni, módosítsa a hívásokat az , temps.write.saveAsTablevagy mindkettőrespark.sql.
  3. Menti a DataFrame tartalmát a táblába.
  4. Lekérdezést SELECT futtat a tábla tartalmán.
  5. Megjeleníti a lekérdezés eredményét.
  6. Törli a táblát.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Feljegyzés

Az alábbi példa bemutatja, hogyan használható az SparkSession osztály olyan esetekben, amikor a DatabricksSession Databricks Csatlakozás osztálya nem érhető el.

Ez a példa lekérdezi a megadott táblát, és az első 5 sort adja vissza. Ez a példa a környezeti változót használja a SPARK_REMOTE hitelesítéshez.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}