Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym samouczku utworzysz notatnik Microsoft Fabric, służący do wyodrębniania danych z różnych modeli semantycznych Power BI przy użyciu interfejsu API REST Execute DAX Queries. Deserializujesz odpowiedzi Arrow IPC do DataFrames Pandas, porównujesz i łączysz wyniki modeli, a następnie przyrostowo scalasz wyniki z tabelą Delta w usłudze OneLake.
Ten wzorzec został zaprojektowany dla analityków danych i inżynierów ds. analityki, którzy potrzebują wydajnego wyodrębniania danych z niskim narzutem na analizę.
Dlaczego ten wzorzec działa
W porównaniu z wyodrębnianiem opartym na formacie JSON, Arrow IPC zmniejsza obciążenie CPU i pamięci po stronie klienta, ponieważ unikasz wielokrotnego analizowania formatu JSON i materializacji obiektów. Bufory Arrow można odczytać bezpośrednio do tabelarycznej reprezentacji w pamięci i przekonwertować do pandas przy użyciu mniejszej liczby kroków przekształcania.
Podczas przyrostowego zapisywania zestawów wyników do Delta, unikasz także całkowitego przepisania tabeli. Takie podejście pomaga zmniejszyć zużycie jednostek wydajności (CU), zachowując aktualność bieżących scenariuszy Direct Lake dla niższych poziomów.
Co tworzysz
W jednym notesie Fabric możesz:
- Wykonywanie zapytań o dwa modele semantyczne przy użyciu języka DAX.
- Zmaterializuj każdą odpowiedź jako pandas DataFrame.
- Porównaj lub połącz ramki danych.
- Stopniowe scalanie zmian w tabeli Delta.
- Sprawdź, czy użytkownicy usługi Direct Lake mogą pobrać zaktualizowane dane.
Wymagania wstępne
Obszar roboczy o mocy obliczeniowej Fabric lub Premium.
Co najmniej dwa semantyczne modele, które chcesz porównać lub połączyć.
Uprawnienia do tworzenia i odczytu dla każdego modelu semantycznego.
Notatnik Fabric powiązany z magazynem danych typu lakehouse, w którym można tworzyć i aktualizować tabele Delta.
pakiety Python:
%pip install msal requests pyarrow pandasWłączone ustawienia dzierżawy:
- Interfejs API REST wykonywania zapytań zestawu danych.
- Pozwól podmiotom zabezpieczeń na korzystanie z interfejsów API Power BI, jeśli korzystasz z uwierzytelniania wyłącznie dla aplikacji.
przepływ pracy notesu Fabric
Notatnik wykonuje następujące kroki:
- Uzyskaj token dostępu.
- Wykonywanie języka DAX względem wielu modeli semantycznych.
- Deserializuj odpowiedzi strzałki w ramkach danych biblioteki pandas.
- Normalizacja schematów i porównywanie lub łączenie ramek danych.
- Przyrostowe scalanie wyników do tabeli Delta.
- Zweryfikuj dostępność danych pod kątem użycia usługi Direct Lake.
1 - Uzyskaj token Entra Id dla bieżącego użytkownika
W pierwszej komórce kodu zdefiniuj obiekty docelowe modelu semantycznego i uzyskaj token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 — Wykonywanie zapytań języka DAX w modelach semantycznych
Zdefiniuj funkcję pomocniczą, która wykonuje język DAX i zwraca DataFrame pandas z Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
W następnej komórce kodu uruchom specyficzne dla każdego modelu zapytanie języka DAX i oznacz pochodzenie tagów.
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 — Porównywanie i łączenie ramek danych
Normalizuj kolumny kluczy, a następnie porównaj dane wyjściowe modelu lub połącz je w jeden zestaw analityczny.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 — Przyrostowe scalanie z tabelą Delta
Użyj scalania typu Delta opartego na kolumnach na poziomie szczegółowości biznesowej. Ten wzorzec aktualizuje zmienione wiersze i wstawia nowe wiersze bez ponownego zapisywania pełnej tabeli.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Wskazówka
W przypadku bardzo dużych okien ekstrakcji należy podzielić docelową tabelę Delta według daty i przetwarzać w ograniczonych wycinkach. Takie podejście zwiększa wydajność scalania i pomaga kontrolować użycie jednostek obliczeniowych.
5 — Weryfikowanie gotowości usługi Direct Lake
Upewnij się, że tabela delty została zaktualizowana i można wykonywać zapytania:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Po zaktualizowaniu tabeli delty semantyczne modele usługi Direct Lake odwołujące się do tej tabeli mogą pobierać nowe dane za pomocą normalnego zachowania synchronizacji.
Sugerowany układ komórek notesu Fabric
Użyj tego układu komórki, aby zachować możliwość utrzymania przepływu pracy:
- Komórka Markdown: scenariusz, identyfikatory modeli i cel tabeli.
- komórka Python: importowanie pakietów i pozyskiwanie tokenów.
- Komórka Pythona: pomocnik wykonywania DAX.
- Komórka Pythona: wyodrębnij dane z każdego modelu semantycznego.
- Python komórka: porównaj/połącz pandas DataFrames.
- Python komórka: zapisz przemieszczanie ramki danych na platformie Spark i uruchom < delta
. - W komórce Python: zweryfikuj liczbę wierszy i najnowsze znaczniki czasu wyodrębniania.
Wskazówki dotyczące wydajności
- Zachowaj zastosowanie DAX tylko do wymaganych kolumn i wierszy.
- Użyj
resultsetRowcountLimitoraz filtrów DAX, aby ograniczyć okna wyodrębniania. - Preferuj scalania przyrostowe zamiast pełnego odświeżania.
- Używaj ponownie pojedynczego klienta MSAL i pamięci podręcznej tokenów na sesję notatnika.
- Preferuj użycie Apache Arrow end-to-end do wyodrębniania, aby uniknąć narzutu związanego z analizą JSON w Pythonie.
- Śledź czas trwania wyodrębniania, rozmiar ładunku i czas trwania scalania jako metryki operacyjne.
Troubleshooting
- 401 Brak autoryzacji: zweryfikuj najemcę, poświadczenia klienta i zakres uprawnień.
- HTTP 429: Dodaj ponowną próbę z wykładniczym opóźnieniem i fluktuacją.
- Dryf schematu między modelami: Normalizacja nazw kolumn i typów danych przed scaleniem.
- Duże użycie pamięci w bibliotece pandas: przetwarzanie danych wyjściowych modelu w partiach lub agregowanie w języku DAX przed wyodrębnianiem.
Uwaga / Notatka
Jeśli obiekt wywołujący ma niewystarczające uprawnienia, zapytanie kończy się niepowodzeniem, ale odpowiedź HTTP jest nadal 200 OK. Sprawdź treść odpowiedzi, aby uzyskać szczegółowe informacje o błędzie.
Treści powiązane
- Omówienie interfejsu API wykonywania zapytań języka DAX
- Rozpocznij pracę z interfejsem API REST do wykonywania zapytań DAX
- Tutorial: Tworzenie usługi .NET warstwy średniej przy użyciu interfejsu API REST wykonywania zapytań języka DAX
- Najlepsze rozwiązania dotyczące interfejsu API REST wykonywania zapytań języka DAX