Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este tutorial, creas un cuaderno de Microsoft Fabric que extrae datos de varios modelos semánticos de Power BI mediante la API REST Execute DAX Queries. Deserializa las respuestas de IPC de Arrow en DataFrames de pandas, compara y fusiona las salidas del modelo y fusiona los resultados de forma incremental en una tabla Delta en OneLake.
Este patrón está diseñado para científicos de datos e ingenieros de análisis que necesitan extracción de alto rendimiento con una sobrecarga de análisis baja.
¿Por qué funciona este patrón?
En comparación con la extracción basada en JSON, Arrow IPC reduce la sobrecarga de CPU y memoria en el lado cliente, ya que se evita el análisis repetido de JSON y la materialización de objetos. Puede leer búferes de flecha directamente en una representación tabular en memoria y convertir en pandas con menos pasos de transformación.
Al conservar los conjuntos de resultados incrementalmente en Delta, también se evitan reescrituras de tablas completas. Este enfoque ayuda a reducir el uso de la unidad de capacidad (CU) al tiempo que mantiene actuales los escenarios posteriores de Direct Lake.
Lo que construyes
En un cuaderno de Fabric, puedes:
- Consulte dos modelos semánticos con DAX.
- Materialice cada respuesta como DataFrame de pandas.
- Compare o combine los dataframes.
- Combinar incrementalmente los cambios en una tabla Delta.
- Compruebe que los consumidores de Direct Lake pueden recoger los datos actualizados.
Prerrequisitos
Un espacio de trabajo de capacidad Fabric o Premium.
Al menos dos modelos semánticos que desea comparar o combinar.
Permisos de compilación y lectura en cada modelo semántico.
Un cuaderno de Fabric conectado a un lago donde puede crear y actualizar tablas delta.
Paquetes de Python:
%pip install msal requests pyarrow pandasConfiguración de inquilino habilitada:
- Api REST de ejecución de consultas de conjunto de datos.
- Permitir que las entidades de servicio usen las API de Power BI si utiliza la autenticación exclusiva para aplicaciones.
flujo de bloc de notas de Fabric
El cuaderno realiza estos pasos:
- Obtener un token de acceso.
- Ejecute DAX en varios modelos semánticos.
- Deserialize las respuestas de Arrow en pandas DataFrames.
- Normalice esquemas y compare o combine DataFrames.
- Combine incrementalmente los resultados en una tabla Delta.
- Valide la disponibilidad de datos para el consumo en Direct Lake.
1 - Adquirir un token de ID de Entra para el usuario actual
En la primera celda de código, defina los destinos del modelo semántico y adquiera un token.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2: Ejecución de consultas DAX en modelos semánticos
Defina una función auxiliar que ejecute DAX y devuelva un DataFrame de Pandas desde Arrow IPC.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
En la celda de código siguiente, ejecute una consulta DAX específica para cada modelo y etiquete la procedencia:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - Comparar y combinar dataFrames
Normalice las columnas de clave y, a continuación, compare las salidas del modelo o combínelas en un único conjunto analítico.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Fusionar incrementalmente en una tabla Delta
Utilice una combinación Delta basada en columnas de grano empresarial. Este patrón actualiza las filas modificadas e inserta nuevas filas sin volver a escribir la tabla completa.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Sugerencia
Para ventanas de extracción muy grandes, particione la tabla Delta de destino por fecha y proceso en segmentos enlazados. Este enfoque mejora la eficacia de la combinación y ayuda a controlar el uso de CU.
5: Validación de la preparación de Direct Lake
Confirme que la tabla Delta está actualizada y es consultable.
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Una vez actualizada la tabla Delta, los modelos semánticos de Direct Lake que hacen referencia a esa tabla pueden recoger los nuevos datos a través del comportamiento de sincronización normal.
Diseño sugerido de celda del cuaderno Fabric
Usa este diseño de celda para mantener el flujo de trabajo gestionable.
- Celda Markdown: escenario, identificadores de modelo y objetivo de la tabla.
- Python celda: importación de paquetes y adquisición de tokens.
- Python celda: asistente de ejecución DAX.
- Celda de Python: extraer datos de cada modelo semántico.
- Celda de Python: comparar/combinarlas DataFrames de Pandas.
- Celda de Python: escribe el DataFrame provisional en Spark y ejecuta Delta
MERGE. - Celda en Python: validar el número de filas y las marcas temporales de extracción más recientes.
Guía de rendimiento
- Mantenga el ámbito de DAX solo en las columnas y filas necesarias.
- Use
resultsetRowcountLimity filtros DAX para delimitar ventanas de extracción. - Favorece las fusiones incrementales sobre las actualizaciones completas.
- Reutilizar un solo cliente MSAL y caché de tokens por sesión de notebook.
- Se prefiere usar Arrow de extremo a extremo para la extracción, para evitar la sobrecarga de análisis JSON en Python.
- Realice un seguimiento de la duración de la extracción, el tamaño de carga y la duración de la combinación como métricas operativas.
Solución de problemas
- 401 No autorizado: valide el inquilino, las credenciales de cliente y el ámbito.
- HTTP 429: agregar reintento con retroceso exponencial y vibración.
- Desfase de esquema entre modelos: normalice los nombres de columna y los tipos de datos antes de la combinación.
- Gran uso de memoria en pandas: Procese las salidas del modelo en lotes o agréguelo en DAX antes de la extracción.
Nota:
Si el autor de la llamada no tiene permisos suficientes, se produce un error en la consulta, pero la respuesta HTTP sigue siendo 200 OK. Inspeccione el cuerpo de la respuesta para obtener los detalles del error.