Apache Spark용 Azure Data Explorer 커넥터

중요

이 커넥터는 Microsoft Fabric 의 실시간 분석 에서 사용할 수 있습니다. 다음 예외를 제외하고 이 문서의 지침을 사용합니다.

Apache Spark는 대규모 데이터를 처리하기 위한 통합 분석 엔진입니다. Azure Data Explorer는 대량의 데이터를 실시간으로 분석할 수 있는 빠르고 완전히 관리되는 데이터 분석 서비스입니다.

Spark용 Azure Data Explorer 커넥터는 모든 Spark 클러스터에서 실행할 수 있는 오픈 소스 프로젝트입니다. Azure Data Explorer 및 Spark 클러스터에서 데이터를 이동하기 위한 데이터 원본 및 데이터 싱크를 구현합니다. Azure Data Explorer 및 Apache Spark를 사용하여 데이터 기반 시나리오를 대상으로 하는 빠르고 확장 가능한 애플리케이션을 구축할 수 있습니다. 예를 들어 ML(기계 학습), ETL(추출-변환-로드) 및 Log Analytics가 있습니다. 커넥터를 사용하여 Azure Data Explorer는 쓰기, 읽기 및 writeStream과 같은 표준 Spark 원본 및 싱크 작업에 대한 유효한 데이터 저장소가 됩니다.

대기 중인 수집 또는 스트리밍 수집을 통해 Azure Data Explorer 쓸 수 있습니다. Azure Data Explorer에서 읽기는 열 정리 및 조건자 푸시다운을 지원하며, 이는 Azure Data Explorer에서 데이터를 필터링하여 전송된 데이터의 양을 줄입니다.

참고

Azure Data Explorer용 Synapse Spark 커넥터를 사용하는 방법에 대한 자세한 내용은 Azure Synapse Analytics용 Apache Spark를 사용하여 Azure Data Explorer에 연결을 참조하세요.

이 항목에서는 Azure Data Explorer Spark 커넥터를 설치 및 구성하고 Azure Data Explorer와 Apache Spark 클러스터 간에 데이터를 이동하는 방법을 설명합니다.

참고

아래 예 중 일부는 Azure Databricks Spark 클러스터를 참조하지만 Azure Data Explorer Spark 커넥터에는 Databricks 또는 기타 Spark 배포에 대한 직접적인 종속성이 있지 않습니다.

사전 요구 사항

Spark 2.3.x 버전도 지원되지만 pom.xml 종속성을 일부 변경해야 할 수 있습니다.

Spark 커넥터를 구축하는 방법

버전 2.3.0부터 spark-kusto-커넥터(Spark 3.x 및 Scala 2.12를 대상으로 하는 kusto-spark_3.0_2.12 및 Spark 2.4.x 및 scala 2.11을 대상으로 하는 kusto-spark_2.4_2.11)를 대체하는 새로운 아티팩트 ID를 도입합니다.

참고

2\.5.1 이전 버전을 사용하여 기존 테이블에 더 이상 수집할 수 없습니다. 이후 버전으로 업데이트하세요. 이 단계는 선택 사항입니다. Maven과 같이 미리 빌드된 라이브러리를 사용하는 경우 Spark 클러스터 설정을 참조하세요.

빌드 필수 조건

  1. 미리 빌드된 라이브러리를 사용하지 않는 경우 다음 Kusto Java SDK 라이브러리를 포함하여 종속성에 나열된 라이브러리를 설치해야 합니다. 설치할 올바른 버전을 찾으려면 관련 릴리스의 pom을 확인하세요.

  2. Spark 커넥터를 빌드하려면 이 원본을 참조하세요.

  3. Maven 프로젝트 정의를 사용하는 Scala/Java 애플리케이션의 경우 애플리케이션을 다음 아티팩트와 연결합니다(최신 버전은 다를 수 있음).

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

빌드 명령

jar를 빌드하고 모든 테스트를 실행하려면:

mvn clean package

jar를 빌드하고 모든 테스트를 실행하고 jar를 로컬 Maven 리포지토리에 설치하려면:

mvn clean install

자세한 내용은 커넥터 사용을 참조하세요.

Spark 클러스터 설정

참고

다음 단계를 수행할 때 최신 Azure Data Explorer Spark 커넥터 릴리스를 사용하는 것이 좋습니다.

  1. Spark 2.4.4 및 Scala 2.11 또는 Spark 3.0.1 및 Scala 2.12를 사용하여 Azure Databricks 클러스터를 기반으로 다음 Spark 클러스터 설정을 구성합니다.

    Databricks 클러스터 설정입니다.

  2. Maven에서 최신 spark-kusto-connector 라이브러리를 설치합니다.

    라이브러리를 가져옵니다.Spark-Kusto-Connector를 선택합니다.

  3. 필요한 모든 라이브러리가 설치되었는지 확인합니다.

    설치된 라이브러리를 확인합니다.

  4. JAR 파일을 사용하여 설치하는 경우 추가 종속성이 설치되었는지 확인합니다.

    종속성을 추가합니다.

인증

Azure Data Explorer Spark 커넥터를 사용하면 다음 방법 중 하나를 사용하여 Microsoft Entra ID로 인증할 수 있습니다.

애플리케이션 인증 Microsoft Entra

Microsoft Entra 애플리케이션 인증은 가장 간단하고 일반적인 인증 방법이며 Azure Data Explorer Spark 커넥터에 권장됩니다.

속성 옵션 문자열 설명
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra 애플리케이션(클라이언트) 식별자입니다.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID 인증 기관을 Microsoft Entra. Microsoft Entra 디렉터리(테넌트) ID입니다. 선택 사항 - 기본값은 microsoft.com. 자세한 내용은 Microsoft Entra 기관을 참조하세요.
KUSTO_AAD_APP_SECRET kustoAadAppSecret 클라이언트에 대한 애플리케이션 키를 Microsoft Entra.

참고

이전 API 버전(2.0.0 미만)의 이름은 "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"입니다.

Azure Data Explorer 권한

Azure Data Explorer 클러스터에 대해 다음 권한을 부여합니다.

  • 읽기(데이터 원본)의 경우 Microsoft Entra ID에는 대상 데이터베이스에 대한 뷰어 권한 또는 대상 테이블에 대한 관리자 권한이 있어야 합니다.
  • 쓰기(데이터 싱크)의 경우 Microsoft Entra ID에는 대상 데이터베이스에 대한 수집기 권한이 있어야 합니다. 또한 새 테이블을 만들려면 대상 데이터베이스에 대한 사용자 권한이 있어야 합니다. 대상 테이블이 이미 있는 경우 대상 테이블에 대한 관리자 권한을 구성해야 합니다.

Azure Data Explorer 주 역할에 대한 자세한 내용은 역할 기반 액세스 제어를 참조하세요. 보안 역할 관리는 보안 역할 관리를 참조하세요.

Spark 싱크: Azure Data Explorer에 쓰기

  1. 싱크 매개 변수 설정:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark DataFrame을 Azure Data Explorer 클러스터에 일괄 처리로 씁니다.

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    또는 단순화된 구문을 사용합니다.

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 스트리밍 데이터 쓰기:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark 원본: Azure Data Explorer에서 읽기

  1. 소량의 데이터를 읽을 때 데이터 쿼리를 정의합니다.

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 선택 사항: 사용자가 임시 Blob Storage(Azure Data Explorer가 아님)를 제공하는 경우 Blob이 호출자의 책임하에 만들어집니다. 여기에는 스토리지 프로비저닝, 액세스 키 순환, 임시 아티팩트 삭제가 포함됩니다. KustoBlobStorageUtils 모듈에는 계정 및 컨테이너 좌표와 계정 자격 증명 또는 쓰기, 읽기 및 나열 권한이 있는 전체 SAS URL을 기반으로 Blob을 삭제하기 위한 도우미 함수가 포함되어 있습니다. 해당 RDD가 더 이상 필요하지 않으면 각 트랜잭션은 임시 Blob 아티팩트를 별도의 디렉터리에 저장합니다. 이 디렉터리는 Spark 드라이버 노드에 보고된 읽기-트랜잭션 정보 로그의 일부로 캡처됩니다.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    위의 예에서 커넥터 인터페이스를 사용하여 Key Vault에 액세스하지 않습니다. Databricks 비밀을 사용하는 더 간단한 방법이 사용됩니다.

  3. Azure Data Explorer에서 읽습니다.

    • 사용자가 임시 Blob Storage를 제공하는 경우 Azure Data Explorer에서 다음과 같이 읽습니다.

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Azure Data Explorer가 임시 Blob Storage를 제공하는 경우 Azure Data Explorer에서 다음과 같이 읽습니다.

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)