Udostępnij za pośrednictwem


Pozyskiwanie danych za pomocą funkcji Apache Flink do usługi Azure Data Explorer

Apache Flink to platforma i aparat przetwarzania rozproszonego na potrzeby obliczeń stanowych za pośrednictwem niezwiązanych i ograniczonych strumieni danych.

Łącznik Flink to projekt typu open source, który można uruchomić w dowolnym klastrze Flink. Implementuje ujście danych do przenoszenia danych z klastra Flink. Za pomocą łącznika do platformy Apache Flink można tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych, na przykład uczenia maszynowego (ML), wyodrębniania i przekształcania obciążenia (ETL) i usługi Log Analytics.

Z tego artykułu dowiesz się, jak używać łącznika Flink do wysyłania danych z linku Flink do tabeli. Utworzysz tabelę i mapowanie danych, przekieruj Flink, aby wysłać dane do tabeli, a następnie zweryfikujesz wyniki.

Wymagania wstępne

W przypadku projektów Flink korzystających z narzędzia Maven do zarządzania zależnościami zintegruj ujście podstawowego łącznika Flink dla usługi Azure Data Explorer , dodając ją jako zależność:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

W przypadku projektów, które nie używają narzędzia Maven do zarządzania zależnościami, sklonuj repozytorium łącznika usługi Azure Data Explorer dla platformy Apache Flink i skompiluj je lokalnie. Takie podejście umożliwia ręczne dodawanie łącznika do lokalnego repozytorium Maven przy użyciu polecenia mvn clean install -DskipTests.

Możesz uwierzytelnić się za pomocą linku Flink do przy użyciu aplikacji Microsoft Entra ID lub tożsamości zarządzanej.

Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.

  1. Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.

    az login
    
  2. Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z zwróconych danych JSON skopiuj wartości appId, passwordi tenant do użycia w przyszłości.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Utworzono aplikację Microsoft Entra i jednostkę usługi.

  1. Przyznaj użytkownikowi aplikacji uprawnienia do bazy danych:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Przyznaj aplikacji uprawnienia ingestor lub administratora w tabeli. Wymagane uprawnienia zależą od wybranej metody zapisywania danych. Uprawnienia ingestora są wystarczające dla sinkV2, podczas gdy writeAndSink wymaga uprawnień administratora.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Aby uzyskać więcej informacji na temat autoryzacji, zobacz Kusto role-based access control (Kontrola dostępu oparta na rolach w usłudze Kusto).

Aby zapisać dane z Flink:

  1. Zaimportuj wymagane opcje:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Użyj swojej aplikacji lub tożsamości zarządzanej do uwierzytelniania.

    W przypadku uwierzytelniania aplikacji:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    W przypadku uwierzytelniania tożsamości zarządzanej:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Skonfiguruj parametry ujścia, takie jak baza danych i tabela:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Więcej opcji można dodać zgodnie z opisem w poniższej tabeli:

    Opcja opis Wartość domyślna
    IngestionMappingRef Odwołuje się do istniejącego mapowania pozyskiwania.
    FlushImmediately Natychmiast opróżnia dane i może powodować problemy z wydajnością. Ta metoda nie jest zalecana.
    BatchIntervalMs Określa częstotliwość opróżniania danych. 30 sekund
    BatchSize Ustawia rozmiar partii dla buforowania rekordów przed opróżnianiem. 1 000 rekordów
    ClientBatchSizeLimit Określa rozmiar w MB zagregowanych danych przed pozyskiwaniem. 300 MB
    PollForIngestionStatus Jeśli to prawda, łącznik sonduje stan pozyskiwania po opróżnieniu danych. fałsz
    DeliveryGuarantee Określa semantyka gwarancji dostarczania. Aby osiągnąć dokładnie raz semantyka, użyj metody WriteAheadSink. AT_LEAST_ONCE
  2. Zapisuj dane przesyłane strumieniowo przy użyciu jednej z następujących metod:

    • SinkV2: jest to opcja bezstanowa, która opróżnia dane w punkcie kontrolnym, zapewniając co najmniej raz spójność. Zalecamy tę opcję w przypadku pozyskiwania dużych ilości danych.
    • WriteAheadSink: ta metoda emituje dane do aplikacji KustoSink. Jest zintegrowany z systemem tworzenia punktów kontrolnych Flink i oferuje dokładnie jednokrotne gwarancje. Dane są przechowywane w obiekcie AbstractStateBackend i zatwierdzane tylko po zakończeniu punktu kontrolnego.

    W poniższym przykładzie użyto kodu SinkV2. Aby użyć metody WriteAheadSink, użyj buildWriteAheadSink metody zamiast build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Pełny kod powinien wyglądać mniej więcej tak:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Sprawdź, czy dane są pozyskiwane

Po skonfigurowaniu połączenia dane są wysyłane do tabeli. Możesz sprawdzić, czy dane są pozyskiwane, uruchamiając zapytanie KQL.

  1. Uruchom następujące zapytanie, aby sprawdzić, czy dane są pozyskiwane do tabeli:

    <MyTable>
    | count
    
  2. Uruchom następujące zapytanie, aby wyświetlić dane:

    <MyTable>
    | take 100