Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Eddig végrehajtó példányokat hoztunk létre, és közvetlenül a WorkflowBuilder adtuk át őket. Ez a megközelítés jól működik olyan egyszerű helyzetekben, ahol csak egyetlen munkafolyamat-példányra van szükség. Összetettebb esetekben azonban előfordulhat, hogy ugyanannak a munkafolyamatnak több, elkülönített példányát szeretné létrehozni. Ennek támogatásához minden munkafolyamat-példánynak saját végrehajtópéldány-készletet kell kapnia. Ha ugyanazokat a végrehajtókat újrahasználja, az azt eredményezi, hogy a belső állapotuk meg van osztva a munkafolyamatok között, ami nem kívánt mellékhatásokat eredményez. Ennek elkerülése érdekében regisztrálhat végrehajtó gyárakat a WorkflowBuilder segítségével, biztosítva, hogy új végrehajtó példányok jöjjenek létre minden munkafolyamat-példány számára.
Gyárak regisztrálása a Workflow Builderbe
Hamarosan...
A végrehajtó-előállító regisztrálásához a WorkflowBuilder-re a register_executor metódust használhatja. Ez a módszer két paramétert használ: a végrehajtó példányait létrehozó gyári függvényt (típusa Executor vagy származtatása Executor) és a munkafolyamat-konfigurációban használni kívánt gyár nevét.
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
Munkafolyamat létrehozása a szerkesztővel
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
Várt kimenet:
Accumulated text length: 22
Most hozzunk létre egy másik munkafolyamat-példányt, és futtassuk. A Accumulate végrehajtónak saját belső állapottal kell rendelkeznie, és nem szabad megosztania az állapotot az első munkafolyamat-példánysal.
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
Várt kimenet:
Accumulated text length: 11
Az ügynök-gyár regisztrálásához a WorkflowBuilder módszert használhatja a register_agent metódust. Ez a módszer két paramétert használ: a gyári függvényt, amely létrehozza az ügynök példányait (a implementálandó AgentProtocoltípusok közül) és a munkafolyamat-konfigurációban használni kívánt gyár nevét.
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
Minden alkalommal, amikor új munkafolyamat-példány jön létre, a munkafolyamatban szereplő agent egy új példánya fog létrejönni a gyári függvény által, és új szálpéldányt kap majd.
Munkafolyamat állapotának elkülönítése
A munkafolyamat állapotelkülönítésével kapcsolatos további információkért tekintse meg a munkafolyamatállapot-elkülönítés dokumentációját.