Skip to content

plugins.aea-cli-benchmark.aea_cli_benchmark.case_reactive.case

Latency and throughput check.

TestConnectionMixIn Objects

class TestConnectionMixIn()

Test connection with messages timing.

__init__

def __init__(*args: Any, **kwargs: Any)

Init connection.

send

async def send(envelope: Envelope) -> None

Handle incoming envelope.

receive

async def receive(*args: Any, **kwargs: Any) -> Optional[Envelope]

Generate outgoing envelope.

TestHandler Objects

class TestHandler(Handler)

Dummy handler to handle messages.

setup

def setup() -> None

Noop setup.

teardown

def teardown() -> None

Noop teardown.

handle

def handle(message: Message) -> None

Handle incoming message.

run

def run(duration: int, runtime_mode: str,
        connection_mode: str) -> List[Tuple[str, Union[int, float]]]

Test memory usage.