Opas: Suuren volyymin Python-poiminta Fabric-muistikirjoissa

Tässä tutoriaalissa rakennat Microsoft Fabric-muistikirjan, joka poimii dataa useista Power BI semanttisista malleista käyttämällä Execute DAX Queries REST API. Deserialisoit Arrow IPC -vastaukset pandas-dataframeiksi, vertaat ja yhdistät mallin tuloksia ja yhdistät tulokset asteittain Delta-taulukkoon OneLakessa.

Tämä malli on suunniteltu datatieteilijöille ja analytiikkainsinööreille, jotka tarvitsevat suuren läpimenon erottelun pienellä jäsentämiskululla.

Miksi tämä kuvio toimii

Verrattuna JSON-pohjaiseen uuttoon, Arrow IPC vähentää prosessorin ja muistin kuormitusta asiakaspuolella, koska vältetään toistuva JSON-jäsennys ja objektien materialisointi. Voit lukea nuolipuskureita suoraan taulukkomuotoiseen muistikuvaan ja muuntaa pandoiksi vähemmillä muunnosvaiheilla.

Kun säilytät tulosjoukot asteittain Deltaksi, vältät myös täydelliset taulun uudelleenkirjoitukset. Tämä lähestymistapa auttaa vähentämään kapasiteettiyksikön (CU) käyttöä ja pitää alavirran Direct Lake -skenaariot ajan tasalla.

Mitä rakennat

Yhdessä Fabric-muistikirjassa sinä:

  1. Hae kaksi semanttista mallia DAX:lla.
  2. Materialisoi jokainen vastaus pandas DataFrameksi.
  3. Vertaa tai yhdistä datakehykset.
  4. Yhdistä muutokset asteittain Delta-taulukkoon.
  5. Varmista, että Direct Laken käyttäjät voivat saada päivitetyt tiedot.

edellytykset

  • Fabric- tai Premium-kapasiteetin työtila.

  • Vähintään kaksi semanttista mallia, joita haluat vertailla tai yhdistää.

  • Rakennus- ja lukuoikeudet jokaiselle semanttisella mallille.

  • Fabric-muistikirja, joka on kiinnitetty järventaloon, jossa voit luoda ja päivittää Delta-taulukoita.

  • Python-paketit:

    %pip install msal requests pyarrow pandas
    
  • Vuokralaisasetukset käytössä:

    • Dataset suorita kyselyt REST API.
    • Salli palvelupäähenkilöiden käyttää Power BI rajapintoja jos käytät pelkkää sovellustodennusta.

Fabric-muistikirjan kulku

Muistikirja suorittaa seuraavat vaiheet:

  1. Hanki pääsytunnus.
  2. Suorita DAX useita semanttisia malleja vastaan.
  3. Deserialisoi Arrow-vastaukset pandas-datakehyksiin.
  4. Normalisoi skeemat ja vertaile tai yhdistä DataFrameja.
  5. Vähittäinen yhdistämistulokset muodostavat Delta-taulukon.
  6. Varmista datan saatavuus Direct Lake -käyttöä varten.

1 - Hanki Entra Id -token nykyiselle käyttäjälle

Ensimmäisessä koodisolussa määrittele semanttisen mallin kohteet ja hanki token.

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - Suorita DAX-kyselyt semanttisten mallien yli

Määrittele apuväline, joka suorittaa DAX:n ja palauttaa pandas DataFramen Arrow IPC:stä.

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

Seuraavassa koodisolussa suorita mallikohtainen DAX-kysely jokaiselle mallille ja tunnisteen alkuperälle:

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - Vertaa ja yhdistää datakehyksiä

Normalisoi avainsarakkeet, vertaa sitten mallin tuloksia tai yhdistä ne yhdeksi analyyttiseksi joukoksi.

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - Asteittainen yhdistäminen Delta-taulukkoon

Käytä Delta-yhdistämistä, joka on avainettu business-grain -sarakkeisiin. Tämä malli päivittää vaihdetut rivit ja lisää uusia rivejä kirjoittamatta koko taulukkoa uudelleen.

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Vinkki

Erittäin suurissa poimintaikkunoissa jaetaan kohdedelta-taulukko päivämäärän mukaan ja prosessoi rajoitetuissa viipaleissa. Tämä lähestymistapa parantaa yhdistämisen tehokkuutta ja auttaa hallitsemaan CU:n käyttöä.

5 - Suoran järven valmiuden vahvistaminen

Vahvista, että Delta-taulukko on päivitetty ja kyselytön:

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

Kun Delta-taulukko on päivitetty, Direct Lake -semanttiset mallit, jotka viittaavat kyseiseen taulukkoon, voivat poimia uuden datan normaalin synkronointikäyttäytymisen kautta.

Ehdotettu Fabric-muistikirjan solun asettelu

Käytä tätä soluasettelua työnkulun ylläpitämiseksi:

  1. Markdown-solu: skenaario, mallin tunnukset ja taulukon kohde.
  2. Python-solu: pakettien tuonti ja tokenien hankinta.
  3. Python-solu: DAX-suoritusapuväline.
  4. Python-solu: poimi tiedot jokaisesta semanttisesta mallista.
  5. Python-solu: vertaa/yhdistä pandas DataFrameja.
  6. Python solu: kirjoita staging DataFrame Sparkille ja suorita Delta MERGE.
  7. Python-solu: vahvista rivimäärät ja viimeisimmät poimintaaikaleimat.

Suorituskyvyn ohjaus

  • Pidä DAX rajattuna vain vaadittaviin sarakkeisiin ja riveihin.
  • Käytä resultsetRowcountLimit DAX-suodattimia sitomaan uutto-ikkunoita.
  • Suosi inkrementaalisia yhdistämisiä täysien päivityskirjoitusten sijaan.
  • Käytä yhtä MSAL-asiakasta ja token-välimuistia per muistikirjan istunto.
  • Suosi Arrow-menetelmää päästä päähän -menetelmää poimintaa varten, jotta JSON-jäsennyksen ylikuormitus Python:ssa välttyy.
  • Seuraa poimintakestoa, hyötykuorman kokoa ja yhdistämiskestoa operatiivisina mittareina.

Troubleshooting

  • 401 Luvaton: Vahvista vuokralainen, asiakkaan tunnistetiedot ja laajuus.
  • HTTP 429: Lisää uudelleenyritys eksponentiaalisella peruutuksella ja jitterillä.
  • Skeeman siirtymä mallien välillä: Normalisoi sarakkeennimet ja tietotyypit ennen yhdistämistä.
  • Suuri muistinkulutus pandoissa: Prosessoi mallin tulosteet erissä tai aggregoitu DAX:ssa ennen niiden poimimista.

Muistio

Jos soittajalla ei ole riittäviä oikeuksia, kysely epäonnistuu, mutta HTTP-vastaus on edelleen 200 OK. Tarkista vastekeho virheiden yksityiskohtien varalta.