Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In deze zelfstudie bouwt u een Microsoft Fabric notebook waarmee gegevens uit meerdere Power BI semantische modellen worden geëxtraheerd met behulp van de Execute DAX Query's REST API. U deserialiseert pijl-IPC-antwoorden in pandas DataFrames, vergelijkt en combineert modeluitvoer en voegt incrementeel resultaten samen in een Delta-tabel in OneLake.
Dit patroon is ontworpen voor gegevenswetenschappers en analysetechnici die hoge doorvoerextractie met een lage parseeroverhead nodig hebben.
Waarom dit patroon werkt
Vergeleken met extractie op basis van JSON vermindert Arrow IPC cpu- en geheugenoverhead aan de clientzijde, omdat u herhaalde JSON-parsering en object materialisatie vermijdt. U kunt pijlbuffers rechtstreeks lezen in een tabellaire weergave in het geheugen en converteren naar pandas met minder transformatiestappen.
Wanneer u resultatensets incrementeel op Delta plaatst, voorkomt u ook herschrijven van volledige tabellen. Deze aanpak helpt het gebruik van capaciteitseenheden (CU) te verminderen terwijl downstream Direct Lake-scenario's actueel blijven.
Wat u bouwt
In één Fabric notebook kunt u het volgende doen:
- Voer een query uit op twee semantische modellen met DAX.
- Materialiseer elke reactie als een Pandas DataFrame.
- Vergelijk of combineer de DataFrames.
- Incrementeel wijzigingen samenvoegen in een Delta-tabel.
- Controleer of Direct Lake-consumenten de bijgewerkte gegevens kunnen ophalen.
Prerequisites
Een werkruimte met capaciteit voor Fabric of Premium.
Ten minste twee semantische modellen die u wilt vergelijken of combineren.
Bouw- en leesmachtigingen voor elk semantisch model.
Een Fabric notebook dat is gekoppeld aan een lakehouse waar u Delta-tabellen kunt maken en bijwerken.
Python-pakketten:
%pip install msal requests pyarrow pandasTenantinstellingen ingeschakeld:
- De gegevensset voert query's uit met de REST API.
- Sta de service-principals toe om Power BI API's te gebruiken wanneer u app-verificatie alleen gebruikt.
Fabric notebookworkflow
In het notebook worden de volgende stappen uitgevoerd:
- Schaf een toegangstoken aan.
- DAX uitvoeren op meerdere semantische modellen.
- Deserialiseer pijlreacties in pandas DataFrames.
- Schema's normaliseren en DataFrames vergelijken of combineren.
- Incrementeel resultaten samenvoegen in een Delta-tabel.
- Valideer de beschikbaarheid van gegevens voor Direct Lake-verbruik.
1 - Een Entra Id-token verkrijgen voor de huidige gebruiker
Definieer in de eerste codecel semantische modeldoelen en verwerf een token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - Uitvoeren van DAX-queries op semantische modellen
Definieer een helper die DAX uitvoert en een Pandas DataFrame retourneert van Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Voer in de volgende codecel een modelspecifieke DAX-query uit voor elk model en de herkomst van tags:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - DataFrames vergelijken en combineren
Normaliseer sleutelkolommen en vergelijk vervolgens modeluitvoer of combineer ze in één analytische set.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Incrementeel samenvoegen met een Delta-tabel
Gebruik een Delta-samenvoeging die is gebaseerd op kolommen op bedrijfsniveau. Met dit patroon worden gewijzigde rijen bijgewerkt en worden nieuwe rijen ingevoegd zonder de volledige tabel opnieuw te schrijven.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
Voor zeer grote extractievensters partitioneert u de doel-Delta-tabel op datum en verwerk deze in afgebakende segmenten. Deze aanpak verbetert de samenvoegefficiëntie en helpt het CU-gebruik te beheren.
5 - Direct Lake-gereedheid valideren
Controleer of de Delta-tabel is bijgewerkt en opvraagbaar is.
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Nadat de Delta-tabel is bijgewerkt, kunnen direct Lake-semantische modellen die verwijzen naar die tabel de nieuwe gegevens ophalen via normaal synchronisatiegedrag.
Voorgestelde Fabric celindeling voor notitieblokken
Gebruik deze celindeling om de werkstroom onderhoudbaar te houden:
- Markdown-cel: scenario, model-id's en tabeldoel.
- Python cel: pakketimport en tokenverwerving.
- Python cel: helper voor DAX-uitvoering.
- Python cel: gegevens extraheren uit elk semantisch model.
- Python-cel: pandas DataFrames vergelijken/combineren.
- Python cel: schrijf faseringsgegevensframe naar Spark en voer Delta
MERGEuit. - Python cel: aantal rijen en laatste tijdstempels voor extractie valideren.
Richtlijnen voor prestaties
- Houd DAX beperkt tot alleen vereiste kolommen en rijen.
- Gebruik
resultsetRowcountLimiten DAX-filters om extractievensters te begrenzen. - Voorkeur geven aan incrementele samenvoegingen ten opzichte van schrijfbewerkingen voor volledige vernieuwing.
- Gebruik één MSAL-client en -tokencache per notebooksessie opnieuw.
- Geef de voorkeur aan Arrow end-to-end voor extractie om JSON-parseringsoverhead in Python te voorkomen.
- Duur van extractie, nettoladingsgrootte en samenvoegduur bijhouden als operationele metrische gegevens.
Troubleshooting
- 401 Niet geautoriseerd: tenant, clientreferenties en bereik valideren.
- HTTP 429: Voeg opnieuw proberen toe met exponentieel uitstel en jitter.
- Schemadrift tussen modellen: kolomnamen en gegevenstypen normaliseren voordat u samenvoegt.
- Groot geheugengebruik in pandas: Verwerk de uitvoer van het procesmodel in batches of aggregeer in DAX vóór extractie.
Opmerking
Als de aanroeper onvoldoende machtigingen heeft, mislukt de query, maar is het HTTP-antwoord nog steeds 200 OK. Controleer de hoofdtekst van het antwoord op foutdetails.