자습서: Fabric Notebook에서 대용량 Python 추출

이 자습서에서는 Execute DAX 쿼리 REST API 사용하여 여러 Power BI 의미 체계 모델에서 데이터를 추출하는 Microsoft Fabric Notebook을 빌드합니다. Arrow IPC 응답을 pandas DataFrames로 역직렬화하고, 모델 출력을 비교 및 결합한 후, 결과를 OneLake의 Delta 표에 증분 방식으로 병합합니다.

이 패턴은 낮은 구문 분석 오버헤드로 처리량이 높은 추출이 필요한 데이터 과학자 및 분석 엔지니어를 위해 설계되었습니다.

이 패턴이 작동하는 이유

JSON 기반 추출에 비해 화살표 IPC는 반복되는 JSON 구문 분석 및 개체 구체화를 방지하므로 클라이언트 쪽에서 CPU 및 메모리 오버헤드를 줄입니다. 화살표 버퍼를 테이블 형식 메모리 내 표현으로 직접 읽고 변환 단계가 적은 pandas로 변환할 수 있습니다.

결과 집합을 델타로 증분 방식으로 유지하면 전체 테이블 다시 쓰기도 방지할 수 있습니다. 이 방법은 다운스트림 Direct Lake 시나리오를 최신 상태로 유지하면서 CU(용량 단위) 사용량을 줄이는 데 도움이 됩니다.

당신이 구축한 것

여러분은 한 개의 Fabric Notebook에서 다음을 수행합니다.

  1. DAX를 사용하여 두 의미 체계 모델을 쿼리합니다.
  2. 각 응답을 pandas DataFrame으로 구체화합니다.
  3. 데이터 프레임을 비교하거나 결합합니다.
  4. 변경 내용을 델타 테이블에 증분 방식으로 병합합니다.
  5. Direct Lake 소비자가 업데이트된 데이터를 선택할 수 있는지 확인합니다.

사전 요구 사항

  • Fabric 또는 프리미엄 용량의 워크스페이스입니다.

  • 비교하거나 결합하려는 의미 체계 모델이 두 개 이상 있습니다.

  • 각 의미 체계 모델에 대한 빌드 및 읽기 권한

  • Fabric Notebook은 델타 테이블을 만들고 업데이트할 수 있는 레이크하우스(데이터 저장소 및 분석 플랫폼)에 연결되어 있습니다.

  • Python 패키지:

    %pip install msal requests pyarrow pandas
    
  • 테넌트 설정 활성화됨:

    • 데이터 세트 쿼리 실행 REST API.
    • 앱 전용 인증을 사용하는 경우 서비스 주체가 Power BI API를 사용하도록 허용합니다.

흐름: Fabric 노트북

Notebook은 다음 단계를 수행합니다.

  1. 액세스 토큰을 획득합니다.
  2. 여러 의미 체계 모델에 대해 DAX를 실행합니다.
  3. 화살표 응답을 pandas DataFrames로 역직렬화합니다.
  4. 스키마를 정규화하고 DataFrame을 비교하거나 결합합니다.
  5. 결과를 델타 테이블에 증분 방식으로 병합합니다.
  6. Direct Lake 사용량에 대한 데이터 가용성의 유효성을 검사합니다.

1 - 현재 사용자의 Entra ID 토큰 획득

첫 번째 코드 셀에서 의미 체계 모델 대상을 정의하고 토큰을 획득합니다.

import notebookutils  # available in every Fabric notebook runtime

# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"

# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
    raise RuntimeError(f"Token acquisition failed")

2 - 의미 체계 모델에서 DAX 쿼리 실행

DAX를 실행하고 화살표 IPC에서 pandas DataFrame을 반환하는 도우미를 정의합니다.

import io
import pandas as pd
import pyarrow as pa

from datetime import datetime, timezone

def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
    url = (
        f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
        f"/datasets/{dataset_id}/executeDaxQueries"
    )
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    body = {
        "query": query,
        "resultsetRowcountLimit": 500000
    }

    response = requests.post(url, headers=headers, json=body, timeout=180)
    response.raise_for_status()

    reader = pa.ipc.open_stream(io.BytesIO(response.content))
    table = reader.read_all()
    return table.to_pandas()

다음 코드 셀에서 각 모델에 대해 모델별 DAX 쿼리를 실행하고 출처에 태그를 지정합니다.

dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
    'Date'[Date],
    'Product'[ProductKey],
    "NetSales", [Net Sales],
    "Units", [Units]
)
"""

models = [
    {
        "name": "YOUR_FIRST_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_1",
        "dataset_id": "YOUR_DATASET_ID_1"
    },
    {
        "name": "YOUR_SECOND_SEMANTIC_MODEL",
        "workspace_id": "YOUR_WORKSPACE_ID_2",
        "dataset_id": "YOUR_DATASET_ID_2"
    }
]

frames = []
for m in models:
    df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
    df["model_name"] = m["name"]
    df["extract_utc"] = datetime.now(timezone.utc)
    frames.append(df)

print(f"Extracted {len(frames)} DataFrames.")

3 - DataFrame 비교 및 결합

키 열을 정규화한 다음 모델 출력을 비교하거나 단일 분석 집합으로 결합합니다.

for i, df in enumerate(frames):
    df["Date"] = pd.to_datetime(df["Date"], utc=True)
    df["ProductKey"] = df["ProductKey"].astype("int64")
    frames[i] = df

combined_df = pd.concat(frames, ignore_index=True)

# Example comparison: variance between models by date and product
comparison_df = (
    combined_df
    .pivot_table(
        index=["Date", "ProductKey"],
        columns="model_name",
        values="NetSales",
        aggfunc="sum"
    )
    .reset_index()
)

if "sales_model" in comparison_df and "inventory_model" in comparison_df:
    comparison_df["netsales_delta"] = (
        comparison_df["sales_model"] - comparison_df["inventory_model"]
    )

display(comparison_df.head(20))

4 - 델타 테이블에 점진적 병합

비즈니스 조직 열에 대한 델타 병합 키를 사용합니다. 이 패턴은 변경된 행을 업데이트하고 전체 테이블을 다시 작성하지 않고 새 행을 삽입합니다.

# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")

spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")

spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON  tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Tip

매우 큰 추출 창의 경우 대상 델타 테이블을 날짜별로 분할하고 제한된 조각에서 처리합니다. 이 방법은 병합 효율성을 향상시키고 CU 사용량을 제어하는 데 도움이 됩니다.

5 - Direct Lake 준비 상태 확인

Delta 테이블이 업데이트되고 쿼리 가능한지 확인합니다.

spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)

델타 테이블이 업데이트된 후 해당 테이블을 참조하는 Direct Lake 의미 체계 모델은 정상적인 동기화 동작을 통해 새 데이터를 선택할 수 있습니다.

권장 Fabric 노트북 셀 레이아웃 설계

이 셀 레이아웃을 사용하여 워크플로를 유지 관리할 수 있습니다.

  1. Markdown 셀: 시나리오, 모델 ID 및 테이블 대상입니다.
  2. Python 셀: 패키지 가져오기 및 토큰 획득
  3. Python 셀: DAX 실행 도우미입니다.
  4. Python 셀: 각 의미 체계 모델에서 데이터를 추출합니다.
  5. Python 셀: pandas DataFrames 비교/결합
  6. Python 셀: 스테이징 DataFrame을 Spark에 쓰고 Delta MERGE 실행합니다.
  7. Python 셀: 행 수 및 최신 추출 타임스탬프의 유효성을 검사합니다.

성능 지침

  • DAX 범위를 필요한 열 및 행으로만 유지합니다.
  • resultsetRowcountLimit 및 DAX 필터를 사용하여 추출 창을 한정합니다.
  • 전체 새로 고침 쓰기보다 점진적 병합을 선호합니다.
  • Notebook 세션당 단일 MSAL 클라이언트 및 토큰 캐시를 다시 사용합니다.
  • Python JSON 구문 분석 오버헤드를 방지하려면 추출을 위해 화살표 엔드투엔드 사용을 선호합니다.
  • 추출 기간, 페이로드 크기 및 병합 기간을 작업 메트릭으로 추적합니다.

Troubleshooting

  • 401 권한 없음: 테넌트, 클라이언트 자격 증명 및 범위의 유효성을 검사합니다.
  • HTTP 429: 지수 백오프 및 지터를 사용하여 재시도를 추가합니다.
  • 모델 간의 스키마 드리프트: 병합하기 전에 열 이름과 데이터 형식을 정규화합니다.
  • pandas의 대용량 메모리 사용: 추출 전에 DAX에서 배치 처리하거나 집계하여 모델 출력을 처리하십시오.

메모

호출자에게 권한이 부족한 경우 쿼리는 실패하지만 HTTP 응답은 여전히 200 OK있습니다. 응답 본문에서 오류 세부 정보를 검사합니다.