أمثلة التعليمات البرمجية الاتصال Databricks ل Scala

إشعار

تتناول هذه المقالة الاتصال Databricks لوقت تشغيل Databricks 13.3 LTS وما فوق.

توفر هذه المقالة أمثلة التعليمات البرمجية التي تستخدم الاتصال Databricks ل Scala. تمكنك الاتصال Databricks من توصيل IDEs الشائعة وخوادم دفاتر الملاحظات والتطبيقات المخصصة إلى مجموعات Azure Databricks. راجع ما هو Databricks الاتصال؟. للحصول على إصدار Python من هذه المقالة، راجع أمثلة التعليمات البرمجية ل Databricks الاتصال ل Python.

إشعار

قبل البدء في استخدام الاتصال Databricks، يجب عليك إعداد عميل الاتصال Databricks.

يوفر Databricks العديد من التطبيقات الإضافية التي توضح كيفية استخدام الاتصال Databricks. راجع أمثلة التطبيقات لمستودع الاتصال Databricks في GitHub، على وجه التحديد:

يمكنك أيضا استخدام أمثلة التعليمات البرمجية الأبسط التالية لتجربة الاتصال Databricks. تفترض هذه الأمثلة أنك تستخدم المصادقة الافتراضية لإعداد عميل Databricks الاتصال.

يستعلم مثال التعليمات البرمجية البسيط هذا عن الجدول المحدد ثم يعرض أول 5 صفوف للجدول المحدد. لاستخدام جدول مختلف، اضبط الاستدعاء إلى spark.read.table.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

يقوم مثال التعليمات البرمجية الأطول هذا بالآتي:

  1. إنشاء DataFrame في الذاكرة.
  2. إنشاء جدول بالاسم zzz_demo_temps_table داخل default المخطط. إذا كان الجدول بهذا الاسم موجودا بالفعل، يتم حذف الجدول أولا. لاستخدام مخطط أو جدول مختلف، اضبط الاستدعاءات إلى spark.sqlأو temps.write.saveAsTableأو كليهما.
  3. يحفظ محتويات DataFrame في الجدول.
  4. SELECT تشغيل استعلام على محتويات الجدول.
  5. إظهار نتيجة الاستعلام.
  6. حذف الجدول.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

إشعار

يصف المثال التالي كيفية استخدام SparkSession الفئة في الحالات التي DatabricksSession تكون فيها الفئة في الاتصال Databricks غير متوفرة.

يستعلم هذا المثال عن الجدول المحدد ويعيد أول 5 صفوف. يستخدم SPARK_REMOTE هذا المثال متغير البيئة للمصادقة.

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}