Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Lernprogramm erstellen Sie ein Microsoft Fabric Notizbuch, das Daten aus mehreren Power BI semantischen Modellen mithilfe der REST-API Execute DAX Queries REST API extrahiert. Sie deserialisieren Pfeil-IPC-Antworten in Pandas DataFrames, vergleichen und kombinieren Modellausgaben und verbinden inkrementelle Ergebnisse in einer Delta-Tabelle in OneLake.
Dieses Muster wurde für Datenwissenschaftler und Analyseingenieure entwickelt, die eine Extraktion mit hohem Durchsatz mit geringem Analyseaufwand benötigen.
Warum dieses Muster funktioniert
Im Vergleich zur JSON-basierten Extraktion reduziert Arrow IPC DEN CPU- und Arbeitsspeicheraufwand auf der Clientseite, da Sie wiederholte JSON-Analyse und Objektmaterialisierung vermeiden. Sie können Pfeilpuffer direkt in eine tabellarische Speicherdarstellung lesen und mit weniger Transformationsschritten in Pandas konvertieren.
Wenn Sie Ergebnismengen inkrementell in Delta speichern, vermeiden Sie auch vollständige Tabelleneuschreibungen. Dieser Ansatz trägt dazu bei, die Auslastung der Kapazitätseinheit (CU) zu reduzieren, während die Direct Lake-Szenarien aktuell gehalten werden.
Was Sie erstellen
In einem Fabric-Notizbuch:
- Abfragen von zwei semantischen Modellen mit DAX.
- Materialisieren Sie jede Antwort als Pandas DataFrame.
- Vergleichen oder kombinieren Sie die DataFrames.
- Änderungen inkrementell in einer Delta-Tabelle zusammenführen.
- Überprüfen Sie, ob Direct Lake-Verbraucher die aktualisierten Daten aufnehmen können.
Voraussetzungen
Ein Fabric-Arbeitsbereich oder ein Arbeitsbereich mit Premiumkapazität.
Mindestens zwei semantische Modelle, die Sie vergleichen oder kombinieren möchten.
Erstellen und Lesen von Berechtigungen für jedes semantische Modell.
Ein Fabric-Notizbuch, das mit einem Lakehouse verbunden ist, in dem Sie Delta-Tabellen erstellen und aktualisieren können.
Python Pakete:
%pip install msal requests pyarrow pandasMandanteneinstellungen aktiviert:
- Dataset Execute Queries REST API.
- Erlauben Sie Dienstprinzipalen die Verwendung von Power BI APIs, wenn Sie die App-only-Authentifizierung verwenden.
Fabric Notizbuchfluss
Das Notizbuch führt die folgenden Schritte aus:
- Abrufen eines Zugriffstokens
- Führen Sie DAX für mehrere semantische Modelle aus.
- Deserialisieren von Pfeilantworten in Pandas DataFrames.
- Normalisieren Sie Schemas, und vergleichen oder kombinieren Sie DataFrames.
- Führt schrittweise Ergebnisse zu einer Delta-Tabelle zusammen.
- Überprüfen sie die Datenverfügbarkeit für den Direct Lake-Verbrauch.
1 – Abrufen eines Entra-ID-Tokens für den aktuellen Benutzer
Definieren Sie in der ersten Codezelle semantische Modellziele, und erwerben Sie ein Token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 : Ausführen von DAX-Abfragen über semantische Modelle hinweg
Definieren Sie ein Hilfsprogramm, das DAX ausführt und einen pandas-DataFrame aus Arrow IPC zurückgibt.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Führen Sie in der nächsten Codezelle eine modellspezifische DAX-Abfrage für jedes Modell und jede Tag-Provenienz aus:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 – Vergleichen und Kombinieren von DataFrames
Normalisieren Sie Schlüsselspalten, vergleichen Sie dann Modellausgaben, oder kombinieren Sie sie in einem einzigen analytischen Satz.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 – Inkrementelle Zusammenführung in eine Delta-Tabelle
Verwenden Sie ein Delta Merge, das auf geschäftsgetreuen Spalten basiert. Dieses Muster aktualisiert geänderte Zeilen und fügt neue Zeilen ein, ohne die vollständige Tabelle neu zu schreiben.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
Partitionieren Sie für sehr große Extraktionsfenster die Zieldelta-Tabelle nach Datum und Prozess in gebundenen Segmenten. Dieser Ansatz verbessert die Zusammenführungseffizienz und hilft bei der Steuerung der CU-Nutzung.
5 – Überprüfen der Bereitschaft von Direct Lake
Vergewissern Sie sich, dass die Delta-Tabelle aktualisiert und abfragbar ist:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Nachdem die Delta-Tabelle aktualisiert wurde, können Direct Lake-Semantikmodelle, die auf diese Tabelle verweisen, die neuen Daten über ein normales Synchronisierungsverhalten abrufen.
Vorgeschlagenes Fabric-Notizbuchzelllayout
Verwenden Sie dieses Zellenlayout, um den Workflow zu verwalten:
- Markdown-Zelle: Szenario, Modell-IDs und Tabellenziel.
- Python Zelle: Paketimporte und Tokenakquisition.
- Python Zelle: DAX-Ausführungshilfsprogramm.
- Python Zelle: Extrahieren von Daten aus jedem semantischen Modell.
- Python Zelle: Pandas DataFrames vergleichen und/oder kombinieren.
- Python-Zelle: Staging-DataFrame in Spark schreiben und Delta
MERGEausführen. - Python Zelle: Überprüfen der Zeilenanzahl und der neuesten Extraktionszeitstempel.
Leistungsleitfaden
- Beschränken Sie den DAX auf ausschließlich erforderliche Spalten und Zeilen.
- Verwenden Sie
resultsetRowcountLimitund DAX-Filter, um Extraktionsfenster einzugrenzen. - Bevorzugen Sie inkrementelle Zusammenführungen gegenüber vollständigen Aktualisierungsschreibvorgängen.
- Verwenden Sie einen einzelnen MSAL-Client- und Tokencache pro Notizbuchsitzung wieder.
- Bevorzugen Sie Arrow End-to-End zur Extraktion, um den Aufwand des JSON-Parsings in Python zu vermeiden.
- Verfolgen Sie die Extraktionsdauer, die Nutzlastgröße und die Zusammenführungsdauer als Betriebsmetriken nach.
Problembehandlung
- 401 Unberechtigt: Überprüfen Sie Mandanten, Anmeldeinformationen des Clients und Berechtigungsumfang.
- HTTP 429: Hinzufügen von Wiederholungen mit exponentiellem Backoff und Jitter.
- Schemaabweichung zwischen Modellen: Normalisieren Sie Spaltennamen und Datentypen vor dem Zusammenführen.
- Große Speichernutzung in Pandas: Modellausgaben in Chargen verarbeiten oder in DAX aggregieren, bevor sie extrahiert werden.
Hinweis
Wenn der Aufrufer über unzureichende Berechtigungen verfügt, schlägt die Abfrage fehl, die HTTP-Antwort ist jedoch noch 200 OKvorhanden. Überprüfen Sie den Antworttext auf Fehlerdetails.