Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
V tomto kurzu vytvoříte poznámkový blok Microsoft Fabric, který extrahuje data z více sémantických modelů Power BI pomocí rozhraní REST API Execute DAX Queries REST API. Deserializujete Arrow IPC odpovědi do pandas DataFrames, porovnáváte a kombinujete výstupy modelu a postupně integrujete výsledky do Delta tabulky v OneLake.
Tento model je určený pro datové vědce a analytické techniky, kteří potřebují extrakci s vysokou propustností s nízkou režií na analýzu.
Proč tento model funguje
V porovnání s extrahováním založeným na JSON snižuje Arrow IPC režii procesoru a paměti na straně klienta, protože se vyhnete opakovanému parsování JSON a materializaci objektů. Arrow buffery můžete číst přímo do tabulkové reprezentace v paměti a převést na pandas s méně kroky transformace.
Při uchovávání sad výsledků přírůstkově na Delta se také vyhnete přepsání celé tabulky. Tento přístup pomáhá snížit využití jednotek kapacity (CU) a současně udržet aktuálnost následných scénářů Direct Lake.
Co vytváříte
V Fabric poznámkovém bloku:
- Dotazování dvou sémantických modelů pomocí jazyka DAX
- Materializujte každou odpověď jako datový rámec pandas.
- Porovnejte nebo zkombinujte datové rámce.
- Přírůstkově slučovat změny do tabulky Delta
- Ověřte, že uživatelé Direct Lake mohou vyzvednout aktualizovaná data.
Předpoklady
Pracovní prostor kapacity Fabric nebo Premium.
Aspoň dva sémantické modely, které chcete porovnat nebo zkombinovat.
Sestavení a čtení oprávnění pro každý sémantický model
Poznámkový blok Fabric připojený k jezeru, kde můžete vytvářet a aktualizovat tabulky Delta.
Balíčky Pythonu:
%pip install msal requests pyarrow pandasPovolené nastavení tenanta:
- Datová sada spouští dotazy rozhraní REST API.
- Povolit služebním principálům používat Power BI API pokud používáte ověřování pouze pro aplikace.
průběh poznámkového bloku Fabric
Notebook provádí tyto kroky:
- Získání přístupového tokenu
- Spusťte DAX proti více sémantickým modelům.
- Deserializovat šipkové odpovědi do datových rámců pandas.
- Normalizují schémata a porovnávají nebo kombinují datové rámce.
- Postupné sloučení výsledků do tabulky Delta
- Ověřte dostupnost dat pro využití Direct Lake.
1 . Získání tokenu Id Entra pro aktuálního uživatele
V první buňce kódu definujte sémantické cíle modelu a získejte token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2. Provádění dotazů DAX v sémantických modelech
Definujte pomocníka, který spustí DAX a vrátí datový rámec pandas z Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
V další buňce kódu spusťte dotaz DAX specifický pro každý model a označení původu:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3. Porovnání a kombinování datových rámců
Normalizujte klíčové sloupce a pak porovnejte výstupy modelu nebo je zkombinujte do jedné analytické sady.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 – Přírůstkově sloučit do tabulky Delta
Pro sloupce obchodního intervalu použijte slučovací klíč Delta. Tento vzor aktualizuje změněné řádky a vloží nové řádky bez přepsání celé tabulky.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
V případě velmi velkých oken extrakce rozdělte cílovou tabulku Delta podle data a procesu v ohraničených řezech. Tento přístup zlepšuje efektivitu sloučení a pomáhá řídit využití CU.
5. Ověření připravenosti Direct Lake
Ověřte, že je tabulka Delta aktualizovaná a dotazovatelná:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Po aktualizaci tabulky Delta mohou sémantické modely Direct Lake, které odkazují na tuto tabulku, zachytit nová data prostřednictvím normálního chování synchronizace.
Navrhované rozložení buňky poznámkového bloku Fabric
Použijte toto uspořádání buněk, aby byl pracovní postup snadno udržovatelný:
- Buňka Markdownu: scénář, ID modelu a cíl tabulky
- Python buňce: import balíčků a získání tokenů
- Python buňka: Pomocník pro provádění DAX
- Python buňce: extrahujte data z každého sémantického modelu.
- Python buňka: porovnejte nebo zkombinujte pandas DataFrame.
- buňka Python: Zapište přípravný datový rámec do Sparku a spusťte Delta
MERGE. - Python buňka: Ověřte počty řádků a časová razítka nejnovější extrakce.
Pokyny k výkonu
- Omezte DAX pouze na požadované sloupce a řádky.
- Použijte
resultsetRowcountLimita filtry DAX k omezení oken extrakce. - Upřednostněte přírůstkové slučování před úplnými obnovami.
- Znovu použijte jednoho klienta MSAL a mezipaměť tokenů na relaci poznámkového bloku.
- Upřednostňujte využití technologie Arrow pro extrakci, abyste se vyhnuli režijním nákladům na zpracování JSON v Pythonu.
- Sledujte dobu trvání extrakce, velikost datové části a dobu sloučení jako provozní metriky.
Troubleshooting
- 401 Neautorizováno: Ověřte tenanta, přihlašovací údaje klienta a obor.
- HTTP 429: Přidejte opakování s exponenciálním zpožděním a rozptylem.
- Posun schématu mezi modely: Před sloučením normalizujete názvy sloupců a datové typy.
- Velké využití paměti v knihovně pandas: Zpracovávejte výstupy modelu v dávkách nebo agregujte pomocí DAX před extrakcí.
Poznámka:
Pokud volající nemá dostatečná oprávnění, dotaz selže, ale odpověď HTTP je stále 200 OK. Zkontrolujte podrobnosti o chybě v textu odpovědi.