Use multiplexer stand-alone
The Multiplexer
can be used stand-alone. This way a developer can utilise the protocols and connections independent of the Agent
or AEA
classes.
First, get the required packages from IPFS.
mkdir packages
aea create my_aea
cd my_aea
aea add protocol fetchai/default:1.0.0:bafybeihdvtmnz7fzy7kwi3wlo6rfl27f6q3g5entplgvq7y23i3v5uoz24 --remote
aea push connection fetchai/default --local
aea add connection fetchai/stub:0.21.0:bafybeibybboiwgklfiqpkkcw6rwj65s5jalzfzf6mh6fstxdlt6habzwvy --remote
aea push connection fetchai/stub --local
cd ..
aea delete my_aea
Then, import the Python and application specific libraries and set the static variables.
import os
import time
from copy import copy
from threading import Thread
from typing import Optional
from aea.configurations.base import ConnectionConfig
from aea.helpers.file_io import write_with_lock
from aea.identity.base import Identity
from aea.mail.base import Envelope
from aea.multiplexer import Multiplexer
from packages.fetchai.connections.stub.connection import StubConnection
from packages.fetchai.protocols.default.message import DefaultMessage
INPUT_FILE = "input.txt"
OUTPUT_FILE = "output.txt"
Instantiate a Multiplexer
A Multiplexer
only needs a list of connections. The StubConnection
is a simple connection which reads from and writes to file.
# Ensure the input and output files do not exist initially
if os.path.isfile(INPUT_FILE):
os.remove(INPUT_FILE)
if os.path.isfile(OUTPUT_FILE):
os.remove(OUTPUT_FILE)
# create the connection and multiplexer objects
configuration = ConnectionConfig(
input_file=INPUT_FILE,
output_file=OUTPUT_FILE,
connection_id=StubConnection.connection_id,
)
stub_connection = StubConnection(
configuration=configuration,
data_dir=".",
identity=Identity("some_agent", "some_address", "some_public_key"),
)
multiplexer = Multiplexer([stub_connection], protocols=[DefaultMessage])
Start the Multiplexer
We can run a multiplexer by calling, connect()
which starts the receive and sending loops. We run the multiplexer from a different thread so that we can still use the main thread to pass it messages.
try:
# Set the multiplexer running in a different thread
t = Thread(target=multiplexer.connect)
t.start()
# Wait for everything to start up
for _ in range(20):
if multiplexer.is_connected:
break
time.sleep(1)
else:
raise Exception("Not connected")
Send and receive an envelope
We use the input and output text files to send an envelope to our agent and receive a response
# Create a message inside an envelope and get the stub connection to pass it into the multiplexer
message_text = (
"multiplexer,some_agent,fetchai/default:1.0.0,\x08\x01*\x07\n\x05hello,"
)
with open(INPUT_FILE, "w") as f:
write_with_lock(f, message_text) # type: ignore[arg-type]
# Wait for the envelope to get processed
for _ in range(20):
if not multiplexer.in_queue.empty():
break
time.sleep(1)
else:
raise Exception("No message!")
# get the envelope
envelope: Optional[Envelope] = multiplexer.get()
assert envelope is not None
# Inspect its contents
print(
"Envelope received by Multiplexer: sender={}, to={}, protocol_specification_id={}, message={}".format( # type: ignore[str-bytes-safe]
envelope.sender,
envelope.to,
envelope.protocol_specification_id,
envelope.message,
)
)
# Create a mirrored response envelope
response_envelope = copy(envelope)
response_envelope.to = envelope.sender
response_envelope.sender = envelope.to
# Send the envelope back
multiplexer.put(response_envelope)
# Read the output envelope generated by the multiplexer
with open(OUTPUT_FILE, "r") as f:
print("Envelope received from Multiplexer: " + f.readline())
Shutdown
Finally stop our multiplexer and wait for it to finish
finally:
# Shut down the multiplexer
multiplexer.disconnect()
t.join()
Your turn
Now it is your turn to develop a simple use case which utilises the Multiplexer
to send and receive Envelopes.
Entire code listing
If you just want to copy and paste the entire script in you can find it here:
Click here to see full listing
import os
import time
from copy import copy
from threading import Thread
from typing import Optional
from aea.configurations.base import ConnectionConfig
from aea.helpers.file_io import write_with_lock
from aea.identity.base import Identity
from aea.mail.base import Envelope
from aea.multiplexer import Multiplexer
from packages.fetchai.connections.stub.connection import StubConnection
from packages.fetchai.protocols.default.message import DefaultMessage
INPUT_FILE = "input.txt"
OUTPUT_FILE = "output.txt"
def run() -> None:
"""Run demo."""
# Ensure the input and output files do not exist initially
if os.path.isfile(INPUT_FILE):
os.remove(INPUT_FILE)
if os.path.isfile(OUTPUT_FILE):
os.remove(OUTPUT_FILE)
# create the connection and multiplexer objects
configuration = ConnectionConfig(
input_file=INPUT_FILE,
output_file=OUTPUT_FILE,
connection_id=StubConnection.connection_id,
)
stub_connection = StubConnection(
configuration=configuration,
data_dir=".",
identity=Identity("some_agent", "some_address", "some_public_key"),
)
multiplexer = Multiplexer([stub_connection], protocols=[DefaultMessage])
try:
# Set the multiplexer running in a different thread
t = Thread(target=multiplexer.connect)
t.start()
# Wait for everything to start up
for _ in range(20):
if multiplexer.is_connected:
break
time.sleep(1)
else:
raise Exception("Not connected")
# Create a message inside an envelope and get the stub connection to pass it into the multiplexer
message_text = (
"multiplexer,some_agent,fetchai/default:1.0.0,\x08\x01*\x07\n\x05hello,"
)
with open(INPUT_FILE, "w") as f:
write_with_lock(f, message_text) # type: ignore[arg-type]
# Wait for the envelope to get processed
for _ in range(20):
if not multiplexer.in_queue.empty():
break
time.sleep(1)
else:
raise Exception("No message!")
# get the envelope
envelope: Optional[Envelope] = multiplexer.get()
assert envelope is not None
# Inspect its contents
print(
"Envelope received by Multiplexer: sender={}, to={}, protocol_specification_id={}, message={}".format( # type: ignore[str-bytes-safe]
envelope.sender,
envelope.to,
envelope.protocol_specification_id,
envelope.message,
)
)
# Create a mirrored response envelope
response_envelope = copy(envelope)
response_envelope.to = envelope.sender
response_envelope.sender = envelope.to
# Send the envelope back
multiplexer.put(response_envelope)
# Read the output envelope generated by the multiplexer
with open(OUTPUT_FILE, "r") as f:
print("Envelope received from Multiplexer: " + f.readline())
finally:
# Shut down the multiplexer
multiplexer.disconnect()
t.join()
if __name__ == "__main__":
run()