Поделиться через


Перезапускаемые исполнители

Обзор

Исполнители в рабочих процессах часто имеют состояние — например, они могут накапливать сообщения, отслеживать количество ходов или кэшировать промежуточные результаты. Когда рабочий процесс повторно используется в нескольких запусках с общими экземплярами исполнителя, остаточное состояние от предыдущего запуска может просочиться в последующие запуски, вызывая непредвиденное поведение или повреждение данных.

Интерфейс IResettableExecutor решает это, обеспечивая контракт для исполнителей для очистки их внутреннего состояния между запусками. Выполняющая среда рабочего процесса автоматически вызывает ResetAsync() на общих экземплярах исполнителя при завершении выполнения, обеспечивая проверенную готовность для следующего запуска.

Проблема

Рассмотрим исполнителя, который собирает сообщения во время выполнения рабочего процесса:

internal sealed partial class AggregationExecutor() : Executor("AggregationExecutor")
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }
}

Если этот исполнитель совместно используется в рабочих процессах, _messages сохраняет данные из предыдущего запуска. Во втором запуске будут отображаться устаревшие сообщения, не относящиеся к данному процессу.

Интерфейс IResettableExecutor

IResettableExecutor определяет один метод, который среда выполнения рабочего процесса вызывает между запусками:

public interface IResettableExecutor
{
    ValueTask ResetAsync();
}

Когда исполнитель реализует этот интерфейс, среда выполнения может безопасно сбросить его после каждого запуска, что позволяет рабочему процессу повторно использовать его без устаревших состояний.

Реализация IResettableExecutor

Чтобы сделать исполнителя с состоянием обнуляемым, реализуйте интерфейс и очистите все изменяемое состояние в ResetAsync():

internal sealed partial class AggregationExecutor()
    : Executor("AggregationExecutor"), IResettableExecutor
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }

    public ValueTask ResetAsync()
    {
        this._messages.Clear();
        return default;
    }
}

Для полного примера рабочего процесса, использующего сбрасываемые исполнители, см. пример WorkflowAsAnAgent.

Когда следует реализовать

Не всем исполнителям необходимо реализовать IResettableExecutor. Используйте это руководство по принятию решений:

Сценарий Реализовать? Причина
Исполнитель имеет изменяемое состояние (списки, счетчики, кэш) и совместно используется между запусками. Да Состояние одного запуска может переходить в следующий
Исполнитель без состояния Нет Нечего сбрасывать
Исполнитель создается заново для каждого рабочего процесса (с помощью метода фабрики) Нет Каждый запуск получает новый экземпляр с чистым состоянием
Исполнитель объявляется как доступный для совместного использования между потоками (declareCrossRunShareable: true) Нет Исполнители с возможностью совместного использования между запусками поддерживают одновременное использование без необходимости сброса.

Предупреждение

Если общий исполнитель с отслеживанием состояния не реализует IResettableExecutor, повторное использование рабочего процесса вызывает исключение InvalidOperationException:

"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."

Как среда выполнения это использует

Среда выполнения рабочего процесса автоматически управляет жизненным циклом сброса. Вам не нужно вызывать ResetAsync() самостоятельно. Последовательность:

  1. Когда владение получено — при запуске рабочего процесса среда выполнения берет на себя владение экземпляром рабочего процесса и фиксирует, какие исполнители требуют сброса.
  2. Выполнение выполняется — исполнители обрабатывают сообщения и могут накапливать состояние.
  3. Владение освобождено — когда выполнение завершается (или удаляется), среда выполнения освобождает владение и вызывает метод ResetAsync() на всех общих экземплярах исполнителя, реализующих IResettableExecutor.
  4. Готово к повторному использованию — после успешного сброса рабочий процесс можно использовать для нового запуска.

Если любой общий исполнитель не может сбросить (так как он не реализует интерфейс), рабочий процесс помечается как неиспользуемый повторно и последующие запуски будут вызываться.

Связь с изоляцией состояния

IResettableExecutor дополняет шаблон вспомогательного метода, описанный в разделе "Управление состоянием". Два подхода служат разным потребностям:

  • Вспомогательные методы (создание новых экземпляров на каждый запуск) обеспечивают самые надежные гарантии изоляции и рекомендуется в качестве подхода по умолчанию.
  • IResettableExecutor полезно, если необходимо совместно использовать экземпляры исполнителя между запусками — например, когда создание исполнителя затратно или когда рабочий процесс выступает в роли агента и повторно используется для нескольких вызовов.

Выберите подход, который лучше всего подходит для вашего сценария. Для большинства рабочих процессов вспомогательные методы достаточны. Используйте IResettableExecutor, когда совместное использование экземпляров является преднамеренным выбором в дизайне.

Эта концепция не применяется к Python. Для полной изоляции состояния создайте новые экземпляры рабочего процесса и исполнителя для каждого независимого запуска. См. раздел "Управление состоянием" для шаблонов и примеров.

Дальнейшие действия