แชร์ผ่าน


รูปแบบในการเพิ่มการรวมข้อมูลด้วย Dataflow Gen2

บทช่วยสอนนี้จะใช้เวลา 15 นาที และอธิบายวิธีการเพิ่มหน่วยข้อมูลลงในเลคเฮ้าส์โดยใช้ Dataflow Gen2

การเพิ่มการรวมข้อมูลในปลายทางข้อมูลจําเป็นต้องมีเทคนิคการโหลดเฉพาะข้อมูลใหม่หรือข้อมูลที่อัปเดตแล้วลงในปลายทางของข้อมูลของคุณ เทคนิคนี้สามารถทําได้โดยใช้คิวรีเพื่อกรองข้อมูลตามปลายทางของข้อมูล บทช่วยสอนนี้แสดงวิธีการสร้างกระแสข้อมูลเพื่อโหลดข้อมูลจากแหล่งข้อมูล OData ลงใน lakehouse และวิธีการเพิ่มคิวรีไปยังกระแสข้อมูลเพื่อกรองข้อมูลตามปลายทางของข้อมูล

ขั้นตอนระดับสูงในบทช่วยสอนนี้มีดังนี้:

  • สร้างกระแสข้อมูลเพื่อโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์
  • เพิ่มคิวรีไปยังกระแสข้อมูลเพื่อกรองข้อมูลตามปลายทางของข้อมูล
  • (ไม่บังคับ) โหลดข้อมูลโดยใช้สมุดบันทึกและไปป์ไลน์

ข้อกำหนดเบื้องต้น

คุณต้องมีพื้นที่ทํางานที่เปิดใช้งาน Microsoft Fabric ถ้าคุณยังไม่มี ให้อ้างอิงถึง สร้างพื้นที่ทํางาน นอกจากนี้ บทช่วยสอนจะถือว่าคุณกําลังใช้มุมมองไดอะแกรมใน Dataflow Gen2 เมื่อต้องการตรวจสอบว่าคุณกําลังใช้มุมมองไดอะแกรมใน ribbon ด้านบนให้ไปที่ มุมมอง และตรวจสอบให้แน่ใจว่า มุมมองไดอะแกรมถูกเลือกไว้

สร้างกระแสข้อมูลเพื่อโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์

ในส่วนนี้ คุณสร้างกระแสข้อมูลเพื่อโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์

  1. สร้างเลคเฮ้าส์ใหม่ในพื้นที่ทํางานของคุณ

    สกรีนช็อตที่แสดงกล่องโต้ตอบสร้างเลคเฮ้าส์

  2. สร้าง กระแสข้อมูล Gen2 ใหม่ในพื้นที่ทํางานของคุณ

    สกรีนช็อตที่แสดงรายการดรอปดาวน์สร้างกระแสข้อมูล

  3. เพิ่มแหล่งข้อมูลใหม่ลงในกระแสข้อมูล เลือกแหล่งข้อมูล OData และใส่ URL ต่อไปนี้: https://services.OData.org/V4/Northwind/Northwind.svc

    สกรีนช็อตที่แสดงกล่องโต้ตอบรับข้อมูล

    สกรีนช็อตที่แสดงตัวเชื่อมต่อ OData

    สกรีนช็อตที่แสดงการตั้งค่า OData

  4. เลือกตาราง Orders และเลือกถัดไป

    สกรีนช็อตที่แสดงกล่องโต้ตอบตารางคําสั่งซื้อที่เลือก

  5. เลือกคอลัมน์ต่อไปนี้เพื่อเก็บ:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    สกรีนช็อตที่แสดงฟังก์ชันเลือกคอลัมน์

    สกรีนช็อตที่แสดงการเลือกตารางคําสั่งซื้อคอลัมน์

  6. เปลี่ยนชนิดข้อมูลของ OrderDate, RequiredDateและ ShippedDate เป็นdatetime

    สกรีนช็อตที่แสดงฟังก์ชันเปลี่ยนชนิดข้อมูล

  7. ตั้งค่าปลายทางข้อมูลไปยังเลคเฮ้าส์ของคุณโดยใช้การตั้งค่าต่อไปนี้:

    • ปลายทางของข้อมูล: Lakehouse
    • เลคเฮ้าส์: เลือกเลคเฮ้าส์ที่คุณสร้างขึ้นในขั้นตอนที่ 1
    • ชื่อตารางใหม่: Orders
    • วิธีอัปเดต: Replace

    สกรีนช็อตแสดงริบบอนของทะเลสาบปลายทางข้อมูล

    สกรีนช็อตที่แสดงตารางลําดับของ Lakehouse ปลายทางข้อมูล

    สกรีนช็อตแสดงการแทนที่การตั้งค่า lakehouse ปลายทางข้อมูล

  8. เลือก ถัดไป และเผยแพร่กระแสข้อมูล

    สกรีนช็อตที่แสดงกล่องโต้ตอบเผยแพร่กระแสข้อมูล

ตอนนี้คุณได้สร้างกระแสข้อมูลเพื่อโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์แล้ว กระแสข้อมูลนี้ถูกใช้ในส่วนถัดไปเพื่อเพิ่มคิวรีไปยังกระแสข้อมูลเพื่อกรองข้อมูลตามปลายทางของข้อมูล หลังจากนั้น คุณสามารถใช้กระแสข้อมูลเพื่อโหลดข้อมูลโดยใช้สมุดบันทึกและไปป์ไลน์ได้

เพิ่มคิวรีไปยังกระแสข้อมูลเพื่อกรองข้อมูลตามปลายทางของข้อมูล

ส่วนนี้เพิ่มแบบสอบถามไปยังกระแสข้อมูลเพื่อกรองข้อมูลตามข้อมูลใน lakehouse ปลายทาง คิวรีรับค่าสูงสุด OrderID ใน lakehouse ที่จุดเริ่มต้นของการรีเฟรชกระแสข้อมูล และใช้ OrderId สูงสุดเพื่อรับคําสั่งซื้อที่มี OrderId ที่สูงขึ้นจากต้นทางไปยังต้นทางเพื่อผนวกเข้ากับปลายทางของข้อมูลของคุณ ซึ่งถือว่ามีการเพิ่มใบสั่งไปยังแหล่งข้อมูลจากน้อยไปหามากของOrderID ถ้าไม่ใช่กรณีนี้ คุณสามารถใช้คอลัมน์อื่นเพื่อกรองข้อมูลได้ ตัวอย่างเช่น คุณสามารถใช้ OrderDate คอลัมน์เพื่อกรองข้อมูลได้

หมายเหตุ

ตัวกรอง OData จะถูกนําไปใช้ภายใน Fabric หลังจากที่ได้รับข้อมูลจากแหล่งข้อมูล อย่างไรก็ตาม สําหรับแหล่งข้อมูลฐานข้อมูลเช่น SQL Server ตัวกรองจะถูกนําไปใช้ในคิวรีที่ส่งไปยังแหล่งข้อมูลหลังบ้าน และจะส่งกลับเฉพาะแถวที่กรองไปยังบริการเท่านั้น

  1. หลังจากรีเฟรชกระแสข้อมูลแล้ว ให้เปิดกระแสข้อมูลที่คุณสร้างไว้ในส่วนก่อนหน้าใหม่อีกครั้ง

    สกรีนช็อตที่แสดงกล่องโต้ตอบเปิดกระแสข้อมูล

  2. สร้างคิวรีใหม่ที่ชื่อว่า IncrementalOrderID และรับข้อมูลจากตาราง Orders ใน lakehouse ที่คุณสร้างขึ้นในส่วนก่อนหน้า

    สกรีนช็อตที่แสดงกล่องโต้ตอบรับข้อมูล

    สกรีนช็อตที่แสดงตัวเชื่อมต่อของ lakehouse

    สกรีนช็อตที่แสดงการรับคําสั่งซื้อของเลคเฮาส์ตาราง

    สกรีนช็อตที่แสดงการเปลี่ยนชื่อฟังก์ชันคิวรี

    สกรีนช็อตที่แสดงคิวรีที่เปลี่ยนชื่อแล้ว

  3. ปิดใช้งานการกําหนดสถานะของคิวรีนี้

    สกรีนช็อตที่แสดงฟังก์ชันการจัดเตรียมปิดใช้งาน

  4. ในการแสดงตัวอย่างข้อมูล คลิกขวาบน OrderID คอลัมน์ และเลือก ดูรายละเอียดแนวลึก

    สกรีนช็อตที่แสดงฟังก์ชันการดูรายละเอียดแนวลึก

  5. จากริบบอน ให้เลือก เครื่องมือรายการ ->สถิติ ->ค่าสูงสุด

    สกรีนช็อตที่แสดงฟังก์ชัน orderid สูงสุดของสถิติ

ตอนนี้คุณมีคิวรีที่ส่งกลับ OrderID สูงสุดในเลคเฮ้าส์ คิวรีนี้ถูกใช้เพื่อกรองข้อมูลจากแหล่งข้อมูล OData ในส่วนถัดไปจะเพิ่มคิวรีไปยังกระแสข้อมูลเพื่อกรองข้อมูลจากแหล่งข้อมูล OData ตาม OrderID สูงสุดใน lakehouse

  1. กลับไปที่คิวรี Orders และเพิ่มขั้นตอนใหม่เพื่อกรองข้อมูล ใช้การตั้งค่าต่อไปนี้:

    • คอลัมน์: OrderID
    • การผ่าตัด: Greater than
    • ค่า: พารามิเตอร์ IncrementalOrderID

    สกรีนช็อตที่แสดง orderid มากกว่าฟังก์ชันตัวกรอง

    สกรีนช็อตที่แสดงการตั้งค่าตัวกรอง

  2. อนุญาตให้รวมข้อมูลจากแหล่งข้อมูล OData และ lakehouse โดยการยืนยันกล่องโต้ตอบต่อไปนี้:

    สกรีนช็อตที่แสดงอนุญาตให้รวมกล่องโต้ตอบข้อมูล

  3. อัปเดตปลายทางของข้อมูลเพื่อใช้การตั้งค่าต่อไปนี้:

    • วิธีอัปเดต: Append

    สกรีนช็อตที่แสดงฟังก์ชันแก้ไขการตั้งค่าเอาต์พุต

    สกรีนช็อตที่แสดงตารางคําสั่งซื้อที่มีอยู่

    สกรีนช็อตที่แสดงการตั้งค่า lakehouse ปลายทางข้อมูลต่อท้าย

  4. เผยแพร่กระแสข้อมูล

    สกรีนช็อตที่แสดงกล่องโต้ตอบเผยแพร่กระแสข้อมูล

กระแสข้อมูลของคุณตอนนี้ประกอบด้วยคิวรีที่กรองข้อมูลจากแหล่งข้อมูล OData โดยยึดตาม OrderID สูงสุดใน lakehouse ซึ่งหมายความว่าจะโหลดเฉพาะข้อมูลใหม่หรือข้อมูลที่อัปเดตแล้วลงในเลคเฮ้าส์ ส่วนถัดไปใช้กระแสข้อมูลเพื่อโหลดข้อมูลโดยใช้สมุดบันทึกและไปป์ไลน์

(ไม่บังคับ) โหลดข้อมูลโดยใช้สมุดบันทึกและไปป์ไลน์

อีกทางหนึ่งคือ คุณสามารถโหลดข้อมูลเฉพาะโดยใช้สมุดบันทึกและไปป์ไลน์ได้ ด้วยรหัส python ที่กําหนดเองในสมุดบันทึก คุณจะลบข้อมูลเก่าออกจากเลคเฮ้าส์ได้ จากนั้น สร้างไปป์ไลน์ที่คุณเรียกใช้สมุดบันทึกก่อนและเรียกใช้กระแสข้อมูลตามลําดับ คุณโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์ใหม่ สมุดบันทึกสนับสนุนหลายภาษา แต่บทช่วยสอนนี้ใช้ PySpark Pyspark คือ Python API สําหรับ Spark และใช้ในบทช่วยสอนนี้เพื่อเรียกใช้คิวรี Spark SQL

  1. สร้างสมุดบันทึกใหม่ในพื้นที่ทํางานของคุณ

    สกรีนช็อตที่แสดงกล่องโต้ตอบสมุดบันทึกใหม่

  2. เพิ่มรหัส PySpark ต่อไปนี้ลงในสมุดบันทึกของคุณ:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. เรียกใช้สมุดบันทึกเพื่อตรวจสอบว่าข้อมูลถูกลบออกจากเลคเฮ้าส์หรือไม่

  4. สร้างไปป์ไลน์ใหม่ในพื้นที่ทํางานของคุณ

    สกรีนช็อตที่แสดงกล่องโต้ตอบไปป์ไลน์ใหม่

  5. เพิ่มกิจกรรมสมุดบันทึกใหม่ไปยังไปป์ไลน์ และเลือกสมุดบันทึกที่คุณสร้างในขั้นตอนก่อนหน้า

    สกรีนช็อตที่แสดงกล่องโต้ตอบเพิ่มกิจกรรมของสมุดบันทึก

    สกรีนช็อตที่แสดงกล่องโต้ตอบเลือกสมุดบันทึก

  6. เพิ่มกิจกรรมกระแสข้อมูลใหม่ไปยังไปป์ไลน์และเลือกกระแสข้อมูลที่คุณสร้างขึ้นในส่วนก่อนหน้า

    สกรีนช็อตที่แสดงกล่องโต้ตอบเพิ่มกิจกรรมของกระแสข้อมูล

    สกรีนช็อตที่แสดงกล่องโต้ตอบเลือกกระแสข้อมูล

  7. เชื่อมโยงกิจกรรมของสมุดบันทึกไปยังกิจกรรมกระแสข้อมูลด้วยทริกเกอร์ความสําเร็จ

    สกรีนช็อตที่แสดงกล่องโต้ตอบกิจกรรมการเชื่อมต่อ

  8. บันทึกและเรียกใช้ไปป์ไลน์

    สกรีนช็อตที่แสดงกล่องโต้ตอบเรียกใช้ไปป์ไลน์

ตอนนี้คุณมีไปป์ไลน์ที่ลบข้อมูลเก่าออกจากเลคเฮ้าส์และโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์ ด้วยการตั้งค่านี้ คุณสามารถโหลดข้อมูลจากแหล่งข้อมูล OData ลงในเลคเฮ้าส์เป็นประจําได้