Konektor Azure Data Explorer pro Apache Spark

Důležité

Tento konektor je možné použít v analýzách v reálném čase v Microsoft Fabric. Postupujte podle pokynů v tomto článku s následujícími výjimkami:

Apache Spark je jednotný analytický modul pro zpracování dat ve velkém měřítku. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.

Konektor Azure Data Explorer pro Spark je projekt open source, který může běžet na libovolném clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi clustery Azure Data Explorer a Spark. Pomocí Azure Data Explorer a Apache Sparku můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty. Například strojové učení (ML), extrakce, transformace a načítání (ETL) a Log Analytics. S konektorem se Azure Data Explorer stane platným úložištěm dat pro standardní operace se zdrojem Sparku a jímkou, jako je zápis, čtení a zápisstream.

Do Azure Data Explorer můžete zapisovat prostřednictvím příjmu dat ve frontě nebo příjmu streamování. Čtení z Azure Data Explorer podporuje vyřezávání sloupců a nabízení predikátů, které filtruje data v Azure Data Explorer a snižuje objem přenášených dat.

Poznámka

Informace o práci s konektorem Synapse Spark pro Azure Data Explorer najdete v tématu Připojení k Azure Data Explorer pomocí Apache Sparku pro Azure Synapse Analytics.

Toto téma popisuje, jak nainstalovat a nakonfigurovat konektor Azure Data Explorer Spark a přesouvat data mezi clustery Azure Data Explorer a Apache Spark.

Poznámka

I když některé z níže uvedených příkladů odkazují na cluster Azure Databricks Spark, konektor Azure Data Explorer Spark nepřebírají přímé závislosti na Databricks ani žádné jiné distribuci Sparku.

Požadavky

Tip

Podporují se také verze Sparku 2.3.x, ale můžou vyžadovat určité změny v pom.xml závislostech.

Postup sestavení konektoru Spark

Od verze 2.3.0 zavádíme nová ID artefaktů nahrazující spark-kusto-connector: kusto-spark_3.0_2.12 , která cílí na Spark 3.x a Scala 2.12 a kusto-spark_2.4_2.11 , která cílí na Spark 2.4.x a scala 2.11.

Poznámka

Verze starší než 2.5.1 už nefungují pro příjem existující tabulky. Aktualizujte prosím na novější verzi. Tento krok je volitelný. Pokud používáte předem připravené knihovny, například Maven, přečtěte si téma Nastavení clusteru Spark.

Požadavky na sestavení

  1. Pokud nepoužíváte předem připravené knihovny, musíte nainstalovat knihovny uvedené v závislostech , včetně následujících knihoven Sady Kusto Java SDK . Pokud chcete najít správnou verzi k instalaci, podívejte se do pomu příslušné verze:

  2. Informace o vytvoření konektoru Spark najdete v tomto zdroji .

  3. V případě aplikací Scala/Java využívajících definice projektů Maven propojte aplikaci s následujícím artefaktem (nejnovější verze se může lišit):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Příkazy sestavení

Sestavení jar a spuštění všech testů:

mvn clean package

Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor jar do místního úložiště Maven:

mvn clean install

Další informace najdete v tématu Využití konektoru.

Nastavení clusteru Spark

Poznámka

Při provádění následujících kroků doporučujeme použít nejnovější verzi konektoru Azure Data Explorer Spark.

  1. Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks pomocí Sparku 2.4.4 a Scaly 2.11 nebo Sparku 3.0.1 a Scaly 2.12:

    Nastavení clusteru Databricks.

  2. Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:

    Import knihoven.Vyberte Spark-Kusto-Connector.

  3. Ověřte, že jsou nainstalované všechny požadované knihovny:

    Ověřte nainstalované knihovny.

  4. Při instalaci pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:

    Přidejte závislosti.

Authentication

Konektor Azure Data Explorer Spark umožňuje ověřování pomocí Microsoft Entra ID pomocí jedné z následujících metod:

Microsoft Entra ověřování aplikací

Microsoft Entra ověřování aplikací je nejjednodušší a nejběžnější metoda ověřování, která se doporučuje pro konektor Azure Data Explorer Spark.

Vlastnosti Řetězec možnosti Description
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra identifikátor aplikace (klienta).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra ověřovací autoritu. MICROSOFT ENTRA ID adresáře (tenanta). Volitelné – výchozí hodnota je microsoft.com. Další informace najdete v tématu Microsoft Entra autorita.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra klíč aplikace pro klienta.

Poznámka

Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Oprávnění Azure Data Explorer

Udělte clusteru Azure Data Explorer následující oprávnění:

  • Pro čtení (zdroj dat) musí mít identita Microsoft Entra oprávnění čtenáře k cílové databázi nebo oprávnění správce k cílové tabulce.
  • Pro zápis (datová jímka) musí mít Microsoft Entra identita oprávnění ingestora k cílové databázi. Aby bylo možné vytvářet nové tabulky, musí mít také uživatelská oprávnění k cílové databázi. Pokud cílová tabulka již existuje, musíte pro cílovou tabulku nakonfigurovat oprávnění správce .

Další informace o rolích objektů zabezpečení Azure Data Explorer najdete v tématu Řízení přístupu na základě role. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.

Jímka Sparku: zápis do Azure Data Explorer

  1. Nastavení parametrů jímky:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Zapište datový rámec Sparku do clusteru Azure Data Explorer dávkově:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Nebo použijte zjednodušenou syntaxi:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Zápis streamovaných dat:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Zdroj Sparku: Čtení z Azure Data Explorer

  1. Při čtení malých objemů dat definujte datový dotaz:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Volitelné: Pokud zadáte přechodné úložiště objektů blob (a ne Azure Data Explorer), vytvoří se objekty blob na odpovědnost volajícího. To zahrnuje zřízení úložiště, obměnu přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstranění objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a výpisu. Pokud už odpovídající sada RDD není potřeba, ukládá každá transakce přechodné artefakty objektů blob v samostatném adresáři. Tento adresář je zachycen jako součást protokolů s informacemi o transakcích pro čtení hlášených v uzlu ovladače Sparku.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Ve výše uvedeném příkladu se k Key Vault nepřistupuje pomocí rozhraní konektoru. Používá se jednodušší metoda použití tajných kódů Databricks.

  3. Čtení z Azure Data Explorer.

    • Pokud zadáte přechodné úložiště objektů blob, přečtěte si z Azure Data Explorer následujícím způsobem:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Pokud Azure Data Explorer poskytuje přechodné úložiště objektů blob, přečtěte si z Azure Data Explorer následujícím způsobem:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)