Dela via


Kodexempel för Databricks Anslut för Scala

Kommentar

Den här artikeln beskriver Databricks Anslut för Databricks Runtime 13.3 LTS och senare.

Den här artikeln innehåller kodexempel som använder Databricks Anslut för Scala. Med Databricks Anslut kan du ansluta populära ID:er, notebook-servrar och anpassade program till Azure Databricks-kluster. Se Vad är Databricks Anslut?. Python-versionen av den här artikeln finns i Kodexempel för Databricks Anslut för Python.

Kommentar

Innan du börjar använda Databricks Anslut måste du konfigurera Databricks-Anslut-klienten.

Databricks innehåller flera ytterligare exempelprogram som visar hur du använder Databricks Anslut. Se exempelprogram för Databricks Anslut lagringsplats i GitHub, mer specifikt:

Du kan också använda följande enklare kodexempel för att experimentera med Databricks Anslut. Dessa exempel förutsätter att du använder standardautentisering för Databricks Anslut klientkonfiguration.

Det här enkla kodexemplet frågar den angivna tabellen och visar sedan den angivna tabellens första 5 rader. Om du vill använda en annan tabell justerar du anropet till spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Det här längre kodexemplet gör följande:

  1. Skapar en minnesintern DataFrame.
  2. Skapar en tabell med namnet zzz_demo_temps_table i default schemat. Om tabellen med det här namnet redan finns tas tabellen bort först. Om du vill använda ett annat schema eller en annan tabell justerar du anropen till spark.sql, temps.write.saveAsTableeller båda.
  3. Sparar dataramens innehåll i tabellen.
  4. Kör en SELECT fråga i tabellens innehåll.
  5. Visar frågans resultat.
  6. Tar bort tabellen.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Kommentar

I följande exempel beskrivs hur du använder SparkSession klassen i fall där DatabricksSession klassen i Databricks Anslut inte är tillgänglig.

Det här exemplet frågar den angivna tabellen och returnerar de första 5 raderna. I det SPARK_REMOTE här exemplet används miljövariabeln för autentisering.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}