หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
Apache Flink เป็นเฟรมเวิร์ก และกลไกการประมวลผลแบบกระจายสําหรับการคํานวณแบบสเตทฟูลผ่านสตรีมข้อมูลที่ไม่มีขอบเขตและผูกไว้
ตัวเชื่อมต่อ Flink เป็นโครงการโอเพนซอร์ส (Open Source)ที่สามารถทํางานบนคลัสเตอร์ Flink ใด ๆ ได้ ซึ่งใช้การจมข้อมูลสําหรับการย้ายข้อมูลจากคลัสเตอร์ Flink การใช้ตัวเชื่อมต่อไปยัง Apache Flink คุณสามารถสร้างแอปพลิเคชันที่รวดเร็วและสามารถปรับขนาดได้โดยกําหนดเป้าหมายสถานการณ์ที่ขับเคลื่อนด้วยข้อมูล เช่น การเรียนรู้ของเครื่อง (ML), Extract-Transform-Load (ETL) และ Log Analytics
ในบทความนี้ คุณจะได้เรียนรู้วิธีการใช้ตัวเชื่อมต่อ Flink เพื่อส่งข้อมูลจาก Flink ไปยังตารางของคุณ คุณสร้างตารางและการแมปข้อมูล โดยตรง Flink เพื่อส่งข้อมูลลงในตาราง จากนั้นตรวจสอบผลลัพธ์
ข้อกำหนดเบื้องต้น
- คลัสเตอร์ Azure Data Explorer และฐานข้อมูล สร้างคลัสเตอร์และฐานข้อมูลหรือฐานข้อมูล KQL ในตัวแสดงเวลาจริงใน Microsoft Fabric
- ตารางเป้าหมายในฐานข้อมูลของคุณ ดู สร้างตารางใน Azure Data Explorer หรือ สร้างตารางในตัวแสดงเวลาจริง
- คลัสเตอร์ Apache Flink สร้างคลัสเตอร์
- Maven 3.x
รับตัวเชื่อมต่อ Flink
สําหรับโครงการ Flink ที่ใช้ Maven เพื่อจัดการการขึ้นต่อกัน รวม Flink Connector Core Sink สําหรับ Azure Data Explorer โดยเพิ่มเป็นการขึ้นต่อกัน:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
สําหรับโครงการที่ไม่ได้ใช้ Maven เพื่อจัดการการขึ้นต่อกัน ให้ โคลนที่เก็บสําหรับตัวเชื่อมต่อ Azure Data Explorer สําหรับ Apache Flink และสร้างภายในเครื่อง วิธีนี้ช่วยให้คุณสามารถเพิ่มตัวเชื่อมต่อไปยังที่เก็บ Maven ภายในเครื่องของคุณด้วยตนเองโดยใช้คําสั่งmvn clean install -DskipTests
รับรองความถูกต้อง
คุณสามารถรับรองความถูกต้องจาก Flink เพื่อใช้แอปพลิเคชัน Microsoft Entra ID
บริการหลักนี้จะเป็นข้อมูลประจําตัวที่ใช้โดยตัวเชื่อมต่อเพื่อเขียนข้อมูลตารางของคุณใน Kusto คุณจะให้สิทธิ์สําหรับบริการหลักนี้เพื่อเข้าถึงทรัพยากร Kusto ในภายหลัง
ลงชื่อเข้าใช้การสมัครใช้งาน Azure ของคุณผ่านทาง Azure CLI จากนั้นรับรองความถูกต้องในเบราว์เซอร์
az loginเลือกการสมัครใช้งานเพื่อโฮสต์โครงร่างสําคัญ ขั้นตอนนี้จําเป็นเมื่อคุณมีการสมัครใช้งานหลายรายการ
az account set --subscription YOUR_SUBSCRIPTION_GUIDสร้างบริการหลัก ในตัวอย่างนี้ โครงร่างสําคัญของบริการเรียกว่า
my-service-principalaz ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}จากข้อมูล JSON ที่ส่งกลับ ให้
appIdpasswordคัดลอก และtenantสําหรับการใช้งานในอนาคต{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
คุณได้สร้างแอปพลิเคชัน Microsoft Entra และบริการหลักของคุณแล้ว
ให้สิทธิ์ผู้ใช้แอปพลิเคชันในฐานข้อมูล:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')ให้สิทธิ์การนําเข้าหรือผู้ดูแลระบบของแอปพลิเคชันบนตาราง สิทธิ์ที่จําเป็นขึ้นอยู่กับวิธีการเขียนข้อมูลที่เลือก สิทธิ์การนําเข้าเพียงพอสําหรับ SinkV2 ในขณะที่ WriteAndSink จําเป็นต้องมีสิทธิ์ระดับผู้ดูแลระบบ
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
สําหรับข้อมูลเพิ่มเติมเกี่ยวกับการรับรองความถูกต้อง ดู ควบคุมการเข้าถึงตามบทบาท Kusto
เขียนข้อมูลจาก Flink
เมื่อต้องการเขียนข้อมูลจาก Flink:
นําเข้าตัวเลือกที่จําเป็น:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;ใช้แอปพลิเคชันของคุณเพื่อรับรองความถูกต้อง
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
กําหนดค่าพารามิเตอร์ของที่เก็บ เช่น ฐานข้อมูลและตาราง:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();คุณสามารถเพิ่มตัวเลือกเพิ่มเติม ตามที่อธิบายไว้ในตารางต่อไปนี้:
ตัวเลือก คำอธิบาย ค่าเริ่มต้น IngestionMappingRef อ้างอิงการแมปการนําเข้าที่มีอยู่ ล้างค่าทั้งหมด ล้างข้อมูลทันทีและอาจทําให้เกิดปัญหาด้านประสิทธิภาพการทํางาน ไม่แนะนําให้ใช้วิธีการนี้ BatchIntervalMs ควบคุมความถี่ในการล้างข้อมูล 30 วินาที ขนาดชุด ตั้งค่าขนาดของชุดงานสําหรับการบัฟเฟอร์เรกคอร์ดก่อนล้างข้อมูล 1,000 เรกคอร์ด. ClientBatchSizeLimit ระบุขนาดเป็น MB ของข้อมูลรวมก่อนการนําเข้า 300 MB PollForIngestionStatus ถ้าเป็นจริง ตัวเชื่อมต่อจะทําแบบสํารวจสําหรับสถานะการนําเข้าข้อมูลหลังจากการล้างข้อมูล เท็จ DeliveryGuarantee กําหนดตรรกะการรับประกันการจัดส่ง เพื่อให้บรรลุผลอย่างถูกต้องเมื่อมีการความหมาย ให้ใช้ WriteAheadSink AT_LEAST_ONCE เขียนข้อมูลการสตรีมด้วยหนึ่งในวิธีต่อไปนี้:
- SinkV2: นี่คือตัวเลือกที่ไม่สะดุดซึ่งล้างข้อมูลบนจุดตรวจสอบเพื่อให้มั่นใจว่ามีความสอดคล้องอย่างน้อยหนึ่งครั้ง เราขอแนะนําให้ใช้ตัวเลือกนี้ในการนําเข้าข้อมูลปริมาณมาก
- WriteAheadSink: วิธีนี้จะปล่อยข้อมูลไปยัง KustoSink มันรวมเข้ากับระบบตรวจสอบ Flink และนําเสนอการรับประกันทันที ข้อมูลถูกเก็บไว้ใน AbstractStateBackend และยืนยันเฉพาะหลังจากที่จุดตรวจสอบเสร็จสมบูรณ์แล้ว
ตัวอย่างต่อไปนี้ใช้ SinkV2 หากต้องการใช้ WriteAheadSink ให้ใช้
buildWriteAheadSinkวิธีการ แทนbuild:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
โค้ดที่สมบูรณ์ควรมีลักษณะดังนี้:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
ตรวจสอบว่ามีการนําเข้าข้อมูล
เมื่อกําหนดค่าการเชื่อมต่อแล้ว ข้อมูลจะถูกส่งไปยังตารางของคุณ คุณสามารถตรวจสอบว่ามีการนําเข้าข้อมูลโดยการเรียกใช้คิวรี KQL
เรียกใช้คิวรีต่อไปนี้เพื่อตรวจสอบว่าข้อมูลถูกนําเข้าลงในตาราง:
<MyTable> | countเรียกใช้คิวรีต่อไปนี้เพื่อดูข้อมูล:
<MyTable> | take 100