Skip to content

aea.multiplexer

Module for the multiplexer class and related classes.

MultiplexerStatus Objects

class MultiplexerStatus(AsyncState)

The connection status class.

__init__

def __init__() -> None

Initialize the connection status.

is_connected

@property
def is_connected() -> bool

Return is connected.

is_connecting

@property
def is_connecting() -> bool

Return is connecting.

is_disconnected

@property
def is_disconnected() -> bool

Return is disconnected.

is_disconnecting

@property
def is_disconnecting() -> bool

Return is disconnected.

AsyncMultiplexer Objects

class AsyncMultiplexer(Runnable, WithLogger)

This class can handle multiple connections at once.

__init__

def __init__(
        connections: Optional[Sequence[Connection]] = None,
        default_connection_index: int = 0,
        loop: Optional[AbstractEventLoop] = None,
        exception_policy: ExceptionPolicyEnum = ExceptionPolicyEnum.propagate,
        threaded: bool = False,
        agent_name: str = "standalone",
        default_routing: Optional[Dict[PublicId, PublicId]] = None,
        default_connection: Optional[PublicId] = None,
        protocols: Optional[List[Union[Protocol, Message]]] = None) -> None

Initialize the connection multiplexer.

Arguments:

  • connections: a sequence of connections.
  • default_connection_index: the index of the connection to use as default. This information is used for envelopes which don't specify any routing context. If connections is None, this parameter is ignored.
  • loop: the event loop to run the multiplexer. If None, a new event loop is created.
  • exception_policy: the exception policy used for connections.
  • threaded: if True, run in threaded mode, else async
  • agent_name: the name of the agent that owns the multiplexer, for logging purposes.
  • default_routing: default routing map
  • default_connection: default connection
  • protocols: protocols used

default_connection

@property
def default_connection() -> Optional[Connection]

Get the default connection.

in_queue

@property
def in_queue() -> AsyncFriendlyQueue

Get the in queue.

out_queue

@property
def out_queue() -> asyncio.Queue

Get the out queue.

connections

@property
def connections() -> Tuple[Connection, ...]

Get the connections.

is_connected

@property
def is_connected() -> bool

Check whether the multiplexer is processing envelopes.

default_routing

@property
def default_routing() -> Dict[PublicId, PublicId]

Get the default routing.

default_routing

@default_routing.setter
def default_routing(default_routing: Dict[PublicId, PublicId]) -> None

Set the default routing.

connection_status

@property
def connection_status() -> MultiplexerStatus

Get the connection status.

run

async def run() -> None

Run multiplexer connect and receive/send tasks.

set_loop

def set_loop(loop: AbstractEventLoop) -> None

Set event loop and all event loop related objects.

Arguments:

  • loop: asyncio event loop.

add_connection

def add_connection(connection: Connection, is_default: bool = False) -> None

Add a connection to the multiplexer.

Arguments:

  • connection: the connection to add.
  • is_default: whether the connection added should be the default one.

connect

async def connect() -> None

Connect the multiplexer.

disconnect

async def disconnect() -> None

Disconnect the multiplexer.

get

def get(block: bool = False,
        timeout: Optional[float] = None) -> Optional[Envelope]

Get an envelope within a timeout.

Arguments:

  • block: make the call blocking (ignore the timeout).
  • timeout: the timeout to wait until an envelope is received.

Returns:

the envelope, or None if no envelope is available within a timeout.

async_get

async def async_get() -> Envelope

Get an envelope async way.

Returns:

the envelope

async_wait

async def async_wait() -> None

Get an envelope async way.

Returns:

the envelope

put

def put(envelope: Envelope) -> None

Schedule an envelope for sending it.

Notice that the output queue is an asyncio.Queue which uses an event loop running on a different thread than the one used in this function.

Arguments:

  • envelope: the envelope to be sent.

Multiplexer Objects

class Multiplexer(AsyncMultiplexer)

Transit sync multiplexer for compatibility.

__init__

def __init__(*args: Any, **kwargs: Any) -> None

Initialize the connection multiplexer.

Arguments:

  • args: arguments
  • kwargs: keyword arguments

set_loop

def set_loop(loop: AbstractEventLoop) -> None

Set event loop and all event loop related objects.

Arguments:

  • loop: asyncio event loop.

connect

def connect() -> None

Connect the multiplexer.

Synchronously in thread spawned if new loop created.

disconnect

def disconnect() -> None

Disconnect the multiplexer.

Also stops a dedicated thread for event loop if spawned on connect.

put

def put(envelope: Envelope) -> None

Schedule an envelope for sending it.

Notice that the output queue is an asyncio.Queue which uses an event loop running on a different thread than the one used in this function.

Arguments:

  • envelope: the envelope to be sent.

InBox Objects

class InBox()

A queue from where you can only consume envelopes.

__init__

def __init__(multiplexer: AsyncMultiplexer) -> None

Initialize the inbox.

Arguments:

  • multiplexer: the multiplexer

empty

def empty() -> bool

Check for a envelope on the in queue.

Returns:

boolean indicating whether there is an envelope or not

get

def get(block: bool = False, timeout: Optional[float] = None) -> Envelope

Check for a envelope on the in queue.

Arguments:

  • block: make the call blocking (ignore the timeout).
  • timeout: times out the block after timeout seconds.

Raises:

  • Empty: if the attempt to get an envelope fails.

Returns:

the envelope object.

get_nowait

def get_nowait() -> Optional[Envelope]

Check for a envelope on the in queue and wait for no time.

Returns:

the envelope object

async_get

async def async_get() -> Envelope

Check for a envelope on the in queue.

Returns:

the envelope object.

async_wait

async def async_wait() -> None

Check for a envelope on the in queue.

OutBox Objects

class OutBox()

A queue from where you can only enqueue envelopes.

__init__

def __init__(multiplexer: AsyncMultiplexer) -> None

Initialize the outbox.

Arguments:

  • multiplexer: the multiplexer

empty

def empty() -> bool

Check for a envelope on the in queue.

Returns:

boolean indicating whether there is an envelope or not

put

def put(envelope: Envelope) -> None

Put an envelope into the queue.

Arguments:

  • envelope: the envelope.

put_message

def put_message(message: Message,
                context: Optional[EnvelopeContext] = None) -> None

Put a message in the outbox.

This constructs an envelope with the input arguments.

Arguments:

  • message: the message
  • context: the envelope context