หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
การสตรีมที่มีโครงสร้างเป็นเครื่องมือประมวลผลสตรีมที่ปรับขนาดได้และทนต่อข้อบกพร่องที่สร้างขึ้นบน Spark Spark จะดูแลการเรียกใช้การดําเนินการสตรีมมิ่งแบบเพิ่มหน่วยและต่อเนื่องขณะที่ข้อมูลยังคงมาถึง
การสตรีมที่มีโครงสร้างจะพร้อมใช้งานใน Spark 2.2 ตั้งแต่นั้นมา จึงเป็นแนวทางที่แนะนําสําหรับการสตรีมข้อมูล หลักการพื้นฐานที่อยู่เบื้องหลังสตรีมที่มีโครงสร้างคือการปฏิบัติต่อสตรีมข้อมูลสดเป็นตารางที่มีการผนวกข้อมูลใหม่อย่างต่อเนื่องเสมอ เช่น แถวใหม่ในตาราง มีแหล่งข้อมูลไฟล์การสตรีมในตัวที่กําหนดไว้สองถึงสามแหล่ง เช่น CSV, JSON, ORC, Parquet และการสนับสนุนในตัวสําหรับบริการรับส่งข้อความ เช่น Kafka และ Event Hubs
บทความนี้มีข้อมูลเชิงลึกเกี่ยวกับวิธีการปรับการประมวลผลและการนําเข้าเหตุการณ์ให้เหมาะสมผ่านการสตรีมที่มีโครงสร้างของ Spark ในสภาพแวดล้อมการผลิตที่มีอัตราความเร็วสูง วิธีการที่แนะนําได้แก่:
- การปรับอัตราความเร็วของการสตรีมข้อมูลให้เหมาะสม
- การปรับการดําเนินการเขียนให้เหมาะสมในตาราง Delta และ
- ชุดงานเหตุการณ์
ข้อกําหนดงาน Spark และสมุดบันทึก Spark
สมุดบันทึก Spark เป็นเครื่องมือที่ยอดเยี่ยมสําหรับการตรวจสอบแนวคิดและการทดลองเพื่อรับข้อมูลเชิงลึกจากข้อมูลหรือรหัสของคุณ สมุดบันทึกถูกใช้อย่างแพร่หลายในการเตรียมข้อมูล การแสดงผลข้อมูล ด้วยภาพ การเรียนรู้ของเครื่อง และสถานการณ์ข้อมูลขนาดใหญ่อื่นๆ ข้อกําหนดงาน Spark เป็นงานที่เน้นโค้ดที่ไม่ใช่เชิงโต้ตอบที่ทํางานบนคลัสเตอร์ Spark เป็นระยะเวลานาน ข้อกําหนดงาน Spark มีความคงทนและความพร้อมใช้งาน
สมุดบันทึก Spark เป็นแหล่งข้อมูลที่ยอดเยี่ยมในการทดสอบตรรกะของโค้ดและจัดการกับข้อกําหนดทางธุรกิจทั้งหมด อย่างไรก็ตาม เพื่อให้สามารถเรียกใช้ได้ต่อไปในสถานการณ์การผลิต ข้อกําหนดงาน Spark ที่เปิดใช้งานนโยบายลองใหม่เป็นโซลูชันที่ดีที่สุด
ทําซ้ํานโยบายสําหรับข้อกําหนดงาน Spark
ใน Microsoft Fabric ผู้ใช้สามารถตั้งค่านโยบายการลองใหม่สําหรับงาน Spark Job Definition ได้ แม้ว่าสคริปต์ในงานอาจไม่มีที่สิ้นสุด แต่โครงสร้างพื้นฐานที่เรียกใช้สคริปต์อาจเกิดปัญหาซึ่งต้องใช้การหยุดงาน หรืองานอาจถูกตัดออกเนื่องจากความต้องการในการติดตั้งโครงสร้างพื้นฐาน นโยบายการลองใหม่อนุญาตให้ผู้ใช้สามารถตั้งค่ากฎสําหรับการรีสตาร์ทงานโดยอัตโนมัติหากหยุดเนื่องจากปัญหาพื้นฐานใด ๆ พารามิเตอร์ระบุความถี่ในการรีสตาร์ตงาน สูงสุดถึงการลองใหม่ที่ไม่จํากัด และการตั้งค่าเวลาระหว่างการลองใหม่ ด้วยวิธีนี้ ผู้ใช้สามารถตรวจสอบให้แน่ใจว่างาน Spark Job Definition ของพวกเขายังคงทํางานได้อย่างต่อเนื่องโดยไม่มีที่สิ้นสุดจนกว่าผู้ใช้จะตัดสินใจที่จะหยุดงานเหล่านั้น
แหล่งข้อมูลการสตรีม
การตั้งค่าการสตรีมด้วย Event Hubs จําเป็นต้องมีการกําหนดค่าพื้นฐาน ซึ่งรวมถึงชื่อ namespace Event Hubs ชื่อฮับ ชื่อคีย์การเข้าถึงที่ใช้ร่วมกัน และกลุ่มผู้บริโภค กลุ่มผู้บริโภคคือมุมมองของฮับเหตุการณ์ทั้งหมด ซึ่งช่วยให้แอปพลิเคชันที่ใช้หลายตัวมีมุมมองของเหตุการณ์ที่แยกต่างหากและอ่านสตรีมได้อย่างอิสระตามจังหวะของพวกเขาเองและด้วยค่าชดเชย
พาร์ติชันเป็นส่วนสําคัญของความสามารถในการจัดการข้อมูลที่มีปริมาณสูง ตัวประมวลผลเดียวมีความจุที่จํากัดสําหรับการจัดการเหตุการณ์ต่อวินาที ในขณะที่ตัวประมวลผลหลายตัวสามารถทํางานได้ดีขึ้นเมื่อดําเนินการแบบคู่ขนาน พาร์ติชันช่วยให้สามารถประมวลผลเหตุการณ์จํานวนมากพร้อมกันได้
หากมีการใช้พาร์ติชันมากเกินไปกับอัตราการนําเข้าต่ํา ตัวอ่านพาร์ติชันจะจัดการกับส่วนเล็ก ๆ ของข้อมูลนี้ทําให้เกิดการประมวลผลที่ไม่ใช่ตามความเหมาะสม จํานวนพาร์ติชันที่เหมาะสมขึ้นอยู่กับอัตราการประมวลผลที่ต้องการโดยตรง ถ้าคุณต้องการปรับขนาดการประมวลผลเหตุการณ์ของคุณ พิจารณาเพิ่มพาร์ติชันเพิ่มเติม ไม่มีขีดจํากัดปริมาณงานเฉพาะบนพาร์ติชัน อย่างไรก็ตาม ปริมาณงานรวมใน namespace ของคุณจะถูกจํากัดด้วยจํานวนหน่วยอัตราความเร็ว เมื่อคุณเพิ่มจํานวนหน่วยปริมาณงานในเนมสเปซของคุณ คุณอาจต้องการพาร์ติชันเพิ่มเติมเพื่อให้ผู้อ่านพร้อมกันสามารถรับอัตราความเร็วสูงสุดได้
คําแนะนําคือการตรวจสอบและทดสอบจํานวนพาร์ติชันที่ดีที่สุดสําหรับสถานการณ์อัตราความเร็วของคุณ แต่เป็นเรื่องปกติที่จะเห็นสถานการณ์ที่มีอัตราความเร็วสูงโดยใช้พาร์ติชัน 32 พาร์ติชันหรือมากกว่า
ตัวเชื่อมต่อ Azure Event Hubs สําหรับ Apache Spark (azure-event-hubs-spark) ขอแนะนําให้เชื่อมต่อแอปพลิเคชัน Spark กับ Azure Event Hubs
เลคเฮ้าส์เป็นอ่างสตรีมมิ่ง
Delta Lake เป็นเลเยอร์การจัดเก็บโอเพนซอร์สที่มอบ ACID (อะตอม ความสม่ําเสมอ การแยก และความทนทาน) อยู่ด้านบนของโซลูชันที่จัดเก็บข้อมูลทะเลสาบ นอกจากนี้ Delta Lake ยังสนับสนุนการจัดการเมตาดาต้าที่ปรับขนาดได้ วิวัฒนาการสคีมา การเดินทางเวลา (การกําหนดเวอร์ชันข้อมูล) รูปแบบเปิดและคุณสมบัติอื่น ๆ
ใน Fabric วิศวกรข้อมูล ing มีการใช้ Delta Lake เพื่อ:
- upsert (แทรก/อัปเดต) ได้อย่างง่ายดาย และลบข้อมูลโดยใช้ Spark SQL
- กระชับข้อมูลเพื่อลดเวลาที่ใช้ในการคิวรีข้อมูล
- ดูสถานะของตารางก่อนและหลังดําเนินการ
- ดึงข้อมูลประวัติของการดําเนินการที่ดําเนินการในตาราง
Delta ถูกเพิ่มเป็นหนึ่งในเอาต์พุตที่เป็นไปได้รูปแบบอ่างล้างจานที่ใช้ใน writeStream สําหรับข้อมูลเพิ่มเติมเกี่ยวกับอ่างล้างจานผลลัพธ์ที่มีอยู่ โปรดดู คําแนะนําการเขียนโปรแกรมการสตรีมแบบมีโครงสร้างของ Spark
ตัวอย่างต่อไปนี้สาธิตวิธีการสตรีมข้อมูลลงใน Delta Lake
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
เกี่ยวกับส่วนย่อยของโค้ดในตัวอย่าง:
- format() เป็นคําแนะนําที่กําหนดรูปแบบเอาต์พุตของข้อมูล
- outputMode() กําหนดวิธีการเขียนแถวใหม่ในการสตรีม (นั่นคือ ผนวก เขียนทับ)
- toTable() เก็บรักษาข้อมูลที่ถูกสตรีมลงในตาราง Delta ที่สร้างขึ้นโดยใช้ค่าที่ส่งผ่านเป็นพารามิเตอร์
การปรับการเขียน Delta ให้เหมาะสม
การแบ่งพาร์ติชันข้อมูลเป็นส่วนสําคัญในการสร้างโซลูชันการสตรีมที่มีประสิทธิภาพ: การแบ่งพาร์ติชันจะปรับปรุงวิธีการจัดระเบียบข้อมูล และยังช่วยปรับปรุงปริมาณงานอีกด้วย ไฟล์ต่างๆ จะถูกแยกย่อยได้ง่ายหลังจากการดําเนินการ Delta ทําให้มีไฟล์ขนาดเล็กเกินไป และแฟ้มที่มีขนาดใหญ่เกินไปก็เป็นปัญหาเนื่องจากใช้เวลานานในการเขียนลงในดิสก์ ความท้าทายในการแบ่งพาร์ติชันข้อมูลคือการค้นหาความสมดุลที่เหมาะสมซึ่งส่งผลให้มีขนาดไฟล์ที่เหมาะสม Spark สนับสนุนการแบ่งพาร์ติชันในหน่วยความจําและบนดิสก์ ข้อมูลที่แบ่งพาร์ติชันอย่างถูกต้องสามารถให้ประสิทธิภาพการทํางานที่ดีที่สุดเมื่อยืนยันข้อมูลไปยัง Delta Lake และทําการคิวรีข้อมูลจาก Delta Lake
- เมื่อแบ่งพาร์ติชันข้อมูลบนดิสก์ คุณสามารถเลือกวิธีการแบ่งพาร์ติชันข้อมูลตามคอลัมน์โดยใช้ partitionBy() partitionBy() เป็นฟังก์ชันที่ใช้ในการแบ่งพาร์ติชันแบบจําลองความหมายขนาดใหญ่เป็นไฟล์ที่มีขนาดเล็กลงโดยยึดตามหนึ่งหรือหลายคอลัมน์ที่ระบุขณะเขียนลงในดิสก์ การแบ่งพาร์ติชันคือวิธีในการปรับปรุงประสิทธิภาพการทํางานของคิวรีเมื่อทํางานกับแบบจําลองความหมายขนาดใหญ่ หลีกเลี่ยงการเลือกคอลัมน์ที่สร้างพาร์ติชันที่เล็กเกินไปหรือใหญ่เกินไป กําหนดพาร์ติชันตามชุดของคอลัมน์ที่มีคาร์ดินาลลิตี้ที่ดีและแยกข้อมูลลงในไฟล์ที่มีขนาดที่เหมาะสม
- การแบ่งพาร์ติชันข้อมูลในหน่วยความจําสามารถทําได้โดยใช้ การแปลง repartition() หรือ coalesce() กระจายข้อมูลบนโหนดผู้ปฏิบัติงานหลายรายและสร้างงานหลายอย่างที่สามารถอ่านและประมวลผลข้อมูลควบคู่ไปกับข้อมูลพื้นฐานของชุดข้อมูลแบบกระจายที่ยืดหยุ่น (RDD) ซึ่งช่วยให้การแบ่งแบบจําลองความหมายออกเป็นพาร์ติชันเชิงตรรกะซึ่งสามารถคํานวณได้ในโหนดที่แตกต่างกันของคลัสเตอร์
- repartition() ใช้เพื่อเพิ่มหรือลดจํานวนพาร์ติชันในหน่วยความจํา การแบ่งพาร์ติชันจะจัดตําแหน่งข้อมูลทั้งหมดบนเครือข่ายและปรับสมดุลทั่วทั้งพาร์ติชันทั้งหมด
- coalesce() ใช้เพื่อลดจํานวนพาร์ติชันได้อย่างมีประสิทธิภาพเท่านั้น นั่นคือ repartition() เวอร์ชันที่ปรับให้เหมาะสมซึ่งการเคลื่อนไหวของข้อมูลในพาร์ติชันทั้งหมดจะต่ํากว่าโดยใช้ coalesce()
การรวมวิธีการแบ่งพาร์ติชันทั้งสองคือโซลูชันที่ดีในสถานการณ์ที่มีอัตราความเร็วสูง repartition() สร้างจํานวนพาร์ติชันที่ระบุในหน่วยความจํา ในขณะที่ partitionBy() จะเขียนไฟล์ไปยังดิสก์สําหรับแต่ละพาร์ติชันหน่วยความจําและคอลัมน์การแบ่งพาร์ติชัน ตัวอย่างต่อไปนี้แสดงการใช้งานของกลยุทธ์การแบ่งพาร์ติชันทั้งสองในงาน Spark เดียวกัน: ข้อมูลถูกแยกออกเป็นพาร์ติชัน 48 ในหน่วยความจําก่อน (สมมติว่าเรามีแกน CPU ทั้งหมด 48 แกน) จากนั้นแบ่งพาร์ติชันบนดิสก์โดยยึดตามสองคอลัมน์ที่มีอยู่ในส่วนข้อมูล
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
การเขียนที่ปรับให้เหมาะสม
ตัวเลือกอื่นในการปรับการเขียนให้เหมาะสมกับ Delta Lake คือการใช้ การเขียนที่ปรับให้เหมาะสม การเขียนที่ปรับให้เหมาะสมเป็นคุณลักษณะทางเลือกที่ปรับปรุงวิธีเขียนข้อมูลลงในตาราง Delta Spark จะผสานหรือแยกพาร์ติชันก่อนที่จะเขียนข้อมูล เพิ่มปริมาณงานของข้อมูลที่กําลังเขียนลงในดิสก์ อย่างไรก็ตามการดําเนินการนี้จะเกิดขึ้นแบบ shuffle เต็มรูปแบบ ดังนั้นสําหรับปริมาณงานบางอย่างอาจทําให้เกิดประสิทธิภาพลดลง งานที่ใช้ coalesce() และ/หรือ partition() กับพาร์ติชันข้อมูลบนดิสก์สามารถ refactored เพื่อเริ่มใช้การเขียนที่เหมาะสมแทน
โค้ดต่อไปนี้เป็นตัวอย่างของการใช้ การเขียนที่ปรับให้เหมาะสมแล้ว โปรดทราบว่า พาร์ติชัน By() ยังคงใช้งานอยู่
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
ชุดงานเหตุการณ์
เพื่อลดจํานวนการดําเนินงานเพื่อปรับปรุงเวลาที่ใช้ในการรวบรวมข้อมูลลงใน Delta lake การชุดงานเหตุการณ์เป็นอีกทางเลือกหนึ่งที่ใช้ได้จริง
ทริกเกอร์จะกําหนดว่าควรดําเนินการคิวรีการสตรีมบ่อยเพียงใด (ทริกเกอร์) และส่งข้อมูลใหม่ออก การตั้งค่าการตั้งค่ากําหนดระยะเวลาการประมวลผลเป็นระยะสําหรับ microbatches การสะสมข้อมูลและชุดงานเหตุการณ์ลงในไม่กี่การดําเนินการที่มีอยู่แทนที่จะเขียนลงในดิสก์ตลอดเวลา
ตัวอย่างต่อไปนี้แสดงคิวรีการสตรีมที่เหตุการณ์ถูกประมวลผลเป็นระยะ ๆ ในช่วงเวลาหนึ่งนาที
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
ข้อดีของการรวมการรวมชุดเหตุการณ์ในการดําเนินการเขียนตาราง Delta คือการสร้างไฟล์ Delta ขนาดใหญ่ที่มีข้อมูลเพิ่มเติมในการหลีกเลี่ยงไฟล์ขนาดเล็ก คุณควรวิเคราะห์ปริมาณข้อมูลที่กําลังนําเข้าและค้นหาเวลาการประมวลผลที่ดีที่สุดเพื่อปรับขนาดของไฟล์ Parquet ที่สร้างขึ้นโดยไลบรารี Delta ให้เหมาะสม
การตรวจสอบ
Spark 3.1 และเวอร์ชันที่สูงกว่ามี UI การสตรีมที่มีโครงสร้างภายในซึ่งประกอบด้วยเมตริกการสตรีมต่อไปนี้:
- อัตราการป้อนข้อมูล
- อัตรากระบวนการ
- แถวอินพุต
- ระยะเวลาของชุดงาน
- ระยะเวลาการดําเนินการ
เนื้อหาที่เกี่ยวข้อง
- รับข้อมูลการสตรีมลงใน lakehouse และการเข้าถึงด้วยจุดสิ้นสุดการวิเคราะห์ SQL