Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
In questa esercitazione viene creato un notebook di Microsoft Fabric che estrae dati da più modelli semantici di Power BI usando l'API REST Execute DAX Queries. È possibile deserializzare le risposte IPC arrow in dataframe pandas, confrontare e combinare gli output del modello e unire in modo incrementale i risultati in una tabella Delta in OneLake.
Questo modello è progettato per data scientist e ingegneri di analisi che necessitano di estrazione a velocità effettiva elevata con un sovraccarico di analisi basso.
Perché questo modello funziona
Rispetto all'estrazione basata su JSON, Arrow IPC riduce il sovraccarico della CPU e della memoria sul lato client perché si evita l'analisi JSON ripetuta e la materializzazione degli oggetti. È possibile leggere direttamente i buffer Arrow in una rappresentazione tabellare in memoria e convertirli in pandas con un numero minore di passaggi di trasformazione.
Quando si salvano in modo incrementale i set di risultati in Delta, si evitano anche le riscritture complete della tabella. Questo approccio consente di ridurre l'utilizzo delle unità di capacità mantenendo aggiornati gli scenari Direct Lake downstream.
Cosa costruisci
In un unico notebook Fabric:
- Interrogare due modelli semantici con DAX.
- Materializzare ogni risposta come DataFrame di pandas.
- Confrontare o combinare i DataFrame.
- Unire in modo incrementale le modifiche in una tabella Delta.
- Convalidare che i clienti di Direct Lake possano acquisire i dati aggiornati.
Prerequisites
Un'area di lavoro con capacità Fabric o Premium.
Almeno due modelli semantici da confrontare o combinare.
Autorizzazioni di compilazione e lettura per ogni modello semantico.
Un notebook Fabric collegato a una lakehouse in cui è possibile creare e aggiornare tabelle Delta.
Pacchetti Python:
%pip install msal requests pyarrow pandasImpostazioni del tenant abilitate:
- API REST Per l'esecuzione di query del set di dati.
- Consentire ai principali del servizio di utilizzare le API di Power BI se viene utilizzata l'autenticazione basata solo su app.
flusso di notebook Fabric
Il notebook esegue questi passaggi:
- Acquisire un token di accesso.
- Eseguire DAX su più modelli semantici.
- Deserializzare le risposte Arrow in DataFrame di Pandas.
- Normalizzare gli schemi e confrontare o combinare i DataFrame.
- Unire in modo incrementale i risultati in una tabella Delta.
- Convalidare la disponibilità dei dati per l'utilizzo di Direct Lake.
1 - Acquisire un token Entra ID per l'utente corrente
Nella prima cella di codice definire le destinazioni del modello semantico e acquisire un token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - Eseguire query DAX tra modelli semantici
Definire un helper che esegue DAX e restituisce un dataframe pandas da Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Nella cella di codice successiva eseguire una query DAX specifica del modello per ogni modello e assegnare tag alla provenienza:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - Confrontare e combinare DataFrame
Normalizzare le colonne chiave, quindi confrontare gli output del modello o combinarli in un singolo set analitico.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Unire in modo incrementale con una tabella Delta
Utilizzare un "Delta merge" basato su colonne chiave di granularità aziendale. Questo modello aggiorna le righe modificate e inserisce nuove righe senza riscrivere la tabella completa.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
Per le finestre di estrazione molto grandi, partizionare la tabella Delta di destinazione in base alla data e al processo in sezioni delimitate. Questo approccio migliora l'efficienza della fusione e aiuta a controllare l'uso del CU.
5 - Convalidare l'idoneità di Direct Lake
Verificare che la tabella Delta sia aggiornata e su cui è possibile eseguire query:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Dopo l'aggiornamento della tabella Delta, i modelli semantici Direct Lake che fanno riferimento a tale tabella possono raccogliere i nuovi dati tramite il normale comportamento di sincronizzazione.
Layout consigliato delle celle del notebook Fabric
Usare questo layout di cella per mantenere il flusso di lavoro gestibile:
- Cella Markdown: scenario, ID modello e tabella di destinazione.
- Python cella: importazioni di pacchetti e acquisizione di token.
- Cella Python: assistente di esecuzione DAX.
- Python cella: estrarre dati da ogni modello semantico.
- Python: confrontare e combinare i DataFrame di pandas.
- Cellula Python: scrivere il DataFrame di staging in Spark ed eseguire Delta
MERGE. - Cella Python: convalidare i conteggi delle righe e i timestamp più recenti di estrazione.
Indicazioni sulle prestazioni
- Mantenere l'ambito DAX solo per le colonne e le righe necessarie.
- Usare
resultsetRowcountLimite filtri DAX per limitare le finestre di estrazione. - Favorisci le fusioni incrementali rispetto alle operazioni di scrittura complete di aggiornamento.
- Riutilizzare un singolo client MSAL e una cache dei token per ogni sessione del notebook.
- Si preferisce l'estrazione end-to-end con Arrow per evitare il sovraccarico di parsing JSON in Python.
- Tenere traccia della durata di estrazione, delle dimensioni del payload e della durata dell'unione come metriche operative.
Troubleshooting
- 401 Non autorizzato: convalidare tenant, credenziali client e ambito.
- HTTP 429: aggiungere nuovi tentativi con backoff esponenziale e jitter.
- Deviazione dello schema tra modelli: normalizzare i nomi delle colonne e i tipi di dati prima dell'unione.
- Utilizzo elevato della memoria in pandas: elaborare i risultati del modello in modalità batch o aggregare in DAX prima dell'estrazione.
Annotazioni
Se il chiamante dispone di autorizzazioni insufficienti, la query ha esito negativo ma la risposta HTTP è ancora 200 OK. Esaminare il contenuto della risposta per dettagli dell'errore.