Konektor Azure Data Explorer pro Apache Spark
Důležité
Tento konektor je možné použít v analýzách v reálném čase v Microsoft Fabric. Postupujte podle pokynů v tomto článku s následujícími výjimkami:
- V případě potřeby vytvořte databáze podle pokynů v tématu Vytvoření databáze KQL.
- V případě potřeby vytvořte tabulky podle pokynů v tématu Vytvoření prázdné tabulky.
- Získejte identifikátory URI dotazů nebo příjmu dat podle pokynů v tématu Kopírování identifikátoru URI.
- Spouštění dotazů v sadě dotazů KQL
Apache Spark je jednotný analytický modul pro zpracování dat ve velkém měřítku. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.
Konektor Azure Data Explorer pro Spark je projekt open source, který může běžet na libovolném clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi clustery Azure Data Explorer a Spark. Pomocí Azure Data Explorer a Apache Sparku můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty. Například strojové učení (ML), extrakce, transformace a načítání (ETL) a Log Analytics. S konektorem se Azure Data Explorer stane platným úložištěm dat pro standardní operace se zdrojem Sparku a jímkou, jako je zápis, čtení a zápisstream.
Do Azure Data Explorer můžete zapisovat prostřednictvím příjmu dat ve frontě nebo příjmu streamování. Čtení z Azure Data Explorer podporuje vyřezávání sloupců a nabízení predikátů, které filtruje data v Azure Data Explorer a snižuje objem přenášených dat.
Poznámka
Informace o práci s konektorem Synapse Spark pro Azure Data Explorer najdete v tématu Připojení k Azure Data Explorer pomocí Apache Sparku pro Azure Synapse Analytics.
Toto téma popisuje, jak nainstalovat a nakonfigurovat konektor Azure Data Explorer Spark a přesouvat data mezi clustery Azure Data Explorer a Apache Spark.
Poznámka
I když některé z níže uvedených příkladů odkazují na cluster Azure Databricks Spark, konektor Azure Data Explorer Spark nepřebírají přímé závislosti na Databricks ani žádné jiné distribuci Sparku.
Požadavky
- Předplatné Azure. Vytvořte si bezplatný účet Azure.
- Cluster a databáze Azure Data Explorer. Vytvořte cluster a databázi.
- Cluster Spark
- Nainstalujte knihovnu konektoru Azure Data Explorer:
- Předem připravené knihovny pro Spark 2.4+Scala 2.11 nebo Spark 3+scala 2.12
- Úložiště Maven
- Nainstalovaný Maven 3.x
Tip
Podporují se také verze Sparku 2.3.x, ale můžou vyžadovat určité změny v pom.xml závislostech.
Postup sestavení konektoru Spark
Od verze 2.3.0 zavádíme nová ID artefaktů nahrazující spark-kusto-connector: kusto-spark_3.0_2.12 , která cílí na Spark 3.x a Scala 2.12 a kusto-spark_2.4_2.11 , která cílí na Spark 2.4.x a scala 2.11.
Poznámka
Verze starší než 2.5.1 už nefungují pro příjem existující tabulky. Aktualizujte prosím na novější verzi. Tento krok je volitelný. Pokud používáte předem připravené knihovny, například Maven, přečtěte si téma Nastavení clusteru Spark.
Požadavky na sestavení
Pokud nepoužíváte předem připravené knihovny, musíte nainstalovat knihovny uvedené v závislostech , včetně následujících knihoven Sady Kusto Java SDK . Pokud chcete najít správnou verzi k instalaci, podívejte se do pomu příslušné verze:
Informace o vytvoření konektoru Spark najdete v tomto zdroji .
V případě aplikací Scala/Java využívajících definice projektů Maven propojte aplikaci s následujícím artefaktem (nejnovější verze se může lišit):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Příkazy sestavení
Sestavení jar a spuštění všech testů:
mvn clean package
Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor jar do místního úložiště Maven:
mvn clean install
Další informace najdete v tématu Využití konektoru.
Nastavení clusteru Spark
Poznámka
Při provádění následujících kroků doporučujeme použít nejnovější verzi konektoru Azure Data Explorer Spark.
Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks pomocí Sparku 2.4.4 a Scaly 2.11 nebo Sparku 3.0.1 a Scaly 2.12:
Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:
Ověřte, že jsou nainstalované všechny požadované knihovny:
Při instalaci pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:
Authentication
Konektor Azure Data Explorer Spark umožňuje ověřování pomocí Microsoft Entra ID pomocí jedné z následujících metod:
- Aplikace Microsoft Entra
- Přístupový token Microsoft Entra
- Ověřování zařízení (pro neprodukční scénáře)
- Azure Key Vault Pokud chcete získat přístup k prostředku Key Vault, nainstalujte balíček azure-keyvault a zadejte přihlašovací údaje aplikace.
Microsoft Entra ověřování aplikací
Microsoft Entra ověřování aplikací je nejjednodušší a nejběžnější metoda ověřování, která se doporučuje pro konektor Azure Data Explorer Spark.
Vlastnosti | Řetězec možnosti | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra identifikátor aplikace (klienta). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra ověřovací autoritu. MICROSOFT ENTRA ID adresáře (tenanta). Volitelné – výchozí hodnota je microsoft.com. Další informace najdete v tématu Microsoft Entra autorita. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra klíč aplikace pro klienta. |
Poznámka
Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Oprávnění Azure Data Explorer
Udělte clusteru Azure Data Explorer následující oprávnění:
- Pro čtení (zdroj dat) musí mít identita Microsoft Entra oprávnění čtenáře k cílové databázi nebo oprávnění správce k cílové tabulce.
- Pro zápis (datová jímka) musí mít Microsoft Entra identita oprávnění ingestora k cílové databázi. Aby bylo možné vytvářet nové tabulky, musí mít také uživatelská oprávnění k cílové databázi. Pokud cílová tabulka již existuje, musíte pro cílovou tabulku nakonfigurovat oprávnění správce .
Další informace o rolích objektů zabezpečení Azure Data Explorer najdete v tématu Řízení přístupu na základě role. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.
Jímka Sparku: zápis do Azure Data Explorer
Nastavení parametrů jímky:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Zapište datový rámec Sparku do clusteru Azure Data Explorer dávkově:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Nebo použijte zjednodušenou syntaxi:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Zápis streamovaných dat:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Zdroj Sparku: Čtení z Azure Data Explorer
Při čtení malých objemů dat definujte datový dotaz:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Volitelné: Pokud zadáte přechodné úložiště objektů blob (a ne Azure Data Explorer), vytvoří se objekty blob na odpovědnost volajícího. To zahrnuje zřízení úložiště, obměnu přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstranění objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a výpisu. Pokud už odpovídající sada RDD není potřeba, ukládá každá transakce přechodné artefakty objektů blob v samostatném adresáři. Tento adresář je zachycen jako součást protokolů s informacemi o transakcích pro čtení hlášených v uzlu ovladače Sparku.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
Ve výše uvedeném příkladu se k Key Vault nepřistupuje pomocí rozhraní konektoru. Používá se jednodušší metoda použití tajných kódů Databricks.
Čtení z Azure Data Explorer.
Pokud zadáte přechodné úložiště objektů blob, přečtěte si z Azure Data Explorer následujícím způsobem:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Pokud Azure Data Explorer poskytuje přechodné úložiště objektů blob, přečtěte si z Azure Data Explorer následujícím způsobem:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Související obsah
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro