Pozyskiwanie danych za pomocą narzędzia Apache Flink do usługi Azure Data Explorer
Apache Flink to platforma i aparat przetwarzania rozproszonego do obliczeń stanowych za pośrednictwem niezwiązanych i ograniczonych strumieni danych.
Łącznik Flink to projekt open source, który można uruchomić w dowolnym klastrze Flink. Implementuje ujście danych do przenoszenia danych z klastra Flink. Korzystając z łącznika do platformy Apache Flink, możesz tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych, na przykład uczenie maszynowe (ML), wyodrębnianie i ładowanie transformacji (ETL) i usługę Log Analytics.
Z tego artykułu dowiesz się, jak używać łącznika Flink do wysyłania danych z narzędzia Flink do tabeli. Utworzysz tabelę i mapowanie danych, bezpośrednie łącze Flink w celu wysłania danych do tabeli, a następnie zweryfikuj wyniki.
Wymagania wstępne
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danychlubbazę danych KQL w usłudze Real-Time Analytics w usłudze Microsoft Fabric.
- Tabela docelowa w bazie danych. Zobacz Tworzenie tabeli w usłudze Azure Data Explorer lub Tworzenie tabeli w usłudze Real-Time Analytics
- Klaster Apache Flink. Utwórz klaster.
- Maven 3.x
Pobieranie łącznika Flink
W przypadku projektów Flink, które używają narzędzia Maven do zarządzania zależnościami, zintegruj ujście podstawowego łącznika Flink dla platformy Azure Data Explorer, dodając go jako zależność:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
W przypadku projektów, które nie używają narzędzia Maven do zarządzania zależnościami, sklonuj repozytorium łącznika usługi Azure Data Explorer Connector for Apache Flink i skompiluj je lokalnie. Takie podejście umożliwia ręczne dodawanie łącznika do lokalnego repozytorium Maven przy użyciu polecenia mvn clean install -DskipTests
.
Możesz uwierzytelnić się z poziomu linku Flink przy użyciu aplikacji Tożsamość Microsoft Entra lub tożsamości zarządzanej.
Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.
Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.
az login
Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z zwróconych danych JSON skopiuj wartości
appId
,password
itenant
do użycia w przyszłości.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Utworzono aplikację Microsoft Entra i jednostkę usługi.
Udziel użytkownikowi aplikacji uprawnień do bazy danych:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Udziel aplikacji uprawnienia ingestora lub administratora w tabeli. Wymagane uprawnienia zależą od wybranej metody pisania danych. Uprawnienia ingestora są wystarczające dla ujścia 2, podczas gdy funkcja WriteAndSink wymaga uprawnień administratora.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Aby uzyskać więcej informacji na temat autoryzacji, zobacz Kontrola dostępu oparta na rolach usługi Kusto.
Zapisywanie danych z narzędzia Flink
Aby zapisać dane z narzędzia Flink:
Zaimportuj wymagane opcje:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Użyj swojej aplikacji lub tożsamości zarządzanej do uwierzytelniania.
W przypadku uwierzytelniania aplikacji:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
W przypadku uwierzytelniania tożsamości zarządzanej:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
Skonfiguruj parametry ujścia, takie jak baza danych i tabela:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Więcej opcji można dodać zgodnie z opisem w poniższej tabeli:
Opcja Opis Wartość domyślna IngestionMappingRef Odwołuje się do istniejącego mapowania pozyskiwania. FlushImmediately Natychmiast opróżnia dane i może powodować problemy z wydajnością. Ta metoda nie jest zalecana. BatchIntervalMs Określa częstotliwość opróżniania danych. 30 sekund Batchsize Ustawia rozmiar partii dla buforowania rekordów przed opróżnieniem. 1000 rekordów ClientBatchSizeLimit Określa rozmiar w MB zagregowanych danych przed pozyskiwaniem. 300 MB PollForIngestionStatus Jeśli to prawda, łącznik sonduje stan pozyskiwania po opróżnieniu danych. fałsz DeliveryGuaranteee Określa semantyka gwarancji dostarczania. Aby osiągnąć dokładnie raz semantyka, użyj polecenia WriteAheadSink. AT_LEAST_ONCE Zapisuj dane przesyłane strumieniowo przy użyciu jednej z następujących metod:
- SinkV2: Jest to opcja bezstanowa, która opróżnia dane w punkcie kontrolnym, zapewniając co najmniej raz spójność. Zalecamy tę opcję pozyskiwania danych o dużej ilości.
- WriteAheadSink: ta metoda emituje dane do usługi KustoSink. Jest on zintegrowany z systemem punktów kontrolnych Flink i oferuje dokładnie jednokrotne gwarancje. Dane są przechowywane w obiekcie AbstractStateBackend i zatwierdzane tylko po zakończeniu punktu kontrolnego.
W poniższym przykładzie użyto ujścia 2. Aby użyć metody WriteAheadSink, użyj
buildWriteAheadSink
metody zamiastbuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
Pełny kod powinien wyglądać mniej więcej tak:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Sprawdź, czy dane są pozyskiwane
Po skonfigurowaniu połączenia dane są wysyłane do tabeli. Możesz sprawdzić, czy dane są pozyskiwane, uruchamiając zapytanie KQL.
Uruchom następujące zapytanie, aby sprawdzić, czy dane są pozyskiwane do tabeli:
<MyTable> | count
Uruchom następujące zapytanie, aby wyświetlić dane:
<MyTable> | take 100
Zawartość pokrewna
Opinia
https://aka.ms/ContentUserFeedback.
Dostępne już wkrótce: W 2024 r. będziemy stopniowo wycofywać zgłoszenia z serwisu GitHub jako mechanizm przesyłania opinii na temat zawartości i zastępować go nowym systemem opinii. Aby uzyskać więcej informacji, sprawdź:Prześlij i wyświetl opinię dla