通过


操作指南 人机协作

警告

语义内核进程框架 是实验性的,仍在开发中,并且可能会更改。

概述

在前面的部分中,我们构建了一个流程,帮助我们自动创建新产品的文档。 我们的流程现在可以生成特定于我们产品的文档,并通过进行校对和编辑流程来确保其符合我们的质量标准。 在本部分中,我们将再次改进该过程,要求人工批准或拒绝文档,然后再发布文档。 流程框架的灵活性意味着我们可以采取多种方法来执行此作,但在此示例中,我们将演示与外部 pubsub 系统的集成,以便请求批准。

我们流程的流程图,采用了人机协作模式。

需等待审批后发布

我们需要对流程进行的第一个更改是让发布步骤等待审批,然后再发布文档。 一个选项是只向 PublishDocumentation中的 PublishDocumentationStep 函数添加第二个参数以供审批。 这有效,因为仅当提供所有必需的参数时,才会调用步骤中的 KernelFunction。

// A process step to publish documentation
public class PublishDocumentationStep : KernelProcessStep
{
    [KernelFunction]
    public DocumentInfo PublishDocumentation(DocumentInfo document, bool userApproval) // added the userApproval parameter
    {
        // Only publish the documentation if it has been approved
        if (userApproval)
        {
            // For example purposes we just write the generated docs to the console
            Console.WriteLine($"[{nameof(PublishDocumentationStep)}]:\tPublishing product documentation approved by user: \n{document.Title}\n{document.Content}");
        }
        return document;
    }
}

即将推出对 Python 人工干预过程行为的支持。

使用上述代码时,仅当生成的文档发送到 PublishDocumentation 参数并且审批结果已发送到 PublishDocumentationStep 参数时,才会调用 document 中的 userApproval 函数。

现在,我们可以重用ProofreadStep步骤的现有逻辑,另外向我们的外部 pubsub 系统发出事件,以通知人工审批者有新的请求。

// A process step to publish documentation
public class ProofReadDocumentationStep : KernelProcessStep
{
    ...

    if (formattedResponse.MeetsExpectations)
    {
        // Events that are getting piped to steps that will be resumed, like PublishDocumentationStep.OnPublishDocumentation
        // require events to be marked as public so they are persisted and restored correctly
        await context.EmitEventAsync("DocumentationApproved", data: document, visibility: KernelProcessEventVisibility.Public);
    }
    ...
}

因为我们希望在文档获得校对人员批准后发布新生成的文档,因此已批准的文档将在发布阶段排队。 此外,用户将通过外部 pubsub 系统收到通知,其中包含最新文档的更新。 让我们更新流程以匹配此新设计。

即将推出对 Python 人工干预过程行为的支持。

// Create the process builder
ProcessBuilder processBuilder = new("DocumentationGeneration");

// Add the steps
var infoGatheringStep = processBuilder.AddStepFromType<GatherProductInfoStep>();
var docsGenerationStep = processBuilder.AddStepFromType<GenerateDocumentationStepV2>();
var docsProofreadStep = processBuilder.AddStepFromType<ProofreadStep>();
var docsPublishStep = processBuilder.AddStepFromType<PublishDocumentationStep>();

// internal component that allows emitting SK events externally, a list of topic names
// is needed to link them to existing SK events
var proxyStep = processBuilder.AddProxyStep(["RequestUserReview", "PublishDocumentation"]);

// Orchestrate the events
processBuilder
    .OnInputEvent("StartDocumentGeneration")
    .SendEventTo(new(infoGatheringStep));

processBuilder
    .OnInputEvent("UserRejectedDocument")
    .SendEventTo(new(docsGenerationStep, functionName: "ApplySuggestions"));

// When external human approval event comes in, route it to the 'isApproved' parameter of the docsPublishStep
processBuilder
    .OnInputEvent("UserApprovedDocument")
    .SendEventTo(new(docsPublishStep, parameterName: "userApproval"));

// Hooking up the rest of the process steps
infoGatheringStep
    .OnFunctionResult()
    .SendEventTo(new(docsGenerationStep, functionName: "GenerateDocumentation"));

docsGenerationStep
    .OnEvent("DocumentationGenerated")
    .SendEventTo(new(docsProofreadStep));

docsProofreadStep
    .OnEvent("DocumentationRejected")
    .SendEventTo(new(docsGenerationStep, functionName: "ApplySuggestions"));

// When the proofreader approves the documentation, send it to the 'document' parameter of the docsPublishStep
// Additionally, the generated document is emitted externally for user approval using the pre-configured proxyStep
docsProofreadStep
    .OnEvent("DocumentationApproved")
    // [NEW] addition to emit messages externally
    .EmitExternalEvent(proxyStep, "RequestUserReview") // Hooking up existing "DocumentationApproved" to external topic "RequestUserReview"
    .SendEventTo(new(docsPublishStep, parameterName: "document"));

// When event is approved by user, it gets published externally too
docsPublishStep
    .OnFunctionResult()
    // [NEW] addition to emit messages externally
    .EmitExternalEvent(proxyStep, "PublishDocumentation");

var process = processBuilder.Build();
return process;

最后,应提供接口 IExternalKernelProcessMessageChannel 的实现,因为它由新 ProxyStep接口在内部使用。 此接口用于在外部发出消息。 此接口的实现将取决于所使用的外部系统。 在此示例中,我们将使用已创建的自定义客户端将消息发送到外部 pubsub 系统。

// Example of potential custom IExternalKernelProcessMessageChannel implementation 
public class MyCloudEventClient : IExternalKernelProcessMessageChannel
{
    private MyCustomClient? _customClient;

    // Example of an implementation for the process
    public async Task EmitExternalEventAsync(string externalTopicEvent, KernelProcessProxyMessage message)
    {
        // logic used for emitting messages externally.
        // Since all topics are received here potentially 
        // some if else/switch logic is needed to map correctly topics with external APIs/endpoints.
        if (this._customClient != null)
        {
            switch (externalTopicEvent) 
            {
                case "RequestUserReview":
                    var requestDocument = message.EventData.ToObject() as DocumentInfo;
                    // As an example only invoking a sample of a custom client with a different endpoint/api route
                    this._customClient.InvokeAsync("REQUEST_USER_REVIEW", requestDocument);
                    return;

                case "PublishDocumentation":
                    var publishedDocument = message.EventData.ToObject() as DocumentInfo;
                    // As an example only invoking a sample of a custom client with a different endpoint/api route
                    this._customClient.InvokeAsync("PUBLISH_DOC_EXTERNALLY", publishedDocument);
                    return;
            }
        }
    }

    public async ValueTask Initialize()
    {
        // logic needed to initialize proxy step, can be used to initialize custom client
        this._customClient = new MyCustomClient("http://localhost:8080");
        this._customClient.Initialize();
    }

    public async ValueTask Uninitialize()
    {
        // Cleanup to be executed when proxy step is uninitialized
        if (this._customClient != null)
        {
            await this._customClient.ShutdownAsync();
        }
    }
}

最后,为了允许进程 ProxyStep 在这种情况下应用 IExternalKernelProcessMessageChannelMyCloudEventClient 实现,我们需要正确地配置管道。

使用本地运行时时,可以在调用 StartAsync 类时传递实现的 KernelProcess 类。

KernelProcess process;
IExternalKernelProcessMessageChannel myExternalMessageChannel = new MyCloudEventClient();
// Start the process with the external message channel
await process.StartAsync(kernel, new KernelProcessEvent 
    {
        Id = inputEvent,
        Data = input,
    },
    myExternalMessageChannel)

使用 Dapr 运行时时,必须在项目的 Program 安装程序中通过依赖项注入来完成管道。

var builder = WebApplication.CreateBuilder(args);
...
// depending on the application a singleton or scoped service can be used
// Injecting SK Process custom client IExternalKernelProcessMessageChannel implementation
builder.Services.AddSingleton<IExternalKernelProcessMessageChannel, MyCloudEventClient>();

即将推出对 Python 人工干预过程行为的支持。

对流程进行了两项更改:

  • 添加了一个名为 HumanApprovalResponse 的输入事件,该事件将路由到 userApproval 步骤的 docsPublishStep 参数。
  • 由于 docsPublishStep 中的 KernelFunction 现在有两个参数,因此我们需要更新现有路由以指定 document的参数名称。

像之前一样运行该过程并注意到,当校对程序批准生成的文档并将其发送到 document 步骤的 docPublishStep 参数时,该步骤不再被调用,因为它正在等待 userApproval 参数。 此时,进程变为空闲状态,因为没有准备好要调用的步骤,而我们之前发起的启动进程的调用返回。 此过程将保持此空闲状态,直到我们的人机协作进行操作以批准或拒绝发布请求。 一旦发生这种情况,并且结果已经传达回我们的程序,我们就可以用该结果重新启动流程。

// Restart the process with approval for publishing the documentation.
await process.StartAsync(kernel, new KernelProcessEvent { Id = "UserApprovedDocument", Data = true });

即将推出对 Python 人工干预过程行为的支持。

当进程再次启动时,UserApprovedDocument 将从上次中断的位置继续,调用 docsPublishStep 并将 userApproval 设置为 true,然后我们的文档将被发布。 如果再次通过UserRejectedDocument事件启动,流程将在ApplySuggestions步骤启动docsGenerationStep函数,过程将像以前一样继续。

该步骤现已完成,我们已成功地将人为干预步骤添加到我们的流程中。 现在,该过程可用于为产品生成文档,对其进行校对,并在人工批准后发布它。