Skip to content

Event Bus

The event bus is the backbone of FlexiFlow. Every state change, message, and component lifecycle event flows through it.

Priority ranges from 1 (highest) to 5 (lowest). Lower numbers are called first.

handle = await bus.subscribe(
"my.event", "my_component", handler, priority=2
)
await bus.publish("my.event", data)
bus.unsubscribe(handle)
bus.unsubscribe_all("my_component")

When publishing, you can choose how handlers are invoked:

Handlers run one at a time, in priority order. If one fails, the error policy determines what happens next.

await bus.publish("my.event", data, delivery="sequential")

Handlers run in parallel via asyncio.gather. Fastest when handlers are independent.

await bus.publish("my.event", data, delivery="concurrent")

Error policies control what happens when a handler raises an exception:

PolicyBehavior
continueLog the error and keep calling remaining handlers
raiseFail fast — re-raise the exception immediately

Set the policy when creating the bus or per-publish call.

FlexiFlow emits these built-in events so you can monitor component and engine activity:

EventWhenPayload
engine.component.registeredComponent registered with the engine{component}
component.message.receivedA message is received by a component{component, message}
state.changedA state transition occurs{component, from_state, to_state}
event.handler.failedA handler throws an exception (continue mode){event_name, component_name, exception}

Subscribe to these events like any other:

await bus.subscribe("state.changed", "monitor", my_state_logger, priority=5)

For handlers that call flaky services or do I/O, use the retry decorator:

from flexiflow.extras.retry import retry_async, RetryConfig
@retry_async(RetryConfig(max_attempts=3, base_delay=0.2, jitter=0.2))
async def my_handler(data):
await call_flaky_service(data)

RetryConfig fields:

FieldDefaultDescription
max_attempts3Total attempts (including the first)
base_delay1.0Seconds to wait between attempts
jitter0.0Random jitter added to delay (in seconds)