Skip to content

Use multiplexer stand-alone

The Multiplexer can be used stand-alone. This way a developer can utilise the protocols and connections independent of the Agent or AEA classes.

First, get the required packages from IPFS.

mkdir packages
aea create my_aea
cd my_aea
aea add protocol fetchai/default:1.0.0:bafybeihdvtmnz7fzy7kwi3wlo6rfl27f6q3g5entplgvq7y23i3v5uoz24 --remote
aea push connection fetchai/default --local
aea add connection fetchai/stub:0.21.0:bafybeibybboiwgklfiqpkkcw6rwj65s5jalzfzf6mh6fstxdlt6habzwvy --remote
aea push connection fetchai/stub --local
cd ..
aea delete my_aea

Then, import the Python and application specific libraries and set the static variables.

import os
import time
from copy import copy
from threading import Thread
from typing import Optional

from aea.configurations.base import ConnectionConfig
from aea.helpers.file_io import write_with_lock
from aea.identity.base import Identity
from aea.mail.base import Envelope
from aea.multiplexer import Multiplexer

from packages.fetchai.connections.stub.connection import StubConnection
from packages.fetchai.protocols.default.message import DefaultMessage


INPUT_FILE = "input.txt"
OUTPUT_FILE = "output.txt"

Instantiate a Multiplexer

A Multiplexer only needs a list of connections. The StubConnection is a simple connection which reads from and writes to file.

    # Ensure the input and output files do not exist initially
    if os.path.isfile(INPUT_FILE):
        os.remove(INPUT_FILE)
    if os.path.isfile(OUTPUT_FILE):
        os.remove(OUTPUT_FILE)

    # create the connection and multiplexer objects
    configuration = ConnectionConfig(
        input_file=INPUT_FILE,
        output_file=OUTPUT_FILE,
        connection_id=StubConnection.connection_id,
    )
    stub_connection = StubConnection(
        configuration=configuration,
        data_dir=".",
        identity=Identity("some_agent", "some_address", "some_public_key"),
    )
    multiplexer = Multiplexer([stub_connection], protocols=[DefaultMessage])

Start the Multiplexer

We can run a multiplexer by calling, connect() which starts the receive and sending loops. We run the multiplexer from a different thread so that we can still use the main thread to pass it messages.

    try:
        # Set the multiplexer running in a different thread
        t = Thread(target=multiplexer.connect)
        t.start()

        # Wait for everything to start up
        for _ in range(20):
            if multiplexer.is_connected:
                break
            time.sleep(1)
        else:
            raise Exception("Not connected")

Send and receive an envelope

We use the input and output text files to send an envelope to our agent and receive a response

        # Create a message inside an envelope and get the stub connection to pass it into the multiplexer
        message_text = (
            "multiplexer,some_agent,fetchai/default:1.0.0,\x08\x01*\x07\n\x05hello,"
        )
        with open(INPUT_FILE, "w") as f:
            write_with_lock(f, message_text)  # type: ignore[arg-type]

        # Wait for the envelope to get processed
        for _ in range(20):
            if not multiplexer.in_queue.empty():
                break
            time.sleep(1)
        else:
            raise Exception("No message!")

        # get the envelope
        envelope: Optional[Envelope] = multiplexer.get()
        assert envelope is not None

        # Inspect its contents
        print(
            "Envelope received by Multiplexer: sender={}, to={}, protocol_specification_id={}, message={}".format(  # type: ignore[str-bytes-safe]
                envelope.sender,
                envelope.to,
                envelope.protocol_specification_id,
                envelope.message,
            )
        )

        # Create a mirrored response envelope
        response_envelope = copy(envelope)
        response_envelope.to = envelope.sender
        response_envelope.sender = envelope.to

        # Send the envelope back
        multiplexer.put(response_envelope)

        # Read the output envelope generated by the multiplexer
        with open(OUTPUT_FILE, "r") as f:
            print("Envelope received from Multiplexer: " + f.readline())

Shutdown

Finally stop our multiplexer and wait for it to finish

    finally:
        # Shut down the multiplexer
        multiplexer.disconnect()
        t.join()

Your turn

Now it is your turn to develop a simple use case which utilises the Multiplexer to send and receive Envelopes.

Entire code listing

If you just want to copy and paste the entire script in you can find it here:

Click here to see full listing

import os
import time
from copy import copy
from threading import Thread
from typing import Optional

from aea.configurations.base import ConnectionConfig
from aea.helpers.file_io import write_with_lock
from aea.identity.base import Identity
from aea.mail.base import Envelope
from aea.multiplexer import Multiplexer

from packages.fetchai.connections.stub.connection import StubConnection
from packages.fetchai.protocols.default.message import DefaultMessage


INPUT_FILE = "input.txt"
OUTPUT_FILE = "output.txt"


def run() -> None:
    """Run demo."""

    # Ensure the input and output files do not exist initially
    if os.path.isfile(INPUT_FILE):
        os.remove(INPUT_FILE)
    if os.path.isfile(OUTPUT_FILE):
        os.remove(OUTPUT_FILE)

    # create the connection and multiplexer objects
    configuration = ConnectionConfig(
        input_file=INPUT_FILE,
        output_file=OUTPUT_FILE,
        connection_id=StubConnection.connection_id,
    )
    stub_connection = StubConnection(
        configuration=configuration,
        data_dir=".",
        identity=Identity("some_agent", "some_address", "some_public_key"),
    )
    multiplexer = Multiplexer([stub_connection], protocols=[DefaultMessage])
    try:
        # Set the multiplexer running in a different thread
        t = Thread(target=multiplexer.connect)
        t.start()

        # Wait for everything to start up
        for _ in range(20):
            if multiplexer.is_connected:
                break
            time.sleep(1)
        else:
            raise Exception("Not connected")

        # Create a message inside an envelope and get the stub connection to pass it into the multiplexer
        message_text = (
            "multiplexer,some_agent,fetchai/default:1.0.0,\x08\x01*\x07\n\x05hello,"
        )
        with open(INPUT_FILE, "w") as f:
            write_with_lock(f, message_text)  # type: ignore[arg-type]

        # Wait for the envelope to get processed
        for _ in range(20):
            if not multiplexer.in_queue.empty():
                break
            time.sleep(1)
        else:
            raise Exception("No message!")

        # get the envelope
        envelope: Optional[Envelope] = multiplexer.get()
        assert envelope is not None

        # Inspect its contents
        print(
            "Envelope received by Multiplexer: sender={}, to={}, protocol_specification_id={}, message={}".format(  # type: ignore[str-bytes-safe]
                envelope.sender,
                envelope.to,
                envelope.protocol_specification_id,
                envelope.message,
            )
        )

        # Create a mirrored response envelope
        response_envelope = copy(envelope)
        response_envelope.to = envelope.sender
        response_envelope.sender = envelope.to

        # Send the envelope back
        multiplexer.put(response_envelope)

        # Read the output envelope generated by the multiplexer
        with open(OUTPUT_FILE, "r") as f:
            print("Envelope received from Multiplexer: " + f.readline())
    finally:
        # Shut down the multiplexer
        multiplexer.disconnect()
        t.join()


if __name__ == "__main__":
    run()