Megosztás a következőn keresztül:


Adatok lekérése az Apache Sparkból

Az Apache Spark egy egységes elemzési motor a nagy léptékű adatfeldolgozáshoz.

A Spark Kusto-összekötője egy nyílt forráskód projekt, amely bármilyen Spark-fürtön futtatható. Adatforrást és adatnyelőt implementál az adatok áthelyezéséhez az Azure Data Explorer és a Spark fürtök között. Az Eventhouse és az Apache Spark használatával gyors és méretezhető alkalmazásokat hozhat létre, amelyek adatvezérelt forgatókönyveket céloznak meg. Például gépi tanulás (ML), Extract-Transform-Load (ETL) és Log Analytics. Az összekötő használatával az Eventhouses a standard Spark forrás és sink műveletek, például írás, olvasás és writeStream számára érvényes adattárrá válik.

Az Eventhouse-ba sorban álló betöltéssel vagy streaming betöltéssel írhat. Az Eventhouse-ból való olvasás támogatja az oszlopmetszést és a predikátumleküldést, amely szűri az adatokat az Eventhouse-ban, csökkentve az átvitt adatok mennyiségét.

Ez a cikk bemutatja, hogyan telepítheti és konfigurálhatja a Spark-összekötőt, és hogyan helyezhet át adatokat egy Eventhouse- és Apache Spark-fürtök között.

Megjegyzés

Bár az alábbi példák egy Azure Databricks Spark-fürtre hivatkoznak, a Spark-összekötő nem vesz közvetlen függőségeket a Databricks-hez vagy más Spark-disztribúcióhoz.

Előfeltételek

  • Azure-előfizetés. Hozzon létre egy ingyenes Azure-fiókot. Ez a Microsoft Entra-azonosítót használó hitelesítéshez használatos.
  • KQL-adatbázis a Microsoft Fabricben. Másolja az adatbázis URI-ját az Access meglévő KQL-adatbázisának utasításaival.
  • Spark klaszter
  • Összekötőtár telepítése:
    • A Spark 2.4+Scala 2.11 vagy a Spark 3+scala 2.12 előre összeállított kódtárai
    • Maven-adattár
  • Maven 3.x telepítve

Tipp.

A Spark 2.3.x-verziók is támogatottak, de előfordulhat, hogy pom.xml függőségek bizonyos módosításaira van szükség.

A Spark-összekötő létrehozása

A 2.3.0-s verziótól kezdődően új összetevő-azonosítókat vezetünk be a Spark-kusto-connector helyett: kusto-spark_3.0_2.12 a Spark 3.x és a Scala 2.12 használatára.

Megjegyzés

A 2.5.1-es verzió előtti verziók már nem működnek meglévő táblába való betöltéshez, frissítsen egy későbbi verzióra. Ez a lépés nem kötelező. Ha előre elkészített kódtárakat használ, például a Mavent, tekintse meg a Spark-fürt beállítását.

Felépítési előfeltételek

  1. A Spark-összekötő létrehozásához tekintse meg ezt a forrást.

  2. A Maven-projektdefiníciókat használó Scala/Java-alkalmazások esetében kapcsolja össze az alkalmazást a legújabb összetevővel. Keresse meg a legújabb összetevőt a Maven Centralon.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Ha nem használ előre összeállított kódtárakat, telepítenie kell a függőségekben felsorolt kódtárakat, beleértve a következő Kusto Java SDK-kódtárakat. A telepítéshez megfelelő verzió megkereséséhez tekintse meg a megfelelő kiadás pom-fájlját:

    1. Jar felépítése és az összes teszt futtatása:

      mvn clean package -DskipTests
      
    2. Jar létrehozásához futtassa az összes tesztet, és telepítse a jart a helyi Maven-adattárba:

      mvn clean install -DskipTests
      

További információ: összekötők használata.

Spark-fürt beállítása

Megjegyzés

Az alábbi lépések végrehajtásakor ajánlott a Kusto Spark-összekötő legújabb kiadásának használata.

  1. Konfigurálja az alábbi Spark-fürtbeállításokat az Azure Databricks Spark 3.0.1 és Scala 2.12 alapján:

    Databricks-fürt beállításai.

  2. Telepítse a spark-kusto-connector legújabb kódtárát a Mavenből:

    Könyvtárak importálása. Válassza a Spark-Kusto-Connector lehetőséget.

  3. Ellenőrizze, hogy az összes szükséges kódtár telepítve van-e:

    Ellenőrizze, hogy telepítve vannak-e a kódtárak.

  4. JAR-fájl használatával történő telepítés esetén ellenőrizze, hogy más függőségek is telepítve vannak-e:

    Adjon hozzá függőségeket.

Hitelesítés

A Kusto Spark-összekötő lehetővé teszi a Microsoft Entra-azonosítóval való hitelesítést az alábbi módszerek egyikével:

Microsoft Entra-alkalmazáshitelesítés

A Microsoft Entra alkalmazáshitelesítés a legegyszerűbb és leggyakoribb hitelesítési módszer, és a Kusto Spark-összekötőhöz ajánlott.

  1. Jelentkezzen be az Azure-előfizetésbe az Azure CLI-vel. Ezután hitelesítés a böngészőben.

    az login
    
  2. Válassza ki az előfizetést a fő tartalomgazda hosztolásához. Erre a lépésre akkor van szükség, ha több előfizetéssel rendelkezik.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Hozza létre a szolgáltatásfőt. Ebben a példában a szolgáltatásazonosítót my-service-principal néven nevezik.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. A visszaadott JSON-adatokból másolja ki a appId, passwordés tenant későbbi használatra.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Létrehozta a Microsoft Entra-alkalmazást és a szolgáltatás főszereplőjét.

A Spark-összekötő a hitelesítéshez a következő Entra-alkalmazástulajdonságokat használja:

Tulajdonságok Beállítási sztring Leírás
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra-alkalmazás (ügyfél) azonosítója.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra hitelesítési szolgáltató. Microsoft Entra Directory (tenant) azonosítója. Nem kötelező – alapértelmezés szerint microsoft.com. További információ: Microsoft Entra authority.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra alkalmazáskulcs az ügyfélhez.
KUSTO_ACCESS_TOKEN kustoAccessToken Ha már rendelkezik olyan accessToken-nel, amely Kusto-hozzáféréssel jött létre, azt hitelesítés céljából a csatlakozónak is átadhatja.

Megjegyzés

A régebbi (2.0.0-nál kisebb) API-verziók a következő elnevezéssel rendelkeznek: "kustoAAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Kusto-jogosultságok

Adja meg a következő jogosultságokat a kusto oldalon a végrehajtani kívánt Spark-művelet alapján.

Spark-művelet Jogosultságok
Olvasás – Egyéni mód Olvasó
Olvasás – Elosztott mód kényszerítése Olvasó
Írás – Várakozási mód a CreateTableIfNotExist táblalétrehozási lehetőséggel Rendszergazda
Írás – Várólistás mód a FailIfNotExist táblalétrehozási lehetőséggel Ingestor
Írás – TransactionalMode Rendszergazda

További információ a fő szerepkörökről: szerepköralapú hozzáférés-vezérlés. A biztonsági szerepkörök kezelésével kapcsolatban lásd a biztonsági szerepkörök kezelését.

Spark sink: írás Kusto-ba

  1. Fogadóparaméterek beállítása:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark DataFrame írása a Kusto-fürtbe kötegként:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Vagy használja az egyszerűsített szintaxist:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    // Optional, for any extra options:
    val conf: Map[String, String] = Map()
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Streamelési adatok írása:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // As an alternative to adding .option by .option, you can provide a map:
    val conf: Map[String, String] = Map(
      KustoSinkOptions.KUSTO_CLUSTER -> cluster,
      KustoSinkOptions.KUSTO_TABLE -> table,
      KustoSinkOptions.KUSTO_DATABASE -> database,
      KustoSourceOptions.KUSTO_ACCESS_TOKEN -> accessToken)
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf)
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark forrás: Kustóból való olvasás

  1. Kis mennyiségű adat beolvasásakor adja meg az adat lekérdezést:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Nem kötelező: Ha az átmeneti blobtárolót adja meg (és nem a Kusto-t), a blobok a hívó felelőssége alatt jönnek létre. Ez magában foglalja a tároló kiépítését, a hozzáférési kulcsok elforgatását és az átmeneti összetevők törlését. A KustoBlobStorageUtils modul segédfüggvényeket tartalmaz a blobok törléséhez, amelyek vagy a fiók- és tárolókoordináták és fiók hitelesítő adatai alapján, vagy egy teljes SAS URL-cím használatával, írási, olvasási és listaengedélyekkel működnek. Ha már nincs szükség a megfelelő RDD-re, minden tranzakció egy külön könyvtárban tárolja az átmeneti blobösszetevőket. Ez a könyvtár a Spark-illesztőprogram csomóponton jelentett olvasási tranzakciós információs naplók részeként rögzítésre kerül.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    A fenti példában a Key Vault nem az összekötő felületén keresztül van elérve; a Databricks titkok használata történik egyszerűbb módon.

  3. Olvass a Kusto-ból.

    • Ha megadja az átmeneti blobtárolót, az alábbiak szerint olvassa el a Kusto adatait:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Ha a Kusto biztosítja az átmeneti blobtárolót, olvasson a Kustóból az alábbiak szerint:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)