Mata in data med Apache Flink i Azure Data Explorer
Apache Flink är ett ramverk och en distribuerad bearbetningsmotor för tillståndskänsliga beräkningar över obundna och begränsade dataströmmar.
Flink-anslutningsprogrammet är ett öppen källkod projekt som kan köras på valfritt Flink-kluster. Den implementerar datamottagare för att flytta data från ett Flink-kluster. Med anslutningsappen till Apache Flink kan du skapa snabba och skalbara program som riktar sig mot datadrivna scenarier, till exempel maskininlärning (ML), Extract-Transform-Load (ETL) och Log Analytics.
I den här artikeln får du lära dig hur du använder Flink-anslutningsappen för att skicka data från Flink till tabellen. Du skapar en tabell och datamappning, dirigerar Flink för att skicka data till tabellen och validerar sedan resultaten.
Förutsättningar
- Ett Azure Data Explorer-kluster och en databas. Skapa ett kluster och en databaseller en KQL-databas i Real-Time Analytics i Microsoft Fabric.
- En måltabell i databasen. Se Skapa en tabell i Azure Data Explorer eller Skapa en tabell i Real-Time Analytics
- Ett Apache Flink-kluster. Skapa ett kluster.
- Maven 3.x
Hämta Flink-anslutningsappen
För Flink-projekt som använder Maven för att hantera beroenden integrerar du Flink Connector Core Sink for Azure Data Explorer genom att lägga till den som ett beroende:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
För projekt som inte använder Maven för att hantera beroenden klonar du lagringsplatsen för Azure Data Explorer Connector för Apache Flink och skapar den lokalt. Med den här metoden kan du manuellt lägga till anslutningsappen till din lokala Maven-lagringsplats med kommandot mvn clean install -DskipTests
.
Du kan autentisera från Flink till att använda antingen ett Microsoft Entra ID program eller en hanterad identitet.
Tjänstens huvudnamn är den identitet som används av anslutningsappen för att skriva data i tabellen i Kusto. Senare beviljar du behörigheter för tjänstens huvudnamn för åtkomst till Kusto-resurser.
Logga in på din Azure-prenumeration via Azure CLI. Autentisera sedan i webbläsaren.
az login
Välj den prenumeration som ska vara värd för huvudkontot. Det här steget behövs när du har flera prenumerationer.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Skapa tjänstens huvudnamn. I det här exemplet kallas
my-service-principal
tjänstens huvudnamn .az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Från de returnerade JSON-data kopierar du
appId
,password
ochtenant
för framtida användning.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Du har skapat ditt Microsoft Entra-program och tjänstens huvudnamn.
Bevilja programanvändarbehörigheter för databasen:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Bevilja programmet antingen ingestor- eller administratörsbehörigheter i tabellen. De behörigheter som krävs beror på den valda metoden för dataskrivning. Ingestor-behörigheter är tillräckliga för SinkV2, medan WriteAndSink kräver administratörsbehörigheter.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Mer information om auktorisering finns i Kusto rollbaserad åtkomstkontroll.
Skriva data från Flink
Så här skriver du data från Flink:
Importera de alternativ som krävs:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Använd ditt program eller din hanterade identitet för att autentisera.
För programautentisering:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
För hanterad identitetsautentisering:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Konfigurera mottagarparametrarna, till exempel databas och tabell:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Du kan lägga till fler alternativ enligt beskrivningen i följande tabell:
Alternativ Beskrivning Standardvärde IngestionMappingRef Refererar till en befintlig inmatningsmappning. FlushImmediately Rensar data omedelbart och kan orsaka prestandaproblem. Den här metoden rekommenderas inte. BatchIntervalMs Styr hur ofta data rensas. 30 sekunder BatchSize Anger batchstorleken för buffring av poster före tömning. 1 000 poster ClientBatchSizeLimit Anger storleken i MB för aggregerade data före inmatning. 300 MB PollForIngestionStatus Om det är sant söker anslutningsappen efter inmatningsstatus efter dataspolning. falskt DeliveryGuarantee Avgör semantik för leveransgaranti. Om du vill uppnå exakt en gångs semantik använder du WriteAheadSink. AT_LEAST_ONCE Skriva strömmande data med någon av följande metoder:
- SinkV2: Det här är ett tillståndslöst alternativ som rensar data vid kontrollpunkten, vilket säkerställer minst en gång konsekvens. Vi rekommenderar det här alternativet för datainmatning med stora volymer.
- WriteAheadSink: Den här metoden genererar data till en KustoSink. Det är integrerat med Flinks kontrollpunktssystem och erbjuder exakt en gång garantier. Data lagras i en AbstractStateBackend och checkas in först när en kontrollpunkt har slutförts.
I följande exempel används SinkV2. Om du vill använda WriteAheadSink använder du
buildWriteAheadSink
metoden i stället förbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Den fullständiga koden bör se ut ungefär så här:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Kontrollera att data matas in
När anslutningen har konfigurerats skickas data till tabellen. Du kan kontrollera att data matas in genom att köra en KQL-fråga.
Kör följande fråga för att kontrollera att data matas in i tabellen:
<MyTable> | count
Kör följande fråga för att visa data:
<MyTable> | take 100