Bagikan melalui


Menyerap data dengan Apache Flink ke Azure Data Explorer

Apache Flink adalah kerangka kerja dan mesin pemrosesan terdistribusi untuk komputasi stateful melalui aliran data yang tidak terbatas dan terikat.

Konektor Flink adalah proyek sumber terbuka yang dapat berjalan pada kluster Flink apa pun. Ini mengimplementasikan sink data untuk memindahkan data dari kluster Flink. Dengan menggunakan konektor ke Apache Flink, Anda dapat membangun aplikasi yang cepat dan dapat diskalakan yang menargetkan skenario berbasis data, misalnya, pembelajaran mesin (ML), Extract-Transform-Load (ETL), dan Log Analytics.

Dalam artikel ini, Anda mempelajari cara menggunakan konektor Flink untuk mengirim data dari Flink ke tabel Anda. Anda membuat tabel dan pemetaan data, mengarahkan Flink untuk mengirim data ke dalam tabel, lalu memvalidasi hasilnya.

Prasyarat

Untuk proyek Flink yang menggunakan Maven untuk mengelola dependensi, integrasikan Flink Connector Core Sink For Azure Data Explorer dengan menambahkannya sebagai dependensi:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Untuk proyek yang tidak menggunakan Maven untuk mengelola dependensi, kloning repositori untuk Konektor Azure Data Explorer untuk Apache Flink dan buat secara lokal. Pendekatan ini memungkinkan Anda untuk menambahkan konektor secara manual ke repositori Maven lokal Anda menggunakan perintah mvn clean install -DskipTests.

Anda dapat mengautentikasi dari Flink untuk menggunakan aplikasi Microsoft Entra ID atau identitas terkelola.

Perwakilan layanan ini akan menjadi identitas yang digunakan oleh konektor untuk menulis data tabel Anda di Kusto. Anda nantinya akan memberikan izin bagi perwakilan layanan ini untuk mengakses sumber daya Kusto.

  1. Masuk ke langganan Azure Anda melalui Azure CLI. Kemudian autentikasi di browser.

    az login
    
  2. Pilih langganan untuk menghosting perwakilan. Langkah ini diperlukan saat Anda memiliki beberapa langganan.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Buat perwakilan layanan. Dalam contoh ini, perwakilan layanan disebut my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Dari data JSON yang dikembalikan, salin appId, password, dan tenant untuk digunakan di masa mendatang.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Anda telah membuat aplikasi Microsoft Entra dan perwakilan layanan.

  1. Berikan izin pengguna aplikasi pada database:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Berikan aplikasi izin ingestor atau admin pada tabel. Izin yang diperlukan bergantung pada metode penulisan data yang dipilih. Izin ingestor cukup untuk SinkV2, sementara WriteAndSink memerlukan izin admin.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Untuk informasi selengkapnya tentang otorisasi, lihat Kontrol akses berbasis peran Kusto.

Untuk menulis data dari Flink:

  1. Impor opsi yang diperlukan:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Gunakan aplikasi atau identitas terkelola Anda untuk Mengautentikasi.

    Untuk autentikasi aplikasi:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Untuk autentikasi identitas terkelola:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurasikan parameter sink seperti database dan tabel:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Anda bisa menambahkan opsi lainnya, seperti yang dijelaskan dalam tabel berikut ini:

    Opsi Deskripsi Nilai Default
    IngestionMappingRef Mereferensikan pemetaan penyerapan yang ada.
    FlushImmediately Menghapus data dengan segera, dan dapat menyebabkan masalah performa. Metode ini tidak disarankan.
    BatchIntervalMs Mengontrol seberapa sering data dihapus. 30 detik
    BatchSize Mengatur ukuran batch untuk rekaman buffering sebelum pembilasan. 1.000 rekaman
    ClientBatchSizeLimit Menentukan ukuran dalam MB data agregat sebelum penyerapan. 300 MB
    PollForIngestionStatus Jika true, konektor melakukan polling untuk status penyerapan setelah flush data. salah
    DeliveryGuarantee Menentukan semantik jaminan pengiriman. Untuk mencapai semantik tepat sekali, gunakan WriteAheadSink. AT_LEAST_ONCE
  2. Tulis data streaming dengan salah satu metode berikut:

    • SinkV2: Ini adalah opsi stateless yang menghapus data pada titik pemeriksaan, memastikan setidaknya sekali konsistensi. Kami merekomendasikan opsi ini untuk penyerapan data volume tinggi.
    • WriteAheadSink: Metode ini memancarkan data ke KustoSink. Ini terintegrasi dengan sistem titik pemeriksaan Flink dan menawarkan jaminan sekali persis. Data disimpan di AbstractStateBackend dan dilakukan hanya setelah titik pemeriksaan selesai.

    Contoh berikut menggunakan SinkV2. Untuk menggunakan WriteAheadSink, gunakan metode alih-alih buildWriteAheadSinkbuild:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Kode lengkap akan terlihat seperti ini:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Verifikasi bahwa data diserap

Setelah koneksi dikonfigurasi, data dikirim ke tabel Anda. Anda dapat memverifikasi bahwa data diserap dengan menjalankan kueri KQL.

  1. Jalankan kueri berikut untuk memverifikasi bahwa data diserap ke dalam tabel:

    <MyTable>
    | count
    
  2. Jalankan kueri berikut untuk menampilkan data:

    <MyTable>
    | take 100