教學:在 Fabric 筆記本中進行高容量 Python 萃取

在這個教學中,你將建立一個Microsoft Fabric筆記本,利用 Execute DAX Queries REST API ,從多個Power BI語意模型中擷取資料。 你可以將 Arrow 的 IPC 回應反序列化成 pandas DataFrame,比較並合併模型輸出,然後逐步合併結果到 OneLake 的 Delta 表格。

此模式專為需要高吞吐量且解析開銷低的資料科學家和分析工程師設計。

為什麼這個圖案有效

與基於 JSON 的擷取相比,Arrow IPC 在用戶端降低了 CPU 和記憶體負擔,因為你避免了重複的 JSON 解析和物件實體化。 你可以直接將 Arrow 緩衝區讀成記憶體中的表格表示,並以較少的轉換步驟轉換成 panda。

當你將結果集逐步持久化到 Delta 時,也能避免整個資料表重寫。 此方法有助於降低容量單位(CU)使用,同時保持下游直達湖情境的即時更新。

您建置的什麼

在一本 Fabric 筆記本裡,你:

  1. 用 DAX 查詢兩個語意模型。
  2. 將每個回應具體化為 pandas DataFrame。
  3. 比較或合併這些資料幀。
  4. 逐步將變更合併到 Delta 表格。
  5. 確認 Direct Lake 的用戶能取得更新後的資料。

先決條件

  • Fabric 或 Premium 容量的工作空間。

  • 至少要比較或合併兩個語意模型。

  • 對每個語意模型建立並讀取權限。

  • 一本連接在湖邊小屋的 Fabric 筆記本,可以建立和更新 Delta 表格。

  • Python 套件:

    %pip install msal requests pyarrow pandas
    
  • 啟用租戶設定:

    • 資料集執行查詢 REST API
    • 如果你使用僅限應用程式的驗證,允許服務主體使用 Power BI API

Fabric 筆記本工作流程

筆記本執行以下步驟:

  1. 獲取訪問令牌。
  2. 針對多個語意模型執行 DAX。
  3. 將 Arrow 回應反序列化成 pandas DataFrame。
  4. 標準化結構並比較或合併資料框架。
  5. 逐步合併結果成 Delta 表格。
  6. 驗證 Direct Lake 的數據可用性。

1 - 為目前使用者取得 Entra ID 代幣

在第一個程式碼區塊中,定義語意模型目標並取得一個標記。

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - 跨語意模型執行 DAX 查詢

定義一個輔助程式,用來執行 DAX,並從 Arrow IPC 回傳 pandas DataFrame。

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

在下一個程式碼區塊,對每個模型與標籤來源執行模型專屬的 DAX 查詢:

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - 比較與合併資料框架

先正規化關鍵欄位,然後比較模型輸出或將它們合併成單一分析集。

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - 逐步合併到 Delta 表格

使用以商業穀物欄位為關鍵字的 Delta 合併工具。 此模式更新變更的列並插入新列,且不重寫整個表格。

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Tip

對於非常大的擷取視窗,請依日期將目標 Delta 表格分割,並以有界切片進行處理。 此方法提升合併效率並協助控制 CU 使用。

5 - 驗證 Direct Lake 技術的就緒性

確認 Delta 資料表已更新且可查詢:

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

在 Delta 資料表更新後,參考該資料表的 Direct Lake 語意模型可以透過正常的同步行為擷取新資料。

建議的 Fabric 筆記本儲存格配置

使用此儲存格配置以保持工作流程可維護:

  1. Markdown 儲存格:情境、模型 ID 與資料表目標。
  2. Python 儲存格:套件匯入與令牌取得。
  3. Python 儲存格:DAX 執行輔助工具。
  4. Python 儲存格:從每個語意模型中擷取資料。
  5. Python 儲存格:比較/合併 pandas DataFrames。
  6. Python 儲存格:將暫存 DataFrame 寫入 Spark,並執行 Delta MERGE
  7. Python 儲存格:驗證列數和最新的擷取時間戳。

績效指導

  • 讓 DAX 只涵蓋必要的欄位和列。
  • 使用 resultsetRowcountLimit 和 DAX 過濾器來綁定擷取視窗。
  • 偏好增量合併勝過完全刷新寫入。
  • 每個筆記本會話重複使用單一 MSAL 用戶端和令牌快取。
  • 建議用 Arrow 從端到端擷取,以避免 Python 的 JSON 解析開銷。
  • 追蹤提取持續時間、有效載荷大小及合併持續時間作為運營指標。

Troubleshooting

  • 401 未授權:驗證租戶、客戶憑證及範圍。
  • HTTP 429:新增重試機制,並加入指數退避與抖動。
  • 模型間的結構漂移:合併前將欄位名稱和資料型態進行正規化。
  • pandas 的大量記憶體使用:在擷取之前,將模型輸出的處理分批或在 DAX 中聚合。

備註

若呼叫者權限不足,查詢失敗,但 HTTP 回應仍 200 OK為 。 檢查回應本體是否有錯誤細節。