Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Até agora, temos criado instâncias de executor e passado-as diretamente para o WorkflowBuilder. Esta abordagem funciona bem em cenários simples onde só precisa de uma única instância de fluxo de trabalho. No entanto, em casos mais complexos, pode querer criar múltiplas instâncias isoladas do mesmo fluxo de trabalho. Para suportar isto, cada instância de workflow deve receber o seu próprio conjunto de instâncias executores. Reutilizar os mesmos executores faria com que o seu estado interno fosse partilhado entre fluxos de trabalho, resultando em efeitos secundários indesejados. Para evitar isto, pode registar fábricas executoras com o WorkflowBuilder, garantindo que novas instâncias executoras são criadas para cada instância de workflow.
Registar Fábricas no Construidor de Fluxos de Trabalho
Brevemente...
Para registar uma fábrica de executores no WorkflowBuilder, pode usar o método register_executor. Este método utiliza dois parâmetros: a função de fábrica que cria instâncias do executor (do tipo Executor ou derivação de Executor) e o nome da fábrica a ser usada na configuração do fluxo de trabalho.
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
Crie um fluxo de trabalho usando o construtor
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
Resultados esperados:
Accumulated text length: 22
Agora vamos criar outra instância de workflow e executá-la. O Accumulate executor deve ter o seu próprio estado interno e não partilhar o estado com a primeira instância do workflow.
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
Resultados esperados:
Accumulated text length: 11
Para registar uma fábrica de agentes no WorkflowBuilder, pode usar o método register_agent. Este método assume dois parâmetros: a função fábrica que cria instâncias do agente (dos tipos que implementam AgentProtocol) e o nome da fábrica a ser usada na configuração do fluxo de trabalho.
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
Cada vez que uma nova instância de workflow é criada, o agente no workflow será uma nova instância criada pela função de fábrica, e receberá uma nova instância de thread.
Isolamento de Estado do Fluxo de Trabalho
Para saber mais sobre o isolamento do estado do fluxo de trabalho, consulte a documentação do Isolamento do Estado do Fluxo de Trabalho .