Codevoorbeelden voor Databricks-Verbinding maken voor Scala
Notitie
Dit artikel bevat informatie over Databricks Verbinding maken voor Databricks Runtime 13.3 LTS en hoger.
Dit artikel bevat codevoorbeelden die Gebruikmaken van Databricks Verbinding maken voor Scala. Met Databricks Verbinding maken kunt u populaire IDE's, notebookservers en aangepaste toepassingen verbinden met Azure Databricks-clusters. Zie Wat is Databricks Verbinding maken? Zie Codevoorbeelden voor Databricks Verbinding maken voor Python voor de Python-versie van dit artikel.
Notitie
Voordat u Databricks Verbinding maken gaat gebruiken, moet u de Databricks Verbinding maken-client instellen.
Databricks biedt verschillende aanvullende voorbeeldtoepassingen die laten zien hoe u Databricks Verbinding maken gebruikt. Bekijk de voorbeeldtoepassingen voor Databricks Verbinding maken-opslagplaats in GitHub, met name:
U kunt ook de volgende eenvoudigere codevoorbeelden gebruiken om te experimenteren met Databricks Verbinding maken. In deze voorbeelden wordt ervan uitgegaan dat u standaardverificatie gebruikt voor het instellen van databricks Verbinding maken client.
In dit eenvoudige codevoorbeeld wordt een query uitgevoerd op de opgegeven tabel en worden vervolgens de eerste vijf rijen van de opgegeven tabel weergegeven. Als u een andere tabel wilt gebruiken, past u de aanroep aan.spark.read.table
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
Dit langere codevoorbeeld doet het volgende:
- Hiermee maakt u een DataFrame in het geheugen.
- Hiermee maakt u een tabel met de naam
zzz_demo_temps_table
in hetdefault
schema. Als de tabel met deze naam al bestaat, wordt de tabel eerst verwijderd. Als u een ander schema of een andere tabel wilt gebruiken, past u de aanroepen aanspark.sql
,temps.write.saveAsTable
of beide aan. - Slaat de inhoud van het DataFrame op in de tabel.
- Voert een
SELECT
query uit op de inhoud van de tabel. - Geeft het resultaat van de query weer.
- Hiermee verwijdert u de tabel.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
Notitie
In het volgende voorbeeld wordt beschreven hoe u de SparkSession
klasse gebruikt in gevallen waarin de DatabricksSession
klasse in Databricks Verbinding maken niet beschikbaar is.
In dit voorbeeld wordt een query uitgevoerd op de opgegeven tabel en worden de eerste vijf rijen geretourneerd. In dit voorbeeld wordt de SPARK_REMOTE
omgevingsvariabele gebruikt voor verificatie.
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}
Feedback
https://aka.ms/ContentUserFeedback.
Binnenkort beschikbaar: In de loop van 2024 zullen we GitHub-problemen geleidelijk uitfaseren als het feedbackmechanisme voor inhoud en deze vervangen door een nieuw feedbacksysteem. Zie voor meer informatie:Feedback verzenden en weergeven voor