使用 Azure OpenAI 將白板草圖轉換成數據管線
Microsoft Fabric 中的數據處理站提供雲端規模的數據移動和數據轉換服務,可讓您解決最複雜的數據處理站和 ETL 案例,併為您提供新式數據整合體驗,從豐富的數據源擷取、準備及轉換數據。 在 Data Factory 中,您可以建立數據管線,以使用現用的豐富數據協調流程功能來撰寫符合您企業需求的彈性數據工作流程。
現在,在 gpt-4o
Azure 中使用 AI 模型,我們會推送您可以使用 Data Factory 執行的動作限制,並讓您從映像建立數據解決方案。
您需要開始使用什麼? 只是一個Microsoft網狀架構帳戶和想法。 在這裡,我們會示範如何使用圖片和 gpt-4o
,將白板概念轉換成 Fabric Data Factory 數據管線。
必要條件
建立解決方案之前,請確定 Azure 和 Fabric 中已設定下列必要條件:
- Microsoft已啟用網狀架構的工作區。
- 已部署 API 金鑰和
gpt-4o
模型的 Azure OpenAI 帳戶。 - 您要管線外觀的影像。
警告
API 金鑰是敏感性資訊,生產密鑰應該一律只安全地儲存在 Azure 金鑰保存庫 或其他安全存放區中。 此範例只會使用 OpenAI 金鑰進行示範。 針對生產程式代碼,請考慮針對不安全的環境使用 Microsoft Entra ID,而不是密鑰驗證,如果密鑰遭到入侵,則不會依賴密鑰共用或有安全性缺口的風險。
步驟 1:將您的影像上傳至 Lakehouse
您必須先將影像上傳至 Lakehouse,才能分析影像。 登入您的 Microsoft Fabric 帳戶,並流覽至您的工作區。 選擇 (+ 新增項目) 並建立新的 Lakehouse。
設定 Lakehouse 之後,請在檔案下建立新的資料夾,稱為影像,然後上傳該處的影像。
步驟 2:在工作區中建立筆記本
現在,我們只需要建立筆記本來執行一些 Python 程式代碼,以摘要並建立工作區中的管線。
在工作區中建立新的 Notebook:
在程式代碼區域中,輸入下列程式代碼,以設定必要的連結庫和組態,並編碼影像:
# Configuration
AZURE_OPENAI_KEY = "<Your Azure OpenAI key>"
AZURE_OPENAI_GPT4O_ENDPOINT = "<Your Azure OpenAI gpt-4o deployment endpoint>"
IMAGE_PATH = "<Path to your uploaded image file>" # For example, "/lakehouse/default/files/images/pipeline.png"
# Install the OpenAI library
!pip install semantic-link --q
!pip uninstall --yes openai
!pip install openai
%pip install openai --upgrade
# Imports
import os
import requests
import base64
import json
import time
import pprint
import openai
import sempy.fabric as fabric
import pandas as pd
# Load the image
image_bytes = open(IMAGE_PATH, 'rb').read()
encoded_image = base64.b64encode(image_bytes).decode('ascii')
## Request headers
headers = {
"Content-Type": "application/json",
"api-key": AZURE_OPENAI_KEY,
}
執行此程式代碼區塊來設定環境。
步驟 3:用來 gpt-4o
描述管線(選擇性)
此步驟是選擇性的,但示範如何從影像擷取詳細數據,這可能與您的用途相關。 如果您未執行此步驟,您仍然可以在下一個步驟中產生管線 JSON。
首先選取 Notebook 主選單上的 [編輯 ],然後選取 工具列上按鈕下方 的 [+ 新增程式代碼] 單元格,以在上一個程式碼之後新增程式碼區塊。
然後將下列程式代碼新增至新的區段。 此程式代碼示範如何 gpt-4o
解譯和摘要影像以瞭解其內容。
# Summarize the image
## Request payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Show list of pipeline activities and how they are connected."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
response_json = response.json()
## Show AI response
print(response_json["choices"][0]['message']['content'])
執行此程式代碼區塊以查看影像及其元件的 AI 摘要。
步驟 4:產生管線 JSON
將另一個程式代碼區塊新增至 Notebook,並新增下列程式代碼。 此程式代碼會分析影像併產生管線 JSON。
# Analyze the image and generate the pipeline JSON
## Setup new payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Succeeded is denoted by a green line, and Fail is denoted by a red line. Generate an ADF v2 pipeline JSON with what you see. Return ONLY the JSON text required, without any leading or trailing markdown denoting a code block."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
## Get JSON from request and show
response_json = response.json()
pipeline_json = response_json["choices"][0]['message']['content']
print(pipeline_json)
執行此程式代碼區塊,從映射產生管線 JSON。
步驟 4:使用網狀架構 REST API 建立管線
既然您已取得管線 JSON,您可以使用網狀架構 REST API 直接建立它。 將另一個程式代碼區塊新增至 Notebook,並新增下列程式代碼。 此程序代碼會在您的工作區中建立管線。
# Convert pipeline JSON to Fabric REST API request
json_data = json.loads(pipeline_json)
# Extract the activities from the JSON
activities = json_data["properties"]["activities"]
# Prepare the data pipeline JSON definition
data = {}
activities_list = []
idx = 0
# Name mapping used to track activity name found in image to dynamically generated name
name_mapping = {}
for activity in activities:
idx = idx + 1
activity_name = activity["type"].replace("Activity","")
objName = f"{activity_name}{idx}"
# store the name mapping so we can deal with dependency
name_mapping[activity["name"]] = objName
if 'dependsOn' in activity:
activity_dependent_list = activity["dependsOn"]
dependent_activity = ""
if ( len(activity_dependent_list) > 0 ):
dependent_activity = activity_dependent_list[0]["activity"]
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} },
"sink": { "datasetSettings": {} } }})
case "Web":
activities_list.append({'name': objName, 'type': "Office365Outlook",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case _:
continue
else:
# simple activities with no dependencies
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} } , "sink": { "datasetSettings": {} } }})
case "SendEmail":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "Web":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline"})
case _:
print("NoOp")
# Now that the activities_list is created, assign it to the activities tag in properties
data['properties'] = { "activities": activities_list}
# Convert data from dict to string, then Byte Literal, before doing a Base-64 encoding
data_str = str(data).replace("'",'"')
createPipeline_json = data_str.encode(encoding="utf-8")
createPipeline_Json64 = base64.b64encode(createPipeline_json)
# Create a new data pipeline in Fabric
timestr = time.strftime("%Y%m%d-%H%M%S")
pipelineName = f"Pipeline from image with AI-{timestr}"
payload = {
"displayName": pipelineName,
"type": "DataPipeline",
"definition": {
"parts": [
{
"path": "pipeline-content.json",
"payload": createPipeline_Json64,
"payloadType": "InlineBase64"
}
]
}
}
print(f"Creating pipeline: {pipelineName}")
# Call the Fabric REST API to generate the pipeline
client = fabric.FabricRestClient()
workspaceId = fabric.get_workspace_id()
try:
response = client.post(f"/v1/workspaces/{workspaceId}/items",json=payload)
if response.status_code != 201:
raise FabricHTTPException(response)
except WorkspaceNotFoundException as e:
print("Workspace is not available or cannot be found.")
except FabricHTTPException as e:
print(e)
print("Fabric HTTP Exception. Check that you have the correct Fabrric API endpoints.")
response = client.get(f"/v1/workspaces/{workspaceId}/Datapipelines")
df_items = pd.json_normalize(response.json()['value'])
print("List of pipelines in the workspace:")
df_items
輸出會確認已建立的管線名稱,並顯示工作區管線的清單,讓您可以驗證它是否存在。
步驟 6:使用您的管線
建立管線之後,您可以在 Fabric 工作區中編輯它,以查看您的映像實作為管線。 您可以選取每個活動來視需要加以設定,然後視需要執行並監視它。