Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatbejelentkezni vagymódosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatjamódosítani a címtárat.
Az Apache Spark egy egységes elemzési motor a nagy léptékű adatfeldolgozáshoz.
A Spark Kusto-összekötője egy nyílt forráskód projekt, amely bármilyen Spark-fürtön futtatható. Adatforrást és adatnyelőt implementál az adatok áthelyezéséhez az Azure Data Explorer és a Spark fürtök között. Az Eventhouse és az Apache Spark használatával gyors és méretezhető alkalmazásokat hozhat létre, amelyek adatvezérelt forgatókönyveket céloznak meg. Például gépi tanulás (ML), Extract-Transform-Load (ETL) és Log Analytics. Az összekötő használatával az Eventhouses a standard Spark forrás és sink műveletek, például írás, olvasás és writeStream számára érvényes adattárrá válik.
Az Eventhouse-ba sorban álló betöltéssel vagy streaming betöltéssel írhat. Az Eventhouse-ból való olvasás támogatja az oszlopmetszést és a predikátumleküldést, amely szűri az adatokat az Eventhouse-ban, csökkentve az átvitt adatok mennyiségét.
Ez a cikk bemutatja, hogyan telepítheti és konfigurálhatja a Spark-összekötőt, és hogyan helyezhet át adatokat egy Eventhouse- és Apache Spark-fürtök között.
Megjegyzés
Bár az alábbi példák egy Azure Databricks Spark-fürtre hivatkoznak, a Spark-összekötő nem vesz közvetlen függőségeket a Databricks-hez vagy más Spark-disztribúcióhoz.
Előfeltételek
- Azure-előfizetés. Hozzon létre egy ingyenes Azure-fiókot. Ez a Microsoft Entra-azonosítót használó hitelesítéshez használatos.
- KQL-adatbázis a Microsoft Fabricben. Másolja az adatbázis URI-ját az Access meglévő KQL-adatbázisának utasításaival.
- Spark klaszter
- Összekötőtár telepítése:
- A Spark 2.4+Scala 2.11 vagy a Spark 3+scala 2.12 előre összeállított kódtárai
- Maven-adattár
- Maven 3.x telepítve
Tipp.
A Spark 2.3.x-verziók is támogatottak, de előfordulhat, hogy pom.xml függőségek bizonyos módosításaira van szükség.
A Spark-összekötő létrehozása
A 2.3.0-s verziótól kezdődően új összetevő-azonosítókat vezetünk be a Spark-kusto-connector helyett: kusto-spark_3.0_2.12 a Spark 3.x és a Scala 2.12 használatára.
Megjegyzés
A 2.5.1-es verzió előtti verziók már nem működnek meglévő táblába való betöltéshez, frissítsen egy későbbi verzióra. Ez a lépés nem kötelező. Ha előre elkészített kódtárakat használ, például a Mavent, tekintse meg a Spark-fürt beállítását.
Felépítési előfeltételek
A Spark-összekötő létrehozásához tekintse meg ezt a forrást.
A Maven-projektdefiníciókat használó Scala/Java-alkalmazások esetében kapcsolja össze az alkalmazást a legújabb összetevővel. Keresse meg a legújabb összetevőt a Maven Centralon.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).Ha nem használ előre összeállított kódtárakat, telepítenie kell a függőségekben felsorolt kódtárakat, beleértve a következő Kusto Java SDK-kódtárakat. A telepítéshez megfelelő verzió megkereséséhez tekintse meg a megfelelő kiadás pom-fájlját:
Jar felépítése és az összes teszt futtatása:
mvn clean package -DskipTestsJar létrehozásához futtassa az összes tesztet, és telepítse a jart a helyi Maven-adattárba:
mvn clean install -DskipTests
További információ: összekötők használata.
Spark-fürt beállítása
Megjegyzés
Az alábbi lépések végrehajtásakor ajánlott a Kusto Spark-összekötő legújabb kiadásának használata.
Konfigurálja az alábbi Spark-fürtbeállításokat az Azure Databricks Spark 3.0.1 és Scala 2.12 alapján:
Telepítse a spark-kusto-connector legújabb kódtárát a Mavenből:
Ellenőrizze, hogy az összes szükséges kódtár telepítve van-e:
JAR-fájl használatával történő telepítés esetén ellenőrizze, hogy más függőségek is telepítve vannak-e:
Hitelesítés
A Kusto Spark-összekötő lehetővé teszi a Microsoft Entra-azonosítóval való hitelesítést az alábbi módszerek egyikével:
- Microsoft Entra-alkalmazás
- Microsoft Entra hozzáférési jogkivonat
- Eszközhitelesítés (nem gyártási forgatókönyvekhez)
- Azure Key Vault eléréséhez telepítse az azure-keyvault csomagot, és adja meg az alkalmazás hitelesítő adatait.
Microsoft Entra-alkalmazáshitelesítés
A Microsoft Entra alkalmazáshitelesítés a legegyszerűbb és leggyakoribb hitelesítési módszer, és a Kusto Spark-összekötőhöz ajánlott.
Jelentkezzen be az Azure-előfizetésbe az Azure CLI-vel. Ezután hitelesítés a böngészőben.
az loginVálassza ki az előfizetést a fő tartalomgazda hosztolásához. Erre a lépésre akkor van szükség, ha több előfizetéssel rendelkezik.
az account set --subscription YOUR_SUBSCRIPTION_GUIDHozza létre a szolgáltatásfőt. Ebben a példában a szolgáltatásazonosítót
my-service-principalnéven nevezik.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}A visszaadott JSON-adatokból másolja ki a
appId,passwordéstenantkésőbbi használatra.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Létrehozta a Microsoft Entra-alkalmazást és a szolgáltatás főszereplőjét.
A Spark-összekötő a hitelesítéshez a következő Entra-alkalmazástulajdonságokat használja:
| Tulajdonságok | Beállítási sztring | Leírás |
|---|---|---|
| KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra-alkalmazás (ügyfél) azonosítója. |
| KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra hitelesítési szolgáltató. Microsoft Entra Directory (tenant) azonosítója. Nem kötelező – alapértelmezés szerint microsoft.com. További információ: Microsoft Entra authority. |
| KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra alkalmazáskulcs az ügyfélhez. |
| KUSTO_ACCESS_TOKEN | kustoAccessToken | Ha már rendelkezik olyan accessToken-nel, amely Kusto-hozzáféréssel jött létre, azt hitelesítés céljából a csatlakozónak is átadhatja. |
Megjegyzés
A régebbi (2.0.0-nál kisebb) API-verziók a következő elnevezéssel rendelkeznek: "kustoAAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Kusto-jogosultságok
Adja meg a következő jogosultságokat a kusto oldalon a végrehajtani kívánt Spark-művelet alapján.
| Spark-művelet | Jogosultságok |
|---|---|
| Olvasás – Egyéni mód | Olvasó |
| Olvasás – Elosztott mód kényszerítése | Olvasó |
| Írás – Várakozási mód a CreateTableIfNotExist táblalétrehozási lehetőséggel | Rendszergazda |
| Írás – Várólistás mód a FailIfNotExist táblalétrehozási lehetőséggel | Ingestor |
| Írás – TransactionalMode | Rendszergazda |
További információ a fő szerepkörökről: szerepköralapú hozzáférés-vezérlés. A biztonsági szerepkörök kezelésével kapcsolatban lásd a biztonsági szerepkörök kezelését.
Spark sink: írás Kusto-ba
Fogadóparaméterek beállítása:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"Spark DataFrame írása a Kusto-fürtbe kötegként:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()Vagy használja az egyszerűsített szintaxist:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ // Optional, for any extra options: val conf: Map[String, String] = Map() val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)Streamelési adatok írása:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // As an alternative to adding .option by .option, you can provide a map: val conf: Map[String, String] = Map( KustoSinkOptions.KUSTO_CLUSTER -> cluster, KustoSinkOptions.KUSTO_TABLE -> table, KustoSinkOptions.KUSTO_DATABASE -> database, KustoSourceOptions.KUSTO_ACCESS_TOKEN -> accessToken) // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark forrás: Kustóból való olvasás
Kis mennyiségű adat beolvasásakor adja meg az adat lekérdezést:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)Nem kötelező: Ha az átmeneti blobtárolót adja meg (és nem a Kusto-t), a blobok a hívó felelőssége alatt jönnek létre. Ez magában foglalja a tároló kiépítését, a hozzáférési kulcsok elforgatását és az átmeneti összetevők törlését. A KustoBlobStorageUtils modul segédfüggvényeket tartalmaz a blobok törléséhez, amelyek vagy a fiók- és tárolókoordináták és fiók hitelesítő adatai alapján, vagy egy teljes SAS URL-cím használatával, írási, olvasási és listaengedélyekkel működnek. Ha már nincs szükség a megfelelő RDD-re, minden tranzakció egy külön könyvtárban tárolja az átmeneti blobösszetevőket. Ez a könyvtár a Spark-illesztőprogram csomóponton jelentett olvasási tranzakciós információs naplók részeként rögzítésre kerül.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")A fenti példában a Key Vault nem az összekötő felületén keresztül van elérve; a Databricks titkok használata történik egyszerűbb módon.
Olvass a Kusto-ból.
Ha megadja az átmeneti blobtárolót, az alábbiak szerint olvassa el a Kusto adatait:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)Ha a Kusto biztosítja az átmeneti blobtárolót, olvasson a Kustóból az alábbiak szerint:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)