Zdarzenia
31 mar, 23 - 2 kwi, 23
Największe wydarzenie szkoleniowe usługi Fabric, Power BI i SQL. 31 marca – 2 kwietnia. Użyj kodu FABINSIDER, aby zaoszczędzić $400.
Zarejestruj się już dziśTa przeglądarka nie jest już obsługiwana.
Przejdź na przeglądarkę Microsoft Edge, aby korzystać z najnowszych funkcji, aktualizacji zabezpieczeń i pomocy technicznej.
Apache Flink to platforma i aparat przetwarzania rozproszonego na potrzeby obliczeń stanowych za pośrednictwem niezwiązanych i ograniczonych strumieni danych.
Łącznik Flink to projekt typu open source, który można uruchomić w dowolnym klastrze Flink. Implementuje ujście danych do przenoszenia danych z klastra Flink. Za pomocą łącznika do platformy Apache Flink można tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych, na przykład uczenia maszynowego (ML), wyodrębniania i przekształcania obciążenia (ETL) i usługi Log Analytics.
Z tego artykułu dowiesz się, jak używać łącznika Flink do wysyłania danych z linku Flink do tabeli. Utworzysz tabelę i mapowanie danych, przekieruj Flink, aby wysłać dane do tabeli, a następnie zweryfikujesz wyniki.
W przypadku projektów Flink korzystających z narzędzia Maven do zarządzania zależnościami zintegruj ujście podstawowego łącznika Flink dla usługi Azure Data Explorer , dodając ją jako zależność:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
W przypadku projektów, które nie używają narzędzia Maven do zarządzania zależnościami, sklonuj repozytorium łącznika usługi Azure Data Explorer dla platformy Apache Flink i skompiluj je lokalnie. Takie podejście umożliwia ręczne dodawanie łącznika do lokalnego repozytorium Maven przy użyciu polecenia mvn clean install -DskipTests
.
Możesz uwierzytelnić się za pomocą linku Flink do przy użyciu aplikacji Microsoft Entra ID lub tożsamości zarządzanej.
Ta jednostka usługi będzie tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Później przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.
Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.
az login
Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę my-service-principal
.
az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z zwróconych danych JSON skopiuj wartości appId
, password
i tenant
do użycia w przyszłości.
{
"appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
"displayName": "my-service-principal",
"name": "my-service-principal",
"password": "00001111-aaaa-2222-bbbb-3333cccc4444",
"tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
}
Utworzono aplikację Microsoft Entra i jednostkę usługi.
Przyznaj użytkownikowi aplikacji uprawnienia do bazy danych:
// Grant database user permissions
.add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Przyznaj aplikacji uprawnienia ingestor lub administratora w tabeli. Wymagane uprawnienia zależą od wybranej metody zapisywania danych. Uprawnienia ingestora są wystarczające dla sinkV2, podczas gdy writeAndSink wymaga uprawnień administratora.
// Grant table ingestor permissions (SinkV2)
.add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
// Grant table admin permissions (WriteAheadSink)
.add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Aby uzyskać więcej informacji na temat autoryzacji, zobacz Kusto role-based access control (Kontrola dostępu oparta na rolach w usłudze Kusto).
Aby zapisać dane z Flink:
Zaimportuj wymagane opcje:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
Użyj swojej aplikacji lub tożsamości zarządzanej do uwierzytelniania.
W przypadku uwierzytelniania aplikacji:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
W przypadku uwierzytelniania tożsamości zarządzanej:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setManagedIdentityAppId("<Object ID>")
.setClusterUrl("<Cluster URI>").build();
Skonfiguruj parametry ujścia, takie jak baza danych i tabela:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
Więcej opcji można dodać zgodnie z opisem w poniższej tabeli:
Opcja | opis | Wartość domyślna |
---|---|---|
IngestionMappingRef | Odwołuje się do istniejącego mapowania pozyskiwania. | |
FlushImmediately | Natychmiast opróżnia dane i może powodować problemy z wydajnością. Ta metoda nie jest zalecana. | |
BatchIntervalMs | Określa częstotliwość opróżniania danych. | 30 sekund |
BatchSize | Ustawia rozmiar partii dla buforowania rekordów przed opróżnianiem. | 1 000 rekordów |
ClientBatchSizeLimit | Określa rozmiar w MB zagregowanych danych przed pozyskiwaniem. | 300 MB |
PollForIngestionStatus | Jeśli to prawda, łącznik sonduje stan pozyskiwania po opróżnieniu danych. | fałsz |
DeliveryGuarantee | Określa semantyka gwarancji dostarczania. Aby osiągnąć dokładnie raz semantyka, użyj metody WriteAheadSink. | AT_LEAST_ONCE |
Zapisuj dane przesyłane strumieniowo przy użyciu jednej z następujących metod:
W poniższym przykładzie użyto kodu SinkV2. Aby użyć metody WriteAheadSink, użyj buildWriteAheadSink
metody zamiast build
:
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Pełny kod powinien wyglądać mniej więcej tak:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Po skonfigurowaniu połączenia dane są wysyłane do tabeli. Możesz sprawdzić, czy dane są pozyskiwane, uruchamiając zapytanie KQL.
Uruchom następujące zapytanie, aby sprawdzić, czy dane są pozyskiwane do tabeli:
<MyTable>
| count
Uruchom następujące zapytanie, aby wyświetlić dane:
<MyTable>
| take 100
Zdarzenia
31 mar, 23 - 2 kwi, 23
Największe wydarzenie szkoleniowe usługi Fabric, Power BI i SQL. 31 marca – 2 kwietnia. Użyj kodu FABINSIDER, aby zaoszczędzić $400.
Zarejestruj się już dziś