在本教程中,你将生成一个Microsoft Fabric笔记本,该笔记本使用 Execute DAX 查询 REST API从多个Power BI语义模型中提取数据。 将 Arrow IPC 响应反序列化为 pandas 数据框,比较和合并模型输出,并将结果逐步合并到 OneLake 中的 Delta 表中。
此模式专为需要高吞吐量数据提取且解析开销较低的数据科学家和分析工程师设计。
为什么此模式有效
与基于 JSON 的提取相比,箭头 IPC 减少了客户端的 CPU 和内存开销,因为避免了重复的 JSON 分析和对象具体化。 可以将 Arrow 缓冲区直接读取到表格内存表示形式中,并通过更少的转换步骤转换为 pandas。
以增量方式将结果集保存到 Delta 时,还可以避免完整表重写。 此方法有助于降低容量单位(CU)使用率,同时保持下游 Direct Lake 方案最新。
你所构建的
在一个Fabric笔记本中,你可以:
- 使用 DAX 查询两个语义模型。
- 将每个响应具体化为 pandas 数据帧。
- 比较或合并数据帧。
- 以增量方式将更改合并到 Delta 表中。
- 验证 Direct Lake 使用者是否可以获取更新的数据。
先决条件
一个 Fabric 或高级容量工作区。
至少要比较或合并两个语义模型。
为每个语义模型生成和读取权限。
连接到湖仓的Fabric笔记本,可在其中创建和更新 Delta 表。
Python 包:
%pip install msal requests pyarrow pandas已启用租户设置:
- 数据集执行查询 REST API。
- 如果使用仅限应用的身份验证,允许服务主体使用 Power BI APIs。
Fabric笔记本工作流
笔记本执行以下步骤:
- 获取访问令牌。
- 针对多个语义模型执行 DAX。
- 将箭头响应反序列化为 pandas 数据帧。
- 规范化架构并比较或合并数据帧。
- 以增量方式将结果合并到 Delta 表中。
- 验证 Direct Lake 消耗的数据可用性。
1 - 获取当前用户的 Entra ID 令牌
在第一个代码单元中,定义语义模型目标并获取令牌。
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - 跨语义模型执行 DAX 查询
定义一个辅助函数,该函数运行 DAX 并从 Arrow IPC 返回 pandas 数据帧。
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
在下一个代码单元中,为每个模型和标记来源运行特定于模型的 DAX 查询:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - 比较和合并数据帧
规范化键列,然后比较模型输出,或将它们合并到单个分析集中。
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - 以增量方式合并到 Delta 表
使用以业务粒度列为主键的Delta合并。 此模式更新更改的行并插入新行,而无需重写完整表。
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
小窍门
对于非常大的提取窗口,应按日期对目标 Delta 表进行分区,并在限定的范围内进行处理。 此方法可提高合并效率,并帮助控制 CU 使用情况。
5 - 验证 Direct Lake 就绪情况
确认 Delta 表已更新且可查询:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
更新 Delta 表后,引用该表的 Direct Lake 语义模型可以通过正常的同步行为获取新数据。
建议的Fabric笔记本单元格布局
使用此单元格布局使工作流保持可维护:
- Markdown 单元:场景、模型 ID 和表目标。
- Python单元格:包导入和令牌获取。
- Python单元格:DAX执行辅助工具。
- Python单元格:从每个语义模型中提取数据。
- Python代码单元:比较/合并 pandas 数据框。
- Python单元格:将暂存数据帧写入 Spark 并运行 Delta
MERGE。 - Python单元格:验证行计数和最新的提取时间戳。
性能指南
- 使 DAX 的范围仅限定为所需的列和行。
- 使用
resultsetRowcountLimit和 DAX 筛选器绑定提取窗口。 - 优先选择增量合并而非完全刷新写入。
- 在每个笔记本会话中重复使用单个 MSAL 客户端和令牌缓存。
- 优先使用 Arrow 进行端到端提取,以避免 Python 中的 JSON 解析开销。
- 跟踪提取的持续时间、负载大小和合并的持续时间,作为运营指标。
故障排除
- 401 未授权:验证租户、客户端凭据和范围。
- HTTP 429:添加带有指数退避和抖动的重试机制。
- 模型之间的架构偏差:在合并之前规范化列名称和数据类型。
- pandas 中的大量内存使用:在提取之前,批量处理或在 DAX 中聚合模型输出。
注释
如果调用方的权限不足,查询将失败,但 HTTP 响应仍然是 200 OK。 检查响应正文以了解错误详细信息。