Příklady kódu pro databricks Připojení pro Scala
Poznámka:
Tento článek se zabývá Připojení Databricks pro Databricks Runtime 13.3 LTS a vyšší.
Tento článek obsahuje příklady kódu, které používají Databricks Připojení pro Scala. Databricks Připojení umožňuje připojit k clusterům Azure Databricks oblíbené prostředí IDEs, servery poznámkových bloků a vlastní aplikace. Podívejte se, co je Databricks Připojení?. Verze pythonu tohoto článku najdete v příkladech kódu pro Databricks Připojení pro Python.
Poznámka:
Než začnete používat databricks Připojení, musíte nastavit klienta Připojení Databricks.
Databricks poskytuje několik dalších ukázkových aplikací, které ukazují, jak používat Databricks Připojení. Podívejte se na ukázkové aplikace pro úložiště Databricks Připojení v GitHubu, konkrétně:
K experimentování s Připojení Databricks můžete použít také následující jednodušší příklady kódu. Tyto příklady předpokládají, že používáte výchozí ověřování pro nastavení klienta Databricks Připojení.
Tento jednoduchý příklad kódu dotazuje zadanou tabulku a pak zobrazí prvních 5 řádků zadané tabulky. Pokud chcete použít jinou tabulku, upravte volání spark.read.table
na .
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
Tento delší příklad kódu dělá toto:
- Vytvoří datový rámec v paměti.
- Vytvoří tabulku s názvem
zzz_demo_temps_table
v rámci schématudefault
. Pokud tabulka s tímto názvem již existuje, tabulka se nejprve odstraní. Chcete-li použít jiné schéma nebo tabulku, upravte volání naspark.sql
,temps.write.saveAsTable
nebo obojí. - Uloží obsah datového rámce do tabulky.
SELECT
Spustí dotaz na obsah tabulky.- Zobrazí výsledek dotazu.
- Odstraní tabulku.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
Poznámka:
Následující příklad popisuje, jak používat SparkSession
třídu v případech, kdy DatabricksSession
třída v Databricks Připojení není k dispozici.
Tento příklad dotazuje zadanou tabulku a vrátí prvních 5 řádků. V tomto příkladu SPARK_REMOTE
se pro ověřování používá proměnná prostředí.
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}