Använda Azure OpenAI för att omvandla whiteboardskisser till Apache Airflow DAG:er
Apache Airflow-jobb i Microsoft Fabric ger molnbaserad upplevelse för datatekniker och dataforskare, med funktioner som omedelbar körningsetablering, molnbaserad redigering, dynamisk autoskalning, intelligent autopaus och förbättrad säkerhet. Det är en fullständigt hanterad tjänst som gör att du kan skapa, schemalägga och övervaka Apache Airflow-arbetsflöden i molnet utan att behöva bekymra dig om underliggande infrastruktur.
Nu, med gpt-4o
AI-modellen i Azure, tänjer vi på gränserna för vad du kan göra med Apache Airflow-jobb och gör det möjligt för dig att skapa Apache Airflow DAG:er från bara din whiteboard-skissidé. Den här funktionen är användbar för datatekniker och dataforskare som snabbt vill skapa prototyper och visualisera sina dataarbetsflöden.
I den här artikeln skapar du ett änden-till-änden-arbetsflöde som laddar ned skissen lagrad i din Lakehouse, använder gpt-4o
för att omvandla den till en Apache Airflow DAG och laddar in den i Apache Airflowjobb för körning.
Förutsättningar
Innan du skapar lösningen måste du se till att följande förutsättningar har konfigurerats i Azure och Fabric:
Aktivera Apache Airflow-jobb i klientorganisationen.
Notera
Eftersom Apache Airflow-jobbet är i förhandsversionstillstånd måste du aktivera det via klientadministratören. Om du redan ser Apache Airflow-jobb kanske klientadministratören redan har aktiverat det.
Ett Azure OpenAI- konto med en API-nyckel och en distribuerad gpt-4o-modell.
Skapa en Microsoft Entra-ID-app om du inte har någon.
Lägg till tjänstens huvudanvändare som "Deltagare" i din Microsoft Fabric-arbetsyta.
Ett diagram över hur du vill att Apache Airflow DAG ska se ut eller spara den angivna bilden i steg 1 till den lokala datorn.
Lägg till följande Python-paket i
requirements.txt
som finns i din Apache Airflow-jobbmiljö.azure-storage-file-datalake Pillow
Steg 1: Ladda upp avbildningen till Fabric Lakehouse
Innan du kan analysera bilden måste du ladda upp den till Lakehouse.
Ladda upp filen från den lokala datorn till mappen
Files
i Lakehouse.Kopiera lagringskontonamnet för din Fabric Lakehouse. Det används i Apache Airflow-anslutningen för att autentisera med Lakehouse.
Steg 2: Konfigurera miljövariabler för autentisering med Lakehouse och Azure OpenAI.
Obs! Den här självstudien baseras på Airflow version 2.6.3.
Autentiseringsuppgifter för Lakehouse Rest-API:er.
Vi ska använda Lakehouse Rest API:er för att ladda ned avbildningen från Lakehouse. Om du vill autentisera med Lakehouse Rest API:er måste du ange följande miljövariabler i Apache Airflow-jobbet.
-
FABRIC_CLIENT_ID
: Klient-ID för Microsoft Entra-ID-appen. -
FABRIC_CLIENT_SECRET
: Klienthemligheten för Microsoft Entra-ID-appen. -
FABRIC_TENANT_ID
: Hyres-ID för Microsoft Entra ID-appen.
Autentiseringsuppgifter för Azure OpenAI
Vi använder distributionen av gpt-4o
-modellen i Azure OpenAI för att analysera skissen av pipelinen och konvertera den till en Apache Airflow DAG. Om du vill ansluta till Azure OpenAI-API:et lagrar du API-nyckeln och slutpunkten i miljövariabler:
-
OPENAI_API_KEY
: Ange din Azure OpenAI API-nyckel. -
OPENAI_API_ENDPOINT
: Ange slutpunkts-URL:en för din distribueradegpt-4o
modell. Till exempelhttps://ai-contosoai6211465843515213.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2024-02-15-preview
.
Steg 3: Skapa en Apache Airflow DAG för att generera DAG:er från skisser
När alla förutsättningar är klara är du redo att konfigurera arbetsflödet för Azure OpenAI DAG Generator.
Så här fungerar Azure OpenAI DAG Generator
- Ladda ned skissen från lakehouse: Avbildningen är kodad i base64-format och skickas till Azure OpenAI.
- Generera DAG Code med Azure OpenAI: Arbetsflödet använder
gpt-4o
-modellen för att generera DAG-koden från skissen och den angivna systemprompten. - Azure OpenAI tolkar indatabilden och systemprompten och genererar python-kod som representerar en Apache Airflow DAG. Svaret innehåller den här koden som en del av API-utdata.
- Den genererade DAG-koden hämtas från API-svaret och skrivs till en Python-fil i katalogen
dags
. Innan du använder filen konfigurerar du de anslutningar som krävs av operatorerna i Apache Airflow och filen är omedelbart redo att användas i Apache Airflow Jobs-gränssnittet.
Kod för Azure OpenAI DAG Generator
Följ nu stegen för att implementera arbetsflödet:
- Skapa en fil med namnet openapi_dag_generator.py i katalogen dags i Apache Airflow-projektet.
- Lägg till följande kod i filen. Ersätt
yourStorageAccountName
,workspace_name
ochfile_path
med de faktiska värdena och spara filen.import io import os import json import base64 import requests from PIL import Image from pendulum import datetime from airflow.models import Variable from airflow.models.param import Param from airflow.decorators import dag, task from airflow.models.baseoperator import chain from azure.storage.filedatalake import DataLakeServiceClient from azure.identity import ClientSecretCredential @dag( start_date=datetime(2023, 11, 1), schedule=None, catchup=False, params={ "system_prompt": Param( 'You are an AI assistant that helps to write an Apache Airflow DAG code by understanding an image that shows an Apache Airflow DAG containing airflow tasks, task descriptions, parameters, trigger rules and edge labels.\ You have to priortize the Apache Airflow provider operators over Apache Airflow core operators if they resonates more with task description.\ Use the most appropriate Apache Airflow operators as per the task description\ To give the label to the DAG edge use the Label from the airflow.utils.edgemodifier class\ You can use Dummy operators for start and end tasks. \ Return apache airflow dag code in a valid json format following the format:```json{ "dag": "value should be Apache Airflow DAG code"}```', type="string", title="Give a prompt to the Airflow Expert.", description="Enter what you would like to ask Apache Airflow Expert.", min_length=1, max_length=500, ), "seed": Param(42, type="integer"), "temperature": Param(0.1, type="number"), "top_p": Param(0.95, type="number"), "max_tokens": Param(800, type="integer"), }, ) def OpenAI_Dag_Generator(): """ A DAG that generates an Apache Airflow DAG code using `gpt-4o` OpenAI model based on a diagram image stored in Azure Blob Storage. The generated DAG is saved in the `dags` folder for execution. """ @task def fetch_image_from_lakehouse(workspace_name: str, file_path: str): """ Downloads an image from Fabric Lakehouse and encodes it as a Base64 string. :param workspace_name: Name of the workspace where your Lakehouse is located. :param file_path: Relative file path stored in the Fabric Lakehouse. :return: Dictionary containing the encoded image as a Base64 string. """ account_url = f"https://{yourStorageAccountName}.dfs.fabric.microsoft.com" client_id = os.getenv("FABRIC_CLIENT_ID") client_secret = os.getenv("FABRIC_CLIENT_SECRET") tenant_id = os.getenv("FABRIC_TENANT_ID") tokenCredential = ClientSecretCredential( tenant_id=tenant_id, client_id=client_id, client_secret=client_secret ) lakehouse_client = DataLakeServiceClient( account_url, credential=tokenCredential ) blob_data = lakehouse_client.get_file_client(workspace_name, file_path).download_file().readall() image = Image.open(io.BytesIO(blob_data)) # Encode image as Base64 buffered = io.BytesIO() image.save(buffered, format="PNG") encoded_image = base64.b64encode(buffered.getvalue()).decode('ascii') return {"encoded_image": encoded_image} @task def generate_dag_code_from_openai(image_from_blob: dict, system_prompt: str, **context): """ Sends the encoded image to the OpenAI gpt-4o model to generate an Apache Airflow DAG code. :param encoded_image: Dictionary containing the Base64-encoded image. :param system_prompt: Prompt to ask the OpenAI model to generate the DAG code. :return: Dictionary containing the generated DAG code as a string. """ azureAI_api_key = os.getenv("OPENAI_API_KEY") azureAI_endpoint = os.getenv("OPENAI_API_ENDPOINT") image = image_from_blob["encoded_image"] headers = { "Content-Type": "application/json", "api-key": azureAI_api_key, } payload = { "messages": [ { "role": "system", "content": [ { "type": "text", "text": system_prompt } ] }, { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image}" } } ] } ], "seed": context["params"]["seed"], "temperature": context["params"]["temperature"], "top_p": context["params"]["top_p"], "max_tokens": context["params"]["max_tokens"] } response = requests.post(azureAI_endpoint, headers=headers, json=payload) response.raise_for_status() # Get JSON from request and show response_json = response.json() # Check if 'choices' and 'message' are present in the response if 'choices' in response_json and len(response_json['choices']) > 0: content = response_json['choices'][0]['message']['content'] start_index = content.find('```json') end_index = content.rfind("```") # Validate JSON block delimiters if start_index == -1 or end_index == -1: raise ValueError("JSON block delimiters (```json ... ```) not found in the content.") # Extract and parse the JSON string extracted_json_str = content[start_index + 7:end_index].strip() if not extracted_json_str: raise ValueError("Extracted JSON string is empty.") # Convert to a Python dictionary dag_json = json.loads(extracted_json_str) dag_code = dag_json.get("dag") if not dag_code: raise ValueError("'dag' key not found in the extracted JSON.") return {"dag_code": dag_code} return response_json @task def save_dag(xcom_dag_code: dict): """ Saves the generated DAG code to a Python file in the `dags` directory. """ try: with open("dags/openai_dag.py", "w") as f: f.write(xcom_dag_code["dag_code"]) print("DAG code saved successfully.") except Exception as e: raise ValueError(f"Error saving DAG code: {str(e)}") chain( save_dag( generate_dag_code_from_openai( fetch_image_from_lakehouse( workspace_name="airflow-dag-images", # Your Fabric Workspace file_path="lakehouse_ai.Lakehouse/Files/airflow-dag-diagram.png" # Path to the image file located in the Lakehouse ), "{{ params.system_prompt }}" ) ) ) OpenAI_Dag_Generator()
Steg 4: Utlösa DAG från Apache Airflow-användargränssnittet
- Klicka på
Monitor Airflow
. - Gå till DAG-fliken och leta reda på
OpenAI_Dag_Generator
DAG. Klicka på den. - Klicka på uppspelningsknappen och välj
Trigger DAG w/ config
. - Du presenteras med ett formulär som visar DAG-parametrar. Vi tillhandahåller en standardsystemprompt, frö, temperatur, top_p och max_tokens. Du kan ändra dessa värden efter behov.
- Klicka på knappen
Trigger
för att starta. - Efter den lyckade DAG-körningen skulle du se en ny DAG genererad med filnamnet
openai_dag.py
i mappendags
för Apache Airflow Job.
Steg 5: Gör dig redo att köra den nyligen genererade DAG
- Öppna användargränssnittet för Apache Airflow-jobb.
- Den nyligen genererade DAG sparas i mappen DAG:er som
openai_dag.py
. - Öppna DAG-filen för att granska koden. Du kan redigera den efter behov och konfigurera nödvändiga anslutningar för operatorerna.
- När anslutningarna har angetts kan du utlösa DAG för att köra arbetsflödet.
Slutsats
Utforska fler användningsfall genom att ändra systemprompten eller indataskissen. Den här lösningen visar sömlös integrering av Azure OpenAI- och Apache Airflow-jobb för att snabbt omvandla idéer till funktionella arbetsflöden. Genom att automatisera processen för att skapa DAG kan du spara betydande tid och fokusera på aktiviteter med högre värde.
Relaterat innehåll
snabbstart: Skapa ett Apache Airflow-jobbAktivera Azure Key Vault som hemlig serverdel