共用方式為


適用於 Apache Spark 的 Azure 數據總管連接器

重要

此連接器可用於 Microsoft Fabric 中的即時智慧 。 使用本文中的指示,但有下列例外狀況:

Apache Spark 是用於進行大規模資料處理的整合分析引擎。 Azure 資料總管是快速、完全受控的資料分析服務,可即時分析大量資料流。

適用於 Spark 的 Azure 數據總管連接器是可在任何 Spark 叢集上執行的 開放原始碼 專案。 它會實作數據源和數據接收器,以在 Azure 數據總管和 Spark 叢集之間行動數據。 使用 Azure 資料總管和 Apache Spark,您可以建置以數據驅動案例為目標的快速且可調整的應用程式。 例如,機器學習服務 (ML)、擷取-Transform-Load (ETL) 和 Log Analytics。 透過連接器,Azure 資料總管會成為標準 Spark 來源和接收作業的有效資料存放區,例如寫入、讀取和 writeStream。

您可以透過佇列擷取或串流擷取寫入 Azure 資料總管。 從 Azure 數據總管讀取支援數據行剪除和述詞下推,以篩選 Azure 數據總管中的數據,減少已傳輸的數據量。

注意

如需使用適用於 Azure 數據總管的 Synapse Spark 連接器的詳細資訊,請參閱 使用適用於 Azure Synapse Analytics 的 Apache Spark 連線到 Azure 數據總管

本主題描述如何安裝和設定 Azure 數據總管 Spark 連接器,以及在 Azure 數據總管和 Apache Spark 叢集之間行動數據。

注意

雖然下列一些範例參考 Azure Databricks Spark 叢集,但 Azure 數據總管 Spark 連接器不會直接相依於 Databricks 或任何其他 Spark 散發。

必要條件

提示

Spark 2.3.x 版本也受到支援,但可能需要變更pom.xml相依性。

如何建置Spark連接器

從 2.3.0 版開始,我們引進了取代 spark-kusto-connector 的新成品標識符: kusto-spark_3.0_2.12 以 Spark 3.x 和 Scala 2.12kusto-spark_2.4_2.11 為目標,以 Spark 2.4.x 和 scala 2.11 為目標。

注意

2.5.1 之前的版本無法再用於內嵌至現有數據表,請更新為較新的版本。 此為選用步驟。 如果您使用預先建置的連結庫,例如 Maven,請參閱 Spark 叢集設定

建置必要條件

  1. 如果您未使用預先建置的連結庫,則必須安裝相依性中列出的連結庫,包括下列 Kusto Java SDK 連結庫。 若要尋找正確的安裝版本, 請查看相關版本的 pom

  2. 請參閱此來源以建置Spark連接器。

  3. 對於使用 Maven 專案定義的 Scala/Java 應用程式,請連結您的應用程式與下列成品(最新版本可能不同):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

建置命令

若要建置 jar 並執行所有測試:

mvn clean package

若要建置 jar,請執行所有測試,並將 jar 安裝到本機 Maven 存放庫:

mvn clean install

如需詳細資訊,請參閱 連接器使用方式

Spark 叢集設定

注意

建議您在執行下列步驟時使用最新的 Azure 數據總管 Spark 連接器版本。

  1. 根據使用 Spark 2.4.4 和 Scala 2.11 或 Spark 3.0.1 和 Scala 2.12 的 Azure Databricks 叢集來設定下列 Spark 叢集設定:

    Databricks 叢集設定。

  2. 從 Maven 安裝最新的 spark-kusto-connector 連結庫:

    匯入程式庫。選取 [Spark-Kusto-Connector]。

  3. 確認已安裝所有必要的連結庫:

    確認已安裝連結庫。

  4. 若要使用 JAR 檔案進行安裝,請確認已安裝其他相依性:

    新增相依性。

驗證

Azure 數據總管 Spark 連接器可讓您使用下列其中一種方法,向 Microsoft Entra ID 進行驗證:

Microsoft Entra 應用程式驗證

Microsoft Entra 應用程式驗證是最簡單且最常見的驗證方法,建議用於 Azure 數據總管 Spark 連接器。

屬性 選項字串 描述
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra 應用程式 (用戶端) 識別碼。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra 驗證授權單位。 Microsoft Entra Directory (tenant) 標識符。 選擇性 - 預設為 microsoft.com。 如需詳細資訊,請參閱 Microsoft Entra 授權單位
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft用戶端的 Entra 應用程式金鑰。

注意

舊版 API 版本 (小於 2.0.0) 具有下列命名:“kustoAADClientID”、“kustoClientAADClientPassword”、“kustoAADAuthorityID”

Azure 數據總管許可權

在 Azure 資料總管叢集上授與下列權限:

  • 若要讀取 (資料源),Microsoft Entra 身分識別必須具有 目標資料庫的查看器 許可權,或 目標數據表上的系統管理員 許可權。
  • 為了寫入 (數據接收),Microsoft Entra 身分識別必須具有 目標資料庫的擷取器 許可權。 它也必須具有 目標資料庫的用戶 許可權,才能建立新的數據表。 如果目標數據表已經存在,您必須在目標數據表上設定 系統管理員 許可權。

如需 Azure 數據總管主體角色的詳細資訊,請參閱 角色型訪問控制。 如需管理安全性角色,請參閱 安全性角色管理

Spark 接收:寫入至 Azure 數據總管

  1. 設定接收參數:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. 將 Spark DataFrame 寫入 Azure 數據總管叢集作為批次:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    或使用簡化的語法:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 寫入串流資料:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark 來源:從 Azure 數據總管讀取

  1. 讀取 少量資料時,請定義資料查詢:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 選擇性:如果您提供暫時性 Blob 記憶體(而非 Azure 數據總管),則會在呼叫者的責任下建立 Blob。 這包括布建記憶體、輪替存取密鑰,以及刪除暫時性成品。 KustoBlobStorageUtils 模組包含協助程式函式,可根據帳戶和容器座標和帳戶認證,或具有寫入、讀取和列表許可權的完整 SAS URL 來刪除 Blob。 當不再需要對應的 RDD 時,每個交易都會將暫時性 Blob 成品儲存在個別的目錄中。 此目錄會擷取為 Spark 驅動程式節點上所報告讀取事務歷史記錄的一部分。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    在上述範例中,不會使用連接器介面存取 金鑰保存庫;使用 Databricks 秘密的更簡單方法。

  3. 從 Azure 數據總管讀取。

    • 如果您提供暫時性 Blob 記憶體,請從 Azure 數據總管讀取,如下所示:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • 如果 Azure 數據總管 提供暫時性 Blob 記憶體,請從 Azure 數據總管讀取,如下所示:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)