Tutorial: Extração em grande volume de Python em notebooks Fabric

Neste tutorial, você criará um bloco de anotações Microsoft Fabric que extrai dados de vários modelos semânticos Power BI usando a Execute DAX Queries REST API. Você desserializa as respostas de Arrow IPC em DataFrames do pandas, compara e combina saídas de modelos, e mescla resultados incrementalmente em uma tabela Delta no OneLake.

Esse padrão foi projetado para cientistas de dados e engenheiros de análise que precisam de extração de alta taxa de transferência com baixa sobrecarga de análise.

Por que esse padrão funciona

Em comparação com a extração baseada em JSON, o IPC do Arrow reduz a sobrecarga de CPU e memória no lado do cliente, pois evita a análise repetida de JSON e a materialização de objetos. Você pode ler buffers de Arrow diretamente em uma representação tabular na memória e converter para pandas com menos etapas de transformação.

Ao persistir os conjuntos de resultados incrementalmente no Delta, você também evita reescritas completas da tabela. Essa abordagem ajuda a reduzir o uso de unidades de capacidade (CU) enquanto mantém os cenários "downstream" do Direct Lake atuais.

O que você cria

Em um bloco de anotações Fabric, você:

  1. Consulte dois modelos semânticos com DAX.
  2. Materialize cada resposta como um pandas DataFrame.
  3. Compare ou combine os DataFrames.
  4. Mesclar incrementalmente alterações em uma tabela Delta.
  5. Valide se os consumidores do Direct Lake podem pegar os dados atualizados.

Pré-requisitos

  • Um espaço de trabalho de capacidade Fabric ou Premium.

  • Pelo menos dois modelos semânticos que você deseja comparar ou combinar.

  • Permissões de compilação e leitura em cada modelo semântico.

  • Um bloco de anotações Fabric anexado a uma lakehouse onde você pode criar e atualizar tabelas Delta.

  • Pacotes do Python:

    %pip install msal requests pyarrow pandas
    
  • Configurações de locatário habilitadas:

    • API REST executar consultas do conjunto de dados.
    • Permitir que as entidades de serviço usem as APIs do Power BI se você usar a autenticação apenas de aplicativo.

Fabric fluxo do bloco de anotações

O notebook executa estas etapas:

  1. Adquirir um token de acesso.
  2. Execute o DAX em vários modelos semânticos.
  3. Desserializar respostas do Arrow para DataFrames do pandas.
  4. Normalize esquemas e compare ou combine DataFrames.
  5. Mesclar resultados incrementalmente em uma tabela Delta.
  6. Valide a disponibilidade de dados para o consumo do Direct Lake.

1 – Adquirir um token de ID do Entra para o usuário atual

Na primeira célula de código, defina destinos de modelo semântico e adquira um token.

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 – Executar consultas DAX em modelos semânticos

Defina um auxiliar que executa DAX e retorna um DataFrame pandas do Arrow IPC.

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

Na próxima célula de código, execute uma consulta DAX específica do modelo para cada modelo e procedência de marca:

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 – Comparar e combinar DataFrames

Normalize as colunas de chave e compare as saídas do modelo ou combine-as em um único conjunto analítico.

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 – Mesclar incrementalmente em uma tabela Delta

Use uma mesclagem Delta com chave em colunas de nível empresarial. Esse padrão atualiza linhas alteradas e insere novas linhas sem reescrever a tabela completa.

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Dica

Para janelas de extração muito grandes, particione a tabela Delta de destino por data e processo em fatias limitadas. Essa abordagem melhora a eficiência da mesclagem e ajuda a controlar o uso das CUs.

5 – Validar a prontidão do Direct Lake

Confirme se a tabela Delta está atualizada e consultável:

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

Depois que a tabela Delta for atualizada, os modelos semânticos do Direct Lake que fazem referência a essa tabela poderão captar os novos dados por meio do comportamento normal de sincronização.

Layout de célula do bloco de anotações Fabric sugerido

Use este layout de célula para manter o fluxo de trabalho mantenedível:

  1. Célula Markdown: cenário, IDs de modelo e destino de tabela.
  2. Célula de Python: importações de pacotes e aquisição de token.
  3. Célula de Python: auxiliar para execução DAX.
  4. Célula Python: extrair dados de cada modelo semântico.
  5. Python célula: comparar/combinar DataFrames pandas.
  6. Python célula: escreva DataFrame de preparo no Spark e execute Delta MERGE.
  7. Célula Python: validar contagem de linhas e timestamps de extração mais recentes.

Diretrizes de desempenho

  • Mantenha o DAX limitado apenas às colunas e linhas necessárias.
  • Use resultsetRowcountLimit e filtros DAX para delimitar janelas de extração.
  • Prefira mesclagens incrementais em vez de regravações completas.
  • Reutilize um único cache de token e cliente MSAL por sessão de notebook.
  • Prefira usar o Apache Arrow para a extração de dados de ponta a ponta, evitando assim a sobrecarga de processamento de JSON em Python.
  • Acompanhe a duração da extração, o tamanho da carga e a duração da mesclagem como métricas operacionais.

Solução de problemas

  • 401 Não autorizado: validar o locatário, as credenciais do cliente e o escopo.
  • HTTP 429: Adicionar repetição com backoff exponencial e jitter.
  • Desvio de esquema entre modelos: Normalizar nomes de colunas e tipos de dados antes da mesclagem.
  • Uso de memória grande no pandas: processe saídas de modelo em lotes ou agregação no DAX antes da extração.

Note

Se o chamador tiver permissões insuficientes, a consulta falhará, mas a resposta HTTP ainda será 200 OK. Inspecione o corpo da resposta buscando detalhes sobre o erro.