Compartilhar via


Registrar fábricas no Construtor de Fluxos de Trabalho

Até agora, criamos instâncias de executor e as passamos diretamente para o WorkflowBuilder. Essa abordagem funciona bem para cenários simples em que você só precisa de uma única instância de fluxo de trabalho. No entanto, em casos mais complexos, talvez você queira criar várias instâncias isoladas do mesmo fluxo de trabalho. Para dar suporte a isso, cada instância de fluxo de trabalho deve receber seu próprio conjunto de instâncias de executor. Reutilizando os mesmos executores faria com que seu estado interno fosse compartilhado entre fluxos de trabalho, resultando em efeitos colaterais não intencionais. Para evitar isso, você pode registrar fábricas de executores com o WorkflowBuilder, garantindo que novas instâncias de executor sejam criadas para cada instância de fluxo de trabalho.

Registrando fábricas no Construtor de Fluxos de Trabalho

Em breve...

Para registrar uma fábrica de executores no WorkflowBuilder, você pode usar o método register_executor. Esse método usa dois parâmetros: a função de fábrica que cria instâncias do executor (do tipo Executor ou derivação de Executor) e o nome da fábrica a ser usada na configuração do fluxo de trabalho.

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

Criar um fluxo de trabalho usando o construtor

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

Resultado esperado:

Accumulated text length: 22

Agora vamos criar outra instância de fluxo de trabalho e executá-la. O Accumulate executor deve ter seu próprio estado interno e não compartilhar o estado com a primeira instância de fluxo de trabalho.

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

Resultado esperado:

Accumulated text length: 11

Para registrar uma fábrica de agentes no WorkflowBuilder, você pode usar o método register_agent. Esse método usa dois parâmetros: a função de fábrica que cria instâncias do agente (de tipos que implementam AgentProtocol) e o nome da fábrica a ser usada na configuração do fluxo de trabalho.

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

Sempre que uma nova instância de fluxo de trabalho for criada, o agente no fluxo de trabalho será uma nova instância criada pela função de fábrica e obterá uma nova instância de thread.

Isolamento de estado do fluxo de trabalho

Para saber mais sobre o isolamento do estado do fluxo de trabalho, consulte a documentação isolamento de estado do fluxo de trabalho .