Azure Data Explorer Connector for Apache Spark
Apache Spark is a unified analytics engine for large-scale data processing. Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data.
The Kusto connector for Spark is an open source project that can run on any Spark cluster. It implements data source and data sink for moving data across Azure Data Explorer and Spark clusters. Using Azure Data Explorer and Apache Spark, you can build fast and scalable applications targeting data driven scenarios. For example, machine learning (ML), Extract-Transform-Load (ETL), and Log Analytics. With the connector, Azure Data Explorer becomes a valid data store for standard Spark source and sink operations, such as write, read, and writeStream.
You can write to Azure Data Explorer via queued ingestion or streaming ingestion. Reading from Azure Data Explorer supports column pruning and predicate pushdown, which filters the data in Azure Data Explorer, reducing the volume of transferred data.
Note
For information about working with the Synapse Spark connector for Azure Data Explorer, see Connect to Azure Data Explorer using Apache Spark for Azure Synapse Analytics.
This topic describes how to install and configure the Azure Data Explorer Spark connector and move data between Azure Data Explorer and Apache Spark clusters.
Note
Although some of the examples below refer to an Azure Databricks Spark cluster, Azure Data Explorer Spark connector does not take direct dependencies on Databricks or any other Spark distribution.
Prerequisites
- An Azure subscription. Create a free Azure account.
- An Azure Data Explorer cluster and database. Create a cluster and database.
- A Spark cluster
- Install connector library:
- Pre-built libraries for Spark 2.4+Scala 2.11 or Spark 3+scala 2.12
- Maven repo
- Maven 3.x installed
Tip
Spark 2.3.x versions are also supported, but may require some changes in pom.xml dependencies.
How to build the Spark connector
Starting version 2.3.0 we introduce new artifact Ids replacing spark-kusto-connector: kusto-spark_3.0_2.12 targeting Spark 3.x and Scala 2.12.
Note
Versions prior to 2.5.1 do not work anymore for ingest to an existing table, please update to a later version. This step is optional. If you are using pre-built libraries, for example, Maven, see Spark cluster setup.
Build prerequisites
Refer to this source for building the Spark Connector.
For Scala/Java applications using Maven project definitions, link your application with the latest artifact. Find the latest artifact on Maven Central.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
If you aren't using prebuilt libraries, you need to install the libraries listed in dependencies including the following Kusto Java SDK libraries. To find the right version to install, look in the relevant release's pom:
To build jar and run all tests:
mvn clean package -DskipTests
To build jar, run all tests, and install jar to your local Maven repository:
mvn clean install -DskipTests
For more information, see connector usage.
Spark cluster setup
Note
It's recommended to use the latest Kusto Spark connector release when performing the following steps.
Configure the following Spark cluster settings, based on Azure Databricks cluster Spark 3.0.1 and Scala 2.12:
Install the latest spark-kusto-connector library from Maven:
Verify that all required libraries are installed:
For installation using a JAR file, verify that other dependencies were installed:
Authentication
Kusto Spark connector enables you to authenticate with Microsoft Entra ID using one of the following methods:
- An Microsoft Entra application
- An Microsoft Entra access token
- Device authentication (for nonproduction scenarios)
- An Azure Key Vault To access the Key Vault resource, install the azure-keyvault package and provide application credentials.
Microsoft Entra application authentication
Microsoft Entra application authentication is the simplest and most common authentication method and is recommended for the Kusto Spark connector.
Sign in to your Azure subscription via Azure CLI. Then authenticate in the browser.
az login
Choose the subscription to host the principal. This step is needed when you have multiple subscriptions.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Create the service principal. In this example, the service principal is called
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
From the returned JSON data, copy the
appId
,password
, andtenant
for future use.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
You've created your Microsoft Entra application and service principal.
The Spark connector uses the following Entra app properties for authentication:
Properties | Option String | Description |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra application (client) identifier. |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra authentication authority. Microsoft Entra Directory (tenant) ID. Optional - defaults to microsoft.com. For more information, see Microsoft Entra authority. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra application key for the client. |
KUSTO_ACCESS_TOKEN | kustoAccessToken | If you already have an accessToken that is created with access to Kusto, that can be used passed to the connector as well for authentication. |
Note
Older API versions (less than 2.0.0) have the following naming: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Kusto privileges
Grant the following privileges on the kusto side based on the Spark operation you wish to perform.
Spark operation | Privileges |
---|---|
Read - Single Mode | Reader |
Read – Force Distributed Mode | Reader |
Write – Queued Mode with CreateTableIfNotExist table create option | Admin |
Write – Queued Mode with FailIfNotExist table create option | Ingestor |
Write – TransactionalMode | Admin |
For more information on principal roles, see role-based access control. For managing security roles, see security roles management.
Spark sink: writing to Kusto
Set up sink parameters:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Write Spark DataFrame to Kusto cluster as batch:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Or use the simplified syntax:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Write streaming data:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark source: reading from Kusto
When reading small amounts of data, define the data query:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Optional: If you provide the transient blob storage (and not Kusto) the blobs are created under the caller's responsibility. This includes provisioning the storage, rotating access keys, and deleting transient artifacts. The KustoBlobStorageUtils module contains helper functions for deleting blobs based on either account and container coordinates and account credentials, or a full SAS URL with write, read, and list permissions. When the corresponding RDD is no longer needed, each transaction stores transient blob artifacts in a separate directory. This directory is captured as part of read-transaction information logs reported on the Spark Driver node.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
In the example above, the Key Vault isn't accessed using the connector interface; a simpler method of using the Databricks secrets is used.
Read from Kusto.
If you provide the transient blob storage, read from Kusto as follows:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
If Kusto provides the transient blob storage, read from Kusto as follows:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)