Tutorial: Extração de alto volume em Python usando notebooks Fabric

Neste tutorial, constróis um caderno de Microsoft Fabric que extrai dados de múltiplos modelos semânticos Power BI usando a API REST Execute DAX Queries. Desserializas as respostas Arrow IPC em DataFrames pandas, comparas e combinas as saídas dos modelos, e fundes incrementalmente os resultados numa tabela Delta no OneLake.

Este padrão foi concebido para cientistas de dados e engenheiros de análise que necessitam de extração de alto rendimento com baixo overhead de parsing.

Porque é que este padrão funciona

Comparado com a extração baseada em JSON, o IPC Arrow reduz a sobrecarga de CPU e memória do lado do cliente porque evita análises JSON repetidas e materialização de objetos. Pode ler buffers Arrow diretamente numa representação tabular em memória e convertê-los para pandas com menos passos de transformação.

Quando persistem-se os conjuntos de resultados incrementalmente em Delta, também evita a reescrita completa de tabelas. Esta abordagem ajuda a reduzir o uso de unidades de capacidade (UC) enquanto mantém os cenários downstream do Direct Lake atualizados.

O que você constrói

Num bloco de notas do Fabric, você:

  1. Consulta dois modelos semânticos com DAX.
  2. Materializar cada resposta como um pandas DataFrame.
  3. Compare ou combine os DataFrames.
  4. Incorpore alterações incrementalmente numa tabela Delta.
  5. Valide que os consumidores da Direct Lake possam aceder aos dados atualizados.

Pré-requisitos

  • Um espaço de trabalho de capacidade Fabric ou Premium.

  • Pelo menos dois modelos semânticos que queres comparar ou combinar.

  • Permissões de construção e leitura em cada modelo semântico.

  • Um caderno Fabric ligado a uma casa de lago onde podes criar e atualizar tabelas Delta.

  • Pacotes Python:

    %pip install msal requests pyarrow pandas
    
  • Definições de inquilino ativadas:

    • API REST de execução de conjuntos de dados.
    • Permita que as entidades de serviço usem as APIs do Power BI se usar autenticação apenas por aplicação.

Fluxo do caderno de sistema Fabric

O caderno executa estes passos:

  1. Adquira um token de acesso.
  2. Executar DAX contra múltiplos modelos semânticos.
  3. Desserializar as respostas do Arrow em DataFrames pandas.
  4. Normalizar esquemas e comparar ou combinar DataFrames.
  5. Fundir incrementalmente resultados numa tabela Delta.
  6. Validar a disponibilidade de dados para o consumo do Direct Lake.

1 - Adquirir um token Entra Id para o utilizador atual

Na primeira célula de código, defina os alvos semânticos do modelo e adquira um token.

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - Executar consultas DAX em modelos semânticos

Defina um assistente que execute DAX e devolva um DataFrame pandas a partir de Arrow IPC.

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

Na célula de código seguinte, execute uma consulta DAX específica para cada modelo e marque a proveniência:

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - Comparar e combinar DataFrames

Normalize as colunas-chave e depois compare os resultados dos modelos ou combine-os num único conjunto analítico.

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - Fusão incremental numa tabela Delta

Use uma fusão Delta baseada em colunas de nível granular de negócio. Este padrão atualiza as linhas alteradas e insere novas linhas sem reescrever a tabela completa.

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Tip

Para janelas de extração muito grandes, particione a tabela Delta alvo por data e processe em fatias limitadas. Esta abordagem melhora a eficiência da integração e ajuda a controlar o uso do CU.

5 - Validar a prontidão direta do lago

Confirme que a tabela Delta está atualizada e pode ser consultada:

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

Após a atualização da tabela Delta, os modelos semânticos Direct Lake que fazem referência a essa tabela podem captar os novos dados através do comportamento normal de sincronização.

Disposição sugerida de células de cadernos Fabric

Use esta disposição de células para manter o fluxo de trabalho sustentável:

  1. Célula Markdown: cenário, IDs de modelo e alvo da tabela.
  2. Célula Python: importação de pacotes e aquisição de tokens.
  3. Célula Python: auxiliar de execução DAX.
  4. Célula Python: extrair dados de cada modelo semântico.
  5. Célula Python: comparar/combinar pandas DataFrames.
  6. Célula Python: escreve o DataFrame de staging para o Spark e executa o Delta MERGE.
  7. Célula Python: valida a contagem de linhas e os últimos carimbos de extração.

Orientação de desempenho

  • Mantenha o DAX restrito apenas às colunas e linhas necessárias.
  • Use resultsetRowcountLimit e filtros DAX para limitar as janelas de extração.
  • Prefere fusões incrementais em vez de escritas completas de atualização.
  • Reutilizar um único cliente MSAL e cache de token por sessão do notebook.
  • Prefiro Arrow end-to-end para extração, para evitar sobrecarga de análise JSON em Python.
  • Acompanhe a duração da extração, o tamanho da carga útil e a duração da fusão como métricas operacionais.

Troubleshooting

  • 401 Não autorizado: Validar o inquilino, credenciais do cliente e o âmbito.
  • HTTP 429: Adicionar tentativa novamente com backoff exponencial e jitter.
  • Deriva de esquema entre modelos: Normalizar os nomes das colunas e os tipos de dados antes da fusão.
  • Grande utilização de memória em pandas: Processar as saídas do modelo em lotes ou agregá-las em DAX antes da extração.

Observação

Se o chamador tiver permissões insuficientes, a consulta falha, mas a resposta HTTP continua 200 OKa ser . Inspecione o corpo da resposta para detalhes de erros.