Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Nota
Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 LTS e versioni successive.
Questo articolo fornisce esempi di codice che usano Databricks Connect per Scala. Databricks Connect consente di connettere gli IDE, i server notebook e le applicazioni personalizzate più diffusi ai cluster Azure Databricks. Consultare Cos’è Databricks Connect?. Per la versione Python di questo articolo, vedere Esempi di codice per Databricks Connect per Python.
Prima di iniziare a usare Databricks Connect, è necessario configurare il client Databricks Connect.
Gli esempi seguenti presuppongono che si stia usando l'autenticazione predefinita per la configurazione del client Databricks Connect.
Esempio: Leggere una tabella
Questo semplice esempio di codice esegue una query sulla tabella specificata e quindi mostra le prime 5 righe della tabella specificata.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
Creare un dataframe
Questo esempio di codice seguente:
- Crea un dataframe in memoria.
- Crea una tabella con il nome
zzz_demo_temps_tableall'interno dello schemadefault. Se la tabella con questo nome esiste già, la tabella viene eliminata per prima. Per usare uno schema o una tabella diversa, modificare le chiamate aspark.sql,temps.write.saveAsTableo entrambe. - Salva il contenuto del dataframe nella tabella.
- Esegue una
SELECTquery sul contenuto della tabella. - Mostra il risultato della query.
- Elimina la tabella.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
Esempio: Usare DatabricksSesssion o SparkSession
Nell'esempio seguente viene descritto come usare la classe SparkSession nei casi in cui la classe DatabricksSession in Databricks Connect non è disponibile.
Questo esempio esegue una query sulla tabella specificata e restituisce le prime 5 righe. In questo esempio viene usata la SPARK_REMOTE variabile di ambiente per l'autenticazione.
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}
Risorse aggiuntive
Databricks offre applicazioni di esempio aggiuntive che illustrano come usare Databricks Connect nel repository GitHub databricks Connect, tra cui: