LocalStep Class
A local step that is part of a local process.
Note: This class is marked as 'experimental' and may change in the future.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Constructor
LocalStep(*, kernel: Kernel, step_info: KernelProcessStepInfo, outgoing_event_queue: Queue[LocalEvent] = None, initialize_task: bool | None = False, event_namespace: str, step_state: KernelProcessStepState, inputs: dict[str, dict[str, Any | None]] = None, initial_inputs: dict[str, dict[str, Any | None]] = None, functions: dict[str, KernelFunction] = None, output_edges: dict[str, list[KernelProcessEdge]] = None, parent_process_id: str | None = None, init_lock: Lock = None, factories: dict[str, Callable])
Keyword-Only Parameters
| Name | Description |
|---|---|
|
kernel
Required
|
|
|
step_info
Required
|
|
|
outgoing_event_queue
Required
|
|
|
initialize_task
Required
|
|
|
event_namespace
Required
|
|
|
step_state
Required
|
|
|
inputs
Required
|
|
|
initial_inputs
Required
|
|
|
functions
Required
|
|
|
output_edges
Required
|
|
|
parent_process_id
Required
|
|
|
init_lock
Required
|
|
|
factories
Required
|
|
Methods
| emit_event |
Emits an event from the step. |
| emit_local_event |
Emits an event from the step. |
| get_all_events |
Retrieves all events that have been emitted by this step in the previous superstep. |
| get_edge_for_event |
Retrieves all edges that are associated with the provided event Id. |
| handle_message |
Handles a LocalMessage that has been sent to the step. |
| initialize_step |
Initializes the step. |
| invoke_function |
Invokes the function. |
| parse_initial_configuration |
Parses the initial configuration of the step. |
| scoped_event |
Generates a scoped event for the step. |
| scoped_event_from_kernel_process |
Generates a scoped event for the step from a KernelProcessEvent. |
| to_kernel_process_step_info |
Extracts the current state of the step and returns it as a KernelProcessStepInfo. |
emit_event
Emits an event from the step.
async emit_event(process_event: KernelProcessEvent)
Parameters
| Name | Description |
|---|---|
|
process_event
Required
|
|
emit_local_event
Emits an event from the step.
async emit_local_event(local_event: LocalEvent)
Parameters
| Name | Description |
|---|---|
|
local_event
Required
|
|
get_all_events
Retrieves all events that have been emitted by this step in the previous superstep.
get_all_events() -> list[LocalEvent]
get_edge_for_event
Retrieves all edges that are associated with the provided event Id.
get_edge_for_event(event_id: str) -> list[KernelProcessEdge]
Parameters
| Name | Description |
|---|---|
|
event_id
Required
|
|
handle_message
Handles a LocalMessage that has been sent to the step.
async handle_message(message: LocalMessage)
Parameters
| Name | Description |
|---|---|
|
message
Required
|
|
initialize_step
Initializes the step.
async initialize_step()
invoke_function
Invokes the function.
async invoke_function(function: KernelFunction, kernel: Kernel, arguments: dict[str, Any])
Parameters
| Name | Description |
|---|---|
|
function
Required
|
|
|
kernel
Required
|
|
|
arguments
Required
|
|
parse_initial_configuration
Parses the initial configuration of the step.
parse_initial_configuration(data: Any) -> Any
Parameters
| Name | Description |
|---|---|
|
data
Required
|
|
scoped_event
Generates a scoped event for the step.
scoped_event(local_event: LocalEvent) -> LocalEvent
Parameters
| Name | Description |
|---|---|
|
local_event
Required
|
|
scoped_event_from_kernel_process
Generates a scoped event for the step from a KernelProcessEvent.
scoped_event_from_kernel_process(process_event: KernelProcessEvent) -> LocalEvent
Parameters
| Name | Description |
|---|---|
|
process_event
Required
|
|
to_kernel_process_step_info
Extracts the current state of the step and returns it as a KernelProcessStepInfo.
async to_kernel_process_step_info() -> KernelProcessStepInfo
Attributes
id
Gets the ID of the step.
name
Gets the name of the step.
event_namespace
event_namespace: str
factories
factories: dict[str, Callable]
functions
functions: dict[str, KernelFunction]
init_lock
init_lock: Lock
initial_inputs
initial_inputs: dict[str, dict[str, Any | None]]
initialize_task
initialize_task: bool | None
inputs
inputs: dict[str, dict[str, Any | None]]
is_experimental
is_experimental = True
kernel
kernel: Kernel
outgoing_event_queue
outgoing_event_queue: Queue[LocalEvent]
output_edges
output_edges: dict[str, list[KernelProcessEdge]]
parent_process_id
parent_process_id: str | None
stage_status
stage_status = 'experimental'
step_info
step_info: KernelProcessStepInfo
step_state
step_state: KernelProcessStepState