Partager via


Exemples de code Databricks Connect pour Scala

Remarque

Cet article présente Databricks Connect pour Databricks Runtime 13.3 LTS et les versions ultérieures.

Cet article fournit des exemples de code qui utilisent Databricks Connect pour Scala. Databricks Connect vous permet de connecter des IDE populaires, des serveurs de notebook et des applications personnalisées aux clusters Azure Databricks. Consultez Qu’est-ce que Databricks Connect ?. Pour la version Python de cet article, consultez Databricks Connect pour Python.

Remarque

Avant de commencer à utiliser Databricks Connect, vous devez configurer le client Databricks Connect.

Databricks fournit plusieurs exemples d'applications supplémentaires qui montrent comment utiliser Databricks Connect. Reportez-vous au référentiel d’exemples d’applications pour Databricks Connect dans GitHub, spécifiquement :

Vous pouvez également utiliser les exemples de code plus simples suivants pour expérimenter Databricks Connect. Ces exemples supposent que vous utilisez l’authentification par défaut pour la configuration du client Databricks Connect.

Cet exemple de code simple interroge la table spécifiée, puis affiche les 5 premières lignes de la table spécifiée. Pour utiliser une autre table, ajustez l’appel sur spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

Cet exemple de code plus long effectue les opérations suivantes :

  1. Crée un DataFrame en mémoire.
  2. Crée une table portant le nom zzz_demo_temps_table dans le schéma default. Si la table portant ce nom existe déjà, la table est d’abord supprimée. Pour utiliser un autre schéma ou table, ajustez les appels sur spark.sql, temps.write.saveAsTable ou les deux.
  3. Enregistre le contenu du DataFrame dans la table.
  4. Exécute une requête SELECT sur le contenu de la table.
  5. Affiche le résultat de la requête.
  6. Supprime la table.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

Remarque

L’exemple suivant décrit comment utiliser la classe SparkSession, dans les cas où la classe DatabricksSession n’est pas disponible dans Databricks Connect.

Cet exemple interroge la table spécifiée et retourne les cinq premières lignes. Cet exemple utilise la variable d’environnement SPARK_REMOTE pour l’authentification.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}