Del via


Brug Azure OpenAI til at omdanne whiteboard-skitser til datapipelines

Datafabrikken i Microsoft Fabric leverer dataflytnings- og datatransformationstjenester i skyen, der giver dig mulighed for at løse de mest komplekse datafabrik- og ETL-scenarier og giver dig en moderne dataintegrationsoplevelse til at indtage, forberede og transformere data fra et omfattende sæt datakilder. I Data Factory kan du oprette datapipelines for at bruge avancerede dataorkestreringsfunktioner til at oprette fleksible dataarbejdsprocesser, der opfylder virksomhedens behov.

gpt-4o Med AI-modellen i Azure skubber vi nu grænserne for, hvad du kan gøre med Data Factory, og gør det muligt for dig at oprette dataløsninger ud fra blot et billede.

Hvad skal du bruge for at komme i gang? Bare en Microsoft Fabric-konto og en idé. Her viser vi dig, hvordan du transformerer en whiteboard-ide til en Fabric Data Factory-datapipeline ved hjælp af et billede og gpt-4o.

Forudsætninger

Før du opretter løsningen, skal du sikre dig, at følgende forudsætninger er konfigureret i Azure og Fabric:

  • Microsoft Fabric-aktiveret arbejdsområde.
  • Azure OpenAI-konto med en API-nøgle og en gpt-4o model installeret.
  • Et billede af, hvordan pipelinen skal se ud.

Advarsel!

API-nøgler er følsomme oplysninger, og produktionsnøgler bør altid kun gemmes sikkert i Azure Key Vault eller andre sikre butikker. I dette eksempel bruges der kun en OpenAI-nøgle til demonstrationsformål. I forbindelse med produktionskode kan du overveje at bruge Microsoft Entra-id i stedet for nøglegodkendelse til et mere sikkert miljø, der ikke er afhængige af nøgledeling eller risikerer et sikkerhedsbrud, hvis en nøgle kompromitteres.

Trin 1: Upload dit billede til en Lakehouse

Før du kan analysere billedet, skal du uploade det til din Lakehouse. Log på din Microsoft Fabric-konto, og naviger til dit arbejdsområde. Vælg + Nyt element, og opret et nyt Lakehouse.

Skærmbillede, der viser, hvor du kan oprette et nyt Lakehouse.

Når din Lakehouse er konfigureret, skal du oprette en ny mappe under filer, kaldet billeder, og uploade billedet der.

Skærmbillede, der viser et tegnet billede, der skal konverteres til en pipeline.

Trin 2: Opret notesbogen i dit arbejdsområde

Nu skal vi bare oprette en notesbog for at udføre en Python-kode, der opsummerer og opretter pipelinen i arbejdsområdet.

Opret en ny notesbog i dit arbejdsområde:

Skærmbillede, der viser, hvordan du opretter en ny notesbog i et Data Factory for Fabric-arbejdsområde.

I kodeområdet skal du angive følgende kode, der konfigurerer de påkrævede biblioteker og konfiguration og koder billedet:

# Configuration
AZURE_OPENAI_KEY = "<Your Azure OpenAI key>"
AZURE_OPENAI_GPT4O_ENDPOINT = "<Your Azure OpenAI gpt-4o deployment endpoint>"
IMAGE_PATH = "<Path to your uploaded image file>" # For example, "/lakehouse/default/files/images/pipeline.png"

# Install the OpenAI library
!pip install semantic-link --q 
!pip uninstall --yes openai
!pip install openai
%pip install openai --upgrade

# Imports
import os
import requests
import base64
import json
import time
import pprint
import openai
import sempy.fabric as fabric
import pandas as pd

# Load the image
image_bytes = open(IMAGE_PATH, 'rb').read()
encoded_image = base64.b64encode(image_bytes).decode('ascii')

## Request headers
headers = {
    "Content-Type": "application/json",
    "api-key": AZURE_OPENAI_KEY,
}

Kør denne kodeblok for at konfigurere miljøet.

Trin 3: Bruges gpt-4o til at beskrive pipelinen (valgfrit)

Dette trin er valgfrit, men viser dig, hvor nemt det er at udtrække oplysninger fra billedet, hvilket kan være relevant for dine formål. Hvis du ikke udfører dette trin, kan du stadig generere JSON-pipelinen i næste trin.

Vælg først Rediger i hovedmenuen for notesbogen, og vælg derefter knappen + Tilføj kodecelle under på værktøjslinjen for at tilføje en ny kodeblok efter den forrige.

Skærmbillede, der viser, hvor du kan tilføje en ny kodecelle under den aktuelle i notesbogeditoren.

Føj derefter følgende kode til den nye sektion. Denne kode viser, hvordan gpt-4o billedet kan fortolkes og opsummeres for at forstå dets indhold.

# Summarize the image

## Request payload
payload = {
    "messages": [
    {
        "role": "system",
        "content": [
        {
            "type": "text",
            "text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Show list of pipeline activities and how they are connected."
        }
        ]
    },
    {
        "role": "user",
        "content": [
        {
            "type": "image_url",
            "image_url": {
            "url": f"data:image/jpeg;base64,{encoded_image}"
            }
        }
        ]
    }
    ],
    "temperature": 0.7,
    "top_p": 0.95,
    "max_tokens": 800
}

## Send request
try:
    response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
    response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
    raise SystemExit(f"Failed to make the request. Error: {e}")

response_json = response.json()

## Show AI response
print(response_json["choices"][0]['message']['content'])

Kør denne kodeblok for at se AI-opsummering af billedet og dets komponenter.

Trin 4: Generér pipelinens JSON

Føj endnu en kodeblok til notesbogen, og tilføj følgende kode. Denne kode analyserer billedet og genererer JSON-pipelinen.

# Analyze the image and generate the pipeline JSON

## Setup new payload
payload = {
    "messages": [
    {
        "role": "system",
        "content": [
        {
            "type": "text",
            "text": "You are an AI assistant that helps an Azure engineer understand an image that likely shows a Data Factory in Microsoft Fabric data pipeline. Succeeded is denoted by a green line, and Fail is denoted by a red line. Generate an ADF v2 pipeline JSON with what you see. Return ONLY the JSON text required, without any leading or trailing markdown denoting a code block."
        }
        ]
    },
    {
        "role": "user",
        "content": [
        {
            "type": "image_url",
            "image_url": {
            "url": f"data:image/jpeg;base64,{encoded_image}"
            }
        }
        ]
    }
    ],
    "temperature": 0.7,
    "top_p": 0.95,
    "max_tokens": 800
}

## Send request
try:
    response = requests.post(AZURE_OPENAI_GPT4O_ENDPOINT, headers=headers, json=payload)
    response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
except requests.RequestException as e:
    raise SystemExit(f"Failed to make the request. Error: {e}")

## Get JSON from request and show
response_json = response.json()
pipeline_json = response_json["choices"][0]['message']['content']
print(pipeline_json)

Kør denne kodeblok for at generere JSON-pipelinen fra billedet.

Trin 4: Opret pipelinen med Fabric REST API'er

Nu, hvor du har hentet pipelinen JSON, kan du oprette den direkte ved hjælp af Fabric REST API'er. Føj endnu en kodeblok til notesbogen, og tilføj følgende kode. Denne kode opretter pipelinen i dit arbejdsområde.

# Convert pipeline JSON to Fabric REST API request

json_data = json.loads(pipeline_json)

# Extract the activities from the JSON
activities = json_data["properties"]["activities"]

# Prepare the data pipeline JSON definition
data = {}
activities_list = []

idx = 0

# Name mapping used to track activity name found in image to dynamically generated name
name_mapping = {}

for activity in activities:
    idx = idx + 1
    activity_name = activity["type"].replace("Activity","")

    objName = f"{activity_name}{idx}"

    # store the name mapping so we can deal with dependency 
    name_mapping[activity["name"]] = objName

    if 'dependsOn' in activity: 
        activity_dependent_list = activity["dependsOn"] 
        
        dependent_activity = ""
        if ( len(activity_dependent_list) > 0 ):
            dependent_activity = activity_dependent_list[0]["activity"]

        match activity_name:
            case "Copy":
                activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
                'typeProperties': { "source": { "datasetSettings": {} },
                "sink": { "datasetSettings": {} } }})
            case "Web":
                activities_list.append({'name': objName, 'type': "Office365Outlook",
                        "dependsOn": [
                            {
                                "activity":  name_mapping[dependent_activity] ,
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ]
                    }
                )
            case "ExecutePipeline":
                activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline",
                    "dependsOn": [
                            {
                                "activity":  name_mapping[dependent_activity] ,
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            }
                        ]
                    }
                )
            case _:
                continue
    else:
        # simple activities with no dependencies
        match activity_name:
            case "Copy":
                activities_list.append({'name': objName, 'type': "Copy", 'dependsOn': [],
                'typeProperties': { "source": { "datasetSettings": {} } , "sink": { "datasetSettings": {} } }})
            case "SendEmail":
                 activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
            case "Web":
                activities_list.append({'name': "Send mail on success", 'type': "Office365Outlook"})
            case "ExecutePipeline":
                activities_list.append({'name': "execute pipeline 1", 'type': "ExecutePipeline"})
            case _:
                print("NoOp")

# Now that the activities_list is created, assign it to the activities tag in properties
data['properties'] = { "activities": activities_list}

# Convert data from dict to string, then Byte Literal, before doing a Base-64 encoding
data_str = str(data).replace("'",'"')
createPipeline_json = data_str.encode(encoding="utf-8")
createPipeline_Json64 = base64.b64encode(createPipeline_json)

# Create a new data pipeline in Fabric
timestr = time.strftime("%Y%m%d-%H%M%S")
pipelineName = f"Pipeline from image with AI-{timestr}"

payload = {
        "displayName": pipelineName,
        "type": "DataPipeline",
        "definition": {
           "parts": [ 
             { 
              "path": "pipeline-content.json", 
              "payload": createPipeline_Json64, 
              "payloadType": "InlineBase64" 
              }
            ]
        }
}

print(f"Creating pipeline: {pipelineName}")

# Call the Fabric REST API to generate the pipeline
client = fabric.FabricRestClient()
workspaceId = fabric.get_workspace_id()
try:
    response = client.post(f"/v1/workspaces/{workspaceId}/items",json=payload)
    if response.status_code != 201:
        raise FabricHTTPException(response)
except WorkspaceNotFoundException as e:
    print("Workspace is not available or cannot be found.")
except FabricHTTPException as e:
    print(e)
    print("Fabric HTTP Exception. Check that you have the correct Fabrric API endpoints.")

response = client.get(f"/v1/workspaces/{workspaceId}/Datapipelines")
df_items = pd.json_normalize(response.json()['value'])
print("List of pipelines in the workspace:")
df_items

Output bekræfter navnet på den pipeline, der er oprettet, og viser en liste over dit arbejdsområdes pipelines, så du kan validere, at den findes.

Skærmbillede, der viser outputtet fra notesbogen, efter at pipelinen blev oprettet.

Trin 6: Brug din pipeline

Når din pipeline er oprettet, kan du redigere den i dit Fabric-arbejdsområde for at se dit billede implementeret som en pipeline. Du kan vælge hver aktivitet for at konfigurere den, som du ønsker, og derefter køre og overvåge den efter behov.

Skærmbillede, der viser den pipeline, der er genereret af notesbogen med AI.