Azure Data Explorer Connector voor Apache Spark
Belangrijk
Deze connector kan worden gebruikt in realtime analyses in Microsoft Fabric. Gebruik de instructies in dit artikel met de volgende uitzonderingen:
- Maak indien nodig databases aan de hand van de instructies in Een KQL-database maken.
- Maak indien nodig tabellen aan de hand van de instructies in Een lege tabel maken.
- Haal query- of opname-URI's op met behulp van de instructies in URI kopiëren.
- Query's uitvoeren in een KQL-queryset.
Apache Spark is een geïntegreerde analyse-engine voor gegevensverwerking op grote schaal. Azure Data Explorer is een snelle, volledig beheerde service voor gegevensanalyses waarmee grote hoeveelheden gegevens in real-time kunnen worden geanalyseerd.
De Azure Data Explorer-connector voor Spark is een open source-project dat kan worden uitgevoerd op elk Spark-cluster. Het implementeert gegevensbron en gegevenssink voor het verplaatsen van gegevens tussen Azure Data Explorer- en Spark-clusters. Met Azure Data Explorer en Apache Spark kunt u snelle en schaalbare toepassingen bouwen die zijn gericht op gegevensgestuurde scenario's. Bijvoorbeeld machine learning (ML), Extract-Transform-Load (ETL) en Log Analytics. Met de connector wordt Azure Data Explorer een geldig gegevensarchief voor standaard spark-bron- en sinkbewerkingen, zoals write, read en writeStream.
U kunt schrijven naar Azure Data Explorer via opname in de wachtrij of streamingopname. Lezen vanuit Azure Data Explorer ondersteunt kolomsnoeien en pushdown van predicaat, waarmee de gegevens in Azure Data Explorer worden gefilterd, waardoor het volume van de overgedragen gegevens wordt verminderd.
Notitie
Zie Connect to Azure Data Explorer using Apache Spark for Azure Synapse Analytics (Verbinding maken met Azure Data Explorer met behulp van Apache Spark voor Azure Synapse Analytics) voor meer informatie over het werken met de Synapse Spark-connector voor Azure Data Explorer.
In dit onderwerp wordt beschreven hoe u de Azure Data Explorer Spark-connector installeert en configureert en gegevens verplaatst tussen Azure Data Explorer- en Apache Spark-clusters.
Notitie
Hoewel sommige van de onderstaande voorbeelden verwijzen naar een Azure Databricks Spark-cluster, maakt Azure Data Explorer Spark-connector geen directe afhankelijkheden van Databricks of een andere Spark-distributie.
Vereisten
- Een Azure-abonnement. Maak een gratis Azure-account.
- Een Azure Data Explorer-cluster en -database. Maak een cluster en database.
- Een Spark-cluster
- Azure Data Explorer-connectorbibliotheek installeren:
- Vooraf gebouwde bibliotheken voor Spark 2.4+Scala 2.11 of Spark 3+scala 2.12
- Maven-opslagplaats
- Maven 3.x geïnstalleerd
Tip
Spark 2.3.x-versies worden ook ondersteund, maar mogelijk zijn enkele wijzigingen in pom.xml afhankelijkheden vereist.
De Spark-connector bouwen
Vanaf versie 2.3.0 introduceren we nieuwe artefact-id's ter vervanging van spark-kusto-connector: kusto-spark_3.0_2.12 gericht op Spark 3.x en Scala 2.12 en kusto-spark_2.4_2.11 gericht op Spark 2.4.x en scala 2.11.
Notitie
Versies ouder dan 2.5.1 werken niet meer voor opname in een bestaande tabel. Werk deze bij naar een latere versie. Deze stap is optioneel. Als u vooraf gebouwde bibliotheken gebruikt, bijvoorbeeld Maven, raadpleegt u Spark-cluster instellen.
Vereisten voor bouwen
Als u geen vooraf gebouwde bibliotheken gebruikt, moet u de bibliotheken installeren die worden vermeld in afhankelijkheden , waaronder de volgende Kusto Java SDK-bibliotheken . Als u de juiste versie wilt vinden om te installeren, kijkt u in de pom van de relevante release:
Raadpleeg deze bron voor het bouwen van de Spark-connector.
Voor Scala-/Java-toepassingen die gebruikmaken van Maven-projectdefinities, koppelt u uw toepassing aan het volgende artefact (de nieuwste versie kan verschillen):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Build-opdrachten
Ga als volgende te werk om JAR te bouwen en alle tests uit te voeren:
mvn clean package
Als u jar wilt bouwen, voert u alle tests uit en installeert u JAR in uw lokale Maven-opslagplaats:
mvn clean install
Zie connectorgebruik voor meer informatie.
Spark-cluster instellen
Notitie
Het wordt aanbevolen om de nieuwste versie van de Azure Data Explorer Spark-connector te gebruiken bij het uitvoeren van de volgende stappen.
Configureer de volgende Spark-clusterinstellingen op basis van een Azure Databricks-cluster met Behulp van Spark 2.4.4 en Scala 2.11 of Spark 3.0.1 en Scala 2.12:
Installeer de nieuwste spark-kusto-connectorbibliotheek van Maven:
Controleer of alle vereiste bibliotheken zijn geïnstalleerd:
Controleer voor de installatie met behulp van een JAR-bestand of er aanvullende afhankelijkheden zijn geïnstalleerd:
Verificatie
Met de Azure Data Explorer Spark-connector kunt u zich verifiëren met Microsoft Entra-id met behulp van een van de volgende methoden:
- Een Microsoft Entra-toepassing
- Een Microsoft Entra-toegangstoken
- Apparaatverificatie (voor niet-productiescenario's)
- Een Azure-Key Vault Voor toegang tot de Key Vault-resource installeert u het azure-keyvault-pakket en geeft u de toepassingsreferenties op.
Microsoft Entra toepassingsverificatie
Microsoft Entra toepassingsverificatie is de eenvoudigste en meest voorkomende verificatiemethode en wordt aanbevolen voor de Azure Data Explorer Spark-connector.
Eigenschappen | Optiereeks | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra toepassings-id (client). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra verificatie-instantie. Microsoft Entra map-id (tenant). Optioneel: standaard ingesteld op microsoft.com. Zie Microsoft Entra instantie voor meer informatie. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra toepassingssleutel voor de client. |
Notitie
Oudere API-versies (kleiner dan 2.0.0) hebben de volgende naamgeving: 'kustoAADClientID', 'kustoClientAADClientPassword', 'kustoAADAuthorityID'
Azure Data Explorer-bevoegdheden
Verdeel de volgende bevoegdheden voor een Azure Data Explorer-cluster:
- Voor het lezen (gegevensbron) moet de Microsoft Entra identiteit viewerbevoegdheden hebben voor de doeldatabase of beheerdersbevoegdheden voor de doeltabel.
- Voor het schrijven (gegevenssink) moet de Microsoft Entra identiteit ingestor-bevoegdheden hebben voor de doeldatabase. Het moet ook gebruikersbevoegdheden hebben voor de doeldatabase om nieuwe tabellen te kunnen maken. Als de doeltabel al bestaat, moet u beheerdersbevoegdheden voor de doeltabel configureren.
Zie op rollen gebaseerd toegangsbeheer voor meer informatie over Azure Data Explorer belangrijkste rollen. Zie Beheer van beveiligingsrollen voor het beheren van beveiligingsrollen.
Spark-sink: schrijven naar Azure Data Explorer
Sinkparameters instellen:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Spark DataFrame als batch naar Azure Data Explorer-cluster schrijven:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Of gebruik de vereenvoudigde syntaxis:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Streaminggegevens schrijven:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark-bron: lezen uit Azure Data Explorer
Wanneer u kleine hoeveelheden gegevens leest, definieert u de gegevensquery:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Optioneel: als u de tijdelijke blobopslag opgeeft (en niet Azure Data Explorer), worden de blobs gemaakt onder de verantwoordelijkheid van de aanroeper. Dit omvat het inrichten van de opslag, het roteren van toegangssleutels en het verwijderen van tijdelijke artefacten. De KustoBlobStorageUtils-module bevat helperfuncties voor het verwijderen van blobs op basis van account- en containercoördinaten en accountreferenties, of een volledige SAS-URL met schrijf-, lees- en lijstmachtigingen. Wanneer de bijbehorende RDD niet meer nodig is, slaat elke transactie tijdelijke blobartefacten op in een afzonderlijke map. Deze map wordt vastgelegd als onderdeel van logboeken met leestransactiegegevens die worden gerapporteerd op het Spark-stuurprogrammaknooppunt.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
In het bovenstaande voorbeeld wordt de Key Vault niet geopend via de connectorinterface; een eenvoudigere methode voor het gebruik van de Databricks-geheimen wordt gebruikt.
Lees uit Azure Data Explorer.
Als u de tijdelijke blobopslag opgeeft, leest u als volgt uit Azure Data Explorer:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Als Azure Data Explorer de tijdelijke blobopslag biedt, leest u als volgt uit Azure Data Explorer:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)