チュートリアル: Fabric ノートブックでの大量のPython抽出

このチュートリアルでは、Execute DAX Queries REST API を使用して、複数のPower BIセマンティック モデルからデータを抽出するMicrosoft Fabric ノートブックを構築します。 Arrow IPC 応答を pandas DataFrame に逆シリアル化し、モデルの出力を比較・結合し、OneLake の Delta テーブルに結果を逐次的にマージします。

このパターンは、解析オーバーヘッドが少ない高スループットの抽出を必要とするデータ サイエンティストや分析エンジニア向けに設計されています。

このパターンが機能する理由

JSON ベースの抽出と比較して、Arrow IPC は、JSON 解析とオブジェクトの具体化を繰り返さないため、クライアント側の CPU とメモリのオーバーヘッドを削減します。 Arrow バッファーをメモリ内の表形式表現に直接読み込み、変換手順を減らして Pandas に変換できます。

結果セットをDeltaに段階的に保持する場合は、テーブル全体の書き換えを回避できます。 このアプローチは、ダウンストリーム Direct Lake のシナリオを最新の状態に保ちながら、容量ユニット (CU) の使用量を減らすのに役立ちます。

構築するもの

1 つのFabric ノートブックでは、次の手順を実行します。

  1. DAX を使用して 2 つのセマンティック モデルにクエリを実行します。
  2. 各応答を pandas DataFrame として具体化します。
  3. DataFrame を比較または結合します。
  4. 変更を差分テーブルに段階的にマージします。
  5. Direct Lake コンシューマーが更新されたデータを取得できることを検証します。

[前提条件]

  • Fabricまたは Premium 容量ワークスペース。

  • 比較または結合する 2 つ以上のセマンティック モデル。

  • 各セマンティック モデルに対するビルドと読み取りのアクセス許可。

  • Delta テーブルを作成および更新できる、レイクハウスにアタッチされたFabric ノートブック。

  • Python パッケージ:

    %pip install msal requests pyarrow pandas
    
  • テナント設定が有効:

    • データセット実行クエリ REST API
    • アプリ専用認証を使用する場合は、サービス プリンシパルに Power BI API の使用を許可します。

Fabricノートブックフロー

ノートブックは次の手順を実行します。

  1. アクセス トークンを取得します。
  2. 複数のセマンティック モデルに対して DAX を実行します。
  3. Arrow の応答を pandas DataFrames にデシリアライズします。
  4. スキーマを正規化し、DataFrames を比較または結合します。
  5. 増分的に結果をDeltaテーブルにマージします。
  6. Direct Lake の使用に関するデータの可用性を検証します。

1 - 現在のユーザーの Entra Id トークンを取得する

最初のコード セルで、セマンティック モデルターゲットを定義し、トークンを取得します。

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - セマンティック モデル間で DAX クエリを実行する

DAX を実行し、Arrow IPC から pandas DataFrame を返すヘルパーを定義します。

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

次のコード セルで、各モデルとタグの実績に対してモデル固有の DAX クエリを実行します。

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - データフレームの比較と結合

キー列を正規化し、モデルの出力を比較するか、それらを 1 つの分析セットに結合します。

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - デルタテーブルへの増分マージ

ビジネスグレイン列に関連付けたデルタマージを使用します。 このパターンでは、変更された行が更新され、テーブル全体が書き換えられずに新しい行が挿入されます。

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Tip

抽出ウィンドウが非常に大きい場合は、ターゲットの Delta テーブルを日付でパーティション分割し、境界付きスライスで処理します。 この方法により、マージの効率が向上し、CU の使用を制御できます。

5 - Direct Lake の準備状態を検証する

Delta テーブルが更新され、クエリ可能であることを確認します。

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

Delta テーブルが更新されると、そのテーブルを参照する Direct Lake セマンティック モデルは、通常の同期動作によって新しいデータを取得できます。

Fabric ノートブックのセルレイアウト推奨案

このセル レイアウトを使用して、ワークフローを保守可能な状態に保ちます。

  1. Markdown のセル: シナリオ、モデル ID、テーブル ターゲット。
  2. Python セル: パッケージのインポートとトークンの取得。
  3. Python セル: DAX 実行ヘルパー。
  4. Python セル: 各セマンティック モデルからデータを抽出します。
  5. Pythonセル: pandas DataFrames を比較/結合します。
  6. Python セル: ステージング DataFrame を Spark に書き込み、Delta MERGE を実行します。
  7. Pythonセル: 行数と最新の抽出タイムスタンプを検証します。

操作ガイド

  • DAX のスコープは、必要な列と行のみにしてください。
  • 抽出ウィンドウをバインドするには、 resultsetRowcountLimit フィルターと DAX フィルターを使用します。
  • 完全な更新書き込みよりも増分マージを優先します。
  • ノートブック セッションごとに 1 つの MSAL クライアントとトークン キャッシュを再利用します。
  • Pythonで JSON 解析のオーバーヘッドを回避するには、抽出に矢印をエンド ツー エンドで使用します。
  • 抽出期間、ペイロード サイズ、マージ期間を操作メトリックとして追跡します。

Troubleshooting

  • 401 未承認: テナント、クライアント資格情報、およびスコープを検証します。
  • HTTP 429: 指数バックオフとジッターを使用して再試行を追加します。
  • モデル間のスキーマ の誤差: マージ前に列名とデータ型を正規化します。
  • Pandas での大量のメモリ使用量: モデルの出力をバッチで処理するか、DAX で集計してから抽出します。

呼び出し元にアクセス許可が不足している場合、クエリは失敗しますが、HTTP 応答は引き続き 200 OK が返されます。 エラーの詳細については、応答本文を調べます。