End-to-end blob ingestion into Azure Data Explorer

Azure Data Explorer is a fast and scalable data exploration service for log and telemetry data. This article gives you an end-to-end example of how to ingest data from Azure Blob storage into Azure Data Explorer.

You'll learn how to programmatically create a resource group, a storage account and container, an event hub, and an Azure Data Explorer cluster and database. You'll also learn how to programmatically configure Azure Data Explorer to ingest data from the new storage account.

Prerequisites

Install packages

This article contains examples in C# and Python. Choose the tab for your preferred language, and install the required packages.

Azure Resource Manager template

In this article, you use an Azure Resource Manager (ARM) template to create a resource group, a storage account and container, an event hub, and an Azure Data Explorer cluster and database. Save the following content in a file with the name template.json. You'll use this file to run the code example.

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "eventHubNamespaceName": {
            "type": "string",
            "metadata": {
                "description": "Specifies a the event hub Namespace name."
            }
        },
        "eventHubName": {
            "type": "string",
            "metadata": {
                "description": "Specifies a event hub name."
            }
        },
        "storageAccountType": {
            "type": "string",
            "defaultValue": "Standard_LRS",
            "allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
            "metadata": {
                "description": "Storage Account type"
            }
        },
        "storageAccountName": {
            "type": "string",
            "defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the storage account to create"
            }
        },
        "containerName": {
            "type": "string",
            "defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the container in storage account to create"
            }
        },
        "eventHubSku": {
            "type": "string",
            "allowedValues": ["Basic", "Standard"],
            "defaultValue": "Standard",
            "metadata": {
                "description": "Specifies the messaging tier for service Bus namespace."
            }
        },
        "kustoClusterName": {
            "type": "string",
            "defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the cluster to create"
            }
        },
        "kustoDatabaseName": {
            "type": "string",
            "defaultValue": "kustodb",
            "metadata": {
                "description": "Name of the database to create"
            }
        },
        "clusterPrincipalAssignmentName": {
            "type": "string",
            "defaultValue": "clusterPrincipalAssignment1",
            "metadata": {
                "description": "Specifies the name of the principal assignment"
            }
        },
        "principalIdForCluster": {
            "type": "string",
            "metadata": {
                "description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
            }
        },
        "roleForClusterPrincipal": {
            "type": "string",
            "defaultValue": "AllDatabasesViewer",
            "metadata": {
                "description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
                'AllDatabasesMonitor' or 'AllDatabasesViewer'"
            }
        },
        "tenantIdForClusterPrincipal": {
            "type": "string",
            "metadata": {
                "description": "Specifies the tenantId of the cluster principal"
            }
        },
        "principalTypeForCluster": {
            "type": "string",
            "defaultValue": "App",
            "metadata": {
                "description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
            }
        },
        "databasePrincipalAssignmentName": {
            "type": "string",
            "defaultValue": "databasePrincipalAssignment1",
            "metadata": {
                "description": "Specifies the name of the principal assignment"
            }
        },
        "principalIdForDatabase": {
            "type": "string",
            "metadata": {
                "description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
            }
        },
        "roleForDatabasePrincipal": {
            "type": "string",
            "defaultValue": "Admin",
            "metadata": {
                "description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
            }
        },
        "tenantIdForDatabasePrincipal": {
            "type": "string",
            "metadata": {
                "description": "Specifies the tenantId of the database principal"
            }
        },
        "principalTypeForDatabase": {
            "type": "string",
            "defaultValue": "App",
            "metadata": {
                "description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
            }
        },
        "location": {
            "type": "string",
            "defaultValue": "[resourceGroup().location]",
            "metadata": {
                "description": "Location for all resources."
            }
        }
    },
    "variables": {
    },
    "resources": [{
            "apiVersion": "2017-04-01",
            "type": "Microsoft.EventHub/namespaces",
            "name": "[parameters('eventHubNamespaceName')]",
            "location": "[parameters('location')]",
            "sku": {
                "name": "[parameters('eventHubSku')]",
                "tier": "[parameters('eventHubSku')]",
                "capacity": 1
            },
            "properties": {
                "isAutoInflateEnabled": false,
                "maximumThroughputUnits": 0
            }
        }, {
            "apiVersion": "2017-04-01",
            "type": "Microsoft.EventHub/namespaces/eventhubs",
            "name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
            "location": "[parameters('location')]",
            "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
            "properties": {
                "messageRetentionInDays": 7,
                "partitionCount": 1
            }
        }, {
            "type": "Microsoft.Storage/storageAccounts",
            "name": "[parameters('storageAccountName')]",
            "location": "[parameters('location')]",
            "apiVersion": "2018-07-01",
            "sku": {
                "name": "[parameters('storageAccountType')]"
            },
            "kind": "StorageV2",
            "resources": [
                {
                    "name": "[concat('default/', parameters('containerName'))]",
                    "type": "blobServices/containers",
                    "apiVersion": "2018-07-01",
                    "dependsOn": [
                        "[parameters('storageAccountName')]"
                    ],
                    "properties": {
                        "publicAccess": "None"
                    }
                }
            ],
            "properties": {}
        }, {
            "name": "[parameters('kustoClusterName')]",
            "type": "Microsoft.Kusto/clusters",
            "sku": {
                "name": "Standard_E8ads_v5",
                "tier": "Standard",
                "capacity": 2
            },
            "apiVersion": "2019-09-07",
            "location": "[parameters('location')]",
            "tags": {
                "Created By": "GitHub quickstart template"
            }
        }, {
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
            "type": "Microsoft.Kusto/clusters/databases",
            "apiVersion": "2019-09-07",
            "location": "[parameters('location')]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
            "properties": {
                "softDeletePeriodInDays": 365,
                "hotCachePeriodInDays": 31
            }
        }, {
            "type": "Microsoft.Kusto/Clusters/principalAssignments",
            "apiVersion": "2019-11-09",
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
            "properties": {
                "principalId": "[parameters('principalIdForCluster')]",
                "role": "[parameters('roleForClusterPrincipal')]",
                "tenantId": "[parameters('tenantIdForClusterPrincipal')]",
                "principalType": "[parameters('principalTypeForCluster')]"
            }
        }, {
            "type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
            "apiVersion": "2019-11-09",
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
            "properties": {
                "principalId": "[parameters('principalIdForDatabase')]",
                "role": "[parameters('roleForDatabasePrincipal')]",
                "tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
                "principalType": "[parameters('principalTypeForDatabase')]"
            }
        }
    ]
}

Code example

The following code example gives you a step-by-step process that results in data ingestion into Azure Data Explorer.

You first create a resource group. You also create Azure resources such as a storage account and container, an event hub, and an Azure Data Explorer cluster and database, and add principals. You then create an Azure Event Grid subscription, along with a table and column mapping, in the Azure Data Explorer database. Finally, you create the data connection to configure Azure Data Explorer to ingest data from the new storage account.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = await ApplicationTokenProvider.LoginSilentAsync(tenantId, clientId, clientSecret);
var resourceManagementClient = new ResourceManagementClient(credentials) { SubscriptionId = subscriptionId };
var deploymentName = "e2eexample";
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
var resourceGroupName = deploymentName + "resourcegroup";
var location = "West Europe";
var resourceGroupData = new ResourceGroup(location);
await resourceManagementClient.ResourceGroups.CreateOrUpdateAsync(resourceGroupName, resourceGroupData);
Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous section
var eventHubName = deploymentName + "eventhub";
var eventHubNamespaceName = eventHubName + "ns";
var storageAccountName = deploymentName + "storage";
var storageContainerName = deploymentName + "storagecontainer";
var eventGridSubscriptionName = deploymentName + "eventgrid";
var kustoClusterName = deploymentName + "kustocluster";
var kustoDatabaseName = deploymentName + "kustodatabase";
var kustoTableName = "Events";
var kustoColumnMappingName = "Events_CSV_Mapping";
var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
var deploymentData = new Deployment(
    new DeploymentProperties(
        DeploymentMode.Incremental,
        template: File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8),
        parameters: new Dictionary<string, Dictionary<string, object>>
        {
            ["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } },
            ["eventHubName"] = new(capacity: 1) { { "value", eventHubName } },
            ["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } },
            ["containerName"] = new(capacity: 1) { { "value", storageContainerName } },
            ["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } },
            ["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } },
            ["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID
            ["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } },
            ["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } },
            ["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } },
            ["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email
            ["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } },
            ["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } },
            ["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } }
        }
    )
);
await resourceManagementClient.Deployments.CreateOrUpdateAsync(resourceGroupName, deploymentName, deploymentData);
Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var storageResourceId = $"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}";
var eventHubResourceId = $"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}";
var eventGridClient = new EventGridManagementClient(credentials) { SubscriptionId = subscriptionId };
var eventSubscriptionData = new EventSubscription
{
    Destination = new EventHubEventSubscriptionDestination(eventHubResourceId),
    Filter = new EventSubscriptionFilter
    {
        SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
        IncludedEventTypes = new List<string> { "Microsoft.Storage.BlobCreated" }
    }
};
await eventGridClient.EventSubscriptions.CreateOrUpdateAsync(storageResourceId, eventGridSubscriptionName, eventSubscriptionData);
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{location.ToLower().Replace(" ", "")}.kusto.windows.net";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
    InitialCatalog = kustoDatabaseName,
    FederatedSecurity = true,
    ApplicationClientId = clientId,
    ApplicationKey = clientSecret,
    Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    kustoClient.ExecuteControlCommand(
        CslCommandGenerator.GenerateTableCreateCommand(
            kustoTableName,
            new[]
            {
                Tuple.Create("EventTime", "System.DateTime"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("EventSummary", "System.String"),
            }
        )
    );
    kustoClient.ExecuteControlCommand(
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            IngestionMappingKind.Csv,
            kustoTableName,
            kustoColumnMappingName,
            new ColumnMapping[]
            {
                new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
                new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
                new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            }
        )
    );
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var kustoManagementClient = new KustoManagementClient(credentials) { SubscriptionId = subscriptionId };
var eventGridDataConnectionData = new EventGridDataConnection(
    storageResourceId, eventHubResourceId, "$Default",
    location: location, tableName: kustoTableName, mappingRuleName: kustoColumnMappingName, dataFormat: "csv"
);
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(
    resourceGroupName, kustoClusterName, kustoDatabaseName, kustoDataConnectionName, eventGridDataConnectionData
);
Setting Field description
tenantId Your tenant ID. It's also known as a directory ID.
subscriptionId The subscription ID that you use for resource creation.
clientId The client ID of the application that can access resources in your tenant.
clientSecret The client secret of the application that can access resources in your tenant.

Test the code example

  1. Upload a file into the storage account.

    string storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.windows.net";
    var cloudStorageAccount = CloudStorageAccount.Parse(storageConnectionString);
    CloudBlobClient blobClient = cloudStorageAccount.CreateCloudBlobClient();
    CloudBlobContainer container = blobClient.GetContainerReference(storageContainerName);
    CloudBlockBlob blockBlob = container.GetBlockBlobReference("test.csv");
    var blobContent = @"2007-01-01 00:00:00.0000000,2592,Several trees down
    2007-01-01 00:00:00.0000000,4171,Winter Storm";
    await blockBlob.UploadTextAsync(blobContent);
    
    Setting Field description
    storageConnectionString The connection string of the programmatically created storage account.
  2. Run a test query in Azure Data Explorer.

    var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net";
    var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
    {
        InitialCatalog = kustoDatabaseName,
        FederatedSecurity = true,
        ApplicationClientId = clientId,
        ApplicationKey = clientSecret,
        Authority = tenantId
    };
    using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
    {
        var query = $"{kustoTableName} | take 10";
        using (var reader = kustoClient.ExecuteQuery(query) as DataTableReader2)
        {// Print the contents of each of the result sets. 
            while (reader.Read())
            {
                Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}");
            }
        }
    }
    

Next steps