Tutoriel : Extraction à haute volumétrie Python dans les notebooks Fabric

Dans ce tutoriel, vous créez un notebook Microsoft Fabric qui extrait des données de plusieurs modèles sémantiques Power BI à l’aide de l’API REST des requêtes DAX Execute DAX . Vous désérialisez les réponses Arrow IPC dans des DataFrames Pandas, comparez et combinez les sorties du modèle, et fusionnez de façon incrémentielle les résultats dans une table Delta dans OneLake.

Ce modèle est conçu pour les scientifiques des données et les ingénieurs d’analytique qui ont besoin d’une extraction à débit élevé avec une surcharge d’analyse faible.

Pourquoi ce modèle fonctionne

Par rapport à l’extraction basée sur JSON, arrow IPC réduit la surcharge processeur et mémoire côté client, car vous évitez l’analyse JSON répétée et la matérialisation d’objets. Vous pouvez lire les tampons Arrow directement dans une représentation tabulaire en mémoire et les convertir en un DataFrame pandas avec moins d'étapes de transformation.

Lorsque vous conservez les jeux de résultats de manière incrémentielle sur Delta, vous évitez également les réécritures de table complètes. Cette approche permet de réduire l’utilisation de l’unité de capacité tout en conservant les scénarios Direct Lake en aval actuels.

Ce que vous construisez

Dans un bloc-notes Fabric, vous devez :

  1. Interrogez deux modèles sémantiques avec DAX.
  2. Convertissez chaque réponse en un DataFrame de pandas.
  3. Comparez ou combinez les DataFrames.
  4. Fusionnez de façon incrémentielle les modifications dans une table Delta.
  5. Vérifiez que les consommateurs Direct Lake peuvent récupérer les données mises à jour.

Prerequisites

  • Un espace de travail de capacité Fabric ou Premium.

  • Au moins deux modèles sémantiques que vous souhaitez comparer ou combiner.

  • Générer et lire des autorisations sur chaque modèle sémantique.

  • Un bloc-notes Fabric attaché à un lakehouse où vous pouvez créer et mettre à jour des tables Delta.

  • paquets Python:

    %pip install msal requests pyarrow pandas
    
  • Paramètres du locataire activés :

    • API REST pour l'exécution des requêtes sur le jeu de données.
    • Autorisez les principaux de service à utiliser les API Power BI si vous utilisez une authentification uniquement par application.

flux de bloc-notes Fabric

Le notebook effectue les étapes suivantes :

  1. Obtenez un jeton d’accès.
  2. Exécutez DAX sur plusieurs modèles sémantiques.
  3. Désérialiser les réponses Arrow en DataFrames pandas.
  4. Normalisez les schémas et comparez ou combinez des DataFrames.
  5. Fusionnez de façon incrémentielle les résultats dans une table Delta.
  6. Valider la disponibilité des données pour la consommation Direct Lake.

1 - Acquérir un jeton Entra ID pour l’utilisateur actuel

Dans la première cellule de code, définissez des cibles de modèle sémantique et obtenez un jeton.

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - Exécuter des requêtes DAX sur des modèles sémantiques

Définissez un outil qui exécute DAX et retourne un DataFrame pandas à partir de l'IPC Arrow.

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

Dans la cellule de code suivante, exécutez une requête DAX spécifique au modèle pour chaque modèle et chaque provenance de balise :

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - Comparer et combiner des DataFrames

Normalisez les colonnes clés, puis comparez les sorties du modèle ou combinez-les dans un seul jeu analytique.

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - Fusion incrémentielle vers une table Delta

Utilisez une fusion Delta basée sur les colonnes clés du modèle d'entreprise. Ce modèle met à jour les lignes modifiées et insère de nouvelles lignes sans réécrire la table complète.

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Conseil / Astuce

Pour les fenêtres d’extraction très volumineuses, partitionnez la table Delta cible par date et traitez les tranches délimitées. Cette approche améliore l’efficacité du processus de fusion et permet de contrôler l’utilisation de CU.

5 - Valider l'état de préparation de Direct Lake

Vérifiez que la table Delta est mise à jour et interrogeable :

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

Une fois la table Delta mise à jour, les modèles sémantiques Direct Lake qui référencent cette table peuvent récupérer les nouvelles données par le biais d’un comportement de synchronisation normal.

Disposition de cellule de bloc-notes suggérée Fabric

Utilisez cette disposition de cellule pour maintenir le flux de travail à jour :

  1. Cellule Markdown : scénario, ID de modèle et cible de table.
  2. Python : importations des modules et acquisition des jetons.
  3. cellule Python : assistance d’exécution DAX.
  4. Python cellule : extrayez les données de chaque modèle sémantique.
  5. Python cellule : comparer/combiner des DataFrames pandas.
  6. cellule Python : écrivez un DataFrame intermédiaire dans Spark et exécutez Delta MERGE.
  7. Cellule Python : valider le nombre de lignes et les horodatages d’extraction les plus récents.

Guide de performances

  • Gardez l'étendue de DAX uniquement aux colonnes et lignes requises.
  • Utilisez resultsetRowcountLimit et les filtres DAX pour limiter les fenêtres d’extraction.
  • Privilégiez les fusions incrémentielles par rapport aux écritures d’actualisation complètes.
  • Réutilisez un client MSAL unique et un cache de jetons par session de notebook.
  • Préférez la flèche de bout en bout pour l’extraction afin d’éviter la surcharge d’analyse JSON dans Python.
  • Effectuez le suivi de la durée d’extraction, de la taille de la charge utile et de la durée de fusion en tant que métriques opérationnelles.

Résolution des problèmes

  • 401 Non autorisé : valider le locataire, les informations d’identification du client et l’étendue.
  • HTTP 429 : Ajoutez une nouvelle tentative avec un retrait exponentiel et un jitter.
  • Dérive de schéma entre les modèles : normalisez les noms de colonnes et les types de données avant la fusion.
  • Utilisation importante de la mémoire dans pandas : traitez les sorties du modèle par lots ou regroupez-les dans DAX avant l'extraction.

Note

Si l’appelant dispose d’autorisations insuffisantes, la requête échoue mais la réponse HTTP est toujours 200 OK. Inspectez le corps de la réponse pour obtenir des détails sur l’erreur.