適用于 Apache Spark 的 Azure Data Explorer 連接器
重要
此連接器可用於 Microsoft Fabric 中的 即時分析 。 使用本文中的指示,但有下列例外狀況:
- 如有需要,請使用 建立 KQL 資料庫中的指示來建立資料庫。
- 如有需要,請使用 建立空白資料表中的指示來建立資料表。
- 使用 複製 URI中的指示取得查詢或擷取 URI。
- 在 KQL 查詢集中執行查詢。
Apache Spark 是用於進行大規模資料處理的整合分析引擎。 Azure 資料總管是快速、完全受控的資料分析服務,可即時分析大量資料流。
適用于 Spark 的 Azure Data Explorer 連接器是可在任何 Spark 叢集上執行的開放原始碼專案。 其會實作資料來源和資料接收,以便在 Azure 資料總管和 Spark 叢集間移動資料。 使用 Azure Data Explorer 和 Apache Spark,您可以建置以資料驅動案例為目標的快速且可調整的應用程式。 例如,機器學習 (ML) 、擷取-轉換-載入 (ETL) 和 Log Analytics。 透過連接器,Azure Data Explorer會成為標準 Spark 來源和接收作業的有效資料存放區,例如寫入、讀取和 writeStream。
您可以透過佇列擷取或串流擷取寫入 Azure Data Explorer。 從 Azure 讀取Data Explorer支援資料行剪除和述詞下推,以篩選 Azure Data Explorer中的資料,以減少傳輸的資料量。
注意
如需使用適用于 Azure Data Explorer 的 Synapse Spark 連接器的相關資訊,請參閱使用 Apache Spark 進行 Azure Synapse Analytics 連線至 Azure Data Explorer。
本主題描述如何安裝和設定 Azure Data Explorer Spark 連接器,以及在 Azure Data Explorer 和 Apache Spark 叢集之間移動資料。
注意
雖然下列部分範例參考Azure Databricks Spark 叢集,但 Azure Data Explorer Spark 連接器不會直接相依于 Databricks 或任何其他 Spark 散發套件。
必要條件
- Azure 訂用帳戶。 建立 Azure 免費帳戶。
- Azure 資料總管叢集和資料庫。 建立叢集和資料庫。
- Spark 叢集
- 安裝 Azure Data Explorer 連接器程式庫:
- 已安裝 Maven 3.x
提示
也支援 Spark 2.3.x 版本,但可能需要 pom.xml 相依性進行一些變更。
如何建置 Spark 連接器
從 2.3.0 版開始,我們引進了新的成品識別碼來取代 spark-kusto-connector: kusto-spark_3.0_2.12 ,以 Spark 3.x 和 Scala 2.12 和 kusto-spark_2.4_2.11 為目標,以 Spark 2.4.x 和 scala 2.11 為目標。
注意
2.5.1 之前的版本無法再用於內嵌至現有資料表,請更新為更新版本。 此步驟是選用的。 如果您使用預先建置的程式庫,例如 Maven,請參閱 Spark 叢集設定。
建置必要條件
如果您未使用預先建置的程式庫,您必須安裝相 依性 中列出的程式庫,包括下列 Kusto JAVA SDK 程式庫。 若要尋找要安裝的正確版本, 請查看相關版本的 pom:
請參閱 此來源 以建置 Spark 連接器。
針對使用 Maven 專案定義的 Scala/JAVA 應用程式,將您的應用程式連結至下列成品 (最新版本可能會有所不同) :
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
建置命令
若要建置 jar 並執行所有測試:
mvn clean package
若要建置 jar,請執行所有測試,並將 jar 安裝到本機 Maven 存放庫:
mvn clean install
如需詳細資訊,請參閱 連接器使用方式。
Spark 叢集設定
注意
建議您在執行下列步驟時,使用最新的 Azure Data Explorer Spark 連接器版本。
根據使用 Spark 2.4.4 和 Scala 2.11 或 Spark 3.0.1 和 Scala 2.12 的 Azure Databricks 叢集設定下列 Spark 叢集設定:
從 Maven 安裝最新的 spark-kusto-connector 程式庫:
確認已安裝所有必要的程式庫:
若要使用 JAR 檔案進行安裝,請確認已安裝其他相依性:
驗證
Azure Data Explorer Spark 連接器可讓您使用下列其中一種方法向Microsoft Entra識別碼進行驗證:
- Microsoft Entra應用程式
- Microsoft Entra存取權杖
- 非生產案例的裝置驗證 ()
- Azure 金鑰保存庫若要存取金鑰保存庫資源,請安裝 azure 金鑰保存庫套件並提供應用程式認證。
Microsoft Entra應用程式驗證
Microsoft Entra應用程式驗證是最簡單且最常見的驗證方法,建議用於 Azure Data Explorer Spark 連接器。
屬性 | 選項字串 | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra應用程式 (用戶端) 識別碼。 |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra驗證授權單位。 Microsoft Entra目錄 (租使用者) 識別碼。 選擇性 - 預設為 microsoft.com。 如需詳細資訊,請參閱Microsoft Entra授權單位。 |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra用戶端的應用程式金鑰。 |
注意
舊版 API (小於 2.0.0) 具有下列命名:「kustoAADClientID」、「kustoClientAADClientPassword」、「kustoAADAuthorityID」
Azure Data Explorer許可權
在 Azure Data Explorer 叢集上授與下列許可權:
- 若要讀取 (資料來源) ,Microsoft Entra身分識別必須具有目標資料庫的檢視者許可權,或目標資料表的系統管理員許可權。
- 若要寫入 (資料接收) ,Microsoft Entra身分識別必須具有目標資料庫的擷取器許可權。 它也必須具有目標資料庫 的使用者 許可權,才能建立新的資料表。 如果目標資料表已經存在,您必須在目標資料表上設定 系統管理員 許可權。
如需 Azure Data Explorer主體角色的詳細資訊,請參閱角色型存取控制。 如需管理安全性角色,請參閱 安全性角色管理。
Spark 接收:寫入 Azure Data Explorer
設定接收參數:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
將 Spark DataFrame 寫入 Azure Data Explorer叢集作為批次:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
或使用簡化的語法:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
寫入串流資料:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark 來源:從 Azure Data Explorer讀取
讀取 少量資料時,請定義資料查詢:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
選擇性:如果您提供暫時性 Blob 儲存體 (,而不是 Azure Data Explorer) 會根據呼叫者的責任建立 Blob。 這包括布建儲存體、輪替存取金鑰,以及刪除暫時性成品。 KustoBlobStorageUtils 模組包含協助程式函式,可根據帳戶和容器座標和帳號憑證,或具有寫入、讀取和列出許可權的完整 SAS URL 來刪除 Blob。 當不再需要對應的 RDD 時,每個交易都會將暫時性 Blob 成品儲存在不同的目錄中。 此目錄會擷取為 Spark 驅動程式節點上所報告之讀取交易記錄的一部分。
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
在上述範例中,不會使用連接器介面來存取金鑰保存庫;使用 Databricks 秘密的更簡單方法。
從 Azure Data Explorer讀取。
如果您提供暫時性 Blob 儲存體,請從 Azure Data Explorer讀取,如下所示:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
如果Azure Data Explorer提供暫時性 Blob 儲存體,請從 Azure Data Explorer讀取,如下所示:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應