Dela via


Utlösa Machine Learning-pipelines

GÄLLER FÖR: Python SDK azureml v1

I den här artikeln får du lära dig hur du programmatiskt schemalägger en pipeline som ska köras i Azure. Du kan skapa ett schema baserat på förfluten tid eller filsystemändringar. Tidsbaserade scheman kan användas för att ta hand om rutinuppgifter, till exempel övervakning av dataavvikelser. Ändringsbaserade scheman kan användas för att reagera på oregelbundna eller oförutsägbara ändringar, till exempel nya data som laddas upp eller gamla data som redigeras. När du har lärt dig hur du skapar scheman får du lära dig hur du hämtar och inaktiverar dem. Slutligen får du lära dig hur du använder andra Azure-tjänster, Azure Logic App och Azure Data Factory, för att köra pipelines. En Azure Logic App möjliggör mer komplex utlösande logik eller beteende. Med Azure Data Factory-pipelines kan du anropa en maskininlärningspipeline som en del av en större dataorkestreringspipeline.

Förutsättningar

Utlösa pipelines med Azure Mašinsko učenje SDK för Python

För att schemalägga en pipeline behöver du en referens till din arbetsyta, identifieraren för den publicerade pipelinen och namnet på experimentet där du vill skapa schemat. Du kan hämta dessa värden med följande kod:

import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:
    print(experiment.name)

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

experiment_name = "MyExperiment" 
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" 

Skapa en tidsplan

Om du vill köra en pipeline regelbundet skapar du ett schema. En Schedule associerar en pipeline, ett experiment och en utlösare. Utlösaren kan antingen vara enScheduleRecurrence som beskriver väntetiden mellan jobb eller en Datastore-sökväg som anger en katalog för att hålla utkik efter ändringar. I båda fallen behöver du pipelineidentifieraren och namnet på experimentet där schemat ska skapas.

Längst upp i Python-filen importerar du klasserna Schedule och ScheduleRecurrence :


from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

Skapa ett tidsbaserat schema

Konstruktorn ScheduleRecurrence har ett obligatoriskt frequency argument som måste vara en av följande strängar: "Minute", "Hour", "Day", "Week" eller "Month". Det kräver också ett heltalsargument interval som anger hur många av frequency enheterna som ska förflutit mellan schemastarterna. Med valfria argument kan du vara mer specifik om starttider, enligt beskrivningen i dokumentationen om ScheduleRecurrence SDK.

Skapa en Schedule som börjar ett jobb var 15:e minut:

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",
                            pipeline_id=pipeline_id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

Skapa ett ändringsbaserat schema

Pipelines som utlöses av filändringar kan vara effektivare än tidsbaserade scheman. När du vill göra något innan en fil ändras, eller när en ny fil läggs till i en datakatalog, kan du förbearbeta filen. Du kan övervaka ändringar i ett datalager eller ändringar i en specifik katalog i datalagringen. Om du övervakar en specifik katalog utlöser ändringar i underkataloger i katalogen inte ett jobb.

Kommentar

Ändringsbaserade scheman stöder endast övervakning av Azure Blob Storage.

Om du vill skapa en filreaktiv Schedulemåste du ange parametern datastore i anropet till Schedule.create. Om du vill övervaka en mapp anger du path_on_datastore argumentet.

Med polling_interval argumentet kan du i minuter ange hur ofta datalagringen ska kontrolleras efter ändringar.

Om pipelinen har konstruerats med en DataPath PipelineParameter kan du ange variabeln till namnet på den ändrade filen genom att ange data_path_parameter_name argumentet.

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")

Valfria argument när du skapar ett schema

Förutom argumenten som beskrevs tidigare kan du ange status argumentet till för "Disabled" att skapa ett inaktivt schema. Slutligen continue_on_step_failure kan du skicka ett booleskt värde som åsidosätter pipelinens standardbeteende för fel.

Visa dina schemalagda pipelines

Gå till Azure Mašinsko učenje i webbläsaren. I avsnittet Slutpunkter i navigeringspanelen väljer du Pipeline-slutpunkter. Detta tar dig till en lista över pipelines som publicerats på arbetsytan.

Sidan Pipelines i AML

På den här sidan kan du se sammanfattningsinformation om alla pipelines på arbetsytan: namn, beskrivningar, status och så vidare. Öka detaljnivån genom att klicka i pipelinen. På den resulterande sidan finns mer information om din pipeline och du kan öka detaljnivån i enskilda jobb.

Inaktivera pipelinen

Om du har en Pipeline som har publicerats, men inte schemalagts, kan du inaktivera den med:

pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()

Om pipelinen är schemalagd måste du avbryta schemat först. Hämta schemats identifierare från portalen eller genom att köra:

ss = Schedule.list(ws)
for s in ss:
    print(s)

När du har inaktiverat schedule_id kör du:

def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

stop_by_schedule_id(ws, schedule_id)

Om du sedan kör Schedule.list(ws) igen bör du få en tom lista.

Använda Azure Logic Apps för komplexa utlösare

Mer komplexa utlösarregler eller beteenden kan skapas med hjälp av en Azure Logic App.

Om du vill använda en Azure Logic App för att utlösa en Mašinsko učenje pipeline behöver du REST-slutpunkten för en publicerad Mašinsko učenje pipeline. Skapa och publicera din pipeline. Leta sedan reda på REST-slutpunkten för din PublishedPipeline med hjälp av pipeline-ID:t:

# You can find the pipeline ID in Azure Machine Learning studio

published_pipeline = PublishedPipeline.get(ws, id="<pipeline-id-here>")
published_pipeline.endpoint 

Skapa en logikapp i Azure

Skapa nu en Azure Logic App-instans . När logikappen har etablerats använder du de här stegen för att konfigurera en utlösare för din pipeline:

  1. Skapa en systemtilldelad hanterad identitet för att ge appen åtkomst till din Azure Mašinsko učenje-arbetsyta.

  2. Gå till vyn Logikappdesigner och välj mallen Tom logikapp.

    Tom mall

  3. Sök efter blob i designern. Välj utlösaren När en blob läggs till eller ändras (endast egenskaper) och lägg till den här utlösaren i logikappen.

    Lägg till utlösare

  4. Fyll i anslutningsinformationen för det Blob Storage-konto som du vill övervaka för blobtillägg eller ändringar. Välj den container som ska övervakas.

    Välj intervall och frekvens för att söka efter uppdateringar som fungerar åt dig.

    Kommentar

    Den här utlösaren övervakar den valda containern men övervakar inte undermappar.

  5. Lägg till en HTTP-åtgärd som körs när en ny eller modifierad blob identifieras. Välj + Nytt steg och sök sedan efter och välj HTTP-åtgärden.

Sök efter HTTP-åtgärd

Använd följande inställningar för att konfigurera åtgärden:

Inställning Värde
HTTP-åtgärd POST
URI slutpunkten till den publicerade pipelinen som du hittade som en förutsättning
Autentiseringsläge Hanterad identitet
  1. Konfigurera schemat för att ange värdet för alla DataPath PipelineParameters som du kan ha:

    {
      "DataPathAssignments": {
        "input_datapath": {
          "DataStoreName": "<datastore-name>",
          "RelativePath": "@{triggerBody()?['Name']}" 
        }
      },
      "ExperimentName": "MyRestPipeline",
      "ParameterAssignments": {
        "input_string": "sample_string3"
      },
      "RunSource": "SDK"
    }
    

    Använd den DataStoreName du har lagt till på din arbetsyta som en förutsättning.

    HTTP-inställningar

  2. Välj Spara och ditt schema är nu klart.

Viktigt!

Om du använder rollbaserad åtkomstkontroll i Azure (Azure RBAC) för att hantera åtkomst till din pipeline anger du behörigheterna för pipelinescenariot (träning eller bedömning).

Anropa pipelines för maskininlärning från Azure Data Factory-pipelines

I en Azure Data Factory-pipeline kör aktiviteten Mašinsko učenje Execute Pipeline en Azure Mašinsko učenje pipeline. Du hittar den här aktiviteten på datafabrikens redigeringssida under kategorin Mašinsko učenje:

Skärmbild som visar ML-pipelineaktiviteten i Azure Data Factory-redigeringsmiljön

Nästa steg

I den här artikeln använde du Azure Mašinsko učenje SDK för Python för att schemalägga en pipeline på två olika sätt. Ett schema upprepas baserat på förfluten klocktid. De andra schemajobben om en fil ändras på en angiven Datastore eller i en katalog i det arkivet. Du såg hur du använder portalen för att undersöka pipelinen och enskilda jobb. Du har lärt dig hur du inaktiverar ett schema så att pipelinen slutar köras. Slutligen skapade du en Azure Logic App för att utlösa en pipeline.

Mer information finns i: