到目前為止,我們一直在創建執行者實例,並直接傳給 WorkflowBuilder。 這種方法對於只需要單一工作流程實例的簡單情境來說效果很好。 然而,在更複雜的情況下,你可能想建立多個相同工作流程的獨立實例。 為了支援此點,每個工作流程實例必須擁有自己的執行實例集。 重複使用相同的執行器會導致其內部狀態在不同工作流程間被共享,產生意想不到的副作用。 為避免這種情況,你可以使用 WorkflowBuilder 註冊執行器工廠,確保為每個工作流程實例創建新的執行器實例。
將工廠註冊到 Workflow Builder
即將推出...
要將執行器工廠註冊到WorkflowBuilder,可以使用register_executor方法。 此方法需要兩個參數:一個是建立型別 Executor 的執行器實例或其 Executor 衍生類別的工廠函數,另一個是在工作流程配置中要使用的工廠名稱。
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
用建構工具建立工作流程
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
預期輸出:
Accumulated text length: 22
現在讓我們建立另一個工作流程實例並執行它。
Accumulate執行器應該有自己的內部狀態,而不是與第一個工作流程實例共享該狀態。
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
預期輸出:
Accumulated text length: 11
要將代理工廠註冊到 WorkflowBuilder,可以使用這個 register_agent 方法。 此方法需要兩個參數:建立代理實例(實作 AgentProtocol型別)的工廠函數,以及工作流程配置中要使用的工廠名稱。
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
每當建立新的工作流程實例時,工作流程中的代理會是工廠函式建立的新實例,並會獲得新的執行緒實例。
工作流程狀態隔離
欲了解更多關於工作流程狀態隔離的資訊,請參閱 工作流程狀態隔離 文件。