Sdílet prostřednictvím


Registrace továren do Tvůrce pracovních postupů

Do této chvíle jsme vytvořili exekutor instance a předali je přímo do objektu WorkflowBuilder. Tento přístup funguje dobře pro jednoduché scénáře, ve kterých potřebujete pouze jednu instanci pracovního postupu. V složitějších případech však můžete chtít vytvořit více izolovaných instancí stejného pracovního postupu. Pro zajištění podpory musí každá workflow instance obdržet vlastní sadu instancí exekutora. Opakované používání stejných exekutorů způsobí, že jejich vnitřní stav se bude sdílet napříč pracovními postupy, což vede k nežádoucím vedlejším účinkům. Chcete-li tomu zabránit, můžete zaregistrovat továrny exekutorů pomocí WorkflowBuilder, aby bylo zajištěno, že nové instance exekutora budou vytvořeny pro každou instanci pracovního workflow.

Registrace továren do Tvůrce pracovních postupů

Již brzy...

K registraci exekutoru do objektu WorkflowBuilder, můžete použít metodu register_executor . Tato metoda má dva parametry: funkce továrny, která vytváří instance exekutoru (typu Executor nebo odvození Executor) a název továrny, která se má použít v konfiguraci pracovního postupu.

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

Vytvoření pracovního postupu pomocí tvůrce

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

Očekávaný výstup:

Accumulated text length: 22

Teď vytvoříme další instanci pracovního postupu a spustíme ji. Accumulate Exekutor by měl mít svůj vlastní interní stav a nesdílel stav s první instancí pracovního postupu.

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

Očekávaný výstup:

Accumulated text length: 11

K registraci objektu pro vytváření agentů do služby WorkflowBuildermůžete použít metodu register_agent . Tato metoda má dva parametry: funkce továrny, která vytváří instance agenta (typů, které implementují AgentProtocol) a název továrny, která se má použít v konfiguraci pracovního postupu.

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

Při každém vytvoření nové instance pracovního postupu bude agent v pracovním postupu novou instancí vytvořenou tovární funkcí a získá instanci nového vlákna.

Izolace stavu pracovního postupu

Další informace o izolaci stavu pracovního postupu najdete v dokumentaci k izolaci stavu pracovního postupu .