Share via


LocalStep Class

A local step that is part of a local process.

Note: This class is marked as 'experimental' and may change in the future.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Constructor

LocalStep(*, kernel: Kernel, step_info: KernelProcessStepInfo, outgoing_event_queue: Queue[LocalEvent] = None, initialize_task: bool | None = False, event_namespace: str, step_state: KernelProcessStepState, inputs: dict[str, dict[str, Any | None]] = None, initial_inputs: dict[str, dict[str, Any | None]] = None, functions: dict[str, KernelFunction] = None, output_edges: dict[str, list[KernelProcessEdge]] = None, parent_process_id: str | None = None, init_lock: Lock = None, factories: dict[str, Callable])

Keyword-Only Parameters

Name Description
kernel
Required
step_info
Required
outgoing_event_queue
Required
initialize_task
Required
event_namespace
Required
step_state
Required
inputs
Required
initial_inputs
Required
functions
Required
output_edges
Required
parent_process_id
Required
init_lock
Required
factories
Required

Methods

emit_event

Emits an event from the step.

emit_local_event

Emits an event from the step.

get_all_events

Retrieves all events that have been emitted by this step in the previous superstep.

get_edge_for_event

Retrieves all edges that are associated with the provided event Id.

handle_message

Handles a LocalMessage that has been sent to the step.

initialize_step

Initializes the step.

invoke_function

Invokes the function.

parse_initial_configuration

Parses the initial configuration of the step.

scoped_event

Generates a scoped event for the step.

scoped_event_from_kernel_process

Generates a scoped event for the step from a KernelProcessEvent.

to_kernel_process_step_info

Extracts the current state of the step and returns it as a KernelProcessStepInfo.

emit_event

Emits an event from the step.

async emit_event(process_event: KernelProcessEvent)

Parameters

Name Description
process_event
Required

emit_local_event

Emits an event from the step.

async emit_local_event(local_event: LocalEvent)

Parameters

Name Description
local_event
Required

get_all_events

Retrieves all events that have been emitted by this step in the previous superstep.

get_all_events() -> list[LocalEvent]

get_edge_for_event

Retrieves all edges that are associated with the provided event Id.

get_edge_for_event(event_id: str) -> list[KernelProcessEdge]

Parameters

Name Description
event_id
Required

handle_message

Handles a LocalMessage that has been sent to the step.

async handle_message(message: LocalMessage)

Parameters

Name Description
message
Required

initialize_step

Initializes the step.

async initialize_step()

invoke_function

Invokes the function.

async invoke_function(function: KernelFunction, kernel: Kernel, arguments: dict[str, Any])

Parameters

Name Description
function
Required
kernel
Required
arguments
Required

parse_initial_configuration

Parses the initial configuration of the step.

parse_initial_configuration(data: Any) -> Any

Parameters

Name Description
data
Required

scoped_event

Generates a scoped event for the step.

scoped_event(local_event: LocalEvent) -> LocalEvent

Parameters

Name Description
local_event
Required

scoped_event_from_kernel_process

Generates a scoped event for the step from a KernelProcessEvent.

scoped_event_from_kernel_process(process_event: KernelProcessEvent) -> LocalEvent

Parameters

Name Description
process_event
Required

to_kernel_process_step_info

Extracts the current state of the step and returns it as a KernelProcessStepInfo.

async to_kernel_process_step_info() -> KernelProcessStepInfo

Attributes

id

Gets the ID of the step.

name

Gets the name of the step.

event_namespace

event_namespace: str

factories

factories: dict[str, Callable]

functions

functions: dict[str, KernelFunction]

init_lock

init_lock: Lock

initial_inputs

initial_inputs: dict[str, dict[str, Any | None]]

initialize_task

initialize_task: bool | None

inputs

inputs: dict[str, dict[str, Any | None]]

is_experimental

is_experimental = True

kernel

kernel: Kernel

outgoing_event_queue

outgoing_event_queue: Queue[LocalEvent]

output_edges

output_edges: dict[str, list[KernelProcessEdge]]

parent_process_id

parent_process_id: str | None

stage_status

stage_status = 'experimental'

step_info

step_info: KernelProcessStepInfo

step_state

step_state: KernelProcessStepState