Skip to content

plugins.aea-cli-benchmark.aea_cli_benchmark.utils

Performance checks utils.

ROOT_DIR

type: ignore

wait_for_condition

def wait_for_condition(condition_checker: Callable,
                       timeout: int = 2,
                       error_msg: str = "Timeout") -> None

Wait for condition occurs in selected timeout.

make_agent

def make_agent(agent_name: str = "my_agent",
               runtime_mode: str = "threaded",
               resources: Optional[Resources] = None,
               wallet: Optional[Wallet] = None,
               identity: Optional[Identity] = None,
               packages_dir=PACKAGES_DIR,
               default_ledger=None) -> AEA

Make AEA instance.

make_envelope

def make_envelope(sender: str,
                  to: str,
                  message: Optional[Message] = None) -> Envelope

Make an envelope.

GeneratorConnection Objects

class GeneratorConnection(Connection)

Generates messages and count.

__init__

def __init__(*args: Any, **kwargs: Any)

Init connection.

enable

def enable() -> None

Enable message generation.

disable

def disable() -> None

Disable message generation.

connect

async def connect() -> None

Connect connection.

disconnect

async def disconnect() -> None

Disconnect connection.

send

async def send(envelope: "Envelope") -> None

Handle incoming envelope.

receive

async def receive(*args: Any, **kwargs: Any) -> Optional["Envelope"]

Generate an envelope.

make

@classmethod
def make(cls) -> "GeneratorConnection"

Construct connection instance.

SyncedGeneratorConnection Objects

class SyncedGeneratorConnection(GeneratorConnection)

Synchronized message generator.

__init__

def __init__(*args: Any, **kwargs: Any) -> None

Init connection.

condition

@property
def condition() -> asyncio.Event

Get condition.

connect

async def connect() -> None

Connect connection.

send

async def send(envelope: "Envelope") -> None

Handle incoming envelope.

receive

async def receive(*args: Any, **kwargs: Any) -> Optional["Envelope"]

Generate an envelope.

make_skill

def make_skill(agent: AEA,
               handlers: Optional[Dict[str, Type[Handler]]] = None,
               behaviours: Optional[Dict[str, Type[Behaviour]]] = None,
               skill_id: Optional[PublicId] = None) -> Skill

Construct skill instance for agent from behaviours.

get_mem_usage_in_mb

def get_mem_usage_in_mb() -> float

Get memory usage of the current process in megabytes.

multi_run

def multi_run(num_runs: int, fn: Callable,
              args: Tuple) -> List[Tuple[str, Any, Any, Any]]

Perform multiple test runs.

Arguments:

  • num_runs: host many times to run
  • fn: callable that returns list of tuples with result
  • args: args to pass to callable

Returns:

list of tuples of results

def print_results(
        output_format: str, parameters: Dict,
        result_fn: Callable[..., List[Tuple[str, Any, Any, Any]]]) -> Any

Print result for multi_run response.

make_identity_from_wallet

def make_identity_from_wallet(name, wallet, default_ledger)

Make indentity for ledger id and wallet specified.

with_packages

@contextmanager
def with_packages(packages: List[Tuple[str, str]],
                  packages_dir: Optional[Path] = None)

Download and install packages context manager.