Apache Spark 用の Azure Data Explorer コネクタ

重要

このコネクタは、Microsoft Fabric の リアルタイム分析 で使用できます。 次の例外を含め、この記事の手順を使用します。

Apache Spark は、"大規模なデータ処理のための統合された分析エンジン" です。 Azure Data Explorer は、大量のデータのリアルタイム分析を実現するフル マネージドの高速データ分析サービスです。

Spark 用の Azure Data Explorer コネクタは、あらゆる Spark クラスターで実行できるオープン ソース プロジェクトです。 Azure Data Explorer と Spark クラスター間でデータを移動するためのデータ ソースとデータ シンクを実装します。 Azure Data Explorer と Apache Spark を使用して、データ ドリブン シナリオをターゲットとする、高速でスケーラブルなアプリケーションを作成することができます。 たとえば、機械学習 (ML)、ETL (抽出 - 読み込み - 変換)、および Log Analytics などです。 このコネクタにより、Azure Data Explorer は、書き込み、読み取り、writeStream などの標準的な Spark のソースおよびシンク操作に有効なデータ ストアになります。

キューインジェストまたはストリーミング インジェストを使用して、Azure Data Explorerに書き込むことができます。 Azure Data Explorer からの読み取りでは、列の排除と述語のプッシュダウンがサポートされています。これにより、Azure Data Explorer でデータがフィルター処理されるため、転送されるデータの量が減ります。

注意

Azure Data Explorer 用の Synapse Spark コネクタを使用する方法については、「Apache Spark for Azure Synapse Analytics を使用して Azure Data Explorer に接続する」を参照してください。

このトピックでは、Azure Data Explorer Spark コネクタをインストールして構成し、Azure Data Explorer と Apache Spark クラスター間でデータを移動する方法について説明します。

注意

下の例のいくつかでは Azure Databricks Spark クラスターが登場しますが、Azure Data Explorer Spark コネクタは、Databricks やその他の Spark ディストリビューションに直接依存しません。

前提条件

ヒント

Spark 2.3.x バージョンもサポートされていますが、pom.xml の依存関係の変更が必要になる場合があります。

Spark コネクタのビルド方法

バージョン 2.3.0 以降では、spark-kusto-connector に置き換わる新しい成果物 ID として、kusto-spark_3.0_2.12 (Spark 3.x と Scala 2.12 が対象) および kusto-spark_2.4_2.11 (Spark 2.4.x と Scala 2.11 が対象) が導入されます。

注意

2\.5.1 より前のバージョンでは、既存のテーブルへの取り込みは機能しなくなったため、新しいバージョンに更新してください。 この手順は省略可能です。 Maven などの事前構築されたライブラリを使用する場合は、「Spark クラスターの設定」を参照してください。

構築の前提条件

  1. 事前構築されたライブラリを使用しない場合は、次の Kusto Java SDK ライブラリを含む、「依存関係」に記載されているライブラリをインストールします。 インストールする適切なバージョンを確認するには、関連するリリースの POM を確認します。

  2. Spark コネクタの構築については、こちらのソースを参照してください。

  3. Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合は、アプリケーションを次の成果物にリンクしてください (最新版は異なる場合があります)。

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

ビルド コマンド

Jar をビルドして、すべてのテストの実行するには:

mvn clean package

Jar をビルドし、すべてのテストを実行し、ローカルの Maven リポジトリに jar をインストールするには:

mvn clean install

詳細については、コネクタの使用に関するページを参照してください。

Spark クラスターの設定

注意

次の手順を実行するときは、最新の Azure Data Explorer Spark コネクタ リリースを使用することをお勧めします。

  1. Spark 2.4.4 と Scala 2.11 または Spark 3.0.1 と Scala 2.12 を使用し、Azure Databricks クラスターに基づいて Spark クラスターを次のように構成します。

    Databricks クラスターの設定。

  2. Maven から最新の spark-kusto-connector ライブラリをインストールします。

    ライブラリのインポート。Spark-Kusto-Connector の選択。

  3. 必要なライブラリがすべてインストールされていることを確認します。

    ライブラリがインストールされていることを確認する。

  4. JAR ファイルを使用したインストールの場合は、追加の依存関係がインストールされていることを確認します。

    依存関係を追加します。

認証

Azure Data Explorer Spark コネクタを使用すると、次のいずれかの方法を使用してMicrosoft Entra ID で認証できます。

Microsoft Entra アプリケーション認証

Microsoft Entraアプリケーション認証は最も単純で最も一般的な認証方法であり、Azure Data Explorer Spark コネクタに推奨されます。

Properties オプション文字列 説明
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra アプリケーション (クライアント) 識別子。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra認証機関。 Microsoft Entra ディレクトリ (テナント) ID。 省略可能 - 既定値は microsoft.com です。 詳細については、「Microsoft Entra機関」を参照してください。
KUSTO_AAD_APP_SECRET kustoAadAppSecret クライアントのアプリケーション キーをMicrosoft Entraします。

注意

以前の API バージョン (2.0.0 未満) には、"kustoAADClientID"、"kustoClientAADClientPassword"、"kustoAADAuthorityID" という名前が付けられています。

Azure Data Explorer の特権

Azure Data Explorer クラスターに対して次の特権を付与します。

  • 読み取り (データ ソース) の場合、Microsoft Entra ID にはターゲット データベースに対するビューアー特権、またはターゲット テーブルに対する管理者特権が必要です。
  • 書き込み (データ シンク) の場合、Microsoft Entra ID には、ターゲット データベースに対する取り込み権限が必要です。 新しいテーブルを作成するには、対象データベースのユーザー特権も与える必要があります。 ターゲット テーブルが既に存在する場合、ターゲット テーブルに対する "管理者" 特権を構成する必要があります。

Azure Data Explorer プリンシパル ロールの詳細については、「ロールベースのアクセス制御」を参照してください。 セキュリティ ロールの管理については、「security roles management」(セキュリティ ロールの管理) を参照してください。

Spark シンク: Azure Data Explorer に書き込む

  1. シンクのパラメーターの設定:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark DataFrame を Azure Data Explorer クラスターにバッチとして書き込む:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    または、簡略化された構文を使用します。

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. ストリーミング データを書き込む:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark ソース: Azure Data Explorer から読み取る

  1. 少量のデータを読み込む場合は、データ クエリを定義します。

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 省略可能: 自分で一時的な BLOB ストレージ (Azure Data Explorer ではなく) を提供する場合は、作成される BLOB は呼び出し元の責任になります。 これには、ストレージのプロビジョニング、アクセス キーのローテーション、および一時的な成果物の削除が含まれます。 KustoBlobStorageUtils モジュールには、アカウントとコンテナーの座標およびアカウントの資格情報、または書き込み、読み取り、リストの権限を持つ完全な SAS URL のいずれかに基づいて、BLOB を削除するためのヘルパー関数が含まれています。 対応する RDD が不要になると、トランザクションごとに一時的な BLOB 成果物が別のディレクトリに格納されます。 このディレクトリは、Spark ドライバー ノードで報告される読み取りトランザクション情報ログの一部としてキャプチャされます。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    上記の例では、コネクタ インターフェイスを使用して Key Vault にアクセスすることはできません。より簡単な Databricks シークレットを使用する方法が使用されます。

  3. Azure Data Explorer から読み取ります。

    • 自分で一時的な BLOB ストレージを提供する場合は、次のように Azure Data Explorer から読み取ります。

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Azure Data Explorer によって一時的な BLOB ストレージが提供される場合は、次のように Azure Data Explorer から読み取ります。

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)