Azure Data Explorer Connector för Apache Spark

Viktigt

Den här anslutningsappen kan användas i realtidsanalys i Microsoft Fabric. Följ anvisningarna i den här artikeln med följande undantag:

Apache Spark är en enhetlig analysmotor för storskalig databearbetning. Azure Data Explorer är en snabb, fullständigt hanterad dataanalystjänst för realtidsanalys av stora mängder data.

Azure Data Explorer-anslutningsappen för Spark är ett öppen källkod projekt som kan köras på alla Spark-kluster. Den implementerar datakälla och datamottagare för att flytta data mellan Azure Data Explorer- och Spark-kluster. Med Hjälp av Azure Data Explorer och Apache Spark kan du skapa snabba och skalbara program för datadrivna scenarier. Till exempel maskininlärning (ML), Extract-Transform-Load (ETL) och Log Analytics. Med anslutningsappen blir Azure Data Explorer ett giltigt datalager för vanliga Spark-käll- och mottagaråtgärder, till exempel skrivning, läsning och writeStream.

Du kan skriva till Azure Data Explorer via köad inmatning eller strömningsinmatning. Läsning från Azure Data Explorer stöder kolumnrensning och predikat-pushdown, vilket filtrerar data i Azure Data Explorer, vilket minskar mängden överförda data.

Anteckning

Information om hur du arbetar med Synapse Spark-anslutningsappen för Azure Data Explorer finns i Ansluta till Azure Data Explorer med Apache Spark för Azure Synapse Analytics.

Det här avsnittet beskriver hur du installerar och konfigurerar Azure Data Explorer Spark-anslutningsappen och flyttar data mellan Azure Data Explorer- och Apache Spark-kluster.

Anteckning

Även om några av exemplen nedan refererar till ett Azure Databricks Spark-kluster, tar Azure Data Explorer Spark-anslutningsprogrammet inte direkta beroenden till Databricks eller någon annan Spark-distribution.

Förutsättningar

Tips

Spark 2.3.x-versioner stöds också, men kan kräva vissa ändringar i pom.xml beroenden.

Så här skapar du Spark-anslutningsappen

Från och med version 2.3.0 introducerar vi nya artefakt-ID:n som ersätter spark-kusto-connector: kusto-spark_3.0_2.12 för Spark 3.x och Scala 2.12 och kusto-spark_2.4_2.11 för Spark 2.4.x och scala 2.11.

Anteckning

Versioner före 2.5.1 fungerar inte längre för inmatning till en befintlig tabell. Uppdatera till en senare version. Det här är valfritt. Om du använder fördefinierade bibliotek, till exempel Maven, läser du Konfiguration av Spark-kluster.

Skapa förutsättningar

  1. Om du inte använder fördefinierade bibliotek måste du installera biblioteken som anges i beroenden , inklusive följande Kusto Java SDK-bibliotek . Om du vill hitta rätt version att installera kan du titta i den relevanta versionens pom:

  2. Se den här källan för att skapa Spark-anslutningsappen.

  3. För Scala-/Java-program som använder Maven-projektdefinitioner länkar du ditt program med följande artefakt (den senaste versionen kan skilja sig åt):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Skapa kommandon

Så här skapar du jar-filen och kör alla tester:

mvn clean package

Skapa jar-filen genom att köra alla tester och installera jar-filen på din lokala Maven-lagringsplats:

mvn clean install

Mer information finns i Användning av anslutningsappar.

Konfiguration av Spark-kluster

Anteckning

Vi rekommenderar att du använder den senaste versionen av Azure Data Explorer Spark-anslutningsappen när du utför följande steg.

  1. Konfigurera följande Spark-klusterinställningar baserat på Azure Databricks-kluster med spark 2.4.4 och Scala 2.11 eller Spark 3.0.1 och Scala 2.12:

    Inställningar för Databricks-kluster.

  2. Installera det senaste spark-kusto-connector-biblioteket från Maven:

    Importera bibliotek.Välj Spark-Kusto-Connector.

  3. Kontrollera att alla bibliotek som krävs är installerade:

    Kontrollera att biblioteken är installerade.

  4. För installation med hjälp av en JAR-fil kontrollerar du att ytterligare beroenden har installerats:

    Lägg till beroenden.

Autentisering

Med Azure Data Explorer Spark-anslutningsprogrammet kan du autentisera med Microsoft Entra-ID med någon av följande metoder:

Microsoft Entra programautentisering

Microsoft Entra programautentisering är den enklaste och vanligaste autentiseringsmetoden och rekommenderas för Azure Data Explorer Spark-anslutningsappen.

Egenskaper Alternativsträng Description
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra programidentifierare (klient).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra utfärdare av autentisering. Microsoft Entra katalog-ID (klientorganisation). Valfritt – standardvärdet är microsoft.com. Mer information finns i Microsoft Entra utfärdare.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra programnyckel för klienten.

Anteckning

Äldre API-versioner (mindre än 2.0.0) har följande namn: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Azure Data Explorer-behörigheter

Bevilja följande behörigheter i ett Azure Data Explorer-kluster:

  • För läsning (datakälla) måste den Microsoft Entra identiteten ha visningsbehörighet för måldatabasen eller administratörsbehörigheter i måltabellen.
  • För att kunna skriva (datamottagare) måste den Microsoft Entra identiteten ha ingestor-behörigheter på måldatabasen. Den måste också ha användarbehörighet för måldatabasen för att skapa nya tabeller. Om måltabellen redan finns måste du konfigurera administratörsbehörigheter i måltabellen.

Mer information om Huvudroller för Azure Data Explorer finns i rollbaserad åtkomstkontroll. Information om hur du hanterar säkerhetsroller finns i Hantering av säkerhetsroller.

Spark-mottagare: skriva till Azure Data Explorer

  1. Konfigurera mottagarparametrar:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Skriv Spark DataFrame till Azure Data Explorer kluster som batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Eller använd den förenklade syntaxen:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Skriva strömmande data:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark-källa: läsa från Azure Data Explorer

  1. När du läser små mängder data definierar du datafrågan:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Valfritt: Om du anger den tillfälliga bloblagringen (och inte Azure Data Explorer) skapas blobarna under anroparens ansvar. Detta omfattar etablering av lagring, roterande åtkomstnycklar och borttagning av tillfälliga artefakter. Modulen KustoBlobStorageUtils innehåller hjälpfunktioner för att ta bort blobar baserat på antingen konto- och containerkoordinater och kontoautentiseringsuppgifter, eller en fullständig SAS-URL med skriv-, läs- och listbehörigheter. När motsvarande RDD inte längre behövs lagrar varje transaktion tillfälliga blobartefakter i en separat katalog. Den här katalogen registreras som en del av loggarna för lästransaktionsinformation som rapporteras på Spark-drivrutinsnoden.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    I exemplet ovan används inte Key Vault med anslutningsgränssnittet. En enklare metod för att använda Databricks-hemligheter används.

  3. Läs från Azure Data Explorer.

    • Om du anger den tillfälliga bloblagringen läser du från Azure Data Explorer på följande sätt:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Om Azure Data Explorer tillhandahåller den tillfälliga bloblagringen läser du från Azure Data Explorer på följande sätt:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)