รับข้อมูลจาก Apache Flink

Apache Flink เป็นเฟรมเวิร์ก และกลไกการประมวลผลแบบกระจายสําหรับการคํานวณแบบสเตทฟูลผ่านสตรีมข้อมูลที่ไม่มีขอบเขตและผูกไว้

ตัวเชื่อมต่อ Flink เป็นโครงการโอเพนซอร์ส (Open Source)ที่สามารถทํางานบนคลัสเตอร์ Flink ใด ๆ ได้ ซึ่งใช้การจมข้อมูลสําหรับการย้ายข้อมูลจากคลัสเตอร์ Flink การใช้ตัวเชื่อมต่อไปยัง Apache Flink คุณสามารถสร้างแอปพลิเคชันที่รวดเร็วและสามารถปรับขนาดได้โดยกําหนดเป้าหมายสถานการณ์ที่ขับเคลื่อนด้วยข้อมูล เช่น การเรียนรู้ของเครื่อง (ML), Extract-Transform-Load (ETL) และ Log Analytics

ในบทความนี้ คุณจะได้เรียนรู้วิธีการใช้ตัวเชื่อมต่อ Flink เพื่อส่งข้อมูลจาก Flink ไปยังตารางของคุณ คุณสร้างตารางและการแมปข้อมูล โดยตรง Flink เพื่อส่งข้อมูลลงในตาราง จากนั้นตรวจสอบผลลัพธ์

ข้อกำหนดเบื้องต้น

สําหรับโครงการ Flink ที่ใช้ Maven เพื่อจัดการการขึ้นต่อกัน รวม Flink Connector Core Sink สําหรับ Azure Data Explorer โดยเพิ่มเป็นการขึ้นต่อกัน:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

สําหรับโครงการที่ไม่ได้ใช้ Maven เพื่อจัดการการขึ้นต่อกัน ให้ โคลนที่เก็บสําหรับตัวเชื่อมต่อ Azure Data Explorer สําหรับ Apache Flink และสร้างภายในเครื่อง วิธีนี้ช่วยให้คุณสามารถเพิ่มตัวเชื่อมต่อไปยังที่เก็บ Maven ภายในเครื่องของคุณด้วยตนเองโดยใช้คําสั่งmvn clean install -DskipTests

รับรองความถูกต้อง

คุณสามารถรับรองความถูกต้องจาก Flink เพื่อใช้แอปพลิเคชัน Microsoft Entra ID

บริการหลักนี้จะเป็นข้อมูลประจําตัวที่ใช้โดยตัวเชื่อมต่อเพื่อเขียนข้อมูลตารางของคุณใน Kusto คุณจะให้สิทธิ์สําหรับบริการหลักนี้เพื่อเข้าถึงทรัพยากร Kusto ในภายหลัง

  1. ลงชื่อเข้าใช้การสมัครใช้งาน Azure ของคุณผ่านทาง Azure CLI จากนั้นรับรองความถูกต้องในเบราว์เซอร์

    az login
    
  2. เลือกการสมัครใช้งานเพื่อโฮสต์โครงร่างสําคัญ ขั้นตอนนี้จําเป็นเมื่อคุณมีการสมัครใช้งานหลายรายการ

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. สร้างบริการหลัก ในตัวอย่างนี้ โครงร่างสําคัญของบริการเรียกว่าmy-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. จากข้อมูล JSON ที่ส่งกลับ ให้appIdpasswordคัดลอก และ tenant สําหรับการใช้งานในอนาคต

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

คุณได้สร้างแอปพลิเคชัน Microsoft Entra และบริการหลักของคุณแล้ว

  1. ให้สิทธิ์ผู้ใช้แอปพลิเคชันในฐานข้อมูล:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. ให้สิทธิ์การนําเข้าหรือผู้ดูแลระบบของแอปพลิเคชันบนตาราง สิทธิ์ที่จําเป็นขึ้นอยู่กับวิธีการเขียนข้อมูลที่เลือก สิทธิ์การนําเข้าเพียงพอสําหรับ SinkV2 ในขณะที่ WriteAndSink จําเป็นต้องมีสิทธิ์ระดับผู้ดูแลระบบ

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

สําหรับข้อมูลเพิ่มเติมเกี่ยวกับการรับรองความถูกต้อง ดู ควบคุมการเข้าถึงตามบทบาท Kusto

เมื่อต้องการเขียนข้อมูลจาก Flink:

  1. นําเข้าตัวเลือกที่จําเป็น:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. ใช้แอปพลิเคชันของคุณเพื่อรับรองความถูกต้อง

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. กําหนดค่าพารามิเตอร์ของที่เก็บ เช่น ฐานข้อมูลและตาราง:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    คุณสามารถเพิ่มตัวเลือกเพิ่มเติม ตามที่อธิบายไว้ในตารางต่อไปนี้:

    ตัวเลือก คำอธิบาย ค่าเริ่มต้น
    IngestionMappingRef อ้างอิงการแมปการนําเข้าที่มีอยู่
    ล้างค่าทั้งหมด ล้างข้อมูลทันทีและอาจทําให้เกิดปัญหาด้านประสิทธิภาพการทํางาน ไม่แนะนําให้ใช้วิธีการนี้
    BatchIntervalMs ควบคุมความถี่ในการล้างข้อมูล 30 วินาที
    ขนาดชุด ตั้งค่าขนาดของชุดงานสําหรับการบัฟเฟอร์เรกคอร์ดก่อนล้างข้อมูล 1,000 เรกคอร์ด.
    ClientBatchSizeLimit ระบุขนาดเป็น MB ของข้อมูลรวมก่อนการนําเข้า 300 MB
    PollForIngestionStatus ถ้าเป็นจริง ตัวเชื่อมต่อจะทําแบบสํารวจสําหรับสถานะการนําเข้าข้อมูลหลังจากการล้างข้อมูล เท็จ
    DeliveryGuarantee กําหนดตรรกะการรับประกันการจัดส่ง เพื่อให้บรรลุผลอย่างถูกต้องเมื่อมีการความหมาย ให้ใช้ WriteAheadSink AT_LEAST_ONCE
  2. เขียนข้อมูลการสตรีมด้วยหนึ่งในวิธีต่อไปนี้:

    • SinkV2: นี่คือตัวเลือกที่ไม่สะดุดซึ่งล้างข้อมูลบนจุดตรวจสอบเพื่อให้มั่นใจว่ามีความสอดคล้องอย่างน้อยหนึ่งครั้ง เราขอแนะนําให้ใช้ตัวเลือกนี้ในการนําเข้าข้อมูลปริมาณมาก
    • WriteAheadSink: วิธีนี้จะปล่อยข้อมูลไปยัง KustoSink มันรวมเข้ากับระบบตรวจสอบ Flink และนําเสนอการรับประกันทันที ข้อมูลถูกเก็บไว้ใน AbstractStateBackend และยืนยันเฉพาะหลังจากที่จุดตรวจสอบเสร็จสมบูรณ์แล้ว

    ตัวอย่างต่อไปนี้ใช้ SinkV2 หากต้องการใช้ WriteAheadSink ให้ใช้ buildWriteAheadSink วิธีการ แทน build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

โค้ดที่สมบูรณ์ควรมีลักษณะดังนี้:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

ตรวจสอบว่ามีการนําเข้าข้อมูล

เมื่อกําหนดค่าการเชื่อมต่อแล้ว ข้อมูลจะถูกส่งไปยังตารางของคุณ คุณสามารถตรวจสอบว่ามีการนําเข้าข้อมูลโดยการเรียกใช้คิวรี KQL

  1. เรียกใช้คิวรีต่อไปนี้เพื่อตรวจสอบว่าข้อมูลถูกนําเข้าลงในตาราง:

    <MyTable>
    | count
    
  2. เรียกใช้คิวรีต่อไปนี้เพื่อดูข้อมูล:

    <MyTable>
    | take 100