Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Neste tutorial, você criará um bloco de anotações Microsoft Fabric que extrai dados de vários modelos semânticos Power BI usando a Execute DAX Queries REST API. Você desserializa as respostas de Arrow IPC em DataFrames do pandas, compara e combina saídas de modelos, e mescla resultados incrementalmente em uma tabela Delta no OneLake.
Esse padrão foi projetado para cientistas de dados e engenheiros de análise que precisam de extração de alta taxa de transferência com baixa sobrecarga de análise.
Por que esse padrão funciona
Em comparação com a extração baseada em JSON, o IPC do Arrow reduz a sobrecarga de CPU e memória no lado do cliente, pois evita a análise repetida de JSON e a materialização de objetos. Você pode ler buffers de Arrow diretamente em uma representação tabular na memória e converter para pandas com menos etapas de transformação.
Ao persistir os conjuntos de resultados incrementalmente no Delta, você também evita reescritas completas da tabela. Essa abordagem ajuda a reduzir o uso de unidades de capacidade (CU) enquanto mantém os cenários "downstream" do Direct Lake atuais.
O que você cria
Em um bloco de anotações Fabric, você:
- Consulte dois modelos semânticos com DAX.
- Materialize cada resposta como um pandas DataFrame.
- Compare ou combine os DataFrames.
- Mesclar incrementalmente alterações em uma tabela Delta.
- Valide se os consumidores do Direct Lake podem pegar os dados atualizados.
Pré-requisitos
Um espaço de trabalho de capacidade Fabric ou Premium.
Pelo menos dois modelos semânticos que você deseja comparar ou combinar.
Permissões de compilação e leitura em cada modelo semântico.
Um bloco de anotações Fabric anexado a uma lakehouse onde você pode criar e atualizar tabelas Delta.
Pacotes do Python:
%pip install msal requests pyarrow pandasConfigurações de locatário habilitadas:
- API REST executar consultas do conjunto de dados.
- Permitir que as entidades de serviço usem as APIs do Power BI se você usar a autenticação apenas de aplicativo.
Fabric fluxo do bloco de anotações
O notebook executa estas etapas:
- Adquirir um token de acesso.
- Execute o DAX em vários modelos semânticos.
- Desserializar respostas do Arrow para DataFrames do pandas.
- Normalize esquemas e compare ou combine DataFrames.
- Mesclar resultados incrementalmente em uma tabela Delta.
- Valide a disponibilidade de dados para o consumo do Direct Lake.
1 – Adquirir um token de ID do Entra para o usuário atual
Na primeira célula de código, defina destinos de modelo semântico e adquira um token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 – Executar consultas DAX em modelos semânticos
Defina um auxiliar que executa DAX e retorna um DataFrame pandas do Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Na próxima célula de código, execute uma consulta DAX específica do modelo para cada modelo e procedência de marca:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 – Comparar e combinar DataFrames
Normalize as colunas de chave e compare as saídas do modelo ou combine-as em um único conjunto analítico.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 – Mesclar incrementalmente em uma tabela Delta
Use uma mesclagem Delta com chave em colunas de nível empresarial. Esse padrão atualiza linhas alteradas e insere novas linhas sem reescrever a tabela completa.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Dica
Para janelas de extração muito grandes, particione a tabela Delta de destino por data e processo em fatias limitadas. Essa abordagem melhora a eficiência da mesclagem e ajuda a controlar o uso das CUs.
5 – Validar a prontidão do Direct Lake
Confirme se a tabela Delta está atualizada e consultável:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Depois que a tabela Delta for atualizada, os modelos semânticos do Direct Lake que fazem referência a essa tabela poderão captar os novos dados por meio do comportamento normal de sincronização.
Layout de célula do bloco de anotações Fabric sugerido
Use este layout de célula para manter o fluxo de trabalho mantenedível:
- Célula Markdown: cenário, IDs de modelo e destino de tabela.
- Célula de Python: importações de pacotes e aquisição de token.
- Célula de Python: auxiliar para execução DAX.
- Célula Python: extrair dados de cada modelo semântico.
- Python célula: comparar/combinar DataFrames pandas.
- Python célula: escreva DataFrame de preparo no Spark e execute Delta
MERGE. - Célula Python: validar contagem de linhas e timestamps de extração mais recentes.
Diretrizes de desempenho
- Mantenha o DAX limitado apenas às colunas e linhas necessárias.
- Use
resultsetRowcountLimite filtros DAX para delimitar janelas de extração. - Prefira mesclagens incrementais em vez de regravações completas.
- Reutilize um único cache de token e cliente MSAL por sessão de notebook.
- Prefira usar o Apache Arrow para a extração de dados de ponta a ponta, evitando assim a sobrecarga de processamento de JSON em Python.
- Acompanhe a duração da extração, o tamanho da carga e a duração da mesclagem como métricas operacionais.
Solução de problemas
- 401 Não autorizado: validar o locatário, as credenciais do cliente e o escopo.
- HTTP 429: Adicionar repetição com backoff exponencial e jitter.
- Desvio de esquema entre modelos: Normalizar nomes de colunas e tipos de dados antes da mesclagem.
- Uso de memória grande no pandas: processe saídas de modelo em lotes ou agregação no DAX antes da extração.
Note
Se o chamador tiver permissões insuficientes, a consulta falhará, mas a resposta HTTP ainda será 200 OK. Inspecione o corpo da resposta buscando detalhes sobre o erro.