Condividi tramite


Registrare Factory in Generatore Flussi di Lavoro

Fino a questo punto, abbiamo continuato a creare istanze dell'executor e le abbiamo passate direttamente a WorkflowBuilder. Questo approccio funziona bene per scenari semplici in cui è necessaria solo una singola istanza del flusso di lavoro. Tuttavia, in casi più complessi è possibile creare più istanze isolate dello stesso flusso di lavoro. A tale scopo, ogni istanza del flusso di lavoro deve ricevere il proprio set di istanze di executor. Il riutilizzo degli stessi executor causerebbe la condivisione dello stato interno tra flussi di lavoro, causando effetti collaterali imprevisti. Per evitare questo problema, è possibile registrare le factory executor con il componente WorkflowBuilder, assicurandosi che vengano create nuove istanze dell'executor per ogni istanza del flusso di lavoro.

Registrazione delle factory nel Generatore di flussi di lavoro

Prossimamente...

Per registrare una factory executor in WorkflowBuilder, è possibile usare il metodo register_executor. Questo metodo accetta due parametri: la funzione factory che crea istanze dell'executor (di tipo Executor o derivazione di Executor) e il nome della factory da usare nella configurazione del flusso di lavoro.

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

Creare un flusso di lavoro usando il generatore

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

Output previsto:

Accumulated text length: 22

Creare ora un'altra istanza del flusso di lavoro ed eseguirla. L'executor Accumulate deve avere il proprio stato interno e non condividere lo stato con la prima istanza del flusso di lavoro.

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

Output previsto:

Accumulated text length: 11

Per registrare una factory di agenti in WorkflowBuilder, è possibile usare il metodo register_agent. Questo metodo accetta due parametri: la funzione factory che crea istanze dell'agente (di tipi che implementano AgentProtocol) e il nome della factory da usare nella configurazione del flusso di lavoro.

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

Ogni volta che viene creata una nuova istanza del flusso di lavoro, l'agente nel flusso di lavoro sarà una nuova istanza creata dalla funzione factory e otterrà una nuova istanza del thread.

Isolamento dello stato del flusso di lavoro

Per altre informazioni sull'isolamento dello stato del flusso di lavoro, vedere la documentazione relativa all'isolamento dello stato del flusso di lavoro.