Delen via


Gegevens opnemen met Apache Flink in Azure Data Explorer

Apache Flink is een framework en gedistribueerde verwerkingsengine voor stateful berekeningen voor niet-gebonden en gebonden gegevensstromen.

De Flink-connector is een open source-project dat kan worden uitgevoerd op elk Flink-cluster. Het implementeert gegevenssink voor het verplaatsen van gegevens uit een Flink-cluster. Met de connector voor Apache Flink kunt u snelle en schaalbare toepassingen bouwen die zijn gericht op gegevensgestuurde scenario's, bijvoorbeeld machine learning (ML), Extract-Transform-Load (ETL) en Log Analytics.

In dit artikel leert u hoe u de Flink-connector gebruikt om gegevens van Flink naar uw tabel te verzenden. U maakt een tabel- en gegevenstoewijzing, stuurt Flink de opdracht om gegevens naar de tabel te verzenden en valideert vervolgens de resultaten.

Vereisten

Voor Flink-projecten die Gebruikmaken van Maven om afhankelijkheden te beheren, integreert u de Flink Connector Core Sink for Azure Data Explorer door deze toe te voegen als een afhankelijkheid:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Voor projecten die niet gebruikmaken van Maven voor het beheren van afhankelijkheden, kloont u de opslagplaats voor de Azure Data Explorer Connector voor Apache Flink en bouwt u deze lokaal. Met deze benadering kunt u de connector handmatig toevoegen aan uw lokale Maven-opslagplaats met behulp van de opdracht mvn clean install -DskipTests.

U kunt verifiëren vanuit Flink met behulp van een Microsoft Entra ID-toepassing of een beheerde identiteit.

Deze service-principal is de identiteit die door de connector wordt gebruikt voor het schrijven van gegevens in uw tabel in Kusto. U verleent deze service-principal later machtigingen voor toegang tot Kusto-resources.

  1. Meld u aan bij uw Azure-abonnement via Azure CLI. Verifieer vervolgens in de browser.

    az login
    
  2. Kies het abonnement om de principal te hosten. Deze stap is nodig wanneer u meerdere abonnementen hebt.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Maak de service-principal. In dit voorbeeld wordt de service-principal genoemd my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopieer vanuit de geretourneerde JSON-gegevens de appId, passworden tenant voor toekomstig gebruik.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

U hebt uw Microsoft Entra toepassing en service-principal gemaakt.

  1. Ververleent de toepassing gebruikersmachtigingen voor de database:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Ververleent de toepassing ingestor- of beheerdersmachtigingen voor de tabel. De vereiste machtigingen zijn afhankelijk van de gekozen methode voor het schrijven van gegevens. Ingestor-machtigingen zijn voldoende voor SinkV2, terwijl WriteAndSink beheerdersmachtigingen vereist.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Zie Op rollen gebaseerd toegangsbeheer van Kusto voor meer informatie over autorisatie.

Gegevens schrijven vanuit Flink:

  1. Importeer de vereiste opties:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Gebruik uw toepassing of beheerde identiteit om te verifiëren.

    Voor toepassingsverificatie:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Voor verificatie van beheerde identiteit:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configureer de sinkparameters, zoals database en tabel:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    U kunt meer opties toevoegen, zoals beschreven in de volgende tabel:

    Optie Beschrijving Standaardwaarde
    IngestionMappingRef Verwijst naar een bestaande opnametoewijzing.
    FlushImmediately Gegevens worden onmiddellijk gewist en kunnen prestatieproblemen veroorzaken. Deze methode wordt niet aanbevolen.
    BatchIntervalMs Hiermee bepaalt u hoe vaak gegevens worden leeggemaakt. 30 seconden
    BatchSize Hiermee stelt u de batchgrootte in voor het bufferen van records vóór het leegmaken. 1000 records
    ClientBatchSizeLimit Hiermee geeft u de grootte in MB van geaggregeerde gegevens vóór opname. 300 MB
    PollForIngestionStatus Als dit waar is, controleert de connector naar de opnamestatus na het leegmaken van gegevens. onjuist
    DeliveryGuarantee Bepaalt de semantiek van de leveringsgarantie. Gebruik WriteAheadSink om exact één keer semantiek te bereiken. AT_LEAST_ONCE
  2. Schrijf streaminggegevens met een van de volgende methoden:

    • SinkV2: Dit is een staatloze optie waarmee gegevens op het controlepunt worden leeggemaakt, waardoor ten minste één keer consistentie wordt gegarandeerd. We raden deze optie aan voor gegevensopname met een hoog volume.
    • WriteAheadSink: Met deze methode worden gegevens verzonden naar een KustoSink. Het is geïntegreerd met het checkpointing systeem van Flink en biedt precies één keer garanties. Gegevens worden opgeslagen in een AbstractStateBackend en pas doorgevoerd nadat een controlepunt is voltooid.

    In het volgende voorbeeld wordt SinkV2 gebruikt. Als u WriteAheadSink wilt gebruiken, gebruikt u de buildWriteAheadSink methode in plaats van build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

De volledige code moet er ongeveer als volgt uitzien:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Controleren of gegevens worden opgenomen

Zodra de verbinding is geconfigureerd, worden gegevens naar de tabel verzonden. U kunt controleren of de gegevens worden opgenomen door een KQL-query uit te voeren.

  1. Voer de volgende query uit om te controleren of gegevens worden opgenomen in de tabel:

    <MyTable>
    | count
    
  2. Voer de volgende query uit om de gegevens weer te geven:

    <MyTable>
    | take 100