Konektor Azure Data Explorer untuk Apache Spark

Penting

Konektor ini dapat digunakan dalam Analitik Real-Time di Microsoft Fabric. Gunakan instruksi dalam artikel ini dengan pengecualian berikut:

Apache Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar. Azure Data Explorer adalah layanan analitik data yang cepat dan dikelola sepenuhnya untuk analisis real-time pada data bervolume besar.

Konektor Azure Data Explorer untuk Spark adalah proyek sumber terbuka yang dapat berjalan pada kluster Spark apa pun. Ini mengimplementasikan sumber data dan sink data untuk memindahkan data di seluruh kluster Azure Data Explorer dan Spark. Dengan menggunakan Azure Data Explorer dan Apache Spark, Anda dapat membangun aplikasi yang cepat dan dapat diskalakan yang menargetkan skenario berbasis data. Misalnya, pembelajaran mesin (ML), Extract-Transform-Load (ETL), dan Log Analytics. Dengan konektor, Azure Data Explorer menjadi penyimpanan data yang valid untuk operasi sumber dan sink Spark standar, seperti menulis, membaca, dan menulisStream.

Anda dapat menulis ke Azure Data Explorer melalui penyerapan antrean atau penyerapan streaming. Membaca dari Azure Data Explorer mendukung pemangkasan kolom dan pushdown predikat, yang memfilter data di Azure Data Explorer, mengurangi volume data yang ditransfer.

Catatan

Untuk informasi tentang bekerja dengan konektor Synapse Spark untuk Azure Data Explorer, lihat Menyambungkan ke Azure Data Explorer menggunakan Apache Spark untuk Azure Synapse Analytics.

Topik ini menjelaskan cara menginstal dan mengonfigurasi konektor Azure Data Explorer Spark dan memindahkan data antara kluster Azure Data Explorer dan Apache Spark.

Catatan

Meskipun beberapa contoh di bawah ini mengacu pada kluster Azure Databricks Spark, konektor Azure Data Explorer Spark tidak mengambil dependensi langsung pada Databricks atau distribusi Spark lainnya.

Prasyarat

Tip

Versi Spark 2.3.x juga didukung, tetapi mungkin memerlukan beberapa perubahan dalam dependensi pom.xml.

Cara membangun konektor Spark

Mulai versi 2.3.0, kami memperkenalkan Id artefak baru yang menggantikan spark-kusto-connector: kusto-spark_3.0_2.12 yang menargetkan Spark 3.x dan Scala 2.12 dan kusto-spark_2.4_2.11 menargetkan Spark 2.4.x dan scala 2.11.

Catatan

Versi sebelum 2.5.1 tidak berfungsi lagi untuk menyerap ke tabel yang sudah ada, harap perbarui ke versi yang lebih baru. Langkah ini bersifat opsional. Jika Anda menggunakan pustaka bawaan, misalnya, Maven, lihat Penyiapan kluster Spark.

Membangun prasyarat

  1. Jika Anda tidak menggunakan pustaka bawaan, Anda perlu menginstal pustaka yang tercantum dalam dependensi termasuk pustaka Kusto Java SDK berikut. Untuk menemukan versi yang tepat untuk diinstal, lihat di pom rilis yang relevan:

  2. Lihat sumber ini untuk membangun Konektor Spark.

  3. Untuk aplikasi Scala/Java yang menggunakan definisi proyek Maven, tautkan aplikasi Anda dengan artefak berikut (versi terbaru mungkin berbeda):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Perintah build

Untuk membangun jar dan menjalankan semua pengujian:

mvn clean package

Untuk membangun jar, jalankan semua pengujian, dan instal jar ke repositori Maven lokal Anda:

mvn clean install

Untuk informasi selengkapnya, lihat penggunaan konektor.

Penyiapan kluster Spark

Catatan

Disarankan untuk menggunakan rilis konektor Azure Data Explorer Spark terbaru saat melakukan langkah-langkah berikut.

  1. Konfigurasikan pengaturan kluster Spark berikut, berdasarkan kluster Azure Databricks menggunakan Spark 2.4.4 dan Scala 2.11 atau Spark 3.0.1 dan Scala 2.12:

    Pengaturan kluster Databricks.

  2. Instal pustaka spark-kusto-connector terbaru dari Maven:

    Mengimpor pustaka.Pilih Spark-Kusto-Connector.

  3. Verifikasi bahwa semua pustaka yang diperlukan diinstal:

    Verifikasi pustaka yang terinstal.

  4. Untuk penginstalan menggunakan file JAR, verifikasi bahwa dependensi tambahan telah diinstal:

    Tambahkan dependensi.

Autentikasi

Konektor Azure Data Explorer Spark memungkinkan Anda mengautentikasi dengan ID Microsoft Entra menggunakan salah satu metode berikut:

Microsoft Entra autentikasi aplikasi

Microsoft Entra autentikasi aplikasi adalah metode autentikasi yang paling sederhana dan paling umum dan direkomendasikan untuk konektor Azure Data Explorer Spark.

Properti String Opsi Deskripsi
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra pengidentifikasi aplikasi (klien).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra otoritas autentikasi. MICROSOFT ENTRA ID Direktori (penyewa). Opsional - default ke microsoft.com. Untuk informasi selengkapnya, lihat otoritas Microsoft Entra.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra kunci aplikasi untuk klien.

Catatan

Versi API yang lebih lama (kurang dari 2.0.0) memiliki penamaan berikut: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Hak istimewa Azure Data Explorer

Berikan hak istimewa berikut pada kluster Azure Data Explorer:

  • Untuk membaca (sumber data), identitas Microsoft Entra harus memiliki hak istimewa penampil pada database target, atau hak istimewa admin pada tabel target.
  • Untuk menulis (sink data), identitas Microsoft Entra harus memiliki hak istimewa penyerap pada database target. Ini juga harus memiliki hak istimewa pengguna pada database target untuk membuat tabel baru. Jika tabel target sudah ada, Anda harus mengonfigurasi hak istimewa admin pada tabel target.

Untuk informasi selengkapnya tentang peran utama Azure Data Explorer, lihat kontrol akses berbasis peran. Untuk mengelola peran keamanan, lihat manajemen peran keamanan.

Spark sink: menulis ke Azure Data Explorer

  1. Siapkan parameter sink:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Tulis Spark DataFrame ke kluster Azure Data Explorer sebagai batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Atau gunakan sintaks yang disederhanakan:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Menulis data streaming:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Sumber Spark: membaca dari Azure Data Explorer

  1. Saat membaca sejumlah kecil data, tentukan kueri data:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Opsional: Jika Anda menyediakan penyimpanan blob sementara (dan bukan Azure Data Explorer) blob dibuat di bawah tanggung jawab pemanggil. Ini termasuk memprovisikan penyimpanan, memutar kunci akses, dan menghapus artefak sementara. Modul KustoBlobStorageUtils berisi fungsi pembantu untuk menghapus blob berdasarkan koordinat akun dan kontainer dan kredensial akun, atau URL SAS lengkap dengan izin tulis, baca, dan daftar. Ketika RDD yang sesuai tidak lagi diperlukan, setiap transaksi menyimpan artefak blob sementara dalam direktori terpisah. Direktori ini ditangkap sebagai bagian dari log informasi transaksi baca yang dilaporkan pada simpul Driver Spark.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Dalam contoh di atas, Key Vault tidak diakses menggunakan antarmuka konektor; metode yang lebih sederhana untuk menggunakan rahasia Databricks digunakan.

  3. Baca dari Azure Data Explorer.

    • Jika Anda menyediakan penyimpanan blob sementara, baca dari Azure Data Explorer sebagai berikut:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Jika Azure Data Explorer menyediakan penyimpanan blob sementara, baca dari Azure Data Explorer sebagai berikut:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)