Używanie usługi Azure OpenAI do przekształcania szkiców tablicy w potoki danych
Fabryka danych w usłudze Microsoft Fabric udostępnia usługi przenoszenia i przekształcania danych w skali chmury, które umożliwiają rozwiązywanie najbardziej złożonych scenariuszy fabryki danych i etl oraz umożliwia nowoczesne środowisko integracji danych w celu pozyskiwania, przygotowywania i przekształcania danych z bogatego zestawu źródeł danych. W usłudze Data Factory możesz tworzyć potoki danych, aby korzystać z wbudowanych możliwości aranżacji danych, które pozwalają tworzyć elastyczne przepływy pracy danych spełniające potrzeby przedsiębiorstwa.
Teraz, korzystając z modelu sztucznej gpt-4o
inteligencji na platformie Azure, wypychamy limity tego, co można zrobić za pomocą usługi Data Factory i umożliwiamy tworzenie rozwiązań danych na podstawie tylko obrazu.
Co należy rozpocząć? Wystarczy konto usługi Microsoft Fabric i pomysł. W tym miejscu pokazano, jak przekształcić pomysł tablicy w potok danych usługi Fabric Data Factory przy użyciu tylko obrazu i gpt-4o
.
Wymagania wstępne
Przed utworzeniem rozwiązania upewnij się, że w usłudze Azure i Sieci szkieletowej zostały skonfigurowane następujące wymagania wstępne:
- Obszar roboczy z obsługą usługi Microsoft Fabric.
- Konto usługi Azure OpenAI z kluczem interfejsu API i wdrożonym modelem
gpt-4o
. - Obraz przedstawiający wygląd potoku.
Ostrzeżenie
Klucze interfejsu API są poufnymi informacjami, a klucze produkcyjne powinny być zawsze przechowywane bezpiecznie tylko w usłudze Azure Key Vault lub w innych bezpiecznych magazynach. W tym przykładzie użyto klucza OpenAI tylko do celów demonstracyjnych. W przypadku kodu produkcyjnego rozważ użycie identyfikatora Entra firmy Microsoft zamiast uwierzytelniania klucza dla bezpieczniejszego środowiska, które nie polega na udostępnianiu kluczy lub ryzyku naruszenia zabezpieczeń w przypadku naruszenia zabezpieczeń.
Krok 1. Przekazywanie obrazu do usługi Lakehouse
Zanim będzie można przeanalizować obraz, musisz przekazać go do usługi Lakehouse. Zaloguj się do konta usługi Microsoft Fabric i przejdź do obszaru roboczego. Wybierz pozycję + Nowy element i utwórz nowy Lakehouse.
Po skonfigurowaniu usługi Lakehouse utwórz nowy folder w obszarze plików, nazywanych obrazami i przekaż tam obraz.
Krok 2. Tworzenie notesu w obszarze roboczym
Teraz musimy utworzyć notes, aby wykonać kod języka Python, który podsumowuje i tworzy potok w obszarze roboczym.
Utwórz nowy notes w obszarze roboczym:
W obszarze kodu wprowadź następujący kod, który konfiguruje wymagane biblioteki i konfigurację i koduje obraz:
# Configuration
AZURE_OPENAI_KEY = "<Your Azure OpenAI key>"
AZURE_OPENAI_GPT4O_ENDPOINT = "<Your Azure OpenAI gpt-4o deployment endpoint>"
IMAGE_PATH = "<Path to your uploaded image file>" # For example, "/lakehouse/default/files/images/pipeline.png"
# Install the OpenAI library
!pip install semantic-link --q
!pip uninstall --yes openai
!pip install openai
%pip install openai --upgrade
# Imports
import os
import requests
import base64
import json
import time
import pprint
import openai
import sempy.fabric as fabric
import pandas as pd
# Load the image
image_bytes = open(IMAGE_PATH, 'rb').read()
encoded_image = base64.b64encode(image_bytes).decode('ascii')
## Request headers
headers = {
"Content-Type": "application/json",
"api-key": AZURE_OPENAI_KEY,
}
Uruchom ten blok kodu, aby skonfigurować środowisko.
Krok 3. Użyj polecenia gpt-4o
, aby opisać potok (opcjonalnie)
Ten krok jest opcjonalny, ale pokazuje, jak proste jest wyodrębnianie szczegółów z obrazu, co może być istotne dla Twoich celów. Jeśli ten krok nie zostanie wykonany, nadal możesz wygenerować kod JSON potoku w następnym kroku.
Najpierw wybierz pozycję Edytuj w menu głównym notesu, a następnie wybierz przycisk + Dodaj kod poniżej na pasku narzędzi, aby dodać nowy blok kodu po poprzednim.
Następnie dodaj następujący kod do nowej sekcji. Ten kod pokazuje, jak gpt-4o
interpretować i podsumowywać obraz, aby zrozumieć jego zawartość.
# Summarize the image
## Request payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Show list of pipeline activities and how they are connected."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
response_json = response.json()
## Show AI response
print(response_json["choices"][0]['message']['content'])
Uruchom ten blok kodu, aby wyświetlić podsumowanie sztucznej inteligencji obrazu i jego składników.
Krok 4. Generowanie kodu JSON potoku
Dodaj kolejny blok kodu do notesu i dodaj następujący kod. Ten kod analizuje obraz i generuje kod JSON potoku.
# Analyze the image and generate the pipeline JSON
## Setup new payload
payload = {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Succeeded is denoted by a green line, and Fail is denoted by a red line. Generate an ADF v2 pipeline JSON with what you see. Return ONLY the JSON text required, without any leading or trailing markdown denoting a code block."
}
]
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
}
]
}
],
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 800
}
## Send request
try:
response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
raise SystemExit(f"Failed to make the request. Error: {e}")
## Get JSON from request and show
response_json = response.json()
pipeline_json = response_json["choices"][0]['message']['content']
print(pipeline_json)
Uruchom ten blok kodu, aby wygenerować kod JSON potoku na podstawie obrazu.
Krok 4. Tworzenie potoku za pomocą interfejsów API REST sieci szkieletowej
Po uzyskaniu kodu JSON potoku możesz utworzyć go bezpośrednio przy użyciu interfejsów API REST sieci szkieletowej. Dodaj kolejny blok kodu do notesu i dodaj następujący kod. Ten kod tworzy potok w obszarze roboczym.
# Convert pipeline JSON to Fabric REST API request
json_data = json.loads(pipeline_json)
# Extract the activities from the JSON
activities = json_data["properties"]["activities"]
# Prepare the data pipeline JSON definition
data = {}
activities_list = []
idx = 0
# Name mapping used to track activity name found in image to dynamically generated name
name_mapping = {}
for activity in activities:
idx = idx + 1
activity_name = activity["type"].replace("Activity","")
objName = f"{activity_name}{idx}"
# store the name mapping so we can deal with dependency
name_mapping[activity["name"]] = objName
if 'dependsOn' in activity:
activity_dependent_list = activity["dependsOn"]
dependent_activity = ""
if ( len(activity_dependent_list) > 0 ):
dependent_activity = activity_dependent_list[0]["activity"]
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} },
"sink": { "datasetSettings": {} } }})
case "Web":
activities_list.append({'name': objName, 'type': "Office365Outlook",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline",
"dependsOn": [
{
"activity": name_mapping[dependent_activity] ,
"dependencyConditions": [
"Succeeded"
]
}
]
}
)
case _:
continue
else:
# simple activities with no dependencies
match activity_name:
case "Copy":
activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
'typeProperties': { "source": { "datasetSettings": {} } , "sink": { "datasetSettings": {} } }})
case "SendEmail":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "Web":
activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
case "ExecutePipeline":
activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline"})
case _:
print("NoOp")
# Now that the activities_list is created, assign it to the activities tag in properties
data['properties'] = { "activities": activities_list}
# Convert data from dict to string, then Byte Literal, before doing a Base-64 encoding
data_str = str(data).replace("'",'"')
createPipeline_json = data_str.encode(encoding="utf-8")
createPipeline_Json64 = base64.b64encode(createPipeline_json)
# Create a new data pipeline in Fabric
timestr = time.strftime("%Y%m%d-%H%M%S")
pipelineName = f"Pipeline from image with AI-{timestr}"
payload = {
"displayName": pipelineName,
"type": "DataPipeline",
"definition": {
"parts": [
{
"path": "pipeline-content.json",
"payload": createPipeline_Json64,
"payloadType": "InlineBase64"
}
]
}
}
print(f"Creating pipeline: {pipelineName}")
# Call the Fabric REST API to generate the pipeline
client = fabric.FabricRestClient()
workspaceId = fabric.get_workspace_id()
try:
response = client.post(f"/v1/workspaces/{workspaceId}/items",json=payload)
if response.status_code != 201:
raise FabricHTTPException(response)
except WorkspaceNotFoundException as e:
print("Workspace is not available or cannot be found.")
except FabricHTTPException as e:
print(e)
print("Fabric HTTP Exception. Check that you have the correct Fabrric API endpoints.")
response = client.get(f"/v1/workspaces/{workspaceId}/Datapipelines")
df_items = pd.json_normalize(response.json()['value'])
print("List of pipelines in the workspace:")
df_items
Dane wyjściowe potwierdzają nazwę utworzonego potoku i wyświetlają listę potoków obszaru roboczego, aby można było sprawdzić jego obecność.
Krok 6. Korzystanie z potoku
Po utworzeniu potoku możesz edytować go w obszarze roboczym Sieć szkieletowa, aby zobaczyć obraz zaimplementowany jako potok. Możesz wybrać każde działanie, aby skonfigurować je zgodnie z wymaganiami, a następnie uruchomić i monitorować je zgodnie z wymaganiami.