Dela via


Återställbara exekverare

Översikt

Utförare i arbetsflöden är ofta tillståndskänsliga , till exempel kan de ackumulera meddelanden, spåra antal svängar eller cachelagra mellanliggande resultat. När ett arbetsflöde återanvänds över flera körningar med delade körinstanser kan överblivna tillstånd från en tidigare körning läcka till efterföljande körningar, vilket orsakar oväntat beteende eller skadade data.

Gränssnittet IResettableExecutor löser detta genom att tillhandahålla ett kontrakt för utförare för att rensa deras interna tillstånd mellan körningar. Arbetsflödeskörningen anropar ResetAsync() automatiskt delade executor-instanser när en körning slutförs, vilket säkerställer en ren skiffer för nästa körning.

Problemet

Överväg en exekverare som samlar in meddelanden under körning av ett arbetsflöde:

internal sealed partial class AggregationExecutor() : Executor("AggregationExecutor")
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }
}

Om den här kören delas mellan arbetsflödeskörningar _messages behåller du data från föregående körning. Den andra körningen kommer att visa gamla meddelanden som inte hör hemma där.

Gränssnittet IResettableExecutor

IResettableExecutor definierar en enda metod som arbetsflödeskörningen anropar mellan körningar:

public interface IResettableExecutor
{
    ValueTask ResetAsync();
}

När en exekverare implementerar det här gränssnittet kan programkörningen återställas på ett säkert sätt efter varje körning, vilket gör att arbetsflödet kan återanvändas utan att det finns kvar någon inaktuell information.

Implementera IResettableExecutor

Om du vill göra en tillståndskänslig köråterställningstabell implementerar du gränssnittet och rensar alla föränderliga tillstånd i ResetAsync():

internal sealed partial class AggregationExecutor()
    : Executor("AggregationExecutor"), IResettableExecutor
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }

    public ValueTask ResetAsync()
    {
        this._messages.Clear();
        return default;
    }
}

Ett fullständigt fungerande exempel på ett arbetsflöde som använder återställbara exekverare finns i exemplet WorkflowAsAnAgent.

När ska implementeras

Alla utförare behöver inte implementera IResettableExecutor. Använd den här beslutsguiden:

Scenario Genomföra? Förnuft
Exekutor har ett föränderligt tillstånd (listor, räknare, cacheminnen) och delas över körningar Ja Tillstånd från en körning skulle läcka in i nästa
Exekverare är tillståndslös No Inget att återställa
Executor skapas ny för varje arbetsflöde (via en fabriksmetod) No Varje körning får en ny instans med obefläckat tillstånd
Executor deklareras som delningsbar för tvärgående körningar (declareCrossRunShareable: true) No Delningsbara exekverare för körningar över flera instanser stöder samtidig användning utan att behöva återställas

Varning

Om en delad tillståndskänslig exekutor inte implementerar IResettableExecutor, kastar återanvändning av arbetsflödet en InvalidOperationException:

"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."

Hur runtime-miljön använder det

Arbetsflödeskörningen hanterar återställningslivscykeln automatiskt. Du behöver inte anropa ResetAsync() själv. Sekvensen är:

  1. Ägarskap förvärvat – när en arbetsflödeskörning startar övertar körningen ägarskapet för arbetsflödesinstansen och noterar vilka exekutorer som behöver återställas.
  2. Kör – exekutorer bearbetar meddelanden och kan ackumulera status.
  3. Ägarskap släppt – när körningen slutförs (eller avslutas) släpper körningen ägarskapet och anropar ResetAsync() på alla delade exekveringsinstanser som implementerar IResettableExecutor.
  4. Redo för återanvändning – efter en lyckad återställning kan arbetsflödet användas för en ny körning.

Om en delad köre inte kan återställas (eftersom den inte implementerar gränssnittet) markeras arbetsflödet som icke-återanvändbart och efterföljande körningar kommer att utlösas.

Relation till tillståndsisolering

IResettableExecutor kompletterar hjälpmetodens mönster som beskrivs i Tillståndshantering. De två metoderna har olika behov:

  • Hjälpmetoder (skapa nya instanser per körning) ger de starkaste isoleringsgarantierna och rekommenderas som standardmetod.
  • IResettableExecutor är användbart när du behöver dela exekutorinstanser mellan körningar, till exempel när exekutorkonstruktion är dyrt eller när ett arbetsflöde exponeras som en agent och återanvänds över flera anrop.

Välj den metod som passar bäst för ditt scenario. För de flesta arbetsflöden räcker det med hjälpmetoder. Använd IResettableExecutor när delade instanser är ett avsiktligt designval.

Det här konceptet gäller inte för Python. Skapa nya arbetsflöden och körinstanser för varje oberoende körning för fullständig tillståndsisolering. Se Tillståndshantering för mönster och exempel.

Nästa steg