適用于 Apache Spark 的 Azure Data Explorer 連接器

重要

此連接器可用於 Microsoft Fabric 中的 即時分析 。 使用本文中的指示,但有下列例外狀況:

Apache Spark 是用於進行大規模資料處理的整合分析引擎。 Azure 資料總管是快速、完全受控的資料分析服務,可即時分析大量資料流。

適用于 Spark 的 Azure Data Explorer 連接器是可在任何 Spark 叢集上執行的開放原始碼專案。 其會實作資料來源和資料接收,以便在 Azure 資料總管和 Spark 叢集間移動資料。 使用 Azure Data Explorer 和 Apache Spark,您可以建置以資料驅動案例為目標的快速且可調整的應用程式。 例如,機器學習 (ML) 、擷取-轉換-載入 (ETL) 和 Log Analytics。 透過連接器,Azure Data Explorer會成為標準 Spark 來源和接收作業的有效資料存放區,例如寫入、讀取和 writeStream。

您可以透過佇列擷取或串流擷取寫入 Azure Data Explorer。 從 Azure 讀取Data Explorer支援資料行剪除和述詞下推,以篩選 Azure Data Explorer中的資料,以減少傳輸的資料量。

注意

如需使用適用于 Azure Data Explorer 的 Synapse Spark 連接器的相關資訊,請參閱使用 Apache Spark 進行 Azure Synapse Analytics 連線至 Azure Data Explorer

本主題描述如何安裝和設定 Azure Data Explorer Spark 連接器,以及在 Azure Data Explorer 和 Apache Spark 叢集之間移動資料。

注意

雖然下列部分範例參考Azure Databricks Spark 叢集,但 Azure Data Explorer Spark 連接器不會直接相依于 Databricks 或任何其他 Spark 散發套件。

必要條件

提示

也支援 Spark 2.3.x 版本,但可能需要 pom.xml 相依性進行一些變更。

如何建置 Spark 連接器

從 2.3.0 版開始,我們引進了新的成品識別碼來取代 spark-kusto-connector: kusto-spark_3.0_2.12 ,以 Spark 3.x 和 Scala 2.12 和 kusto-spark_2.4_2.11 為目標,以 Spark 2.4.x 和 scala 2.11 為目標。

注意

2.5.1 之前的版本無法再用於內嵌至現有資料表,請更新為更新版本。 此步驟是選用的。 如果您使用預先建置的程式庫,例如 Maven,請參閱 Spark 叢集設定

建置必要條件

  1. 如果您未使用預先建置的程式庫,您必須安裝相 依性 中列出的程式庫,包括下列 Kusto JAVA SDK 程式庫。 若要尋找要安裝的正確版本, 請查看相關版本的 pom

  2. 請參閱 此來源 以建置 Spark 連接器。

  3. 針對使用 Maven 專案定義的 Scala/JAVA 應用程式,將您的應用程式連結至下列成品 (最新版本可能會有所不同) :

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

建置命令

若要建置 jar 並執行所有測試:

mvn clean package

若要建置 jar,請執行所有測試,並將 jar 安裝到本機 Maven 存放庫:

mvn clean install

如需詳細資訊,請參閱 連接器使用方式

Spark 叢集設定

注意

建議您在執行下列步驟時,使用最新的 Azure Data Explorer Spark 連接器版本。

  1. 根據使用 Spark 2.4.4 和 Scala 2.11 或 Spark 3.0.1 和 Scala 2.12 的 Azure Databricks 叢集設定下列 Spark 叢集設定:

    Databricks 叢集設定。

  2. 從 Maven 安裝最新的 spark-kusto-connector 程式庫:

    匯入程式庫。選取 [Spark-Kusto-Connector]。

  3. 確認已安裝所有必要的程式庫:

    確認程式庫已安裝。

  4. 若要使用 JAR 檔案進行安裝,請確認已安裝其他相依性:

    新增相依性。

驗證

Azure Data Explorer Spark 連接器可讓您使用下列其中一種方法向Microsoft Entra識別碼進行驗證:

Microsoft Entra應用程式驗證

Microsoft Entra應用程式驗證是最簡單且最常見的驗證方法,建議用於 Azure Data Explorer Spark 連接器。

屬性 選項字串 Description
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra應用程式 (用戶端) 識別碼。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra驗證授權單位。 Microsoft Entra目錄 (租使用者) 識別碼。 選擇性 - 預設為 microsoft.com。 如需詳細資訊,請參閱Microsoft Entra授權單位
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra用戶端的應用程式金鑰。

注意

舊版 API (小於 2.0.0) 具有下列命名:「kustoAADClientID」、「kustoClientAADClientPassword」、「kustoAADAuthorityID」

Azure Data Explorer許可權

在 Azure Data Explorer 叢集上授與下列許可權:

  • 若要讀取 (資料來源) ,Microsoft Entra身分識別必須具有目標資料庫的檢視者許可權,或目標資料表的系統管理員許可權。
  • 若要寫入 (資料接收) ,Microsoft Entra身分識別必須具有目標資料庫的擷取器許可權。 它也必須具有目標資料庫 的使用者 許可權,才能建立新的資料表。 如果目標資料表已經存在,您必須在目標資料表上設定 系統管理員 許可權。

如需 Azure Data Explorer主體角色的詳細資訊,請參閱角色型存取控制。 如需管理安全性角色,請參閱 安全性角色管理

Spark 接收:寫入 Azure Data Explorer

  1. 設定接收參數:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. 將 Spark DataFrame 寫入 Azure Data Explorer叢集作為批次:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    或使用簡化的語法:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 寫入串流資料:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark 來源:從 Azure Data Explorer讀取

  1. 讀取 少量資料時,請定義資料查詢:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 選擇性:如果您提供暫時性 Blob 儲存體 (,而不是 Azure Data Explorer) 會根據呼叫者的責任建立 Blob。 這包括布建儲存體、輪替存取金鑰,以及刪除暫時性成品。 KustoBlobStorageUtils 模組包含協助程式函式,可根據帳戶和容器座標和帳號憑證,或具有寫入、讀取和列出許可權的完整 SAS URL 來刪除 Blob。 當不再需要對應的 RDD 時,每個交易都會將暫時性 Blob 成品儲存在不同的目錄中。 此目錄會擷取為 Spark 驅動程式節點上所報告之讀取交易記錄的一部分。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    在上述範例中,不會使用連接器介面來存取金鑰保存庫;使用 Databricks 秘密的更簡單方法。

  3. 從 Azure Data Explorer讀取。

    • 如果您提供暫時性 Blob 儲存體,請從 Azure Data Explorer讀取,如下所示:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • 如果Azure Data Explorer提供暫時性 Blob 儲存體,請從 Azure Data Explorer讀取,如下所示:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)