Pozyskiwanie danych za pomocą narzędzia Apache Flink do usługi Azure Data Explorer

Apache Flink to platforma i aparat przetwarzania rozproszonego do obliczeń stanowych za pośrednictwem niezwiązanych i ograniczonych strumieni danych.

Łącznik Flink to projekt open source, który można uruchomić w dowolnym klastrze Flink. Implementuje ujście danych do przenoszenia danych z klastra Flink. Korzystając z łącznika do platformy Apache Flink, możesz tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych, na przykład uczenie maszynowe (ML), wyodrębnianie i ładowanie transformacji (ETL) i usługę Log Analytics.

Z tego artykułu dowiesz się, jak używać łącznika Flink do wysyłania danych z narzędzia Flink do tabeli. Utworzysz tabelę i mapowanie danych, bezpośrednie łącze Flink w celu wysłania danych do tabeli, a następnie zweryfikuj wyniki.

Wymagania wstępne

W przypadku projektów Flink, które używają narzędzia Maven do zarządzania zależnościami, zintegruj ujście podstawowego łącznika Flink dla platformy Azure Data Explorer, dodając go jako zależność:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

W przypadku projektów, które nie używają narzędzia Maven do zarządzania zależnościami, sklonuj repozytorium łącznika usługi Azure Data Explorer Connector for Apache Flink i skompiluj je lokalnie. Takie podejście umożliwia ręczne dodawanie łącznika do lokalnego repozytorium Maven przy użyciu polecenia mvn clean install -DskipTests.

Możesz uwierzytelnić się z poziomu linku Flink przy użyciu aplikacji Tożsamość Microsoft Entra lub tożsamości zarządzanej.

Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.

  1. Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.

    az login
    
  2. Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z zwróconych danych JSON skopiuj wartości appId, passwordi tenant do użycia w przyszłości.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Utworzono aplikację Microsoft Entra i jednostkę usługi.

  1. Udziel użytkownikowi aplikacji uprawnień do bazy danych:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Udziel aplikacji uprawnienia ingestora lub administratora w tabeli. Wymagane uprawnienia zależą od wybranej metody pisania danych. Uprawnienia ingestora są wystarczające dla ujścia 2, podczas gdy funkcja WriteAndSink wymaga uprawnień administratora.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Aby uzyskać więcej informacji na temat autoryzacji, zobacz Kontrola dostępu oparta na rolach usługi Kusto.

Aby zapisać dane z narzędzia Flink:

  1. Zaimportuj wymagane opcje:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Użyj swojej aplikacji lub tożsamości zarządzanej do uwierzytelniania.

    W przypadku uwierzytelniania aplikacji:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    W przypadku uwierzytelniania tożsamości zarządzanej:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Skonfiguruj parametry ujścia, takie jak baza danych i tabela:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Więcej opcji można dodać zgodnie z opisem w poniższej tabeli:

    Opcja Opis Wartość domyślna
    IngestionMappingRef Odwołuje się do istniejącego mapowania pozyskiwania.
    FlushImmediately Natychmiast opróżnia dane i może powodować problemy z wydajnością. Ta metoda nie jest zalecana.
    BatchIntervalMs Określa częstotliwość opróżniania danych. 30 sekund
    Batchsize Ustawia rozmiar partii dla buforowania rekordów przed opróżnieniem. 1000 rekordów
    ClientBatchSizeLimit Określa rozmiar w MB zagregowanych danych przed pozyskiwaniem. 300 MB
    PollForIngestionStatus Jeśli to prawda, łącznik sonduje stan pozyskiwania po opróżnieniu danych. fałsz
    DeliveryGuaranteee Określa semantyka gwarancji dostarczania. Aby osiągnąć dokładnie raz semantyka, użyj polecenia WriteAheadSink. AT_LEAST_ONCE
  2. Zapisuj dane przesyłane strumieniowo przy użyciu jednej z następujących metod:

    • SinkV2: Jest to opcja bezstanowa, która opróżnia dane w punkcie kontrolnym, zapewniając co najmniej raz spójność. Zalecamy tę opcję pozyskiwania danych o dużej ilości.
    • WriteAheadSink: ta metoda emituje dane do usługi KustoSink. Jest on zintegrowany z systemem punktów kontrolnych Flink i oferuje dokładnie jednokrotne gwarancje. Dane są przechowywane w obiekcie AbstractStateBackend i zatwierdzane tylko po zakończeniu punktu kontrolnego.

    W poniższym przykładzie użyto ujścia 2. Aby użyć metody WriteAheadSink, użyj buildWriteAheadSink metody zamiast build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Pełny kod powinien wyglądać mniej więcej tak:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Sprawdź, czy dane są pozyskiwane

Po skonfigurowaniu połączenia dane są wysyłane do tabeli. Możesz sprawdzić, czy dane są pozyskiwane, uruchamiając zapytanie KQL.

  1. Uruchom następujące zapytanie, aby sprawdzić, czy dane są pozyskiwane do tabeli:

    <MyTable>
    | count
    
  2. Uruchom następujące zapytanie, aby wyświetlić dane:

    <MyTable>
    | take 100