Tutorial: Ekstraksi Python volume tinggi di buku catatan Fabric

Dalam tutorial ini, Anda membuat notebook Microsoft Fabric yang mengekstrak data dari beberapa model semantik Power BI dengan menggunakan REST API Kueri DAX Execute. Anda mengubah respons Arrow IPC menjadi Pandas DataFrames, membandingkan dan menggabungkan output model, serta secara bertahap menggabungkan hasil ke dalam tabel Delta di OneLake.

Pola ini dirancang untuk ilmuwan data dan insinyur analitik yang membutuhkan ekstraksi throughput tinggi dengan overhead penguraian rendah.

Mengapa pola ini bekerja

Dibandingkan dengan ekstraksi berbasis JSON, Arrow IPC mengurangi overhead CPU dan memori di sisi klien karena Anda menghindari penguraian JSON berulang dan materialisasi objek. Anda dapat membaca buffer Arrow langsung ke representasi tabular dalam memori dan mengonversinya ke Pandas dengan langkah-langkah transformasi yang lebih sedikit.

Saat Anda mempertahankan tataan hasil secara bertahap ke Delta, Anda juga menghindari penulisan ulang tabel penuh. Pendekatan ini membantu mengurangi penggunaan unit kapasitas (CU) sambil menjaga skenario hilir Direct Lake tetap terkini.

Apa yang Anda bangun

Dalam satu buku catatan Fabric, Anda:

  1. Kueri dua model semantik dengan DAX.
  2. Mewujudkan setiap respons sebagai DataFrame pandas.
  3. Bandingkan atau gabungkan DataFrames.
  4. Menggabungkan perubahan secara bertahap ke dalam tabel Delta.
  5. Pastikan bahwa konsumen Direct Lake dapat mengambil data yang diperbarui.

Prasyarat

  • Ruang kerja dengan kapasitas Fabric atau Premium.

  • Setidaknya dua model semantik yang ingin Anda bandingkan atau gabungkan.

  • Buat dan Baca izin pada setiap model semantik.

  • Notebook Fabric terhubung ke sebuah lakehouse tempat Anda bisa membuat dan memperbarui tabel Delta.

  • paket Python:

    %pip install msal requests pyarrow pandas
    
  • Pengaturan penyewa diaktifkan:

    • Himpunan data Jalankan Kueri REST API.
    • Perbolehkan perwakilan layanan untuk menggunakan API Power BI jika Anda menggunakan autentikasi khusus aplikasi.

alur buku catatan Fabric

Notebook melakukan langkah-langkah berikut:

  1. Memperoleh token akses.
  2. Jalankan DAX terhadap beberapa model semantik.
  3. Deserialisasi respons Arrow ke dalam Pandas DataFrames.
  4. Menormalkan skema dan membandingkan atau menggabungkan DataFrames.
  5. Menggabungkan hasil secara bertahap ke dalam tabel Delta.
  6. Memvalidasi ketersediaan data untuk konsumsi Direct Lake.

1 - Memperoleh token Id Entra untuk pengguna saat ini

Di sel kode pertama, tentukan target model semantik dan dapatkan token.

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - Menjalankan kueri DAX di seluruh model semantik

Tentukan pembantu yang menjalankan DAX dan mengembalikan DataFrame pandas dari Arrow IPC.

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

Di sel kode yang berikutnya, jalankan kueri DAX khusus model untuk setiap model dan asal tag.

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - Bandingkan dan gabungkan DataFrames

Normalisasi kolom kunci, lalu bandingkan output model atau gabungkan ke dalam satu set analitik.

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - Menggabungkan secara bertahap ke tabel Delta

Gunakan penggabungan Delta yang ditentukan berdasarkan kolom tingkat bisnis. Pola ini memperbarui baris yang diubah dan menyisipkan baris baru tanpa menulis ulang tabel lengkap.

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Tip

Untuk jendela ekstraksi yang sangat besar, bagi tabel Delta target berdasarkan tanggal dan olah dalam irisan terbatas. Pendekatan ini meningkatkan efisiensi penggabungan dan membantu mengontrol penggunaan CU.

5 - Memvalidasi kesiapan Direct Lake

Konfirmasikan bahwa tabel Delta diperbarui dan dapat dikueri:

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

Setelah tabel Delta diperbarui, model semantik Direct Lake yang mereferensikan tabel tersebut dapat mengambil data baru melalui perilaku sinkronisasi normal.

Tata letak sel buku catatan Fabric yang disarankan

Gunakan tata letak sel ini untuk menjaga alur kerja tetap dapat dipertahankan:

  1. Sel markdown: skenario, ID model, dan target tabel.
  2. sel Python: impor paket dan akuisisi token.
  3. Sel Python: Pembantu eksekusi DAX.
  4. sel Python: ekstrak data dari setiap model semantik.
  5. sel Python: bandingkan/gabungkan panda DataFrames.
  6. sel Python: tulis penahapan DataFrame ke Spark dan jalankan Delta MERGE.
  7. sel Python: memvalidasi jumlah baris dan tanda waktu ekstraksi terbaru.

Panduan performa

  • Pertahankan DAX dibatasi pada kolom dan baris yang diperlukan saja.
  • Gunakan resultsetRowcountLimit dan filter DAX untuk membatasi jendela ekstraksi.
  • Lebih baik memilih penggabungan inkremental daripada penulisan refresh penuh.
  • Gunakan kembali satu klien MSAL dan cache token per sesi notebook.
  • Pilih Arrow end-to-end untuk ekstraksi guna menghindari overhead penguraian JSON di Python.
  • Lacak durasi ekstraksi, ukuran payload, dan gabungkan durasi sebagai metrik operasional.

Troubleshooting

  • 401 Tidak Diizinkan: Periksa penyewa, kredensial klien, dan cakupan.
  • HTTP 429: Tambahkan percobaan ulang dengan backoff eksponensial dan jitter.
  • Penyimpangan skema antar model: Menormalkan nama kolom dan jenis data sebelum menggabungkan.
  • Penggunaan memori yang besar dalam pandas: Memproses keluaran model dalam batch atau mengagregasi di DAX sebelum ekstraksi.

Note

Jika pemanggil memiliki izin yang tidak mencukup, kueri gagal tetapi respons HTTP masih 200 OK. Periksa isi respons untuk detail kesalahan.