Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Neste tutorial, constróis um caderno de Microsoft Fabric que extrai dados de múltiplos modelos semânticos Power BI usando a API REST Execute DAX Queries. Desserializas as respostas Arrow IPC em DataFrames pandas, comparas e combinas as saídas dos modelos, e fundes incrementalmente os resultados numa tabela Delta no OneLake.
Este padrão foi concebido para cientistas de dados e engenheiros de análise que necessitam de extração de alto rendimento com baixo overhead de parsing.
Porque é que este padrão funciona
Comparado com a extração baseada em JSON, o IPC Arrow reduz a sobrecarga de CPU e memória do lado do cliente porque evita análises JSON repetidas e materialização de objetos. Pode ler buffers Arrow diretamente numa representação tabular em memória e convertê-los para pandas com menos passos de transformação.
Quando persistem-se os conjuntos de resultados incrementalmente em Delta, também evita a reescrita completa de tabelas. Esta abordagem ajuda a reduzir o uso de unidades de capacidade (UC) enquanto mantém os cenários downstream do Direct Lake atualizados.
O que você constrói
Num bloco de notas do Fabric, você:
- Consulta dois modelos semânticos com DAX.
- Materializar cada resposta como um pandas DataFrame.
- Compare ou combine os DataFrames.
- Incorpore alterações incrementalmente numa tabela Delta.
- Valide que os consumidores da Direct Lake possam aceder aos dados atualizados.
Pré-requisitos
Um espaço de trabalho de capacidade Fabric ou Premium.
Pelo menos dois modelos semânticos que queres comparar ou combinar.
Permissões de construção e leitura em cada modelo semântico.
Um caderno Fabric ligado a uma casa de lago onde podes criar e atualizar tabelas Delta.
Pacotes Python:
%pip install msal requests pyarrow pandasDefinições de inquilino ativadas:
- API REST de execução de conjuntos de dados.
- Permita que as entidades de serviço usem as APIs do Power BI se usar autenticação apenas por aplicação.
Fluxo do caderno de sistema Fabric
O caderno executa estes passos:
- Adquira um token de acesso.
- Executar DAX contra múltiplos modelos semânticos.
- Desserializar as respostas do Arrow em DataFrames pandas.
- Normalizar esquemas e comparar ou combinar DataFrames.
- Fundir incrementalmente resultados numa tabela Delta.
- Validar a disponibilidade de dados para o consumo do Direct Lake.
1 - Adquirir um token Entra Id para o utilizador atual
Na primeira célula de código, defina os alvos semânticos do modelo e adquira um token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - Executar consultas DAX em modelos semânticos
Defina um assistente que execute DAX e devolva um DataFrame pandas a partir de Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Na célula de código seguinte, execute uma consulta DAX específica para cada modelo e marque a proveniência:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - Comparar e combinar DataFrames
Normalize as colunas-chave e depois compare os resultados dos modelos ou combine-os num único conjunto analítico.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Fusão incremental numa tabela Delta
Use uma fusão Delta baseada em colunas de nível granular de negócio. Este padrão atualiza as linhas alteradas e insere novas linhas sem reescrever a tabela completa.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
Para janelas de extração muito grandes, particione a tabela Delta alvo por data e processe em fatias limitadas. Esta abordagem melhora a eficiência da integração e ajuda a controlar o uso do CU.
5 - Validar a prontidão direta do lago
Confirme que a tabela Delta está atualizada e pode ser consultada:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Após a atualização da tabela Delta, os modelos semânticos Direct Lake que fazem referência a essa tabela podem captar os novos dados através do comportamento normal de sincronização.
Disposição sugerida de células de cadernos Fabric
Use esta disposição de células para manter o fluxo de trabalho sustentável:
- Célula Markdown: cenário, IDs de modelo e alvo da tabela.
- Célula Python: importação de pacotes e aquisição de tokens.
- Célula Python: auxiliar de execução DAX.
- Célula Python: extrair dados de cada modelo semântico.
- Célula Python: comparar/combinar pandas DataFrames.
- Célula Python: escreve o DataFrame de staging para o Spark e executa o Delta
MERGE. - Célula Python: valida a contagem de linhas e os últimos carimbos de extração.
Orientação de desempenho
- Mantenha o DAX restrito apenas às colunas e linhas necessárias.
- Use
resultsetRowcountLimite filtros DAX para limitar as janelas de extração. - Prefere fusões incrementais em vez de escritas completas de atualização.
- Reutilizar um único cliente MSAL e cache de token por sessão do notebook.
- Prefiro Arrow end-to-end para extração, para evitar sobrecarga de análise JSON em Python.
- Acompanhe a duração da extração, o tamanho da carga útil e a duração da fusão como métricas operacionais.
Troubleshooting
- 401 Não autorizado: Validar o inquilino, credenciais do cliente e o âmbito.
- HTTP 429: Adicionar tentativa novamente com backoff exponencial e jitter.
- Deriva de esquema entre modelos: Normalizar os nomes das colunas e os tipos de dados antes da fusão.
- Grande utilização de memória em pandas: Processar as saídas do modelo em lotes ou agregá-las em DAX antes da extração.
Observação
Se o chamador tiver permissões insuficientes, a consulta falha, mas a resposta HTTP continua 200 OKa ser . Inspecione o corpo da resposta para detalhes de erros.