Share via


ProcessBuilder Class

A builder for a process.

Note: This class is marked as 'experimental' and may change in the future.

Initialize the ProcessStepBuilder with a step class type and name.

Constructor

ProcessBuilder()

Methods

add_step

Register a step type with optional constructor arguments.

add_step_from_process

Adds a step from the given process.

build

Builds the KernelProcess.

build_step

Builds the process step.

link_to

Links to the given event ID.

on_input_event

Creates a new ProcessEdgeBuilder for the input event.

resolve_function_target

Resolves the function target.

where_input_event_is

Filters the input event.

add_step

Register a step type with optional constructor arguments.

add_step(step_type: type[TStep], name: str | None = None, initial_state: TState | None = None, factory_function: Callable | None = None, **kwargs) -> ProcessStepBuilder

Parameters

Name Description
step_type
Required

The step type.

name

The name of the step. Defaults to None.

Default value: None
initial_state

The initial state of the step. Defaults to None.

Default value: None
factory_function

The factory function. Allows for a callable that is used to create the step instance that may have complex dependencies that cannot be JSON serialized or deserialized. Defaults to None.

Default value: None
kwargs
Required

Additional keyword arguments.

Returns

Type Description

The process step builder.

add_step_from_process

Adds a step from the given process.

add_step_from_process(kernel_process: ProcessBuilder) -> ProcessBuilder

Parameters

Name Description
kernel_process
Required

build

Builds the KernelProcess.

build() -> KernelProcess

build_step

Builds the process step.

build_step() -> KernelProcessStepInfo

Links to the given event ID.

link_to(event_id: str, edge_builder: ProcessStepEdgeBuilder) -> None

Parameters

Name Description
event_id
Required
edge_builder
Required

on_input_event

Creates a new ProcessEdgeBuilder for the input event.

on_input_event(event_id: str | Enum) -> ProcessEdgeBuilder

Parameters

Name Description
event_id
Required

resolve_function_target

Resolves the function target.

resolve_function_target(function_name: str | None, parameter_name: str | None) -> KernelProcessFunctionTarget

Parameters

Name Description
function_name
Required
parameter_name
Required

where_input_event_is

Filters the input event.

where_input_event_is(event_id: str | Enum) -> ProcessFunctionTargetBuilder

Parameters

Name Description
event_id
Required

Attributes

edges

edges: dict[str, list[Any]]

entry_steps

entry_steps: list[ProcessStepBuilder]

event_namespace

event_namespace: str

external_event_target_map

external_event_target_map: dict[str, ProcessFunctionTargetBuilder]

factories

factories: dict[str, Callable]

function_type

function_type: type[TStep] | None

functions_dict

functions_dict: dict[str, 'KernelFunctionMetadata']

has_parent_process

has_parent_process: bool

id

id: str | None

initial_state

initial_state: TState | None

is_experimental

is_experimental = True

name

name: str

stage_status

stage_status = 'experimental'

steps

steps: list[ProcessStepBuilder]