หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
ในบทช่วยสอนนี้ คุณสร้างสมุดบันทึก Microsoft Fabric ที่แยกข้อมูลจากแบบจําลองความหมาย Power BI หลายแบบโดยใช้ ดําเนินการคิวรี DAX REST API คุณ deserialize การตอบสนอง IPC ของ Arrow ลงใน DataFrames ของแพนด้า เปรียบเทียบและรวมผลลัพธ์ของแบบจําลอง และผสานผลลัพธ์ลงในตารางเดลต้าใน OneLake ทีละน้อย
รูปแบบนี้ออกแบบมาสําหรับนักวิทยาศาสตร์ข้อมูลและวิศวกรวิเคราะห์ที่ต้องการการแยกข้อมูลปริมาณงานสูงโดยมีค่าใช้จ่ายในการแยกวิเคราะห์ต่ํา
ทําไมรูปแบบนี้ถึงได้ผล
เมื่อเทียบกับการแยกข้อมูลตาม JSON แล้ว Arrow IPC จะลดค่าใช้จ่ายของ CPU และหน่วยความจําในฝั่งไคลเอ็นต์ เนื่องจากคุณหลีกเลี่ยงการแยกวิเคราะห์ JSON ซ้ําๆ และการทําให้วัตถุเป็นรูปธรรม คุณสามารถอ่านบัฟเฟอร์ลูกศรลงในการแสดงในหน่วยความจําแบบตารางได้โดยตรง และแปลงเป็นแพนด้าด้วยขั้นตอนการแปลงที่น้อยลง
เมื่อคุณคงชุดผลลัพธ์ไว้เป็นเดลต้าทีละน้อย คุณจะหลีกเลี่ยงการเขียนตารางใหม่แบบเต็มด้วย วิธีการนี้ช่วยลดการใช้หน่วยความจุ (CU) ในขณะที่รักษาสถานการณ์ Direct Lake ดาวน์สตรีมให้เป็นปัจจุบัน
สิ่งที่คุณสร้าง
ในสมุดบันทึก Fabric หนึ่งเล่ม คุณ:
- คิวรีแบบจําลองความหมายสองแบบด้วย DAX
- ทําให้การตอบสนองแต่ละครั้งเป็นรูปธรรมเป็น DataFrame ของแพนด้า
- เปรียบเทียบหรือรวม DataFrames
- ผสานการเปลี่ยนแปลงลงในตารางเดลต้าทีละน้อย
- ตรวจสอบว่าผู้บริโภค Direct Lake สามารถรับข้อมูลที่อัปเดตได้
ข้อกำหนดเบื้องต้น
พื้นที่ทํางานความจุ Fabric หรือ Premium
โมเดลความหมายอย่างน้อยสองแบบที่คุณต้องการเปรียบเทียบหรือรวมกัน
สิทธิ์ในการสร้างและอ่านในแต่ละแบบจําลองความหมาย
สมุดบันทึก Fabric ที่แนบมากับเลคเฮาส์ที่คุณสามารถสร้างและอัปเดตตารางเดลต้าได้
แพ็คเกจ Python:
%pip install msal requests pyarrow pandasเปิดใช้งานการตั้งค่าผู้เช่า:
- ชุดข้อมูลดําเนินการคิวรี REST API
- อนุญาตให้บริการหลักใช้ Power BI APIs หากคุณใช้การรับรองความถูกต้องเฉพาะแอป
การไหลของสมุดบันทึก Fabric
สมุดบันทึกทําตามขั้นตอนเหล่านี้:
- รับโทเค็นเพื่อการเข้าถึง
- ดําเนินการ DAX กับโมเดลความหมายหลายแบบ
- Deserialize การตอบสนองของลูกศรลงใน DataFrames ของแพนด้า
- ปรับ Schema ให้เป็นมาตรฐานและเปรียบเทียบหรือรวม DataFrames
- ผสานผลลัพธ์ทีละน้อยลงในตารางเดลต้า
- ตรวจสอบความพร้อมใช้งานของข้อมูลสําหรับการใช้ Direct Lake
1 - รับโทเค็น Entra Id สําหรับผู้ใช้ปัจจุบัน
ในเซลล์โค้ดแรก ให้กําหนดเป้าหมายแบบจําลองความหมายและรับโทเค็น
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - ดําเนินการคิวรี DAX ในแบบจําลองความหมาย
กําหนดตัวช่วยที่เรียกใช้ DAX และส่งกลับ DataFrame แพนด้าจาก Arrow IPC
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
ในเซลล์โค้ดถัดไป ให้เรียกใช้คิวรี DAX เฉพาะแบบจําลองสําหรับแต่ละแบบจําลองและที่มาของแท็ก:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - เปรียบเทียบและรวม DataFrames
ปรับคอลัมน์หลักให้เป็นมาตรฐาน จากนั้นเปรียบเทียบผลลัพธ์ของแบบจําลองหรือรวมเป็นชุดการวิเคราะห์ชุดเดียว
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - ผสานเข้ากับตารางเดลต้าทีละน้อย
ใช้การผสานเดลต้าที่คีย์บนคอลัมน์เกรนธุรกิจ รูปแบบนี้จะอัปเดตแถวที่เปลี่ยนแปลงและแทรกแถวใหม่โดยไม่ต้องเขียนตารางแบบเต็มใหม่
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
สําหรับหน้าต่างการแยกขนาดใหญ่มาก ให้แบ่งตารางเดลต้าเป้าหมายตามวันที่และประมวลผลในส่วนที่มีขอบเขต วิธีการนี้ช่วยเพิ่มประสิทธิภาพการผสานและช่วยควบคุมการใช้ CU
5 - ตรวจสอบความพร้อมของ Direct Lake
ยืนยันว่าตารางเดลต้าได้รับการอัปเดตและสามารถสืบค้นได้:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
หลังจากอัปเดตตารางเดลต้า แบบจําลองความหมาย Direct Lake ที่อ้างอิงตารางนั้นสามารถรับข้อมูลใหม่ผ่านพฤติกรรมการซิงโครไนส์ปกติ
เค้าโครงเซลล์โน้ตบุ๊ค Fabric ที่แนะนํา
ใช้เค้าโครงเซลล์นี้เพื่อให้เวิร์กโฟลว์สามารถรักษาไว้ได้
- เซลล์มาร์กดาวน์: สถานการณ์ รหัสแบบจําลอง และเป้าหมายตาราง
- เซลล์ Python: การนําเข้าแพ็คเกจและการรับโทเค็น
- เซลล์ Python: ตัวช่วยการดําเนินการ DAX
- เซลล์ Python: ดึงข้อมูลจากแต่ละโมเดลความหมาย
- เซลล์ Python: เปรียบเทียบ/รวมแพนด้า DataFrames
- Python เซลล์: เขียน DataFrame การจัดเตรียมไปยัง Spark และเรียกใช้ Delta
MERGE - เซลล์ Python: ตรวจสอบจํานวนแถวและการประทับเวลาการแยกล่าสุด
คําแนะนําด้านประสิทธิภาพ
- กําหนดขอบเขต DAX ไว้ที่คอลัมน์และแถวที่จําเป็นเท่านั้น
- ใช้
resultsetRowcountLimitตัวกรอง DAX เพื่อผูกหน้าต่างการแยกข้อมูล - สนับสนุนการผสานที่เพิ่มขึ้นมากกว่าการเขียนการรีเฟรชแบบเต็ม
- นําไคลเอ็นต์ MSAL เดียวและแคชโทเค็นต่อเซสชันโน้ตบุ๊กมาใช้ซ้ํา
- ชอบลูกศรแบบ end-to-end สําหรับการแยกเพื่อหลีกเลี่ยงค่าใช้จ่ายในการแยกวิเคราะห์ JSON ใน Python
- ติดตามระยะเวลาการแยก ขนาดเพย์โหลด และระยะเวลาการผสานเป็นตัวชี้วัดการดําเนินงาน
แก้ไข ปัญหา
- 401 ไม่ได้รับอนุญาต: ตรวจสอบความถูกต้องของผู้เช่า ข้อมูลประจําตัวไคลเอ็นต์ และขอบเขต
- HTTP 429: เพิ่มการลองใหม่ด้วยการย้อนกลับแบบเอ็กซ์โพเนนเชียลและความกระวนกระวายใจ
- การดริฟท์สคีมาระหว่างโมเดล: ปรับชื่อคอลัมน์และชนิดข้อมูลให้เป็นมาตรฐานก่อนผสาน
- การใช้หน่วยความจําขนาดใหญ่ใน pandas: ประมวลผลเอาต์พุตแบบจําลองเป็นชุดงานหรือรวมใน DAX ก่อนการแยก
Note
หากผู้เรียกมีสิทธิ์ไม่เพียงพอ คิวรีจะล้มเหลว แต่การตอบสนอง HTTP ยังคง 200 OKเป็น ตรวจสอบรายละเอียดข้อผิดพลาดของเนื้อหาการตอบกลับ