Ingest data with Apache Flink into Azure Data Explorer

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

The Flink connector is an open source project that can run on any Flink cluster. It implements data sink for moving data from a Flink cluster. Using the connector to Apache Flink, you can build fast and scalable applications targeting data driven scenarios, for example, machine learning (ML), Extract-Transform-Load (ETL), and Log Analytics.

In this article, you learn how to use the Flink connector to send data from Flink to your table. You create a table and data mapping, direct Flink to send data into the table, and then validate the results.

Prerequisites

For Flink projects that use Maven to manage dependencies, integrate the Flink Connector Core Sink For Azure Data Explorer by adding it as a dependency:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

For projects that don't use Maven to manage dependencies, clone the repository for the Azure Data Explorer Connector for Apache Flink and build it locally. This approach allows you to manually add the connector to your local Maven repository using the command mvn clean install -DskipTests.

You can authenticate from Flink to using either a Microsoft Entra ID application or a managed identity.

This service principal will be the identity used by the connector to write data your table in Kusto. You'll later grant permissions for this service principal to access Kusto resources.

  1. Sign in to your Azure subscription via Azure CLI. Then authenticate in the browser.

    az login
    
  2. Choose the subscription to host the principal. This step is needed when you have multiple subscriptions.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Create the service principal. In this example, the service principal is called my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. From the returned JSON data, copy the appId, password, and tenant for future use.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

You've created your Microsoft Entra application and service principal.

  1. Grant the application user permissions on the database:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Grant the application either ingestor or admin permissions on the table. The required permissions depend on the chosen data writing method. Ingestor permissions are sufficient for SinkV2, while WriteAndSink requires admin permissions.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

For more information on authorization, see Kusto role-based access control.

To write data from Flink:

  1. Import the required options:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Use your application or managed identity to Authenticate.

    For application authentication:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    For managed identity authentication:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configure the sink parameters such as database and table:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    You can add more options, as described in the following table:

    Option Description Default Value
    IngestionMappingRef References an existing ingestion mapping.
    FlushImmediately Flushes data immediately, and may cause performance issues. This method isn't recommended.
    BatchIntervalMs Controls how often data is flushed. 30 seconds
    BatchSize Sets the batch size for buffering records before flushing. 1,000 records
    ClientBatchSizeLimit Specifies the size in MB of aggregated data before ingestion. 300 MB
    PollForIngestionStatus If true, the connector polls for ingestion status after data flush. false
    DeliveryGuarantee Determines delivery guarantee semantics. To achieve exactly once semantics, use WriteAheadSink. AT_LEAST_ONCE
  2. Write streaming data with one of the following methods:

    • SinkV2: This is a stateless option that flushes data on checkpoint, ensuring at least once consistency. We recommend this option for high-volume data ingestion.
    • WriteAheadSink: This method emits data to a KustoSink. It's integrated with Flink's checkpointing system and offers exactly once guarantees. Data is stored in an AbstractStateBackend and committed only after a checkpoint is completed.

    The following example uses SinkV2. To use WriteAheadSink, use the buildWriteAheadSink method instead of build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

The complete code should look something like this:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Verify that data is ingested

Once the connection is configured, data is sent to your table. You can verify that the data is ingested by running a KQL query.

  1. Run the following query to verify that data is ingested into the table:

    <MyTable>
    | count
    
  2. Run the following query to view the data:

    <MyTable>
    | take 100