Create an Event Grid data connection for Azure Data Explorer with SDKs
In this article, you learn how to ingest blobs from your storage account into Azure Data Explorer using an Event Grid data connection. You'll create an Event Grid data connection that sets an Azure Event Grid subscription. The Event Grid subscription routes events from your storage account to Azure Data Explorer via an Azure Event Hubs.
To learn how to create the connection in the Azure portal or with an ARM template, see Create an Event Grid data connection.
For general information about ingesting into Azure Data Explorer from Event Grid, see Connect to Event Grid.
Note
To achieve the best performance with the Event Grid connection, set the rawSizeBytes
ingestion property via the blob metadata. For more information, see ingestion properties.
For code samples based on previous SDK versions, see the archived article.
Prerequisites
- An Azure subscription. Create a free Azure account.
- An Azure Data Explorer cluster and database. Create a cluster and database.
- A destination table. Create a table or use an existing table.
- An ingestion mapping for the table.
- A storage account. An Event Grid notification subscription can be set on Azure Storage accounts for
BlobStorage
,StorageV2
, or Data Lake Storage Gen2. - Have the Event Grid resource provider registered.
Create an Event Grid data connection
In this section, you'll establish a connection between Event Grid and your Azure Data Explorer table.
Install the Microsoft.Azure.Management.Kusto NuGet package.
Create a Microsoft Entra application principal to use for authentication. You'll need the directory (tenant) ID, application ID, and client secret.
Run the following code.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID var clientSecret = "PlaceholderClientSecret"; //Client Secret var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret); var resourceManagementClient = new ArmClient(credentials, subscriptionId); var resourceGroupName = "testrg"; //The cluster and database that are created as part of the Prerequisites var clusterName = "mykustocluster"; var databaseName = "mykustodatabase"; var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync(); var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value; var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value; var database = (await cluster.GetKustoDatabaseAsync(databaseName)).Value; var dataConnections = database.GetKustoDataConnections(); var eventGridConnectionName = "myeventgridconnect"; //The event hub and storage account that are created as part of the Prerequisites var eventHubResourceId = new ResourceIdentifier("/subscriptions/<storageAccountSubscriptionId>/resourceGroups/<storageAccountResourceGroupName>/providers/Microsoft.Storage/storageAccounts/<storageAccountName>"); var storageAccountResourceId = new ResourceIdentifier("/subscriptions/<eventHubSubscriptionId>/resourceGroups/<eventHubResourceGroupName>/providers/Microsoft.EventHub/namespaces/<eventHubNamespaceName>/eventhubs/<eventHubName>"); var consumerGroup = "$Default"; var location = AzureLocation.CentralUS; //The table and column mapping are created as part of the Prerequisites var tableName = "StormEvents"; var mappingRuleName = "StormEvents_CSV_Mapping"; var dataFormat = KustoEventGridDataFormat.Csv; var blobStorageEventType = BlobStorageEventType.MicrosoftStorageBlobCreated; var databaseRouting = KustoDatabaseRouting.Multi; var eventGridConnectionData = new KustoEventGridDataConnection { StorageAccountResourceId = storageAccountResourceId, EventHubResourceId = eventHubResourceId, ConsumerGroup = consumerGroup, TableName = tableName, Location = location, MappingRuleName = mappingRuleName, DataFormat = dataFormat, BlobStorageEventType = blobStorageEventType, DatabaseRouting = databaseRouting }; await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, eventGridConnectionName, eventGridConnectionData);
Setting Suggested value Field description tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Your tenant ID. Also known as directory ID. subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx The subscription ID that you use for resource creation. clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx The client ID of the application that can access resources in your tenant. clientSecret PlaceholderClientSecret The client secret of the application that can access resources in your tenant. resourceGroupName testrg The name of the resource group containing your cluster. clusterName mykustocluster The name of your cluster. databaseName mykustodatabase The name of the target database in your cluster. eventGridConnectionName myeventgridconnect The desired name of your data connection. tableName StormEvents The name of the target table in the target database. mappingRuleName StormEvents_CSV_Mapping The name of your column mapping related to the target table. dataFormat csv The data format of the message. eventHubResourceId Resource ID The resource ID of your event hub where the Event Grid is configured to send events. storageAccountResourceId Resource ID The resource ID of your storage account that holds the data for ingestion. consumerGroup $Default The consumer group of your event hub. location Central US The location of the data connection resource. blobStorageEventType Microsoft.Storage.BlobCreated The type of event that triggers ingestion. Supported events are: Microsoft.Storage.BlobCreated or Microsoft.Storage.BlobRenamed. Blob renaming is supported only for ADLSv2 storage. databaseRouting Multi or Single The database routing for the connection. If you set the value to Single, the data connection will be routed to a single database in the cluster as specified in the databaseName setting. If you set the value to Multi, you can override the default target database using the Database ingestion property. For more information, see Events routing.
Use the Event Grid data connection
This section shows how to trigger ingestion from Azure Blob Storage or Azure Data Lake Gen 2 to your cluster following blob creation or blob renaming.
Select the relevant tab based on the type of storage SDK used to upload blobs.
The following code sample uses the Azure Blob Storage SDK to upload a file to Azure Blob Storage. The upload triggers the Event Grid data connection, which ingests the data into Azure Data Explorer.
var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();
Note
Azure Data Explorer won't delete the blobs post ingestion. Retain the blobs for three to five days by using Azure Blob storage lifecycle to manage blob deletion.
Note
Triggering ingestion following a CopyBlob
operation is not supported for storage accounts that have the hierarchical namespace feature enabled on them.
Important
We highly discourage generating Storage Events from custom code and sending them to Event Hubs. If you choose to do so, make sure that the events produced strictly adhere to the appropriate Storage Events schema and JSON format specifications.
Remove an Event Grid data connection
To remove the Event Grid connection, run the following command:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);