Delen via


Azure Data Explorer Connector voor Apache Spark

Belangrijk

Deze connector kan worden gebruikt in realtime analyses in Microsoft Fabric. Gebruik de instructies in dit artikel met de volgende uitzonderingen:

Apache Spark is een geïntegreerde analyse-engine voor gegevensverwerking op grote schaal. Azure Data Explorer is een snelle, volledig beheerde service voor gegevensanalyses waarmee grote hoeveelheden gegevens in real-time kunnen worden geanalyseerd.

De Azure Data Explorer-connector voor Spark is een open source-project dat kan worden uitgevoerd op elk Spark-cluster. Het implementeert gegevensbron en gegevenssink voor het verplaatsen van gegevens tussen Azure Data Explorer- en Spark-clusters. Met Azure Data Explorer en Apache Spark kunt u snelle en schaalbare toepassingen bouwen die zijn gericht op gegevensgestuurde scenario's. Bijvoorbeeld machine learning (ML), Extract-Transform-Load (ETL) en Log Analytics. Met de connector wordt Azure Data Explorer een geldig gegevensarchief voor standaard spark-bron- en sinkbewerkingen, zoals write, read en writeStream.

U kunt schrijven naar Azure Data Explorer via opname in de wachtrij of streamingopname. Lezen vanuit Azure Data Explorer ondersteunt kolomsnoeien en pushdown van predicaat, waarmee de gegevens in Azure Data Explorer worden gefilterd, waardoor het volume van de overgedragen gegevens wordt verminderd.

Notitie

Zie Connect to Azure Data Explorer using Apache Spark for Azure Synapse Analytics (Verbinding maken met Azure Data Explorer met behulp van Apache Spark voor Azure Synapse Analytics) voor meer informatie over het werken met de Synapse Spark-connector voor Azure Data Explorer.

In dit onderwerp wordt beschreven hoe u de Azure Data Explorer Spark-connector installeert en configureert en gegevens verplaatst tussen Azure Data Explorer- en Apache Spark-clusters.

Notitie

Hoewel sommige van de onderstaande voorbeelden verwijzen naar een Azure Databricks Spark-cluster, maakt Azure Data Explorer Spark-connector geen directe afhankelijkheden van Databricks of een andere Spark-distributie.

Vereisten

Tip

Spark 2.3.x-versies worden ook ondersteund, maar mogelijk zijn enkele wijzigingen in pom.xml afhankelijkheden vereist.

De Spark-connector bouwen

Vanaf versie 2.3.0 introduceren we nieuwe artefact-id's ter vervanging van spark-kusto-connector: kusto-spark_3.0_2.12 gericht op Spark 3.x en Scala 2.12 en kusto-spark_2.4_2.11 gericht op Spark 2.4.x en scala 2.11.

Notitie

Versies ouder dan 2.5.1 werken niet meer voor opname in een bestaande tabel. Werk deze bij naar een latere versie. Deze stap is optioneel. Als u vooraf gebouwde bibliotheken gebruikt, bijvoorbeeld Maven, raadpleegt u Spark-cluster instellen.

Vereisten voor bouwen

  1. Als u geen vooraf gebouwde bibliotheken gebruikt, moet u de bibliotheken installeren die worden vermeld in afhankelijkheden , waaronder de volgende Kusto Java SDK-bibliotheken . Als u de juiste versie wilt vinden om te installeren, kijkt u in de pom van de relevante release:

  2. Raadpleeg deze bron voor het bouwen van de Spark-connector.

  3. Voor Scala-/Java-toepassingen die gebruikmaken van Maven-projectdefinities, koppelt u uw toepassing aan het volgende artefact (de nieuwste versie kan verschillen):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Build-opdrachten

Ga als volgende te werk om JAR te bouwen en alle tests uit te voeren:

mvn clean package

Als u jar wilt bouwen, voert u alle tests uit en installeert u JAR in uw lokale Maven-opslagplaats:

mvn clean install

Zie connectorgebruik voor meer informatie.

Spark-cluster instellen

Notitie

Het wordt aanbevolen om de nieuwste versie van de Azure Data Explorer Spark-connector te gebruiken bij het uitvoeren van de volgende stappen.

  1. Configureer de volgende Spark-clusterinstellingen op basis van een Azure Databricks-cluster met Behulp van Spark 2.4.4 en Scala 2.11 of Spark 3.0.1 en Scala 2.12:

    Databricks-clusterinstellingen.

  2. Installeer de nieuwste spark-kusto-connectorbibliotheek van Maven:

    Bibliotheken importeren. Selecteer Spark-Kusto-Connector.

  3. Controleer of alle vereiste bibliotheken zijn geïnstalleerd:

    Controleer of de bibliotheken zijn geïnstalleerd.

  4. Controleer voor de installatie met behulp van een JAR-bestand of er aanvullende afhankelijkheden zijn geïnstalleerd:

    Afhankelijkheden toevoegen.

Verificatie

Met de Azure Data Explorer Spark-connector kunt u zich verifiëren met Microsoft Entra-id met behulp van een van de volgende methoden:

Microsoft Entra toepassingsverificatie

Microsoft Entra toepassingsverificatie is de eenvoudigste en meest voorkomende verificatiemethode en wordt aanbevolen voor de Azure Data Explorer Spark-connector.

Eigenschappen Optiereeks Description
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra toepassings-id (client).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra verificatie-instantie. Microsoft Entra map-id (tenant). Optioneel: standaard ingesteld op microsoft.com. Zie Microsoft Entra instantie voor meer informatie.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra toepassingssleutel voor de client.

Notitie

Oudere API-versies (kleiner dan 2.0.0) hebben de volgende naamgeving: 'kustoAADClientID', 'kustoClientAADClientPassword', 'kustoAADAuthorityID'

Azure Data Explorer-bevoegdheden

Verdeel de volgende bevoegdheden voor een Azure Data Explorer-cluster:

  • Voor het lezen (gegevensbron) moet de Microsoft Entra identiteit viewerbevoegdheden hebben voor de doeldatabase of beheerdersbevoegdheden voor de doeltabel.
  • Voor het schrijven (gegevenssink) moet de Microsoft Entra identiteit ingestor-bevoegdheden hebben voor de doeldatabase. Het moet ook gebruikersbevoegdheden hebben voor de doeldatabase om nieuwe tabellen te kunnen maken. Als de doeltabel al bestaat, moet u beheerdersbevoegdheden voor de doeltabel configureren.

Zie op rollen gebaseerd toegangsbeheer voor meer informatie over Azure Data Explorer belangrijkste rollen. Zie Beheer van beveiligingsrollen voor het beheren van beveiligingsrollen.

Spark-sink: schrijven naar Azure Data Explorer

  1. Sinkparameters instellen:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark DataFrame als batch naar Azure Data Explorer-cluster schrijven:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Of gebruik de vereenvoudigde syntaxis:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Streaminggegevens schrijven:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark-bron: lezen uit Azure Data Explorer

  1. Wanneer u kleine hoeveelheden gegevens leest, definieert u de gegevensquery:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Optioneel: als u de tijdelijke blobopslag opgeeft (en niet Azure Data Explorer), worden de blobs gemaakt onder de verantwoordelijkheid van de aanroeper. Dit omvat het inrichten van de opslag, het roteren van toegangssleutels en het verwijderen van tijdelijke artefacten. De KustoBlobStorageUtils-module bevat helperfuncties voor het verwijderen van blobs op basis van account- en containercoördinaten en accountreferenties, of een volledige SAS-URL met schrijf-, lees- en lijstmachtigingen. Wanneer de bijbehorende RDD niet meer nodig is, slaat elke transactie tijdelijke blobartefacten op in een afzonderlijke map. Deze map wordt vastgelegd als onderdeel van logboeken met leestransactiegegevens die worden gerapporteerd op het Spark-stuurprogrammaknooppunt.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    In het bovenstaande voorbeeld wordt de Key Vault niet geopend via de connectorinterface; een eenvoudigere methode voor het gebruik van de Databricks-geheimen wordt gebruikt.

  3. Lees uit Azure Data Explorer.

    • Als u de tijdelijke blobopslag opgeeft, leest u als volgt uit Azure Data Explorer:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Als Azure Data Explorer de tijdelijke blobopslag biedt, leest u als volgt uit Azure Data Explorer:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)