Condividi tramite


Esempi di codice per databricks Connessione per Scala

Nota

Questo articolo illustra databricks Connessione per Databricks Runtime 13.3 LTS e versioni successive.

Questo articolo fornisce esempi di codice che usano databricks Connessione per Scala. Databricks Connessione consente di connettere gli IDE, i server notebook e le applicazioni personalizzate più diffusi ai cluster Azure Databricks. Vedere Che cos'è Databricks Connessione?. Per la versione Python di questo articolo, vedere Esempi di codice per Databricks Connessione per Python.

Nota

Prima di iniziare a usare Databricks Connessione, è necessario configurare il client databricks Connessione.

Databricks offre diverse applicazioni di esempio aggiuntive che illustrano come usare Databricks Connessione. Vedere le applicazioni di esempio per Databricks Connessione repository in GitHub, in particolare:

È anche possibile usare gli esempi di codice più semplici seguenti per sperimentare con Databricks Connessione. Questi esempi presuppongono che si usi l'autenticazione predefinita per Databricks Connessione configurazione del client.

Questo semplice esempio di codice esegue una query sulla tabella specificata e quindi mostra le prime 5 righe della tabella specificata. Per usare una tabella diversa, modificare la chiamata a spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Questo esempio di codice più lungo esegue le operazioni seguenti:

  1. Crea un dataframe in memoria.
  2. Crea una tabella con il nome zzz_demo_temps_table all'interno dello default schema. Se la tabella con questo nome esiste già, la tabella viene eliminata per prima. Per usare uno schema o una tabella diversa, modificare le chiamate a spark.sql, temps.write.saveAsTableo entrambe.
  3. Salva il contenuto del dataframe nella tabella.
  4. Esegue una SELECT query sul contenuto della tabella.
  5. Mostra il risultato della query.
  6. Elimina la tabella.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Nota

L'esempio seguente descrive come usare la SparkSession classe nei casi in cui la DatabricksSession classe in Databricks Connessione non è disponibile.

Questo esempio esegue una query sulla tabella specificata e restituisce le prime 5 righe. In questo esempio viene usata la SPARK_REMOTE variabile di ambiente per l'autenticazione.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}