Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Informazioni generali
Gli executor nei flussi di lavoro sono spesso con stato, ad esempio possono accumulare messaggi, tenere traccia dei conteggi dei turni o memorizzare nella cache i risultati intermedi. Quando un flusso di lavoro viene riutilizzato in più esecuzioni con istanze dell'executor condivise, lo stato residuo di un'esecuzione precedente può trasmettersi nelle esecuzioni successive, causando un comportamento imprevisto o una corruzione dei dati.
L'interfaccia IResettableExecutor risolve questo problema fornendo un contratto agli esecutori per cancellare il loro stato interno tra un'esecuzione e l'altra. Il runtime del flusso di lavoro chiama ResetAsync() automaticamente le istanze dell'executor condiviso al termine di un'esecuzione, assicurando un slate pulito per l'esecuzione successiva.
Il problema
Si consideri un executor che raccoglie messaggi durante un'esecuzione del flusso di lavoro:
internal sealed partial class AggregationExecutor() : Executor("AggregationExecutor")
{
private readonly List<string> _messages = [];
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this._messages.Add(message);
// Process aggregated messages...
}
}
Se questo executor viene condiviso tra le esecuzioni del flusso di lavoro, _messages conserva i dati dell'esecuzione precedente. La seconda esecuzione visualizzerà messaggi obsoleti che non le appartengono.
Interfaccia IResettableExecutor
IResettableExecutor definisce un singolo metodo che il runtime del flusso di lavoro chiama tra le esecuzioni:
public interface IResettableExecutor
{
ValueTask ResetAsync();
}
Quando un esecutore implementa questa interfaccia, l'ambiente di runtime può reimpostarlo in modo sicuro dopo ogni esecuzione, consentendo il riutilizzo del flusso di lavoro senza stato obsoleto.
Implementazione di IResettableExecutor
Per rendere un executor con stato reimpostabile, implementare l'interfaccia e cancellare tutto lo stato modificabile in ResetAsync().
internal sealed partial class AggregationExecutor()
: Executor("AggregationExecutor"), IResettableExecutor
{
private readonly List<string> _messages = [];
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this._messages.Add(message);
// Process aggregated messages...
}
public ValueTask ResetAsync()
{
this._messages.Clear();
return default;
}
}
Per un esempio di lavoro completo di un flusso di lavoro che usa executor reimpostabili, vedere l'esempio WorkflowAsAnAgent.
Quando implementare
Non tutti gli executor devono implementare IResettableExecutor. Usare questa guida decisionale:
| Scenario | Implementare? | Ragione |
|---|---|---|
| L'executor ha uno stato modificabile (elenchi, contatori, cache) e viene condiviso tra le esecuzioni | Sì | Lo stato di una corsa potrebbe fluire nella successiva |
| Executor è senza stato | No | Niente da reimpostare |
| L'executor viene creato nuovo per ogni flusso di lavoro (tramite un metodo factory) | No | Ogni esecuzione ottiene una nuova istanza con stato pulito |
L'executor viene dichiarato come condivisibile tra esecuzioni (declareCrossRunShareable: true) |
No | Gli executor condivisibili a esecuzione incrociata supportano l'uso simultaneo senza reimpostare |
Avviso
Se un executor con stato condiviso non implementa IResettableExecutor, il riutilizzo del flusso di lavoro genera un'eccezione InvalidOperationException:
"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."
Modalità di utilizzo del runtime
Il runtime del flusso di lavoro gestisce automaticamente il ciclo di vita di reimpostazione. Non è necessario chiamare ResetAsync(). La sequenza è:
- Proprietà acquisita : all'avvio di un'esecuzione del flusso di lavoro, il runtime assume la proprietà dell'istanza del flusso di lavoro e indica quali executor devono essere reimpostati.
- Esecuzione eseguita : gli executor elaborano i messaggi e possono accumulare lo stato.
-
Proprietà rilasciata : quando l'esecuzione viene completata (o eliminata), il runtime rilascia la proprietà e chiama
ResetAsync()su tutte le istanze dell'executor condiviso che implementanoIResettableExecutor. - Pronto per il riutilizzo : dopo una reimpostazione corretta, il flusso di lavoro può essere usato per una nuova esecuzione.
Se un executor condiviso non viene reimpostato (perché non implementa l'interfaccia), il flusso di lavoro viene contrassegnato come non riutilizzabile e le esecuzioni successive genereranno.
Relazione con l'isolamento dello stato
IResettableExecutor integra il modello di metodo ausiliario descritto in Gestione stato. I due approcci servono esigenze diverse:
- I metodi helper (creazione di istanze nuove per esecuzione) offrono le garanzie di isolamento più avanzate e sono consigliati come approccio predefinito.
-
IResettableExecutorè utile quando è necessario condividere le istanze dell'executor tra le esecuzioni, ad esempio quando la costruzione dell'executor è costosa o quando un flusso di lavoro viene esposto come agente e riutilizzato tra più chiamate.
Scegliere l'approccio più adatto allo scenario. Per la maggior parte dei flussi di lavoro, i metodi helper sono sufficienti. Usare IResettableExecutor quando la condivisione di istanze è una scelta progettuale intenzionale.
Questo concetto non si applica a Python. Per l'isolamento completo dello stato, creare nuove istanze di flusso di lavoro ed executor per ogni esecuzione indipendente. Vedere Gestione dello stato per modelli ed esempi.