Dela via


Mata in data med Apache Flink i Azure Data Explorer

Apache Flink är ett ramverk och en distribuerad bearbetningsmotor för tillståndskänsliga beräkningar över obundna och begränsade dataströmmar.

Flink-anslutningsprogrammet är ett öppen källkod projekt som kan köras på valfritt Flink-kluster. Den implementerar datamottagare för att flytta data från ett Flink-kluster. Med anslutningsappen till Apache Flink kan du skapa snabba och skalbara program som riktar sig mot datadrivna scenarier, till exempel maskininlärning (ML), Extract-Transform-Load (ETL) och Log Analytics.

I den här artikeln får du lära dig hur du använder Flink-anslutningsappen för att skicka data från Flink till tabellen. Du skapar en tabell och datamappning, dirigerar Flink för att skicka data till tabellen och validerar sedan resultaten.

Förutsättningar

För Flink-projekt som använder Maven för att hantera beroenden integrerar du Flink Connector Core Sink for Azure Data Explorer genom att lägga till den som ett beroende:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

För projekt som inte använder Maven för att hantera beroenden klonar du lagringsplatsen för Azure Data Explorer Connector för Apache Flink och skapar den lokalt. Med den här metoden kan du manuellt lägga till anslutningsappen till din lokala Maven-lagringsplats med kommandot mvn clean install -DskipTests.

Du kan autentisera från Flink till att använda antingen ett Microsoft Entra ID program eller en hanterad identitet.

Tjänstens huvudnamn är den identitet som används av anslutningsappen för att skriva data i tabellen i Kusto. Senare beviljar du behörigheter för tjänstens huvudnamn för åtkomst till Kusto-resurser.

  1. Logga in på din Azure-prenumeration via Azure CLI. Autentisera sedan i webbläsaren.

    az login
    
  2. Välj den prenumeration som ska vara värd för huvudkontot. Det här steget behövs när du har flera prenumerationer.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Skapa tjänstens huvudnamn. I det här exemplet kallas my-service-principaltjänstens huvudnamn .

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Från de returnerade JSON-data kopierar du appId, passwordoch tenant för framtida användning.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Du har skapat ditt Microsoft Entra-program och tjänstens huvudnamn.

  1. Bevilja programanvändarbehörigheter för databasen:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Bevilja programmet antingen ingestor- eller administratörsbehörigheter i tabellen. De behörigheter som krävs beror på den valda metoden för dataskrivning. Ingestor-behörigheter är tillräckliga för SinkV2, medan WriteAndSink kräver administratörsbehörigheter.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Mer information om auktorisering finns i Kusto rollbaserad åtkomstkontroll.

Så här skriver du data från Flink:

  1. Importera de alternativ som krävs:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Använd ditt program eller din hanterade identitet för att autentisera.

    För programautentisering:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    För hanterad identitetsautentisering:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurera mottagarparametrarna, till exempel databas och tabell:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Du kan lägga till fler alternativ enligt beskrivningen i följande tabell:

    Alternativ Beskrivning Standardvärde
    IngestionMappingRef Refererar till en befintlig inmatningsmappning.
    FlushImmediately Rensar data omedelbart och kan orsaka prestandaproblem. Den här metoden rekommenderas inte.
    BatchIntervalMs Styr hur ofta data rensas. 30 sekunder
    BatchSize Anger batchstorleken för buffring av poster före tömning. 1 000 poster
    ClientBatchSizeLimit Anger storleken i MB för aggregerade data före inmatning. 300 MB
    PollForIngestionStatus Om det är sant söker anslutningsappen efter inmatningsstatus efter dataspolning. falskt
    DeliveryGuarantee Avgör semantik för leveransgaranti. Om du vill uppnå exakt en gångs semantik använder du WriteAheadSink. AT_LEAST_ONCE
  2. Skriva strömmande data med någon av följande metoder:

    • SinkV2: Det här är ett tillståndslöst alternativ som rensar data vid kontrollpunkten, vilket säkerställer minst en gång konsekvens. Vi rekommenderar det här alternativet för datainmatning med stora volymer.
    • WriteAheadSink: Den här metoden genererar data till en KustoSink. Det är integrerat med Flinks kontrollpunktssystem och erbjuder exakt en gång garantier. Data lagras i en AbstractStateBackend och checkas in först när en kontrollpunkt har slutförts.

    I följande exempel används SinkV2. Om du vill använda WriteAheadSink använder du buildWriteAheadSink metoden i stället för build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Den fullständiga koden bör se ut ungefär så här:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Kontrollera att data matas in

När anslutningen har konfigurerats skickas data till tabellen. Du kan kontrollera att data matas in genom att köra en KQL-fråga.

  1. Kör följande fråga för att kontrollera att data matas in i tabellen:

    <MyTable>
    | count
    
  2. Kör följande fråga för att visa data:

    <MyTable>
    | take 100