ここまでは、Executor インスタンスを作成し、 WorkflowBuilderに直接渡してきました。 この方法は、ワークフロー インスタンスが 1 つだけ必要な単純なシナリオに適しています。 ただし、より複雑な場合は、同じワークフローの複数の分離されたインスタンスを作成する必要があります。 これをサポートするには、各ワークフロー インスタンスが独自の Executor インスタンスのセットを受け取る必要があります。 同じ Executor を再利用すると、内部状態がワークフロー間で共有され、意図しない副作用が発生します。 これを回避するには、executor ファクトリを WorkflowBuilderに登録し、ワークフロー インスタンスごとに新しい Executor インスタンスが作成されるようにします。
ワークフロー ビルダーへのファクトリの登録
もうすぐです。。。
executor ファクトリーをWorkflowBuilderに登録するには、register_executorメソッドを使用できます。 このメソッドは、2 つのパラメーターを受け取ります。Executor のインスタンスを作成するファクトリ関数 ( Executor 型または Executorの派生型) と、ワークフロー構成で使用するファクトリの名前です。
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
ビルダーを使用してワークフローを構築する
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
予想される出力:
Accumulated text length: 22
次に、別のワークフロー インスタンスを作成して実行してみましょう。
Accumulate Executor は、独自の内部状態を持ち、最初のワークフロー インスタンスと状態を共有しないようにする必要があります。
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
予想される出力:
Accumulated text length: 11
エージェント ファクトリを WorkflowBuilderに登録するには、 register_agent メソッドを使用できます。 このメソッドは、( AgentProtocolを実装する型の) エージェントのインスタンスを作成するファクトリ関数と、ワークフロー構成で使用するファクトリの名前という 2 つのパラメーターを受け取ります。
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
新しいワークフロー インスタンスが作成されるたびに、ワークフロー内のエージェントはファクトリ関数によって作成された新しいインスタンスになり、新しいスレッド インスタンスが取得されます。
ワークフロー状態の分離
ワークフロー状態の分離の詳細については、 ワークフロー状態の分離 に関するドキュメントを参照してください。