Σημείωμα
Η πρόσβαση σε αυτήν τη σελίδα απαιτεί εξουσιοδότηση. Μπορείτε να δοκιμάσετε να εισέλθετε ή να αλλάξετε καταλόγους.
Η πρόσβαση σε αυτήν τη σελίδα απαιτεί εξουσιοδότηση. Μπορείτε να δοκιμάσετε να αλλάξετε καταλόγους.
Σε αυτό το εκπαιδευτικό βοήθημα, δημιουργείτε ένα σημειωματάριο Microsoft Fabric που εξάγει δεδομένα από πολλά σημασιολογικά μοντέλα Power BI χρησιμοποιώντας το Execute DAX Queries REST API. Μπορείτε να αποσειριοποιήσετε τις αποκρίσεις IPC του Arrow σε pandas DataFrames, να συγκρίνετε και να συνδυάσετε τις εξόδους του μοντέλου και να συγχωνεύσετε σταδιακά τα αποτελέσματα σε έναν πίνακα Delta στο OneLake.
Αυτό το μοτίβο έχει σχεδιαστεί για επιστήμονες δεδομένων και μηχανικούς ανάλυσης που χρειάζονται εξαγωγή υψηλής απόδοσης με χαμηλή επιβάρυνση ανάλυσης.
Γιατί λειτουργεί αυτό το μοτίβο
Σε σύγκριση με την εξαγωγή που βασίζεται σε JSON, το Arrow IPC μειώνει την επιβάρυνση της CPU και της μνήμης από την πλευρά του πελάτη, επειδή αποφεύγετε την επαναλαμβανόμενη ανάλυση JSON και την υλοποίηση αντικειμένων. Μπορείτε να διαβάσετε τα buffer Arrow απευθείας σε μια αναπαράσταση πίνακα στη μνήμη και να τα μετατρέψετε σε panda με λιγότερα βήματα μετασχηματισμού.
Όταν διατηρείτε τα σύνολα αποτελεσμάτων σταδιακά σε Delta, αποφεύγετε επίσης τις πλήρεις επανεγγραφές πινάκων. Αυτή η προσέγγιση συμβάλλει στη μείωση της χρήσης της μονάδας χωρητικότητας (CU), διατηρώντας παράλληλα ενημερωμένα τα κατάντη σενάρια Direct Lake.
Τι χτίζετε
Σε ένα σημειωματάριο Fabric:
- Υποβάλετε ερώτημα σε δύο σημασιολογικά μοντέλα με το DAX.
- Υλοποιήστε κάθε απάντηση ως pandas DataFrame.
- Συγκρίνετε ή συνδυάστε τα πλαίσια δεδομένων.
- Σταδιακή συγχώνευση αλλαγών σε έναν πίνακα Delta.
- Επικυρώστε ότι οι καταναλωτές του Direct Lake μπορούν να παραλάβουν τα ενημερωμένα δεδομένα.
Προαπαιτούμενα στοιχεία
Ένας χώρος εργασίας Fabric ή Premium εκχωρημένων πόρων.
Τουλάχιστον δύο σημασιολογικά μοντέλα που θέλετε να συγκρίνετε ή να συνδυάσετε.
Δικαιώματα δόμησης και ανάγνωσης σε κάθε μοντέλο σημασιολογίας.
Ένα σημειωματάριο Fabric συνδεδεμένο σε μια λίμνη όπου μπορείτε να δημιουργήσετε και να ενημερώσετε πίνακες Delta.
Python πακέτα:
%pip install msal requests pyarrow pandasΕνεργοποιημένες ρυθμίσεις μισθωτή:
- Σύνολο δεδομένων Εκτέλεση ερωτημάτων REST API.
- Επιτρέψτε στις κύριες υπηρεσίες να χρησιμοποιούν API Power BI εάν χρησιμοποιείτε έλεγχο ταυτότητας μόνο για εφαρμογές.
Fabric ροή φορητού υπολογιστή
Ο φορητός υπολογιστής εκτελεί τα εξής βήματα:
- Αποκτήστε ένα διακριτικό πρόσβασης.
- Εκτελέστε DAX σε σχέση με πολλά σημασιολογικά μοντέλα.
- Αποσειριοποιήστε τις απαντήσεις του Arrow σε pandas DataFrames.
- Κανονικοποιήστε σχήματα και συγκρίνετε ή συνδυάστε DataFrames.
- Επαυξητική συγχώνευση αποτελεσμάτων σε έναν πίνακα Delta.
- Επικυρώστε τη διαθεσιμότητα δεδομένων για άμεση κατανάλωση λίμνης.
1 - Αποκτήστε ένα διακριτικό Entra Id για τον τρέχοντα χρήστη
Στο πρώτο κελί κώδικα, ορίστε στόχους σημασιολογικού μοντέλου και αποκτήστε ένα διακριτικό.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - Εκτέλεση ερωτημάτων DAX σε σημασιολογικά μοντέλα
Ορίστε έναν βοηθό που εκτελεί DAX και επιστρέφει ένα pandas DataFrame από το Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Στο επόμενο κελί κώδικα, εκτελέστε ένα ερώτημα DAX για κάθε μοντέλο και προέλευση ετικέτας:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - Συγκρίνετε και συνδυάστε DataFrames
Κανονικοποιήστε τις στήλες κλειδιών και, στη συνέχεια, συγκρίνετε τα αποτελέσματα του μοντέλου ή συνδυάστε τα σε ένα ενιαίο σύνολο ανάλυσης.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Τμηματική συγχώνευση σε πίνακα Delta
Χρησιμοποιήστε μια συγχώνευση Delta με κλειδί σε στήλες επιχειρηματικής λεπτομέρειας. Αυτό το μοτίβο ενημερώνει τις αλλαγμένες γραμμές και εισάγει νέες γραμμές χωρίς να ξαναγράψει τον πλήρη πίνακα.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Συμβουλή
Για πολύ μεγάλα παράθυρα εξαγωγής, διαμερίστε τον πίνακα Delta προορισμού κατά ημερομηνία και επεξεργαστείτε σε οριοθετημένα slice. Αυτή η προσέγγιση βελτιώνει την αποτελεσματικότητα της συγχώνευσης και βοηθά στον έλεγχο της χρήσης CU.
5 - Επικύρωση ετοιμότητας Direct Lake
Επιβεβαιώστε ότι ο πίνακας Delta είναι ενημερωμένος και με δυνατότητα αναζήτησης:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Μετά την ενημέρωση του πίνακα Delta, τα σημασιολογικά μοντέλα Direct Lake που αναφέρονται σε αυτόν τον πίνακα μπορούν να λάβουν τα νέα δεδομένα μέσω κανονικής συμπεριφοράς συγχρονισμού.
Προτεινόμενη διάταξη κελιού σημειωματαρίου Fabric
Χρησιμοποιήστε αυτήν τη διάταξη κελιού για να διατηρήσετε τη ροή εργασίας διατηρήσιμη:
- Κελί Markdown: σενάριο, αναγνωριστικά μοντέλου και προορισμός πίνακα.
- Κελί Python: εισαγωγές πακέτων και απόκτηση διακριτικών.
- Κελί Python: Βοηθός εκτέλεσης DAX.
- Κελί Python: εξαγωγή δεδομένων από κάθε σημασιολογικό μοντέλο.
- Κελί Python: σύγκριση/συνδυασμός pandas DataFrames.
- Python κελί: γράψτε το DataFrame προεργασίας στο Spark και εκτελέστε το Delta
MERGE. - Κελί Python: επικύρωση πλήθους σειρών και πιο πρόσφατων χρονικών σημάνσεων εξαγωγής.
Οδηγίες απόδοσης
- Διατηρήστε το εύρος DAX μόνο στις απαιτούμενες στήλες και γραμμές.
- Χρησιμοποιήστε
resultsetRowcountLimitφίλτρα DAX για να συνδέσετε παράθυρα εξαγωγής. - Προτιμήστε τις επαυξητικές συγχωνεύσεις έναντι των εγγραφών πλήρους ανανέωσης.
- Χρησιμοποιήστε ξανά ένα πρόγραμμα-πελάτη MSAL και ένα cache διακριτικού ανά περίοδο λειτουργίας σημειωματάριου.
- Προτιμήστε το βέλος από άκρο σε άκρο για εξαγωγή για να αποφύγετε την επιβάρυνση ανάλυσης JSON στην Python.
- Παρακολουθήστε τη διάρκεια εξαγωγής, το μέγεθος ωφέλιμου φορτίου και τη διάρκεια συγχώνευσης ως λειτουργικές μετρήσεις.
Troubleshooting
- 401 Μη εξουσιοδοτημένο: Επικύρωση μισθωτή, διαπιστευτηρίων προγράμματος-πελάτη και εμβέλειας.
- HTTP 429: Προσθέστε επανάληψη με εκθετική οπισθοδρόμηση και διακύμανση καθυστέρησης.
- Μετατόπιση σχήματος μεταξύ μοντέλων: Κανονικοποιήστε τα ονόματα στηλών και τους τύπους δεδομένων πριν από τη συγχώνευση.
- Χρήση μεγάλης μνήμης σε panda: Επεξεργασία εξόδων μοντέλου σε δέσμες ή συνάθροιση σε DAX πριν από την εξαγωγή.
Σημείωση
Εάν ο καλών δεν έχει επαρκή δικαιώματα, το ερώτημα αποτυγχάνει, αλλά η απόκριση HTTP εξακολουθεί να 200 OKείναι . Επιθεωρήστε το σώμα απόκρισης για λεπτομέρειες σφάλματος.