이 자습서에서는 Execute DAX 쿼리 REST API 사용하여 여러 Power BI 의미 체계 모델에서 데이터를 추출하는 Microsoft Fabric Notebook을 빌드합니다. Arrow IPC 응답을 pandas DataFrames로 역직렬화하고, 모델 출력을 비교 및 결합한 후, 결과를 OneLake의 Delta 표에 증분 방식으로 병합합니다.
이 패턴은 낮은 구문 분석 오버헤드로 처리량이 높은 추출이 필요한 데이터 과학자 및 분석 엔지니어를 위해 설계되었습니다.
이 패턴이 작동하는 이유
JSON 기반 추출에 비해 화살표 IPC는 반복되는 JSON 구문 분석 및 개체 구체화를 방지하므로 클라이언트 쪽에서 CPU 및 메모리 오버헤드를 줄입니다. 화살표 버퍼를 테이블 형식 메모리 내 표현으로 직접 읽고 변환 단계가 적은 pandas로 변환할 수 있습니다.
결과 집합을 델타로 증분 방식으로 유지하면 전체 테이블 다시 쓰기도 방지할 수 있습니다. 이 방법은 다운스트림 Direct Lake 시나리오를 최신 상태로 유지하면서 CU(용량 단위) 사용량을 줄이는 데 도움이 됩니다.
당신이 구축한 것
여러분은 한 개의 Fabric Notebook에서 다음을 수행합니다.
- DAX를 사용하여 두 의미 체계 모델을 쿼리합니다.
- 각 응답을 pandas DataFrame으로 구체화합니다.
- 데이터 프레임을 비교하거나 결합합니다.
- 변경 내용을 델타 테이블에 증분 방식으로 병합합니다.
- Direct Lake 소비자가 업데이트된 데이터를 선택할 수 있는지 확인합니다.
사전 요구 사항
Fabric 또는 프리미엄 용량의 워크스페이스입니다.
비교하거나 결합하려는 의미 체계 모델이 두 개 이상 있습니다.
각 의미 체계 모델에 대한 빌드 및 읽기 권한
Fabric Notebook은 델타 테이블을 만들고 업데이트할 수 있는 레이크하우스(데이터 저장소 및 분석 플랫폼)에 연결되어 있습니다.
Python 패키지:
%pip install msal requests pyarrow pandas테넌트 설정 활성화됨:
- 데이터 세트 쿼리 실행 REST API.
- 앱 전용 인증을 사용하는 경우 서비스 주체가 Power BI API를 사용하도록 허용합니다.
흐름: Fabric 노트북
Notebook은 다음 단계를 수행합니다.
- 액세스 토큰을 획득합니다.
- 여러 의미 체계 모델에 대해 DAX를 실행합니다.
- 화살표 응답을 pandas DataFrames로 역직렬화합니다.
- 스키마를 정규화하고 DataFrame을 비교하거나 결합합니다.
- 결과를 델타 테이블에 증분 방식으로 병합합니다.
- Direct Lake 사용량에 대한 데이터 가용성의 유효성을 검사합니다.
1 - 현재 사용자의 Entra ID 토큰 획득
첫 번째 코드 셀에서 의미 체계 모델 대상을 정의하고 토큰을 획득합니다.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - 의미 체계 모델에서 DAX 쿼리 실행
DAX를 실행하고 화살표 IPC에서 pandas DataFrame을 반환하는 도우미를 정의합니다.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
다음 코드 셀에서 각 모델에 대해 모델별 DAX 쿼리를 실행하고 출처에 태그를 지정합니다.
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - DataFrame 비교 및 결합
키 열을 정규화한 다음 모델 출력을 비교하거나 단일 분석 집합으로 결합합니다.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - 델타 테이블에 점진적 병합
비즈니스 조직 열에 대한 델타 병합 키를 사용합니다. 이 패턴은 변경된 행을 업데이트하고 전체 테이블을 다시 작성하지 않고 새 행을 삽입합니다.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
매우 큰 추출 창의 경우 대상 델타 테이블을 날짜별로 분할하고 제한된 조각에서 처리합니다. 이 방법은 병합 효율성을 향상시키고 CU 사용량을 제어하는 데 도움이 됩니다.
5 - Direct Lake 준비 상태 확인
Delta 테이블이 업데이트되고 쿼리 가능한지 확인합니다.
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
델타 테이블이 업데이트된 후 해당 테이블을 참조하는 Direct Lake 의미 체계 모델은 정상적인 동기화 동작을 통해 새 데이터를 선택할 수 있습니다.
권장 Fabric 노트북 셀 레이아웃 설계
이 셀 레이아웃을 사용하여 워크플로를 유지 관리할 수 있습니다.
- Markdown 셀: 시나리오, 모델 ID 및 테이블 대상입니다.
- Python 셀: 패키지 가져오기 및 토큰 획득
- Python 셀: DAX 실행 도우미입니다.
- Python 셀: 각 의미 체계 모델에서 데이터를 추출합니다.
- Python 셀: pandas DataFrames 비교/결합
- Python 셀: 스테이징 DataFrame을 Spark에 쓰고 Delta
MERGE실행합니다. - Python 셀: 행 수 및 최신 추출 타임스탬프의 유효성을 검사합니다.
성능 지침
- DAX 범위를 필요한 열 및 행으로만 유지합니다.
-
resultsetRowcountLimit및 DAX 필터를 사용하여 추출 창을 한정합니다. - 전체 새로 고침 쓰기보다 점진적 병합을 선호합니다.
- Notebook 세션당 단일 MSAL 클라이언트 및 토큰 캐시를 다시 사용합니다.
- Python JSON 구문 분석 오버헤드를 방지하려면 추출을 위해 화살표 엔드투엔드 사용을 선호합니다.
- 추출 기간, 페이로드 크기 및 병합 기간을 작업 메트릭으로 추적합니다.
Troubleshooting
- 401 권한 없음: 테넌트, 클라이언트 자격 증명 및 범위의 유효성을 검사합니다.
- HTTP 429: 지수 백오프 및 지터를 사용하여 재시도를 추가합니다.
- 모델 간의 스키마 드리프트: 병합하기 전에 열 이름과 데이터 형식을 정규화합니다.
- pandas의 대용량 메모리 사용: 추출 전에 DAX에서 배치 처리하거나 집계하여 모델 출력을 처리하십시오.
메모
호출자에게 권한이 부족한 경우 쿼리는 실패하지만 HTTP 응답은 여전히 200 OK있습니다. 응답 본문에서 오류 세부 정보를 검사합니다.
관련 콘텐츠
- DAX 쿼리 실행 API 이해
- DAX 쿼리 실행 REST API 시작
Tutorial: DAX 쿼리 실행 REST API - DAX 쿼리 실행 REST API에 대한 모범 사례