このチュートリアルでは、Execute DAX Queries REST API を使用して、複数のPower BIセマンティック モデルからデータを抽出するMicrosoft Fabric ノートブックを構築します。 Arrow IPC 応答を pandas DataFrame に逆シリアル化し、モデルの出力を比較・結合し、OneLake の Delta テーブルに結果を逐次的にマージします。
このパターンは、解析オーバーヘッドが少ない高スループットの抽出を必要とするデータ サイエンティストや分析エンジニア向けに設計されています。
このパターンが機能する理由
JSON ベースの抽出と比較して、Arrow IPC は、JSON 解析とオブジェクトの具体化を繰り返さないため、クライアント側の CPU とメモリのオーバーヘッドを削減します。 Arrow バッファーをメモリ内の表形式表現に直接読み込み、変換手順を減らして Pandas に変換できます。
結果セットをDeltaに段階的に保持する場合は、テーブル全体の書き換えを回避できます。 このアプローチは、ダウンストリーム Direct Lake のシナリオを最新の状態に保ちながら、容量ユニット (CU) の使用量を減らすのに役立ちます。
構築するもの
1 つのFabric ノートブックでは、次の手順を実行します。
- DAX を使用して 2 つのセマンティック モデルにクエリを実行します。
- 各応答を pandas DataFrame として具体化します。
- DataFrame を比較または結合します。
- 変更を差分テーブルに段階的にマージします。
- Direct Lake コンシューマーが更新されたデータを取得できることを検証します。
[前提条件]
Fabricまたは Premium 容量ワークスペース。
比較または結合する 2 つ以上のセマンティック モデル。
各セマンティック モデルに対するビルドと読み取りのアクセス許可。
Delta テーブルを作成および更新できる、レイクハウスにアタッチされたFabric ノートブック。
Python パッケージ:
%pip install msal requests pyarrow pandasテナント設定が有効:
- データセット実行クエリ REST API。
- アプリ専用認証を使用する場合は、サービス プリンシパルに Power BI API の使用を許可します。
Fabricノートブックフロー
ノートブックは次の手順を実行します。
- アクセス トークンを取得します。
- 複数のセマンティック モデルに対して DAX を実行します。
- Arrow の応答を pandas DataFrames にデシリアライズします。
- スキーマを正規化し、DataFrames を比較または結合します。
- 増分的に結果をDeltaテーブルにマージします。
- Direct Lake の使用に関するデータの可用性を検証します。
1 - 現在のユーザーの Entra Id トークンを取得する
最初のコード セルで、セマンティック モデルターゲットを定義し、トークンを取得します。
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - セマンティック モデル間で DAX クエリを実行する
DAX を実行し、Arrow IPC から pandas DataFrame を返すヘルパーを定義します。
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
次のコード セルで、各モデルとタグの実績に対してモデル固有の DAX クエリを実行します。
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - データフレームの比較と結合
キー列を正規化し、モデルの出力を比較するか、それらを 1 つの分析セットに結合します。
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - デルタテーブルへの増分マージ
ビジネスグレイン列に関連付けたデルタマージを使用します。 このパターンでは、変更された行が更新され、テーブル全体が書き換えられずに新しい行が挿入されます。
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
抽出ウィンドウが非常に大きい場合は、ターゲットの Delta テーブルを日付でパーティション分割し、境界付きスライスで処理します。 この方法により、マージの効率が向上し、CU の使用を制御できます。
5 - Direct Lake の準備状態を検証する
Delta テーブルが更新され、クエリ可能であることを確認します。
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Delta テーブルが更新されると、そのテーブルを参照する Direct Lake セマンティック モデルは、通常の同期動作によって新しいデータを取得できます。
Fabric ノートブックのセルレイアウト推奨案
このセル レイアウトを使用して、ワークフローを保守可能な状態に保ちます。
- Markdown のセル: シナリオ、モデル ID、テーブル ターゲット。
- Python セル: パッケージのインポートとトークンの取得。
- Python セル: DAX 実行ヘルパー。
- Python セル: 各セマンティック モデルからデータを抽出します。
- Pythonセル: pandas DataFrames を比較/結合します。
- Python セル: ステージング DataFrame を Spark に書き込み、Delta
MERGEを実行します。 - Pythonセル: 行数と最新の抽出タイムスタンプを検証します。
操作ガイド
- DAX のスコープは、必要な列と行のみにしてください。
- 抽出ウィンドウをバインドするには、
resultsetRowcountLimitフィルターと DAX フィルターを使用します。 - 完全な更新書き込みよりも増分マージを優先します。
- ノートブック セッションごとに 1 つの MSAL クライアントとトークン キャッシュを再利用します。
- Pythonで JSON 解析のオーバーヘッドを回避するには、抽出に矢印をエンド ツー エンドで使用します。
- 抽出期間、ペイロード サイズ、マージ期間を操作メトリックとして追跡します。
Troubleshooting
- 401 未承認: テナント、クライアント資格情報、およびスコープを検証します。
- HTTP 429: 指数バックオフとジッターを使用して再試行を追加します。
- モデル間のスキーマ の誤差: マージ前に列名とデータ型を正規化します。
- Pandas での大量のメモリ使用量: モデルの出力をバッチで処理するか、DAX で集計してから抽出します。
注
呼び出し元にアクセス許可が不足している場合、クエリは失敗しますが、HTTP 応答は引き続き 200 OK が返されます。 エラーの詳細については、応答本文を調べます。