Sdílet prostřednictvím


Návod: Human-in-the-Loop

Varování

sémantická architektura procesu jádra je experimentální, stále ve vývoji a může se změnit.

Přehled

V předchozích částech jsme vytvořili proces, který nám pomůže automatizovat vytváření dokumentace pro náš nový produkt. Náš proces teď může generovat dokumentaci, která je specifická pro náš produkt, a zajistit, že splňuje náš panel kvality spuštěním kontroly pravopisu a cyklu úprav. V této části tento proces znovu vylepšíme tím, že před publikováním bude muset člověk schválit nebo odmítnout dokumentaci. Flexibilita architektury procesu znamená, že existuje několik způsobů, jak to udělat, ale v tomto příkladu předvedeme integraci s externím systémem pubsub pro žádost o schválení.

Vývojový diagram našeho procesu se vzorem lidské interakce.

Nastavit publikování tak, aby čekalo na schválení

První změnou, kterou musíme v procesu provést, je to, že krok publikování musí počkat na schválení, než dokumentaci publikuje. Jednou z možností je jednoduše přidat druhý parametr pro schválení do funkce PublishDocumentation v PublishDocumentationStep. To funguje, protože funkce KernelFunction v kroku bude vyvolána pouze v případě, že byly zadány všechny požadované parametry.

// A process step to publish documentation
public class PublishDocumentationStep : KernelProcessStep
{
    [KernelFunction]
    public DocumentInfo PublishDocumentation(DocumentInfo document, bool userApproval) // added the userApproval parameter
    {
        // Only publish the documentation if it has been approved
        if (userApproval)
        {
            // For example purposes we just write the generated docs to the console
            Console.WriteLine($"[{nameof(PublishDocumentationStep)}]:\tPublishing product documentation approved by user: \n{document.Title}\n{document.Content}");
        }
        return document;
    }
}

Podpora chování procesu human-in-the-loop v Pythonu bude brzy k dispozici.

S výše uvedeným kódem se funkce PublishDocumentation v PublishDocumentationStep vyvolá pouze v případě, že byla vygenerovaná dokumentace odeslána do parametru document a výsledek schválení byl odeslán do parametru userApproval.

Teď můžeme znovu použít stávající logiku ProofreadStep kroku k dodatečnému generování události do externího systému pubsub, který oznámí schvalovateli, že existuje nová žádost.

// A process step to publish documentation
public class ProofReadDocumentationStep : KernelProcessStep
{
    ...

    if (formattedResponse.MeetsExpectations)
    {
        // Events that are getting piped to steps that will be resumed, like PublishDocumentationStep.OnPublishDocumentation
        // require events to be marked as public so they are persisted and restored correctly
        await context.EmitEventAsync("DocumentationApproved", data: document, visibility: KernelProcessEventVisibility.Public);
    }
    ...
}

Vzhledem k tomu, že chceme publikovat nově vygenerovanou dokumentaci při schválení agentem kontroly pravopisu, schválené dokumenty se zařadí do fronty v kroku publikování. Kromě toho bude člověk upozorněn prostřednictvím externího systému pubsub s aktualizací nejnovějšího dokumentu. Pojďme tok procesu aktualizovat tak, aby odpovídal tomuto novému návrhu.

Podpora chování procesu human-in-the-loop v Pythonu bude brzy k dispozici.

// Create the process builder
ProcessBuilder processBuilder = new("DocumentationGeneration");

// Add the steps
var infoGatheringStep = processBuilder.AddStepFromType<GatherProductInfoStep>();
var docsGenerationStep = processBuilder.AddStepFromType<GenerateDocumentationStepV2>();
var docsProofreadStep = processBuilder.AddStepFromType<ProofreadStep>();
var docsPublishStep = processBuilder.AddStepFromType<PublishDocumentationStep>();

// internal component that allows emitting SK events externally, a list of topic names
// is needed to link them to existing SK events
var proxyStep = processBuilder.AddProxyStep(["RequestUserReview", "PublishDocumentation"]);

// Orchestrate the events
processBuilder
    .OnInputEvent("StartDocumentGeneration")
    .SendEventTo(new(infoGatheringStep));

processBuilder
    .OnInputEvent("UserRejectedDocument")
    .SendEventTo(new(docsGenerationStep, functionName: "ApplySuggestions"));

// When external human approval event comes in, route it to the 'isApproved' parameter of the docsPublishStep
processBuilder
    .OnInputEvent("UserApprovedDocument")
    .SendEventTo(new(docsPublishStep, parameterName: "userApproval"));

// Hooking up the rest of the process steps
infoGatheringStep
    .OnFunctionResult()
    .SendEventTo(new(docsGenerationStep, functionName: "GenerateDocumentation"));

docsGenerationStep
    .OnEvent("DocumentationGenerated")
    .SendEventTo(new(docsProofreadStep));

docsProofreadStep
    .OnEvent("DocumentationRejected")
    .SendEventTo(new(docsGenerationStep, functionName: "ApplySuggestions"));

// When the proofreader approves the documentation, send it to the 'document' parameter of the docsPublishStep
// Additionally, the generated document is emitted externally for user approval using the pre-configured proxyStep
docsProofreadStep
    .OnEvent("DocumentationApproved")
    // [NEW] addition to emit messages externally
    .EmitExternalEvent(proxyStep, "RequestUserReview") // Hooking up existing "DocumentationApproved" to external topic "RequestUserReview"
    .SendEventTo(new(docsPublishStep, parameterName: "document"));

// When event is approved by user, it gets published externally too
docsPublishStep
    .OnFunctionResult()
    // [NEW] addition to emit messages externally
    .EmitExternalEvent(proxyStep, "PublishDocumentation");

var process = processBuilder.Build();
return process;

A konečně, implementace rozhraní IExternalKernelProcessMessageChannel by měla být poskytována, protože je interně používána novým ProxyStep. Toto rozhraní slouží k externímu generování zpráv. Implementace tohoto rozhraní bude záviset na externím systému, který používáte. V tomto příkladu použijeme vlastního klienta, kterého jsme vytvořili k odesílání zpráv do externího systému pubsub.

// Example of potential custom IExternalKernelProcessMessageChannel implementation 
public class MyCloudEventClient : IExternalKernelProcessMessageChannel
{
    private MyCustomClient? _customClient;

    // Example of an implementation for the process
    public async Task EmitExternalEventAsync(string externalTopicEvent, KernelProcessProxyMessage message)
    {
        // logic used for emitting messages externally.
        // Since all topics are received here potentially 
        // some if else/switch logic is needed to map correctly topics with external APIs/endpoints.
        if (this._customClient != null)
        {
            switch (externalTopicEvent) 
            {
                case "RequestUserReview":
                    var requestDocument = message.EventData.ToObject() as DocumentInfo;
                    // As an example only invoking a sample of a custom client with a different endpoint/api route
                    this._customClient.InvokeAsync("REQUEST_USER_REVIEW", requestDocument);
                    return;

                case "PublishDocumentation":
                    var publishedDocument = message.EventData.ToObject() as DocumentInfo;
                    // As an example only invoking a sample of a custom client with a different endpoint/api route
                    this._customClient.InvokeAsync("PUBLISH_DOC_EXTERNALLY", publishedDocument);
                    return;
            }
        }
    }

    public async ValueTask Initialize()
    {
        // logic needed to initialize proxy step, can be used to initialize custom client
        this._customClient = new MyCustomClient("http://localhost:8080");
        this._customClient.Initialize();
    }

    public async ValueTask Uninitialize()
    {
        // Cleanup to be executed when proxy step is uninitialized
        if (this._customClient != null)
        {
            await this._customClient.ShutdownAsync();
        }
    }
}

Nakonec, abychom umožnili procesu ProxyStep správně využít implementaci IExternalKernelProcessMessageChannel, v tomto případě MyCloudEventClient, musíme ji správně propojit.

Při použití místního modulu runtime lze implementovanou třídu předat vyvoláním StartAsync na třídě KernelProcess.

KernelProcess process;
IExternalKernelProcessMessageChannel myExternalMessageChannel = new MyCloudEventClient();
// Start the process with the external message channel
await process.StartAsync(kernel, new KernelProcessEvent 
    {
        Id = inputEvent,
        Data = input,
    },
    myExternalMessageChannel)

Při použití Dapr Runtime je zajištění nastavení nutné provést prostřednictvím injektáže závislostí v úvodním nastavení programu projektu.

var builder = WebApplication.CreateBuilder(args);
...
// depending on the application a singleton or scoped service can be used
// Injecting SK Process custom client IExternalKernelProcessMessageChannel implementation
builder.Services.AddSingleton<IExternalKernelProcessMessageChannel, MyCloudEventClient>();

Podpora chování procesu human-in-the-loop v Pythonu bude brzy k dispozici.

V toku procesu byly provedeny dvě změny:

  • Přidání vstupní události s názvem HumanApprovalResponse, která se bude směrovat na parametr userApproval kroku docsPublishStep.
  • Vzhledem k tomu, že kernelFunction v docsPublishStep nyní má dva parametry, musíme aktualizovat existující trasu a zadat název parametru document.

Spusťte proces jako předtím a všimněte si, že tentokrát, když čtečka kontroly pravopisu schválí vygenerovanou dokumentaci a odešle ji do document parametru docPublishStep kroku, už se tento krok nevyvolá, protože čeká na userApproval parametr. V tomto okamžiku se proces stane nečinným, protože nejsou připravené žádné kroky, které by bylo možné vyvolat, a volání, které jsme provedli k zahájení procesu, se vrátí. Tento proces zůstane v tomto nečinném stavu, dokud náš "člověk v procesu" neschválí nebo neodmítne žádost o publikování. Jakmile k tomu dojde a výsledek se předá zpět do našeho programu, můžeme proces restartovat s výsledkem.

// Restart the process with approval for publishing the documentation.
await process.StartAsync(kernel, new KernelProcessEvent { Id = "UserApprovedDocument", Data = true });

Podpora chování procesu human-in-the-loop v Pythonu bude brzy k dispozici.

Jakmile se proces znovu spustí s UserApprovedDocument, pokračuje tam, kde skončil, a vyvolá docsPublishStep s userApproval nastaveným na true. Naše dokumentace bude publikována. Pokud se událost UserRejectedDocument spustí znovu, proces zahájí funkci ApplySuggestions v kroku docsGenerationStep a proces bude pokračovat jako předtím.

Proces je nyní dokončený a úspěšně jsme do našeho procesu přidali krok zahrnujícího člověka. Tento proces je teď možné použít ke generování dokumentace pro náš produkt, jeho kontrolu pravopisu a publikování, jakmile ho schválí člověk.