本文說明如何使用 Lakehouse 作為範例資料來源,在 Microsoft Fabric 中設定資料代理程式。 我們會先建立並填入 Lakehouse,然後建立 Fabric 資料代理程式,並將 Lakehouse 新增至其中。 如果你已經有一個 Power BI 語意模型(具備必要的讀寫權限)、倉庫、KQL 資料庫或本體論,請依照相同步驟選擇該來源。 雖然本逐步解說將使用湖倉,但其他來源的範例相同,僅資料來源的選擇有所不同。
這很重要
這項功能目前處於預覽階段。
先決條件
- 已啟用 Microsoft Fabric 的付費 F2 或更高 Fabric 容量,或每個容量的 Power BI Premium (P1 或更高) 容量
- 已啟用 Fabric 資料代理租戶設定。
- 已啟用 AI 的跨地理位置處理。
- 已啟用 AI 的跨地理位置儲存。
- 至少要有一種,且有資料:倉庫、湖屋、一個或多個 Power BI 語意模型、KQL 資料庫,或本體論。
- Power BI 語義模型透過 XMLA 端點租用戶切換 啟用於 Power BI 語義模型數據來源。
這很重要
請確定已在 Power BI 系統管理入口網站中啟用獨立 Copilot 體驗 (租用戶設定 > Copilot > 獨立 Copilot 體驗)。 如果未啟用,即使其他 Copilot 租用戶開關已開啟,您也無法在 Copilot 案例中使用資料代理程式。 如需詳細資訊,請參閱 Power BI 租用戶設定中的 Copilot。
使用 AdventureWorksLH 建立湖泊屋
首先,建立資料湖倉,並填入所需的資料。
如果您在湖倉或倉儲中已經有 AdventureWorksLH 的實例,您可以略過此步驟。 如果沒有,您可以使用 Fabric 筆記本中的下列指示,將數據填入湖倉中。
在您要在其中建立網狀架構資料代理程式的工作區中建立新的筆記本。
在 探索器窗格的左側,選取 新增資料來源。 此選項可讓您新增現有的 Lakehouse 或建立新的 Lakehouse。 為了清晰起見,請建立新的「Lakehouse」,並給它命名。
在頂端儲存格中,新增下列程式碼片段:
import pandas as pd from tqdm.auto import tqdm base = "https://synapseaisolutionsa.z13.web.core.windows.net/data/AdventureWorks" # load list of tables df_tables = pd.read_csv(f"{base}/adventureworks.csv", names=["table"]) for table in (pbar := tqdm(df_tables['table'].values)): pbar.set_description(f"Uploading {table} to lakehouse") # download df = pd.read_parquet(f"{base}/{table}.parquet") # save as lakehouse table spark.createDataFrame(df).write.mode('overwrite').saveAsTable(table)選擇 全部執行。
幾分鐘後,資料湖倉會填入必要的數據。
謹慎
筆記本在繼續執行(例如,由於不慎陷入無限迴圈或不斷輪詢)時,可能會持續消耗 Fabric 容量。 資料載入完成之後,請停止任何作用中的儲存格,並結束筆記本工作階段 (筆記本工具列 > 停止工作階段) (如果您不再需要它)。 避免在沒有設定逾時的情況下添加長時間運行的迴圈。
建立網狀架構資料代理程式
若要建立新的網狀架構資料代理程式,請流覽至您的工作區,然後選取 [+ 新增項目] 按鈕,如下列螢幕擷取畫面所示:
在 [所有項目] 索引標籤中,搜尋 [網狀架構資料代理程式] 以找出適當的選項。 選取之後,提示會要求您提供網狀架構資料代理程式的名稱,如下列螢幕擷取畫面所示:
輸入名稱之後,請繼續進行下列步驟,讓網狀架構資料代理程式符合您的特定需求。
選取資料
選取您在上一個步驟中建立的 Lakehouse,然後選取 [ 新增],如下列螢幕快照所示:
將湖倉新增為資料來源之後,Fabric 資料代理程式頁面左側的 Explorer 窗格會顯示湖倉名稱。 選擇 Lakehouse 以檢視所有可用的表格。 使用複選框來選取您想要提供給 AI 使用的數據表。 在此案例中,請選取下列數據表:
dimcustomerdimdatedimgeographydimproductdimproductcategorydimpromotiondimresellerdimsalesterritoryfactinternetsalesfactresellersales
提供指示
若要新增指示,請選取 [資料代理程式指示 ] 按鈕以開啟右側的指示窗格。 您可以新增下列指示。
AdventureWorksLH 資料來源包含來自三個資料表的資訊:
-
dimcustomer,以取得詳細的客戶人口統計和連絡資訊 -
dimdate,適用於日期相關資料 - 例如行事曆和會計資訊 -
dimgeography,以取得地理詳細數據,包括城市名稱和國家/地區代碼。
使用此數據源進行涉及客戶詳細數據、以時間為基礎的事件和地理位置的查詢和分析。
提供範例
若要新增範例查詢,請選取 [ 範例查詢] 按鈕,以開啟右側的範例查詢窗格。 此窗格提供為所有支持的數據源新增或編輯範例查詢的選項。 針對每個數據源,您可以選取 [新增或編輯範例查詢] 輸入相關範例,如下列螢幕快照所示:
在這裡,您應該為您所建立的 Lakehouse 數據源新增範例查詢。
Question: Calculate the average percentage increase in sales amount for repeat purchases for every zipcode. Repeat purchase is a purchase subsequent to the first purchase (the average should always be computed relative to the first purchase)
SELECT AVG((s.SalesAmount - first_purchase.SalesAmount) / first_purchase.SalesAmount * 100) AS AvgPercentageIncrease
FROM factinternetsales s
INNER JOIN dimcustomer c ON s.CustomerKey = c.CustomerKey
INNER JOIN dimgeography g ON c.GeographyKey = g.GeographyKey
INNER JOIN (
SELECT *
FROM (
SELECT
CustomerKey,
SalesAmount,
OrderDate,
ROW_NUMBER() OVER (PARTITION BY CustomerKey ORDER BY OrderDate) AS RowNumber
FROM factinternetsales
) AS t
WHERE RowNumber = 1
) first_purchase ON s.CustomerKey = first_purchase.CustomerKey
WHERE s.OrderDate > first_purchase.OrderDate
GROUP BY g.PostalCode;
Question: Show the monthly total and year-to-date total sales. Order by year and month.
SELECT
Year,
Month,
MonthlySales,
SUM(MonthlySales) OVER (PARTITION BY Year ORDER BY Year, Month ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS CumulativeTotal
FROM (
SELECT
YEAR(OrderDate) AS Year,
MONTH(OrderDate) AS Month,
SUM(SalesAmount) AS MonthlySales
FROM factinternetsales
GROUP BY YEAR(OrderDate), MONTH(OrderDate)
) AS t
備註
Power BI 語意模型數據源目前不支援新增範例查詢/問題組。
測試及修改網狀架構資料代理程式
既然您已設定網狀架構資料代理程式、新增網狀架構資料代理程式指示,並提供資料湖存放庫的範例查詢,您可以藉由詢問問題和接收答案來與其互動。 當您繼續測試時,您可以新增更多範例,並精簡指示,以進一步改善網狀資料代理程式的效能。 與您的同事共同作業以收集意見反應,並根據其輸入,確保提供的範例查詢和指示符合他們想要詢問的問題類型。
發佈網狀架構數據代理程式
驗證網狀架構資料代理程式的效能之後,您可能會決定發佈它,以便與想要對資料進行問答的同事共用。 在此情況下,請選取 [發佈],如本螢幕擷取畫面所示:
發佈資料代理程式視窗隨即開啟,如下列螢幕擷取畫面所示:
在此方塊中,選取 [發佈] 以發佈網狀架構資料代理程式。 隨即出現網狀架構資料代理程式的已發佈 URL,如下列螢幕擷取畫面所示:
在 Power BI 中使用 Copilot 中的 Fabric 數據代理程式
您可以在發佈 Fabric 數據代理程序之後,使用 Power BI 中的 Copilot 與 Fabric 數據代理程式互動。 在 Power BI 中使用 Copilot 時,您可以直接取用數據代理程式和其他專案(例如報表或語意模型),而不需要在它們之間切換。
選取左側瀏覽窗格中的 [Copilot ] 按鈕,以在Power BI 中開啟 Copilot。 接下來,在底部的文本框中選取 新增項目以取得更好的結果,以新增數據代理程式。 在開啟的視窗中選取 [數據代理程式 ]。 您只能看到您有權存取的數據代理程式。 選擇您想要的數據代理程式,然後選取 [ 確認]。 此範例示範如何使用單一數據代理程式,但您可以新增更多專案,例如其他數據代理程式、報表或語意模型。 下列螢幕快照說明單一數據代理程式的步驟:
既然您已將數據代理程式新增至 Power BI 中的 Copilot,您可以詢問與 Fabric 數據代理程式相關的任何問題,如下列螢幕快照所示:
以程式設計方式使用網狀架構資料代理程式
您可以在網狀架構筆記本內以程式設計方式使用網狀架構資料代理程式。 若要判斷 Fabric 數據代理程式是否有已發佈的 URL 值,請選取 [ 設定],如下列螢幕快照所示:
發佈 Fabric 數據代理程式之前,它沒有已發佈的 URL 值,如下列螢幕快照所示:
如果您之前尚未發佈 Fabric 數據代理程式,您可以依照先前步驟中的指示加以發佈。 然後,您可以複製已發佈的網址,並在 Fabric 筆記本中使用該網址。 如此一來,您可以在網狀架構筆記本中呼叫網狀架構資料代理程式 API,以查詢網狀架構資料代理程式。 將此程式碼片段中複製的網址貼上。 然後將問題替換成與您的 Fabric 資料代理程式相關的任意查詢。 此範例使用 \<generic published URL value\> 作為網址。
這很重要
以程式設計方式呼叫資料代理程式時,請實作:
- 輪詢逾時(請參閱下面的範例)以避免無限循環。
- 最小輪詢頻率(從 2-5 秒開始;僅在需要時增加)。
- 完成後清除已建立的執行緒或資源。
- 完成後,筆記本會話將關閉,以釋放容量給 Fabric。
備註
如果這些確切版本已過時,請將版本釘選 (openai , synapsemlpandas, tqdm, ) 調整為 Fabric 執行階段的最新驗證版本。
%pip install "openai==1.70.0"
%pip install "synapseml==1.0.5" # Required for synapse.ml.mlflow (update version as needed)
%pip install pandas tqdm # Skip if already available in the Fabric runtime
import typing as t
import time
import uuid
# OpenAI SDK internals
from openai import OpenAI
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given
# SynapseML helper for env config
from synapse.ml.mlflow import get_mlflow_env_config
# Removed unused imports: requests, json, pprint, APIStatusError, SynapseTokenProvider
base_url = "https://<generic published base URL value>"
question = "What data sources do you have access to?"
configs = get_mlflow_env_config()
# Create OpenAI Client
class FabricOpenAI(OpenAI):
def __init__(
self,
api_version: str ="2024-05-01-preview",
**kwargs: t.Any,
) -> None:
self.api_version = api_version
default_query = kwargs.pop("default_query", {})
default_query["api-version"] = self.api_version
super().__init__(
api_key="",
base_url=base_url,
default_query=default_query,
**kwargs,
)
def _prepare_options(self, options: FinalRequestOptions) -> None:
headers: dict[str, str | Omit] = (
{**options.headers} if is_given(options.headers) else {}
)
options.headers = headers
headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
if "Accept" not in headers:
headers["Accept"] = "application/json"
if "ActivityId" not in headers:
correlation_id = str(uuid.uuid4())
headers["ActivityId"] = correlation_id
return super()._prepare_options(options)
# Pretty printing helper
def pretty_print(messages):
print("---Conversation---")
for m in messages:
print(f"{m.role}: {m.content[0].text.value}")
print()
fabric_client = FabricOpenAI()
# Create assistant
assistant = fabric_client.beta.assistants.create(model="not used")
# Create thread
thread = fabric_client.beta.threads.create()
# Create message on thread
message = fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
# Create run
run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)
# Wait for run to complete (avoid indefinite loop)
terminal_states = {"completed", "failed", "cancelled", "requires_action"}
poll_interval = 2
timeout_seconds = 300 # Adjust based on expected workload
start_time = time.time()
while run.status not in terminal_states:
if time.time() - start_time > timeout_seconds:
raise TimeoutError(f"Run polling exceeded {timeout_seconds} seconds (last status={run.status})")
run = fabric_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id,
)
print(run.status)
time.sleep(poll_interval)
if run.status != "completed":
print(f"Run finished with status: {run.status}")
# Print messages
response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
pretty_print(response)
# Delete thread
fabric_client.beta.threads.delete(thread_id=thread.id)