Compartir a través de


Registro de factorías en el Generador de flujos de trabajo

Hasta este punto, hemos estado creando instancias del executor y pasándolas directamente a WorkflowBuilder. Este enfoque funciona bien para escenarios sencillos en los que solo se necesita una sola instancia de flujo de trabajo. Sin embargo, en casos más complejos puede que desee crear varias instancias aisladas del mismo flujo de trabajo. Para admitir esto, cada instancia de flujo de trabajo debe recibir su propio conjunto de instancias del ejecutor. La reutilización de los mismos ejecutores provocaría que su estado interno se comparta entre flujos de trabajo, lo que da lugar a efectos secundarios no deseados. Para evitar esto, puede registrar fábricas de ejecutores con WorkflowBuilder, asegurándose de que se crean nuevas instancias del ejecutor para cada instancia de flujo de trabajo.

Registro de factorías en el Generador de flujos de trabajo

Próximamente...

Para registrar un generador de ejecutores en WorkflowBuilder, puede usar el método register_executor. Este método toma dos parámetros: la función de fábrica que crea instancias del ejecutor (de tipo Executor o derivación de Executor) y el nombre de la fábrica que se va a usar en la configuración del flujo de trabajo.

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

Creación de un flujo de trabajo mediante el generador

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

Resultado esperado:

Accumulated text length: 22

Ahora vamos a crear otra instancia de flujo de trabajo y ejecutarla. El Accumulate ejecutor debe tener su propio estado interno y no compartir el estado con la primera instancia de flujo de trabajo.

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

Resultado esperado:

Accumulated text length: 11

Para registrar una fábrica de agentes en WorkflowBuilder, puede usar el método register_agent. Este método toma dos parámetros: la función de fábrica que crea instancias del agente (de tipos que implementan AgentProtocol) y el nombre de la fábrica que se va a usar en la configuración del flujo de trabajo.

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

Cada vez que se crea una nueva instancia de flujo de trabajo, el agente en el flujo de trabajo será una nueva instancia creada por la función de fábrica y obtendrá una nueva instancia de hilo.

Aislamiento del estado del flujo de trabajo

Para obtener más información sobre el aislamiento de estado de flujo de trabajo, consulte la documentación aislamiento de estado de flujo de trabajo.