Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Jusqu’à ce stade, nous avons créé des instances d’exécuteur et les transmettons directement au WorkflowBuilder. Cette approche fonctionne bien pour les scénarios simples où vous n’avez besoin que d’une seule instance de workflow. Toutefois, dans des cas plus complexes, vous pouvez créer plusieurs instances isolées du même flux de travail. Pour ce faire, chaque instance de workflow doit recevoir son propre ensemble d’instances d’exécuteur. La réutilisation des mêmes exécuteurs provoque le partage de leur état interne entre les flux de travail, provoquant des effets indésirables. Pour éviter cela, vous pouvez enregistrer des usines d’exécuteurs à l’aide de WorkflowBuilder, en vous assurant que de nouvelles instances d’exécuteur sont créées pour chaque instance de flux de travail.
Inscription de fabriques dans le Générateur de flux de travail
À venir...
Pour inscrire une fabrique d'exécuteurs à l’instance WorkflowBuilder, vous pouvez utiliser la méthode register_executor. Cette méthode prend deux paramètres : la fonction de fabrique qui crée des instances de l’exécuteur (de type Executor ou de dérivation de Executor) et le nom de la fabrique à utiliser dans la configuration du flux de travail.
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
Créer un flux de travail à l’aide du générateur
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
Sortie attendue :
Accumulated text length: 22
Nous allons maintenant créer une autre instance de workflow et l’exécuter. L’exécuteur Accumulate doit avoir son propre état interne et ne pas partager l’état avec la première instance de workflow.
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
Sortie attendue :
Accumulated text length: 11
Pour inscrire une fabrique d’agents à l’instance WorkflowBuilder, vous pouvez utiliser la méthode register_agent. Cette méthode prend deux paramètres : la fonction de fabrique qui crée des instances de l’agent (des types qui implémentent AgentProtocol) et le nom de la fabrique à utiliser dans la configuration du flux de travail.
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
Chaque fois qu’une nouvelle instance de flux de travail est créée, l’agent dans le flux de travail sera une nouvelle instance créée par la fonction de fabrique et obtiendra une nouvelle instance de thread.
Isolation de l’état du flux de travail
Pour en savoir plus sur l’isolation de l’état du flux de travail, reportez-vous à la documentation sur l’isolation de l’état du flux de travail .