Aracılığıyla paylaş


Fabrikaları İş Akışı Oluşturucusu'na kaydetme

Bu noktaya kadar executor örnekleri oluşturup bunları doğrudan WorkflowBuilder öğesine geçirdik. Bu yaklaşım, yalnızca tek bir iş akışı örneğine ihtiyacınız olan basit senaryolar için uygundur. Ancak, daha karmaşık durumlarda aynı iş akışının birden çok, yalıtılmış örneğini oluşturmak isteyebilirsiniz. Bunu desteklemek için her iş akışı örneğinin kendi yürütücü örnekleri kümesini alması gerekir. Aynı yürütücülerin yeniden kullanılabilmesi, iç durumlarının iş akışları arasında paylaşılmasına neden olur ve istenmeyen yan etkilere neden olur. Bunu önlemek için, WorkflowBuilder ile yürütücü fabrikalarını kaydederek her iş akışı örneği için yeni yürütücü örneklerinin oluşturulmasını sağlayabilirsiniz.

Fabrikaları workflow builder'a kaydetme

Çok yakında...

Yürütücü fabrikasını WorkflowBuilder öğesine kaydetmek için register_executor yöntemini kullanabilirsiniz. Bu yöntem iki parametre alır: yürütücü örneklerini oluşturan fabrika işlevi (türü Executor veya türetme Executor) ve iş akışı yapılandırmasında kullanılacak fabrikanın adı.

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

Oluşturucuyu kullanarak iş akışı oluşturma

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

Beklenen çıkış:

Accumulated text length: 22

Şimdi başka bir iş akışı örneği oluşturup çalıştıralım. Yürütücü kendi Accumulate iç durumuna sahip olmalı ve durumu ilk iş akışı örneğiyle paylaşmamalıdır.

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

Beklenen çıkış:

Accumulated text length: 11

Bir aracı fabrikasını WorkflowBuilder öğesine kaydetmek için register_agent yöntemini kullanabilirsiniz. Bu yöntem iki parametre alır: aracının örneklerini oluşturan fabrika işlevi (uygulayan AgentProtocoltürlerden) ve iş akışı yapılandırmasında kullanılacak fabrikanın adı.

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

Her yeni iş akışı örneği oluşturulduğunda, iş akışındaki aracı, fabrika fonksiyonu tarafından oluşturulan yeni bir örnek olacak ve yeni bir iş parçacığı örneği alacaktır.

İş Akışı Durumu Yalıtımı

İş akışı durumu yalıtımı hakkında daha fazla bilgi edinmek için İş Akışı Durumu Yalıtımı belgelerine bakın.