Dela via


Använda Azure OpenAI för att omvandla whiteboardskisser till Apache Airflow DAG:er

Apache Airflow-jobb i Microsoft Fabric ger molnbaserad upplevelse för datatekniker och dataforskare, med funktioner som omedelbar körningsetablering, molnbaserad redigering, dynamisk autoskalning, intelligent autopaus och förbättrad säkerhet. Det är en fullständigt hanterad tjänst som gör att du kan skapa, schemalägga och övervaka Apache Airflow-arbetsflöden i molnet utan att behöva bekymra dig om underliggande infrastruktur.

Nu, med gpt-4o AI-modellen i Azure, tänjer vi på gränserna för vad du kan göra med Apache Airflow-jobb och gör det möjligt för dig att skapa Apache Airflow DAG:er från bara din whiteboard-skissidé. Den här funktionen är användbar för datatekniker och dataforskare som snabbt vill skapa prototyper och visualisera sina dataarbetsflöden.

I den här artikeln skapar du ett änden-till-änden-arbetsflöde som laddar ned skissen lagrad i din Lakehouse, använder gpt-4o för att omvandla den till en Apache Airflow DAG och laddar in den i Apache Airflowjobb för körning.

Förutsättningar

Innan du skapar lösningen måste du se till att följande förutsättningar har konfigurerats i Azure och Fabric:

  • Aktivera Apache Airflow-jobb i klientorganisationen.

    Notera

    Eftersom Apache Airflow-jobbet är i förhandsversionstillstånd måste du aktivera det via klientadministratören. Om du redan ser Apache Airflow-jobb kanske klientadministratören redan har aktiverat det.

    1. Gå till administratörsportalen –> Klientinställningar –> under Microsoft Fabric –> expandera avsnittet "Användare kan skapa och använda Apache Airflow-jobb (förhandsversion)".

    2. Välj Använd. Skärmbild för att aktivera Apache Airflow i klientorganisationen.

  • Ett Azure OpenAI- konto med en API-nyckel och en distribuerad gpt-4o-modell.

  • Skapa en Microsoft Entra-ID-app om du inte har någon.

  • Lägg till tjänstens huvudanvändare som "Deltagare" i din Microsoft Fabric-arbetsyta. Skärmbild för att lägga till tjänsthuvud som bidragsgivare.

  • Skapa "Apache Airflow-jobbet" på arbetsytan.

  • Ett diagram över hur du vill att Apache Airflow DAG ska se ut eller spara den angivna bilden i steg 1 till den lokala datorn.

  • Lägg till följande Python-paket i requirements.txt som finns i din Apache Airflow-jobbmiljö.

    azure-storage-file-datalake
    Pillow
    

Steg 1: Ladda upp avbildningen till Fabric Lakehouse

Innan du kan analysera bilden måste du ladda upp den till Lakehouse. Skärmbild som representerar DAG-diagram över Apache Airflow.

  1. Ladda upp filen från den lokala datorn till mappen Files i Lakehouse. Skärmbild som representerar filuppladdning till Fabric Lakehouse.

  2. Kopiera lagringskontonamnet för din Fabric Lakehouse. Det används i Apache Airflow-anslutningen för att autentisera med Lakehouse. Skärmbild som representerar Namnet på Fabric Lakehouse.

Steg 2: Konfigurera miljövariabler för autentisering med Lakehouse och Azure OpenAI.

Obs! Den här självstudien baseras på Airflow version 2.6.3.

Skärmbild för att lägga till miljövariabler i apache airflow-jobbet.

Autentiseringsuppgifter för Lakehouse Rest-API:er.

Vi ska använda Lakehouse Rest API:er för att ladda ned avbildningen från Lakehouse. Om du vill autentisera med Lakehouse Rest API:er måste du ange följande miljövariabler i Apache Airflow-jobbet.

  • FABRIC_CLIENT_ID: Klient-ID för Microsoft Entra-ID-appen.
  • FABRIC_CLIENT_SECRET: Klienthemligheten för Microsoft Entra-ID-appen.
  • FABRIC_TENANT_ID: Hyres-ID för Microsoft Entra ID-appen.

Autentiseringsuppgifter för Azure OpenAI

Vi använder distributionen av gpt-4o-modellen i Azure OpenAI för att analysera skissen av pipelinen och konvertera den till en Apache Airflow DAG. Om du vill ansluta till Azure OpenAI-API:et lagrar du API-nyckeln och slutpunkten i miljövariabler:

  • OPENAI_API_KEY: Ange din Azure OpenAI API-nyckel.
  • OPENAI_API_ENDPOINT: Ange slutpunkts-URL:en för din distribuerade gpt-4o modell. Till exempel https://ai-contosoai6211465843515213.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2024-02-15-preview.

Steg 3: Skapa en Apache Airflow DAG för att generera DAG:er från skisser

När alla förutsättningar är klara är du redo att konfigurera arbetsflödet för Azure OpenAI DAG Generator.

Så här fungerar Azure OpenAI DAG Generator

  1. Ladda ned skissen från lakehouse: Avbildningen är kodad i base64-format och skickas till Azure OpenAI.
  2. Generera DAG Code med Azure OpenAI: Arbetsflödet använder gpt-4o-modellen för att generera DAG-koden från skissen och den angivna systemprompten.
  3. Azure OpenAI tolkar indatabilden och systemprompten och genererar python-kod som representerar en Apache Airflow DAG. Svaret innehåller den här koden som en del av API-utdata.
  4. Den genererade DAG-koden hämtas från API-svaret och skrivs till en Python-fil i katalogen dags. Innan du använder filen konfigurerar du de anslutningar som krävs av operatorerna i Apache Airflow och filen är omedelbart redo att användas i Apache Airflow Jobs-gränssnittet.

Kod för Azure OpenAI DAG Generator

Följ nu stegen för att implementera arbetsflödet:

  1. Skapa en fil med namnet openapi_dag_generator.py i katalogen dags i Apache Airflow-projektet.
  2. Lägg till följande kod i filen. Ersätt yourStorageAccountName, workspace_name och file_path med de faktiska värdena och spara filen.
    import io
    import os
    import json
    import base64
    import requests
    from PIL import Image
    from pendulum import datetime
    
    from airflow.models import Variable
    from airflow.models.param import Param
    from airflow.decorators import dag, task
    from airflow.models.baseoperator import chain
    from azure.storage.filedatalake import DataLakeServiceClient
    from azure.identity import ClientSecretCredential
    
    
    @dag(
        start_date=datetime(2023, 11, 1),
        schedule=None,
        catchup=False,
        params={
            "system_prompt": Param(
                'You are an AI assistant that helps to write an Apache Airflow DAG code by understanding an image that shows an Apache Airflow DAG containing airflow tasks, task descriptions, parameters, trigger rules and edge labels.\
                You have to priortize the Apache Airflow provider operators over Apache Airflow core operators if they resonates more with task description.\
                Use the most appropriate Apache Airflow operators as per the task description\
                To give the label to the DAG edge use the Label from the airflow.utils.edgemodifier class\
                You can use Dummy operators for start and end tasks. \
                Return apache airflow dag code in a valid json format following the format:```json{ "dag": "value should be Apache Airflow DAG code"}```',
                type="string",
                title="Give a prompt to the Airflow Expert.",
                description="Enter what you would like to ask Apache Airflow Expert.",
                min_length=1,
                max_length=500,
            ),
            "seed": Param(42, type="integer"),
            "temperature": Param(0.1, type="number"),
            "top_p": Param(0.95, type="number"),
            "max_tokens": Param(800, type="integer"),
        },
    )
    
    def OpenAI_Dag_Generator():
        """
        A DAG that generates an Apache Airflow DAG code using `gpt-4o` OpenAI model based on a diagram image
        stored in Azure Blob Storage. The generated DAG is saved in the `dags` folder for execution.
        """
    
        @task
        def fetch_image_from_lakehouse(workspace_name: str, file_path: str):
            """
            Downloads an image from Fabric Lakehouse and encodes it as a Base64 string.
    
            :param workspace_name: Name of the workspace where your Lakehouse is located.
            :param file_path: Relative file path stored in the Fabric Lakehouse.
            :return: Dictionary containing the encoded image as a Base64 string.
            """
            account_url = f"https://{yourStorageAccountName}.dfs.fabric.microsoft.com"
            client_id = os.getenv("FABRIC_CLIENT_ID")
            client_secret = os.getenv("FABRIC_CLIENT_SECRET")
            tenant_id = os.getenv("FABRIC_TENANT_ID")
    
            tokenCredential = ClientSecretCredential(
                tenant_id=tenant_id,
                client_id=client_id,
                client_secret=client_secret
            )
    
            lakehouse_client = DataLakeServiceClient(
                account_url,
                credential=tokenCredential
            )
    
            blob_data = lakehouse_client.get_file_client(workspace_name, file_path).download_file().readall()
    
            image = Image.open(io.BytesIO(blob_data))
    
            # Encode image as Base64
            buffered = io.BytesIO()
            image.save(buffered, format="PNG")
            encoded_image = base64.b64encode(buffered.getvalue()).decode('ascii')
    
            return {"encoded_image": encoded_image}
    
    
        @task
        def generate_dag_code_from_openai(image_from_blob: dict, system_prompt: str, **context):
            """
            Sends the encoded image to the OpenAI gpt-4o model to generate an Apache Airflow DAG code.
    
            :param encoded_image: Dictionary containing the Base64-encoded image.
            :param system_prompt: Prompt to ask the OpenAI model to generate the DAG code.
            :return: Dictionary containing the generated DAG code as a string.
            """
    
            azureAI_api_key = os.getenv("OPENAI_API_KEY")
            azureAI_endpoint = os.getenv("OPENAI_API_ENDPOINT")
    
            image = image_from_blob["encoded_image"]
    
            headers = {
                "Content-Type": "application/json",
                "api-key": azureAI_api_key,
            }
    
            payload = {
                "messages": [
                {
                    "role": "system",
                    "content": [
                    {
                        "type": "text",
                        "text": system_prompt
                    }
                    ]
                },
                {
                    "role": "user",
                    "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                        "url": f"data:image/jpeg;base64,{image}"
                        }
                    }
                    ]
                }
                ],
                "seed": context["params"]["seed"],
                "temperature": context["params"]["temperature"],
                "top_p": context["params"]["top_p"],
                "max_tokens": context["params"]["max_tokens"]
            }
    
            response = requests.post(azureAI_endpoint, headers=headers, json=payload)
            response.raise_for_status()  
    
            # Get JSON from request and show
            response_json = response.json()
    
    
            # Check if 'choices' and 'message' are present in the response
            if 'choices' in response_json and len(response_json['choices']) > 0:
                content = response_json['choices'][0]['message']['content']
    
                start_index = content.find('```json')
                end_index = content.rfind("```")
    
                # Validate JSON block delimiters
                if start_index == -1 or end_index == -1:
                    raise ValueError("JSON block delimiters (```json ... ```) not found in the content.")
    
                # Extract and parse the JSON string
                extracted_json_str = content[start_index + 7:end_index].strip()
                if not extracted_json_str:
                    raise ValueError("Extracted JSON string is empty.")
    
                # Convert to a Python dictionary
                dag_json = json.loads(extracted_json_str)
                dag_code = dag_json.get("dag")
                if not dag_code:
                    raise ValueError("'dag' key not found in the extracted JSON.")
    
                return {"dag_code": dag_code}
    
            return response_json
    
        @task
        def save_dag(xcom_dag_code: dict):
            """
            Saves the generated DAG code to a Python file in the `dags` directory.
            """
            try:
                with open("dags/openai_dag.py", "w") as f:
                    f.write(xcom_dag_code["dag_code"])
    
                print("DAG code saved successfully.")
            except Exception as e:
                raise ValueError(f"Error saving DAG code: {str(e)}")
    
        chain(
            save_dag(
                generate_dag_code_from_openai(
                    fetch_image_from_lakehouse(
                        workspace_name="airflow-dag-images", # Your Fabric Workspace
                        file_path="lakehouse_ai.Lakehouse/Files/airflow-dag-diagram.png" # Path to the image file located in the Lakehouse
                    ),
                    "{{ params.system_prompt }}"
                )
            )
        )
    
    OpenAI_Dag_Generator()
    

Steg 4: Utlösa DAG från Apache Airflow-användargränssnittet

  1. Klicka på Monitor Airflow.
  2. Gå till DAG-fliken och leta reda på OpenAI_Dag_Generator DAG. Klicka på den.
  3. Klicka på uppspelningsknappen och välj Trigger DAG w/ config. Skärmbild som visar hur du utlöser DAG med hjälp av konfiguration.
  4. Du presenteras med ett formulär som visar DAG-parametrar. Vi tillhandahåller en standardsystemprompt, frö, temperatur, top_p och max_tokens. Du kan ändra dessa värden efter behov. Skärmbild representerar DAG-parametrar.
  5. Klicka på knappen Trigger för att starta.
  6. Efter den lyckade DAG-körningen skulle du se en ny DAG genererad med filnamnet openai_dag.py i mappen dags för Apache Airflow Job.

Steg 5: Gör dig redo att köra den nyligen genererade DAG

  1. Öppna användargränssnittet för Apache Airflow-jobb.
  2. Den nyligen genererade DAG sparas i mappen DAG:er som openai_dag.py. Skärmbild som representerar ny dag som genererats med OpenAI dag generator.
  3. Öppna DAG-filen för att granska koden. Du kan redigera den efter behov och konfigurera nödvändiga anslutningar för operatorerna.
  4. När anslutningarna har angetts kan du utlösa DAG för att köra arbetsflödet. Skärmbild representerar resulterande DAG från OpenAI.

Slutsats

Utforska fler användningsfall genom att ändra systemprompten eller indataskissen. Den här lösningen visar sömlös integrering av Azure OpenAI- och Apache Airflow-jobb för att snabbt omvandla idéer till funktionella arbetsflöden. Genom att automatisera processen för att skapa DAG kan du spara betydande tid och fokusera på aktiviteter med högre värde.

snabbstart: Skapa ett Apache Airflow-jobbAktivera Azure Key Vault som hemlig serverdel