Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, Execute DAX Sorguları REST API kullanarak birden çok Power BI anlam modelindeki verileri ayıklayan bir Microsoft Fabric not defteri oluşturursunuz. Ok IPC yanıtlarını pandas DataFrames'te seri durumdan çıkarabilirsiniz, model çıkışlarını karşılaştırıp birleştirir ve sonuçları artımlı olarak OneLake'deki bir Delta tablosuyla birleştirirsiniz.
Bu kalıp, düşük ayrıştırma maliyetiyle yüksek hızlı veri çıkarımına ihtiyacı olan veri bilim uzmanları ve analiz mühendisleri için tasarlanmıştır.
Bu desen neden çalışır?
JSON tabanlı ayıklamaya kıyasla, tekrarlanan JSON ayrıştırmaları ve nesne gerçekleştirme işlemlerinden kaçındığınız için Arrow IPC, istemci tarafında CPU ve bellek üzerindeki yükü azaltır. Arrow arabelleklerini doğrudan tablo şeklinde bellek içi bir gösterime okuyabilir ve daha az dönüştürme adımıyla pandas'a çevirebilirsiniz.
Sonuç kümelerini artımlı olarak Delta olarak kalıcı hale eklediğinizde, tam tablo yeniden yazma işlemlerinden de kaçınırsınız. Bu yaklaşım, aşağı akış Direct Lake senaryolarını güncel tutarken kapasite birimi (CU) kullanımını azaltmaya yardımcı olur.
Ne inşa ettiğiniz
Bir Fabric not defterinde şunları yapın:
- DAX ile iki anlam modeli sorgulama.
- Her bir yanıtı pandas DataFrame olarak dönüştür.
- DataFrame'leri karşılaştırın veya birleştirin.
- Değişiklikleri artımlı olarak Delta tablosuna birleştirin.
- Direct Lake tüketicilerinin güncelleştirilmiş verileri alabildiğini doğrulayın.
Prerequisites
Fabric veya Premium kapsamlı çalışma alanı.
Karşılaştırmak veya birleştirmek istediğiniz en az iki anlam modeli.
Her bir anlam modeli için derleme ve okuma izinleri.
Delta tabloları oluşturabileceğiniz ve güncelleştirebileceğiniz bir lakehouse'a (göl evi) bağlı Fabric not defteri.
Python paketleri:
%pip install msal requests pyarrow pandasKiracı ayarları etkin:
- Veri Kümesi Sorguları REST API'si
- Yalnızca uygulama kimlik doğrulaması kullanıyorsanız hizmet sorumlularının Power BI API'leri kullanmasına izin verme.
Not defteri akışını Fabric
Not defteri şu adımları gerçekleştirir:
- Erişim belirteci edinin.
- DAX'i birden çok anlam modeline karşı yürütür.
- Arrow yanıtlarını pandas DataFrames'e çözümleme.
- Şemaları normalleştirin ve DataFrame'leri karşılaştırın veya birleştirin.
- Sonuçları artımlı olarak Delta tablosuyla birleştirin.
- Direct Lake tüketimi için veri kullanılabilirliğini doğrulayın.
1 - Geçerli kullanıcı için bir Entra Id belirteci alma
İlk kod hücresinde anlamsal model hedeflerini tanımlayın ve bir belirteç alın.
import notebookutils # available in every Fabric notebook runtime
# Power BI resource URI — must match this exact value
PBI_RESOURCE = "https://analysis.windows.net/powerbi/api"
# Acquire an Entra Id token for the current user (or workspace identity)
# using the notebook's built-in credential provider.
access_token = notebookutils.credentials.getToken(PBI_RESOURCE)
if access_token is None:
raise RuntimeError(f"Token acquisition failed")
2 - Anlamsal modeller arasında DAX sorguları yürütme
DAX çalıştıran ve Arrow IPC'den pandas DataFrame döndüren bir yardımcı tanımlayın.
import io
import pandas as pd
import pyarrow as pa
from datetime import datetime, timezone
def execute_dax_to_pandas(workspace_id: str, dataset_id: str, query: str) -> pd.DataFrame:
url = (
f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}"
f"/datasets/{dataset_id}/executeDaxQueries"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
body = {
"query": query,
"resultsetRowcountLimit": 500000
}
response = requests.post(url, headers=headers, json=body, timeout=180)
response.raise_for_status()
reader = pa.ipc.open_stream(io.BytesIO(response.content))
table = reader.read_all()
return table.to_pandas()
Sonraki kod hücresinde, her model için modele özgü bir DAX sorgusu ve etiket kökeni için bir sorgu çalıştırın:
dax_query = """
EVALUATE
SUMMARIZECOLUMNS(
'Date'[Date],
'Product'[ProductKey],
"NetSales", [Net Sales],
"Units", [Units]
)
"""
models = [
{
"name": "YOUR_FIRST_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_1",
"dataset_id": "YOUR_DATASET_ID_1"
},
{
"name": "YOUR_SECOND_SEMANTIC_MODEL",
"workspace_id": "YOUR_WORKSPACE_ID_2",
"dataset_id": "YOUR_DATASET_ID_2"
}
]
frames = []
for m in models:
df = execute_dax_to_pandas(m["workspace_id"], m["dataset_id"], dax_query)
df["model_name"] = m["name"]
df["extract_utc"] = datetime.now(timezone.utc)
frames.append(df)
print(f"Extracted {len(frames)} DataFrames.")
3 - DataFrame'leri karşılaştırma ve birleştirme
Anahtar sütunları normalleştirin, ardından model çıkışlarını karşılaştırın veya bunları tek bir analiz kümesinde birleştirin.
for i, df in enumerate(frames):
df["Date"] = pd.to_datetime(df["Date"], utc=True)
df["ProductKey"] = df["ProductKey"].astype("int64")
frames[i] = df
combined_df = pd.concat(frames, ignore_index=True)
# Example comparison: variance between models by date and product
comparison_df = (
combined_df
.pivot_table(
index=["Date", "ProductKey"],
columns="model_name",
values="NetSales",
aggfunc="sum"
)
.reset_index()
)
if "sales_model" in comparison_df and "inventory_model" in comparison_df:
comparison_df["netsales_delta"] = (
comparison_df["sales_model"] - comparison_df["inventory_model"]
)
display(comparison_df.head(20))
4 - Delta tablosuna artımlı olarak birleştirme
İş-tanecik sütunlarında anahtar kullanılarak bir Delta birleştirme kullanın. Bu düzen değiştirilen satırları güncelleştirir ve tablonun tamamını yeniden yazmadan yeni satırlar ekler.
# In Fabric notebooks, Spark is available by default.
spark_df = spark.createDataFrame(combined_df)
spark_df.createOrReplaceTempView("stg_semantic_extract")
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.analytics.semantic_extract_delta
USING DELTA
AS
SELECT * FROM stg_semantic_extract WHERE 1 = 0
""")
spark.sql("""
MERGE INTO lakehouse.analytics.semantic_extract_delta AS tgt
USING stg_semantic_extract AS src
ON tgt.Date = src.Date
AND tgt.ProductKey = src.ProductKey
AND tgt.model_name = src.model_name
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Tip
Çok büyük ayıklama pencereleri için hedef Delta tablosunu tarihe ve sınırlanmış dilimlerde işleme göre bölümleyin. Bu yaklaşım birleştirme verimliliğini artırır ve CU kullanımını denetlemeye yardımcı olur.
5 - Direct Lake hazırlığını doğrulama
Delta tablosunun güncelleştirildiğini ve sorgulanabilir olduğunu onaylayın:
spark.sql("""
SELECT model_name, COUNT(*) AS row_count, MAX(extract_utc) AS latest_extract
FROM lakehouse.analytics.semantic_extract_delta
GROUP BY model_name
""").show(truncate=False)
Delta tablosu güncelleştirildikten sonra, bu tabloya başvuran Direct Lake semantik modelleri normal eşitleme davranışı aracılığıyla yeni verileri alabilir.
Önerilen Fabric notebook hücre yerleşimi
İş akışının sürdürülebilir kalmasını sağlamak için bu hücre düzenini kullanın:
- Markdown hücresi: senaryo, model ID'leri ve tablo hedefi.
- Python hücresi: paketlerin içe aktarılması ve belirteç elde etme.
- Python hücre: DAX yürütme yardımcısı.
- Python hücre: her anlamsal modelden veri ayıklama.
- Python hücresi: pandas veri çerçevelerini karşılaştırın/birleştirin.
- Python hücresi: Hazırlık aşaması DataFrame'i Spark'a yazın ve Delta
MERGEçalıştırın. - Python hücresi: Satır sayılarını ve en son ayıklama zaman damgalarını doğrulayın.
Performans kılavuzu
- DAX'i yalnızca gerekli sütunlar ve satırlarla sınırlandırın.
- Ayıklama pencerelerini sınırlamak için
resultsetRowcountLimitve DAX filtrelerini kullanın. - Artan birleştirmeleri tam yenileştirme yazma işlemlerine tercih edin.
- Not defteri oturumu başına tek bir MSAL istemci ve belirteç önbelleğini yeniden kullanın.
- Python'da JSON işleme yükünden kaçınmak için verilerin ayıklanmasında uçtan uca Arrow kullanmayı tercih edin.
- İşlem ölçümleri olarak ayıklama süresini, yük boyutunu ve birleştirme süresini izleyin.
Troubleshooting
- 401 Yetkisiz: Kiracıyı, istemci kimlik bilgilerini ve kapsamı doğrulayın.
- HTTP 429: Üstel geri alma ve titreme ile yeniden deneme ekleyin.
- Modeller arasında şema kayma: Birleştirmeden önce sütun adlarını ve veri türlerini normalleştirin.
- Pandas'ta büyük bellek kullanımı: Ayıklamadan önce model çıktılarını toplu olarak işleyin veya DAX'ta birleştirin.
Uyarı
Çağıranın izinleri yetersizse sorgu başarısız olur ancak HTTP yanıtı hala 200 OKolur. Hata ayrıntıları için yanıt gövdesini inceleyin.