Sdílet prostřednictvím


Konektor Azure Data Exploreru pro Apache Spark

Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.

Konektor Kusto pro Spark je opensourcový projekt , který se dá spustit v jakémkoli clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi Azure Data Explorerem a clustery Spark. Pomocí Azure Data Exploreru a Apache Sparku můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty. Například strojové učení (ML), extrakce-transformace-načítání (ETL) a Log Analytics. S konektorem se Azure Data Explorer stane platným úložištěm dat pro standardní operace zdroje Sparku a jímky, například write, reada writeStream.

Do Azure Data Exploreru můžete zapisovat prostřednictvím frontovaného příjmu nebo streamovaného příjmu. Čtení z Azure Data Exploreru podporuje pruning sloupců a predikátové zpracování, což filtruje data v Azure Data Exploreru a snižuje objem přenášených dat.

Tento článek popisuje, jak nainstalovat a nakonfigurovat konektor Spark Azure Data Exploreru a přesouvat data mezi Azure Data Explorerem a clustery Apache Spark.

Poznámka:

I když některé z příkladů v tomto článku odkazují na cluster Azure Databricks Spark, konektor Spark v Azure Data Exploreru nepřebírají přímé závislosti na Databricks ani v žádné jiné distribuci Sparku.

Požadavky

Tip

Podporují se také verze Sparku 2.3.x, ale možná budete muset změnit některé závislosti v pom.xml.

Postup sestavení konektoru Spark

Počínaje verzí 2.3.0 zavádíme nové ID artefaktů, které nahrazují spark-kusto-connector: kusto-spark_3.0_2.12 , které cílí na Spark 3.x a Scala 2.12.

Poznámka:

Verze starší než 2.5.1 už nefungují pro příjem do existující tabulky, aktualizujte prosím na novější verzi. Tento krok je nepovinný. Pokud používáte například předpřipravené knihovny, podívejte se na nastavení clusteru Spark.

Předpoklady pro sestavení

  1. Informace o vytvoření konektoru Spark najdete v tomto zdroji .

  2. V případě aplikací Scala/Java využívajících definice projektu Maven propojte aplikaci s nejnovějším artefaktem. Najděte nejnovější artefakt v Centru Mavenu.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Pokud nepoužíváte předem připravené knihovny, musíte nainstalovat knihovny uvedené v závislostech , včetně následujících knihoven sady Kusto Java SDK . Správnou verzi, která se má nainstalovat, najdete v pom příslušné verze:

    1. Sestavení jar a spuštění všech testů:

      mvn clean package -DskipTests
      
    2. Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor JAR do místního úložiště Maven:

      mvn clean install -DskipTests
      

Další informace najdete v tématu Využití konektoru.

Nastavení clusteru Spark

Poznámka:

Při provádění následujících kroků doporučujeme použít nejnovější verzi konektoru Kusto Spark.

  1. Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks Spark 3.0.1 a Scala 2.12:

    Nastavení clusteru Databricks

  2. Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:

    Import knihoven Vyberte Spark-Kusto-Connector.

  3. Ověřte, že jsou nainstalované všechny požadované knihovny:

    Ověřte nainstalované knihovny.

  4. V případě instalace pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:

    Přidejte závislosti.

Ověřování

Konektor Kusto Spark umožňuje ověřování pomocí ID Microsoft Entra pomocí jedné z následujících metod:

Ověřování aplikací Microsoft Entra

Ověřování aplikací Microsoft Entra je nejjednodušší a nejběžnější metodou ověřování a doporučuje se pro konektor Kusto Spark.

  1. Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.

    az login
    
  2. Zvolte předplatné pro hostování hlavního objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Vytvořte objekt služby. V tomto příkladu se instanční objekt nazývá my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z vrácených dat JSON zkopírujte appId, password a tenant pro budoucí použití.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Vytvořili jste aplikaci Microsoft Entra a služebního principála.

Konektor Spark používá k ověřování následující vlastnosti aplikace Entra:

Vlastnosti Řetězec možnosti Popis
KUSTO_AAD_APP_ID KustoAadAppId Identifikátor aplikace Microsoft Entra (klient).
KUSTO_AAD_AUTHORITY_ID KustoAadAuthorityID Ověřovací autorita Microsoft Entra. ID adresáře Microsoft Entra (tenanta). Volitelné – výchozí hodnota je microsoft.com. Další informace naleznete v tématu Microsoft Entra authority.
KUSTO_AAD_APP_SECRET KustoAadAppSecret Klíč aplikace Microsoft Entra pro klienta
KUSTO_ACCESS_TOKEN KustoAccessToken Pokud již máte accessToken, který byl vytvořen s přístupem k Kusto, můžete jej také použít pro autentizaci v konektoru.

Poznámka:

Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: kustoAADClientID, kustoClientAADClientPassword, KustoAADAuthorityID.

Oprávnění Kusto

Na straně Kusto udělte následující oprávnění podle operace Spark, kterou chcete provést.

Operace Spark Oprávnění
Čtení – jeden režim Čtenář
Čtení – nucený distribuovaný režim Čtenář
Zápis – frontový režim s volbou vytvoření tabulky CreateTableIfNotExist Správce
Zápis – režim Queue s možností vytvoření tabulky s volbou FailIfNotExist Ingestor
Zápis – Transakční režim Správce

Další informace o hlavních rolích najdete v tématu Řízení přístupu na základě role. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.

Výstupní kanál Sparku: zápis do Kusto

  1. Nastavení parametrů jímky:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Zapište datový rámec Sparku do clusteru Kusto jako dávkový proces:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Nebo použijte zjednodušenou syntaxi:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    // Optional, for any extra options:
    val conf: Map[String, String] = Map()
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Zápis streamovaných dat:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // As an alternative to adding .option by .option, you can provide a map:
    val conf: Map[String, String] = Map(
      KustoSinkOptions.KUSTO_CLUSTER -> cluster,
      KustoSinkOptions.KUSTO_TABLE -> table,
      KustoSinkOptions.KUSTO_DATABASE -> database,
      KustoSourceOptions.KUSTO_ACCESS_TOKEN -> accessToken)
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf)
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Zdroj Sparku: čtení z Kusto

  1. Při čtení malých objemů dat definujte datový dotaz:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Volitelné: Pokud poskytnete přechodné úložiště objektů blob (a ne Kusto), objekty blob se vytvoří v rámci odpovědnosti volajícího. To zahrnuje zřízení úložiště, obměnu přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstraňování objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a seznamu. Pokud už odpovídající RDD není potřeba, každá transakce ukládá dočasné artefakty blobů do samostatného adresáře. Tento adresář se zaznamenává jako součást protokolů informací o transakcích pro čtení hlášených v uzlu ovladače Sparku.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    V předchozím příkladu není služba Key Vault přístupná pomocí rozhraní konektoru; Používá se jednodušší metoda použití tajných kódů Databricks.

  3. Přečtěte si z Kusto.

    • Pokud poskytnete přechodné úložiště objektů blob, načtěte z Kusto následujícím způsobem:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Pokud Kusto poskytuje přechodné úložiště objektů blob, načtěte z Kusto následujícím způsobem:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)