Brug Azure OpenAI til at omdanne whiteboard-skitser til datapipelines
Datafabrikken i Microsoft Fabric leverer dataflytnings- og datatransformationstjenester i skyen, der giver dig mulighed for at løse de mest komplekse datafabrik- og ETL-scenarier og giver dig en moderne dataintegrationsoplevelse til at indtage, forberede og transformere data fra et omfattende sæt datakilder. I Data Factory kan du oprette datapipelines for at bruge avancerede dataorkestreringsfunktioner til at oprette fleksible dataarbejdsprocesser, der opfylder virksomhedens behov.
gpt-4o
Med AI-modellen i Azure skubber vi nu grænserne for, hvad du kan gøre med Data Factory, og gør det muligt for dig at oprette dataløsninger ud fra blot et billede.
Hvad skal du bruge for at komme i gang? Bare en Microsoft Fabric-konto og en idé. Her viser vi dig, hvordan du transformerer en whiteboard-ide til en Fabric Data Factory-datapipeline ved hjælp af et billede og gpt-4o
.
Forudsætninger
Før du opretter løsningen, skal du sikre dig, at følgende forudsætninger er konfigureret i Azure og Fabric:
- Microsoft Fabric-aktiveret arbejdsområde.
- Azure OpenAI-konto med en API-nøgle og en
gpt-4o
model installeret. - Et billede af, hvordan pipelinen skal se ud.
Advarsel!
API-nøgler er følsomme oplysninger, og produktionsnøgler bør altid kun gemmes sikkert i Azure Key Vault eller andre sikre butikker. I dette eksempel bruges der kun en OpenAI-nøgle til demonstrationsformål. I forbindelse med produktionskode kan du overveje at bruge Microsoft Entra-id i stedet for nøglegodkendelse til et mere sikkert miljø, der ikke er afhængige af nøgledeling eller risikerer et sikkerhedsbrud, hvis en nøgle kompromitteres.
Trin 1: Upload dit billede til en Lakehouse
Før du kan analysere billedet, skal du uploade det til din Lakehouse. Log på din Microsoft Fabric-konto, og naviger til dit arbejdsområde. Vælg + Nyt element, og opret et nyt Lakehouse.
Når din Lakehouse er konfigureret, skal du oprette en ny mappe under filer, kaldet billeder, og uploade billedet der.
Trin 2: Opret notesbogen i dit arbejdsområde
Nu skal vi bare oprette en notesbog for at udføre en Python-kode, der opsummerer og opretter pipelinen i arbejdsområdet.
Opret en ny notesbog i dit arbejdsområde:
I kodeområdet skal du angive følgende kode, der konfigurerer de påkrævede biblioteker og konfiguration og koder billedet:
# Configuration
AZURE_OPENAI_KEY = "<Your Azure OpenAI key>"
AZURE_OPENAI_GPT4O_ENDPOINT = "<Your Azure OpenAI gpt-4o deployment endpoint>"
IMAGE_PATH = "<Path to your uploaded image file>" # For example, "/lakehouse/default/files/images/pipeline.png"
# Install the OpenAI library
!pip install semantic-link --q
!pip uninstall --yes openai
!pip install openai
%pip install openai --upgrade
# Imports
import os
import requests
import base64
import json
import time
import pprint
import openai
import sempy.fabric as fabric
import pandas as pd
# Load the image
image_bytes = open(IMAGE_PATH, 'rb').read()
encoded_image = base64.b64encode(image_bytes).decode('ascii')
## Request headers
headers = {
"Content-Type": "application/json",
"api-key": AZURE_OPENAI_KEY,
}
Kør denne kodeblok for at konfigurere miljøet.
Trin 3: Bruges gpt-4o
til at beskrive pipelinen (valgfrit)
Dette trin er valgfrit, men viser dig, hvor nemt det er at udtrække oplysninger fra billedet, hvilket kan være relevant for dine formål. Hvis du ikke udfører dette trin, kan du stadig generere JSON-pipelinen i næste trin.
Vælg først Rediger i hovedmenuen for notesbogen, og vælg derefter knappen + Tilføj kodecelle under på værktøjslinjen for at tilføje en ny kodeblok efter den forrige.
Føj derefter følgende kode til den nye sektion. Denne kode viser, hvordan gpt-4o
billedet kan fortolkes og opsummeres for at forstå dets indhold.
# Summarize the image
## Request payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Show list of pipeline activities and how they are connected."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
response_json = response.json()
## Show AI response
print(response_json["choices"][0]['message']['content'])
Kør denne kodeblok for at se AI-opsummering af billedet og dets komponenter.
Trin 4: Generér pipelinens JSON
Føj endnu en kodeblok til notesbogen, og tilføj følgende kode. Denne kode analyserer billedet og genererer JSON-pipelinen.
# Analyze the image and generate the pipeline JSON
## Setup new payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Succeeded is denoted by a green line, and Fail is denoted by a red line. Generate an ADF v2 pipeline JSON with what you see. Return ONLY the JSON text required, without any leading or trailing markdown denoting a code block."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
## Get JSON from request and show
response_json = response.json()
pipeline_json = response_json["choices"][0]['message']['content']
print(pipeline_json)
Kør denne kodeblok for at generere JSON-pipelinen fra billedet.
Trin 4: Opret pipelinen med Fabric REST API'er
Nu, hvor du har hentet pipelinen JSON, kan du oprette den direkte ved hjælp af Fabric REST API'er. Føj endnu en kodeblok til notesbogen, og tilføj følgende kode. Denne kode opretter pipelinen i dit arbejdsområde.
# Convert pipeline JSON to Fabric REST API request
json_data = json.loads(pipeline_json)
# Extract the activities from the JSON
activities = json_data["properties"]["activities"]
# Prepare the data pipeline JSON definition
data = {}
activities_list = []
idx = 0
# Name mapping used to track activity name found in image to dynamically generated name
name_mapping = {}
for activity in activities:
idx = idx + 1
activity_name = activity["type"].replace("Activity","")
objName = f"{activity_name}{idx}"
# store the name mapping so we can deal with dependency
name_mapping[activity["name"]] = objName
if 'dependsOn' in activity:
activity_dependent_list = activity["dependsOn"]
dependent_activity = ""
if ( len(activity_dependent_list) > 0 ):
dependent_activity = activity_dependent_list[0]["activity"]
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} },
"sink": { "datasetSettings": {} } }})
case "Web":
activities_list.append({'name': objName, 'type': "Office365Outlook",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case _:
continue
else:
# simple activities with no dependencies
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} } , "sink": { "datasetSettings": {} } }})
case "SendEmail":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "Web":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline"})
case _:
print("NoOp")
# Now that the activities_list is created, assign it to the activities tag in properties
data['properties'] = { "activities": activities_list}
# Convert data from dict to string, then Byte Literal, before doing a Base-64 encoding
data_str = str(data).replace("'",'"')
createPipeline_json = data_str.encode(encoding="utf-8")
createPipeline_Json64 = base64.b64encode(createPipeline_json)
# Create a new data pipeline in Fabric
timestr = time.strftime("%Y%m%d-%H%M%S")
pipelineName = f"Pipeline from image with AI-{timestr}"
payload = {
"displayName": pipelineName,
"type": "DataPipeline",
"definition": {
"parts": [
{
"path": "pipeline-content.json",
"payload": createPipeline_Json64,
"payloadType": "InlineBase64"
}
]
}
}
print(f"Creating pipeline: {pipelineName}")
# Call the Fabric REST API to generate the pipeline
client = fabric.FabricRestClient()
workspaceId = fabric.get_workspace_id()
try:
response = client.post(f"/v1/workspaces/{workspaceId}/items",json=payload)
if response.status_code != 201:
raise FabricHTTPException(response)
except WorkspaceNotFoundException as e:
print("Workspace is not available or cannot be found.")
except FabricHTTPException as e:
print(e)
print("Fabric HTTP Exception. Check that you have the correct Fabrric API endpoints.")
response = client.get(f"/v1/workspaces/{workspaceId}/Datapipelines")
df_items = pd.json_normalize(response.json()['value'])
print("List of pipelines in the workspace:")
df_items
Output bekræfter navnet på den pipeline, der er oprettet, og viser en liste over dit arbejdsområdes pipelines, så du kan validere, at den findes.
Trin 6: Brug din pipeline
Når din pipeline er oprettet, kan du redigere den i dit Fabric-arbejdsområde for at se dit billede implementeret som en pipeline. Du kan vælge hver aktivitet for at konfigurere den, som du ønsker, og derefter køre og overvåge den efter behov.