Példakódok a Scala Databricks Csatlakozás-hoz
Feljegyzés
Ez a cikk a Databricks-Csatlakozás a Databricks Runtime 13.3 LTS-hez és újabb verziókhoz.
Ez a cikk példákat tartalmaz a Databricks Csatlakozás Scalához való használatára. A Databricks Csatlakozás lehetővé teszi népszerű azonosítók, notebook-kiszolgálók és egyéni alkalmazások Azure Databricks-fürtökhöz való csatlakoztatását. Lásd: Mi az a Databricks Csatlakozás?. A cikk Python-verziójával kapcsolatban lásd a Databricks Csatlakozás Pythonhoz készült kód példáit.
Feljegyzés
A Databricks Csatlakozás használatának megkezdése előtt be kell állítania a Databricks Csatlakozás-ügyfelet.
A Databricks számos további példaalkalmazást kínál, amelyek bemutatják a Databricks Csatlakozás használatát. Tekintse meg a Databricks Csatlakozás GitHub-adattárra vonatkozó példaalkalmazásokat, különösen a következőt:
A Databricks Csatlakozás az alábbi egyszerűbb kód példákkal is kísérletezhet. Ezek a példák feltételezik, hogy alapértelmezett hitelesítést használ a Databricks Csatlakozás ügyfélbeállításhoz.
Ez az egyszerű példakód lekérdezi a megadott táblát, majd megjeleníti a megadott tábla első 5 sorát. Ha másik táblát szeretne használni, módosítsa a hívást a következőre spark.read.table
: .
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
Ez a hosszabb példakód a következőket teszi:
- Létrehoz egy memóriabeli DataFrame-et.
- Létrehoz egy táblát a
zzz_demo_temps_table
sémán belüldefault
. Ha már létezik ilyen nevű tábla, a rendszer először törli a táblát. Ha másik sémát vagy táblát szeretne használni, módosítsa a hívásokat az ,temps.write.saveAsTable
vagy mindkettőrespark.sql
. - Menti a DataFrame tartalmát a táblába.
- Lekérdezést
SELECT
futtat a tábla tartalmán. - Megjeleníti a lekérdezés eredményét.
- Törli a táblát.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
Feljegyzés
Az alábbi példa bemutatja, hogyan használható az SparkSession
osztály olyan esetekben, amikor a DatabricksSession
Databricks Csatlakozás osztálya nem érhető el.
Ez a példa lekérdezi a megadott táblát, és az első 5 sort adja vissza. Ez a példa a környezeti változót használja a SPARK_REMOTE
hitelesítéshez.
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}