Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
Apache Flink er et struktur- og distribueret behandlingsprogram til tilstandsfulde beregninger over ubundne og afgrænsede datastrømme.
Flink-connectoren er et åben kildekode projekt, der kan køre på en hvilken som helst Flink-klynge. Den implementerer datavask til flytning af data fra en Flink-klynge. Ved hjælp af connectoren til Apache Flink kan du bygge hurtige og skalerbare programmer, der er målrettet til datadrevne scenarier, f.eks. machine learning (ML), Extract-Transform-Load (ETL) og Log Analytics.
I denne artikel får du mere at vide om, hvordan du bruger Flink-connectoren til at sende data fra Flink til din tabel. Du opretter en tabel og datatilknytning, direkte Flink for at sende data til tabellen og validerer derefter resultaterne.
Forudsætninger
- En Azure Data Explorer-klynge og -database. Opret en klynge og database eller en KQL-database i Realtidsintelligens i Microsoft Fabric.
- En destinationstabel i databasen. Se Opret en tabel i Azure Data Explorer eller Opret en tabel i Realtidsintelligens
- En Apache Flink-klynge. Opret en klynge.
- Maven 3.x
Hent Flink-connectoren
For Flink-projekter, der bruger Maven til at administrere afhængigheder, skal du integrere Flink Connector Core Sink for Azure Data Explorer ved at tilføje den som en afhængighed:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
For projekter, der ikke bruger Maven til at administrere afhængigheder, skal du klone lageret for Azure Data Explorer Connector til Apache Flink og bygge det lokalt. Denne fremgangsmåde giver dig mulighed for manuelt at føje connectoren til dit lokale Maven-lager ved hjælp af kommandoen mvn clean install -DskipTests
.
Godkend
Du kan godkende fra Flink til at bruge et Microsoft Entra ID-program.
Denne tjenesteprincipal er den identitet, der bruges af connectoren til at skrive data i tabellen i Kusto. Du skal senere tildele tilladelser til denne tjenesteprincipal for at få adgang til Kusto-ressourcer.
Log på dit Azure-abonnement via Kommandolinjegrænsefladen i Azure. Godkend derefter i browseren.
az login
Vælg det abonnement, der skal være vært for hovedstolen. Dette trin er nødvendigt, når du har flere abonnementer.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Opret tjenesteprincipalen. I dette eksempel kaldes
my-service-principal
tjenesteprincipalen .az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Fra de returnerede JSON-data skal du kopiere
appId
,password
ogtenant
til fremtidig brug.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Du har oprettet dit Microsoft Entra-program og din tjenesteprincipal.
Tildel programbrugertilladelser til databasen:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Tildel programmet enten ingestor- eller administratortilladelser til tabellen. De påkrævede tilladelser afhænger af den valgte metode til dataskrivning. Ingestor-tilladelser er tilstrækkelige til SinkV2, mens WriteAndSink kræver administratortilladelser.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Du kan få flere oplysninger om godkendelse i Kusto-rollebaseret adgangskontrol.
Skriv data fra Flink
Sådan skriver du data fra Flink:
Importér de nødvendige indstillinger:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Brug dit program til at godkende.
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Konfigurer vaskeparametrene, f.eks. database og tabel:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Du kan tilføje flere indstillinger som beskrevet i følgende tabel:
Indstilling Beskrivelse Standardværdi IngestionMappingRef Refererer til en eksisterende tilknytning af indtagelse. FlushImmediately Rydder data med det samme og kan medføre problemer med ydeevnen. Denne metode anbefales ikke. Batchintervalmer Styrer, hvor ofte data tømmes. 30 sekunder BatchSize Angiver batchstørrelsen for bufferlagring af poster, før der ryddes. 1.000 poster ClientBatchSizeLimit Angiver størrelsen i MB af aggregerede data før indtagelse. 300 MB PollForIngestionStatus Hvis sand, poller connectoren for at få status for indtagelse efter rydning af data. false DeliveryGuarantee Bestemmer semantik for leveringsgaranti. Hvis du vil opnå præcis én gang semantik, skal du bruge WriteAheadSink. AT_LEAST_ONCE Skriv streamingdata med en af følgende metoder:
- SinkV2: Dette er en tilstandsløs indstilling, der rydder data på kontrolpunktet og sikrer mindst én gang konsistens. Vi anbefaler denne indstilling til dataindtagelse i store mængder.
- WriteAheadSink: Denne metode udsender data til en KustoSink. Det er integreret med Flinks kontrolpunktsystem og giver præcis én gang garantier. Data gemmes i en AbstractStateBackend og bekræftes først, når et kontrolpunkt er fuldført.
I følgende eksempel bruges SinkV2. Hvis du vil bruge WriteAheadSink, skal du bruge
buildWriteAheadSink
metoden i stedet forbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Den komplette kode bør se nogenlunde sådan ud:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Kontrollér, at dataene er indtaget
Når forbindelsen er konfigureret, sendes data til tabellen. Du kan bekræfte, at dataene indtages ved at køre en KQL-forespørgsel.
Kør følgende forespørgsel for at kontrollere, at data er indtaget i tabellen:
<MyTable> | count
Kør følgende forespørgsel for at få vist dataene:
<MyTable> | take 100