Merk
Tilgang til denne siden krever autorisasjon. Du kan prøve å logge på eller endre kataloger.
Tilgang til denne siden krever autorisasjon. Du kan prøve å endre kataloger.
I denne veiledningen bygger du en Microsoft Fabric notebook som henter data fra flere Power BI semantiske modeller ved å bruke Execute DAX Queries REST API. Du deserialiserer Arrow IPC-svar til pandas DataFrames, sammenligner og kombinerer modellutdata, og slår gradvis sammen resultater i en Delta-tabell i OneLake.
Dette mønsteret er designet for dataforskere og analyseingeniører som trenger ekstraksjon med høy gjennomstrømning og lav parsing-overhead.
Hvorfor dette mønsteret fungerer
Sammenlignet med JSON-basert ekstraksjon reduserer Arrow IPC CPU- og minneoverhead på klientsiden fordi du unngår gjentatt JSON-parsing og objektmaterialisering. Du kan lese Arrow-buffere direkte inn i en tabellbasert minnerepresentasjon og konvertere til pandas med færre transformasjonssteg.
Når du beholder resultatsett inkrementelt til Delta, unngår du også fullstendige tabellomskrivinger. Denne tilnærmingen bidrar til å redusere bruken av kapasitetsenheter (CU) samtidig som nedstrøms Direct Lake-scenarier holdes oppdaterte.
Hva du bygger
I en Fabric-notatbok gjør du:
- Søk i to semantiske modeller med DAX.
- Materialiser hvert svar som en pandas DataFrame.
- Sammenlign eller kombiner DataFrames.
- Slå inn endringer gradvis i en Delta-tabell.
- Valider at Direct Lake-forbrukere kan hente de oppdaterte dataene.
Forutsetninger
Et arbeidsområde med Fabric- eller Premium-kapasitet.
Minst to semantiske modeller du vil sammenligne eller kombinere.
Bygge- og lesetillatelser på hver semantisk modell.
En Fabric-notatbok festet til et hytte ved innsjøen hvor du kan lage og oppdatere Delta-tabeller.
Python-pakker:
%pip install msal requests pyarrow pandasLeietakerinnstillinger aktivert:
- Dataset Execute Queries REST API.
- Tillat tjenesteprinsipper å bruke Power BI APIer hvis du bruker kun app-autentisering.
Fabric notebook-flyt
Notatboken utfører disse trinnene:
- Få tak i en tilgangstoken.
- Kjør DAX mot flere semantiske modeller.
- Deserialiser Arrow-svar til pandas DataFrames.
- Normaliser skjemaer og sammenlign eller kombiner DataFrames.
- Inkrementell sammenslåing av resultater til en Delta-tabell.
- Valider datatilgjengelighet for Direct Lake-forbruk.
1 - Skaff en Entra Id-token for nåværende bruker
I den første kodecellen definerer du semantiske modellmål og skaffer en token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - Kjør DAX-spørringer på tvers av semantiske modeller
Definer en hjelper som kjører DAX og returnerer en pandas DataFrame fra Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
I neste kodecelle, kjør en modell-spesifikk DAX-spørring for hver modell og tag-proveniens:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - Sammenlign og kombiner DataFrames
Normaliser nøkkelkolonner, og sammenlign deretter modellutdata eller kombiner dem til et enkelt analytisk sett.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Inkrementell sammenslåing til en Delta-tabell
Bruk en Delta-sammenslåing basert på business-grain-kolonner. Dette mønsteret oppdaterer endrede rader og setter inn nye rader uten å skrive hele tabellen på nytt.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tips
For svært store ekstraksjonsvinduer, partisjoner mål-Delta-tabellen etter dato og prosesser i begrensede snitt. Denne tilnærmingen forbedrer sammenslåingseffektiviteten og bidrar til å kontrollere CU-bruken.
5 - Valider Direct Lake-beredskapen
Bekreft at Delta-tabellen er oppdatert og kan spørres:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Etter at Delta-tabellen er oppdatert, kan Direct Lake-semantiske modeller som refererer til den tabellen plukke opp nye data gjennom normal synkroniseringsatferd.
Foreslått Fabric-notatbokcelleoppsett
Bruk dette celleoppsettet for å holde arbeidsflyten vedlikeholdbar:
- Markdown-celle: scenario, modell-ID-er og tabellmål.
- Python-celle: pakkeimport og tokeninnsamling.
- Python-celle: Hjelpemiddel for DAX-kjøring.
- Python-celle: hent ut data fra hver semantisk modell.
- Python-celle: sammenlign/kombiner pandas DataFrames.
- Python cell: skriv staging-DataFrame til Spark og kjør Delta
MERGE. - Python-celle: valider radantall og siste tidsstempler for utvinning.
Ytelsesveiledning
- Hold DAX begrenset til kun nødvendige kolonner og rader.
- Bruk
resultsetRowcountLimitDAX-filtre for å begrense utvinningsvinduer. - Foretrekk inkrementelle sammenslåinger fremfor full oppdateringsskriving.
- Gjenbruk én enkelt MSAL-klient og tokencache per notatbokøkt.
- Foretrekk Arrow end-to-end for utvinning for å unngå JSON-parse-overhead i Python.
- Spor utvinningsvarighet, nyttelaststørrelse og sammenslåingsvarighet som operasjonelle måleparametere.
Troubleshooting
- 401 Uautorisert: Valider leietaker, klientlegitimasjon og omfang.
- HTTP 429: Legg til retry med eksponentiell backoff og jitter.
- Skjemadrift mellom modeller: Normaliser kolonnenavn og datatyper før sammenslåing.
- Stort minnebruk i pandas: Prosessmodellen gir ut i batcher eller aggregater i DAX før utvinning.
Note
Hvis kalleren har utilstrekkelige tillatelser, mislykkes spørringen, men HTTP-svaret er fortsatt 200 OK. Sjekk responsdelen for feildetaljer.