aea.multiplexer
Module for the multiplexer class and related classes.
MultiplexerStatus Objects
class MultiplexerStatus(AsyncState)
The connection status class.
__
init__
def __init__() -> None
Initialize the connection status.
is_
connected
@property
def is_connected() -> bool
Return is connected.
is_
connecting
@property
def is_connecting() -> bool
Return is connecting.
is_
disconnected
@property
def is_disconnected() -> bool
Return is disconnected.
is_
disconnecting
@property
def is_disconnecting() -> bool
Return is disconnected.
AsyncMultiplexer Objects
class AsyncMultiplexer(Runnable, WithLogger)
This class can handle multiple connections at once.
__
init__
def __init__(
connections: Optional[Sequence[Connection]] = None,
default_connection_index: int = 0,
loop: Optional[AbstractEventLoop] = None,
exception_policy: ExceptionPolicyEnum = ExceptionPolicyEnum.propagate,
threaded: bool = False,
agent_name: str = "standalone",
default_routing: Optional[Dict[PublicId, PublicId]] = None,
default_connection: Optional[PublicId] = None,
protocols: Optional[List[Union[Protocol, Message]]] = None) -> None
Initialize the connection multiplexer.
Arguments:
connections
: a sequence of connections.default_connection_index
: the index of the connection to use as default. This information is used for envelopes which don't specify any routing context. If connections is None, this parameter is ignored.loop
: the event loop to run the multiplexer. If None, a new event loop is created.exception_policy
: the exception policy used for connections.threaded
: if True, run in threaded mode, else asyncagent_name
: the name of the agent that owns the multiplexer, for logging purposes.default_routing
: default routing mapdefault_connection
: default connectionprotocols
: protocols used
default_
connection
@property
def default_connection() -> Optional[Connection]
Get the default connection.
in_
queue
@property
def in_queue() -> AsyncFriendlyQueue
Get the in queue.
out_
queue
@property
def out_queue() -> asyncio.Queue
Get the out queue.
connections
@property
def connections() -> Tuple[Connection, ...]
Get the connections.
is_
connected
@property
def is_connected() -> bool
Check whether the multiplexer is processing envelopes.
default_
routing
@property
def default_routing() -> Dict[PublicId, PublicId]
Get the default routing.
default_
routing
@default_routing.setter
def default_routing(default_routing: Dict[PublicId, PublicId]) -> None
Set the default routing.
connection_
status
@property
def connection_status() -> MultiplexerStatus
Get the connection status.
run
async def run() -> None
Run multiplexer connect and receive/send tasks.
set_
loop
def set_loop(loop: AbstractEventLoop) -> None
Set event loop and all event loop related objects.
Arguments:
loop
: asyncio event loop.
add_
connection
def add_connection(connection: Connection, is_default: bool = False) -> None
Add a connection to the multiplexer.
Arguments:
connection
: the connection to add.is_default
: whether the connection added should be the default one.
connect
async def connect() -> None
Connect the multiplexer.
disconnect
async def disconnect() -> None
Disconnect the multiplexer.
get
def get(block: bool = False,
timeout: Optional[float] = None) -> Optional[Envelope]
Get an envelope within a timeout.
Arguments:
block
: make the call blocking (ignore the timeout).timeout
: the timeout to wait until an envelope is received.
Returns:
the envelope, or None if no envelope is available within a timeout.
async_
get
async def async_get() -> Envelope
Get an envelope async way.
Returns:
the envelope
async_
wait
async def async_wait() -> None
Get an envelope async way.
Returns:
the envelope
put
def put(envelope: Envelope) -> None
Schedule an envelope for sending it.
Notice that the output queue is an asyncio.Queue which uses an event loop running on a different thread than the one used in this function.
Arguments:
envelope
: the envelope to be sent.
Multiplexer Objects
class Multiplexer(AsyncMultiplexer)
Transit sync multiplexer for compatibility.
__
init__
def __init__(*args: Any, **kwargs: Any) -> None
Initialize the connection multiplexer.
Arguments:
args
: argumentskwargs
: keyword arguments
set_
loop
def set_loop(loop: AbstractEventLoop) -> None
Set event loop and all event loop related objects.
Arguments:
loop
: asyncio event loop.
connect
def connect() -> None
Connect the multiplexer.
Synchronously in thread spawned if new loop created.
disconnect
def disconnect() -> None
Disconnect the multiplexer.
Also stops a dedicated thread for event loop if spawned on connect.
put
def put(envelope: Envelope) -> None
Schedule an envelope for sending it.
Notice that the output queue is an asyncio.Queue which uses an event loop running on a different thread than the one used in this function.
Arguments:
envelope
: the envelope to be sent.
InBox Objects
class InBox()
A queue from where you can only consume envelopes.
__
init__
def __init__(multiplexer: AsyncMultiplexer) -> None
Initialize the inbox.
Arguments:
multiplexer
: the multiplexer
empty
def empty() -> bool
Check for a envelope on the in queue.
Returns:
boolean indicating whether there is an envelope or not
get
def get(block: bool = False, timeout: Optional[float] = None) -> Envelope
Check for a envelope on the in queue.
Arguments:
block
: make the call blocking (ignore the timeout).timeout
: times out the block after timeout seconds.
Raises:
Empty
: if the attempt to get an envelope fails.
Returns:
the envelope object.
get_
nowait
def get_nowait() -> Optional[Envelope]
Check for a envelope on the in queue and wait for no time.
Returns:
the envelope object
async_
get
async def async_get() -> Envelope
Check for a envelope on the in queue.
Returns:
the envelope object.
async_
wait
async def async_wait() -> None
Check for a envelope on the in queue.
OutBox Objects
class OutBox()
A queue from where you can only enqueue envelopes.
__
init__
def __init__(multiplexer: AsyncMultiplexer) -> None
Initialize the outbox.
Arguments:
multiplexer
: the multiplexer
empty
def empty() -> bool
Check for a envelope on the in queue.
Returns:
boolean indicating whether there is an envelope or not
put
def put(envelope: Envelope) -> None
Put an envelope into the queue.
Arguments:
envelope
: the envelope.
put_
message
def put_message(message: Message,
context: Optional[EnvelopeContext] = None) -> None
Put a message in the outbox.
This constructs an envelope with the input arguments.
Arguments:
message
: the messagecontext
: the envelope context