다음을 통해 공유


워크플로 작성기에서 팩터리 등록

지금까지 우리는 실행기 인스턴스를 생성하고 이를 직접 WorkflowBuilder로 전달했습니다. 이 방법은 단일 워크플로 인스턴스만 필요한 간단한 시나리오에 적합합니다. 그러나 더 복잡한 경우 동일한 워크플로의 격리된 여러 인스턴스를 만들 수 있습니다. 이를 지원하려면 각 워크플로 인스턴스가 자체 실행기 인스턴스 집합을 받아야 합니다. 동일한 실행기를 다시 사용하면 내부 상태가 워크플로 간에 공유되어 의도하지 않은 부작용이 발생합니다. 이를 방지하기 위해 각 워크플로 인스턴스에 WorkflowBuilder대해 새 실행기 인스턴스가 생성되도록 실행기 팩터리를 등록할 수 있습니다.

워크플로 작성기에서 팩터리 등록

곧 출시 예정...

실행기 팩터리를 WorkflowBuilder에 등록하려면 register_executor 메서드를 사용할 수 있습니다. 이 메서드는 실행기의 인스턴스(형식 Executor 또는 파생 Executor)를 만드는 팩터리 함수와 워크플로 구성에 사용할 팩터리의 이름이라는 두 개의 매개 변수를 사용합니다.

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

작성기를 사용하여 워크플로 빌드

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

예상 출력:

Accumulated text length: 22

이제 다른 워크플로 인스턴스를 만들고 실행해 보겠습니다. Accumulate 실행기는 자체 내부 상태를 가져야 하며 첫 번째 워크플로 인스턴스와 상태를 공유하지 않아야 합니다.

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

예상 출력:

Accumulated text length: 11

에이전트 팩토리를 WorkflowBuilder에 등록하려면 register_agent 메서드를 사용할 수 있습니다. 이 메서드는 두 개의 매개 변수를 사용합니다. 즉, 에이전트의 인스턴스를 만드는 팩터리 함수(구현 AgentProtocol하는 형식) 및 워크플로 구성에 사용할 팩터리의 이름입니다.

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

새 워크플로 인스턴스를 만들 때마다 워크플로의 에이전트는 팩터리 함수에서 만든 새 인스턴스가 되고 새 스레드 인스턴스를 가져옵니다.

워크플로 상태 격리

워크플로 상태 격리에 대한 자세한 내용은 워크플로 상태 격리 설명서를 참조하세요.