Skip to content

aea.manager.manager

This module contains the implementation of AEA agents manager.

ProjectNotFoundError Objects

class ProjectNotFoundError(ValueError)

Project not found exception.

ProjectCheckError Objects

class ProjectCheckError(ValueError)

Project check error exception.

__init__

def __init__(msg: str, source_exception: Exception)

Init exception.

ProjectPackageConsistencyCheckError Objects

class ProjectPackageConsistencyCheckError(ValueError)

Check consistency of package versions against already added project.

__init__

def __init__(agent_project_id: PublicId,
             conflicting_packages: List[Tuple[PackageIdPrefix, str, str,
                                              Set[PublicId]]])

Initialize the exception.

Arguments:

  • agent_project_id: the agent project id whose addition has failed.
  • conflicting_packages: the conflicting packages.

BaseAgentRunTask Objects

class BaseAgentRunTask(ABC)

Base abstract class for agent run tasks.

start

@abstractmethod
def start() -> None

Start task.

wait

@abstractmethod
def wait() -> asyncio.Future

Return future to wait task completed.

stop

@abstractmethod
def stop() -> None

Stop task.

is_running

@property
@abstractmethod
def is_running() -> bool

Return is task running.

AgentRunAsyncTask Objects

class AgentRunAsyncTask(BaseAgentRunTask)

Async task wrapper for agent.

__init__

def __init__(agent: AEA, loop: asyncio.AbstractEventLoop) -> None

Init task with agent alias and loop.

create_run_loop

def create_run_loop() -> None

Create run loop.

start

def start() -> None

Start task.

wait

def wait() -> asyncio.Future

Return future to wait task completed.

stop

def stop() -> None

Stop task.

run

async def run() -> None

Run task body.

is_running

@property
def is_running() -> bool

Return is task running.

AgentRunThreadTask Objects

class AgentRunThreadTask(AgentRunAsyncTask)

Threaded wrapper to run agent.

__init__

def __init__(agent: AEA, loop: asyncio.AbstractEventLoop) -> None

Init task with agent alias and loop.

create_run_loop

def create_run_loop() -> None

Create run loop.

start

def start() -> None

Run task in a dedicated thread.

stop

def stop() -> None

Stop the task.

AgentRunProcessTask Objects

class AgentRunProcessTask(BaseAgentRunTask)

Subprocess wrapper to run agent.

PROCESS_JOIN_TIMEOUT

in seconds

PROCESS_ALIVE_SLEEP_TIME

in seconds

__init__

def __init__(agent_alias: AgentAlias, loop: asyncio.AbstractEventLoop) -> None

Init task with agent alias and loop.

start

def start() -> None

Run task in a dedicated process.

wait

def wait() -> asyncio.Future

Return future to wait task completed.

stop

def stop() -> None

Stop the task.

is_running

@property
def is_running() -> bool

Is agent running.

MultiAgentManager Objects

class MultiAgentManager()

Multi agents manager.

__init__

def __init__(working_dir: str,
             mode: str = "async",
             registry_path: str = DEFAULT_REGISTRY_NAME,
             auto_add_remove_project: bool = False,
             password: Optional[str] = None) -> None

Initialize manager.

Arguments:

  • working_dir: directory to store base agents.
  • mode: str. async or threaded
  • registry_path: str. path to the local packages registry
  • auto_add_remove_project: bool. add/remove project on the first agent add/last agent remove
  • password: the password to encrypt/decrypt the private key.

data_dir

@property
def data_dir() -> str

Get the certs directory.

get_data_dir_of_agent

def get_data_dir_of_agent(agent_name: str) -> str

Get the data directory of a specific agent.

is_running

@property
def is_running() -> bool

Is manager running.

dict_state

@property
def dict_state() -> Dict[str, Any]

Create MultiAgentManager dist state.

projects

@property
def projects() -> Dict[PublicId, Project]

Get all projects.

add_error_callback

def add_error_callback(
    error_callback: Callable[[str, BaseException],
                             None]) -> "MultiAgentManager"

Add error callback to call on error raised.

start_manager

def start_manager(local: bool = False,
                  remote: bool = False) -> "MultiAgentManager"

Start manager.

If local = False and remote = False, then the packages are fetched in mixed mode (i.e. first try from local registry, and then from remote registry in case of failure).

Arguments:

  • local: whether or not to fetch from local registry.
  • remote: whether or not to fetch from remote registry.

Returns:

the MultiAgentManager instance.

last_start_status

@property
def last_start_status() -> Tuple[
    bool,
    Dict[PublicId, List[Dict]],
    List[Tuple[PublicId, List[Dict], Exception]],
]

Get status of the last agents start loading state.

stop_manager

def stop_manager(cleanup: bool = True,
                 save: bool = False) -> "MultiAgentManager"

Stop manager.

Stops all running agents and stop agent.

Arguments:

  • cleanup: bool is cleanup on stop.
  • save: bool is save state to file on stop.

Returns:

None

add_project

def add_project(public_id: PublicId,
                local: bool = False,
                remote: bool = False,
                restore: bool = False) -> "MultiAgentManager"

Fetch agent project and all dependencies to working_dir.

If local = False and remote = False, then the packages are fetched in mixed mode (i.e. first try from local registry, and then from remote registry in case of failure).

Arguments:

  • public_id: the public if of the agent project.
  • local: whether or not to fetch from local registry.
  • remote: whether or not to fetch from remote registry.
  • restore: bool flag for restoring already fetched agent.

Returns:

self

remove_project

def remove_project(public_id: PublicId,
                   keep_files: bool = False) -> "MultiAgentManager"

Remove agent project.

list_projects

def list_projects() -> List[PublicId]

List all agents projects added.

Returns:

list of public ids of projects

add_agent

def add_agent(public_id: PublicId,
              agent_name: Optional[str] = None,
              agent_overrides: Optional[dict] = None,
              component_overrides: Optional[List[dict]] = None,
              local: bool = False,
              remote: bool = False,
              restore: bool = False) -> "MultiAgentManager"

Create new agent configuration based on project with config overrides applied.

Alias is stored in memory only!

Arguments:

  • public_id: base agent project public id
  • agent_name: unique name for the agent
  • agent_overrides: overrides for agent config.
  • component_overrides: overrides for component section.
  • local: whether or not to fetch from local registry.
  • remote: whether or not to fetch from remote registry.
  • restore: bool flag for restoring already fetched agent.

Returns:

self

add_agent_with_config

def add_agent_with_config(
        public_id: PublicId,
        config: List[dict],
        agent_name: Optional[str] = None) -> "MultiAgentManager"

Create new agent configuration based on project with config provided.

Alias is stored in memory only!

Arguments:

  • public_id: base agent project public id
  • agent_name: unique name for the agent
  • config: agent config (used for agent re-creation).

Returns:

manager

get_agent_overridables

def get_agent_overridables(agent_name: str) -> Tuple[Dict, List[Dict]]

Get agent config overridables.

Arguments:

  • agent_name: str

Returns:

Tuple of agent overridables dict and and list of component overridables dict.

set_agent_overrides

def set_agent_overrides(
        agent_name: str, agent_overides: Optional[Dict],
        components_overrides: Optional[List[Dict]]) -> "MultiAgentManager"

Set agent overrides.

Arguments:

  • agent_name: str
  • agent_overides: optional dict of agent config overrides
  • components_overrides: optional list of dict of components overrides

Returns:

self

list_agents_info

def list_agents_info() -> List[Dict[str, Any]]

List agents detailed info.

Returns:

list of dicts that represents agent info: public_id, name, is_running.

list_agents

def list_agents(running_only: bool = False) -> List[str]

List all agents.

Arguments:

  • running_only: returns only running if set to True

Returns:

list of agents names

remove_agent

def remove_agent(
        agent_name: str,
        skip_project_auto_remove: bool = False) -> "MultiAgentManager"

Remove agent alias definition from registry.

Arguments:

  • agent_name: agent name to remove
  • skip_project_auto_remove: disable auto project remove on last agent removed.

Returns:

None

start_agent

def start_agent(agent_name: str) -> "MultiAgentManager"

Start selected agent.

Arguments:

  • agent_name: agent name to start

Returns:

None

start_all_agents

def start_all_agents() -> "MultiAgentManager"

Start all not started agents.

Returns:

None

stop_agent

def stop_agent(agent_name: str) -> "MultiAgentManager"

Stop running agent.

Arguments:

  • agent_name: agent name to stop

Returns:

self

stop_all_agents

def stop_all_agents() -> "MultiAgentManager"

Stop all agents running.

Returns:

self

stop_agents

def stop_agents(agent_names: List[str]) -> "MultiAgentManager"

Stop specified agents.

Arguments:

  • agent_names: names of agents

Returns:

self

start_agents

def start_agents(agent_names: List[str]) -> "MultiAgentManager"

Stop specified agents.

Arguments:

  • agent_names: names of agents

Returns:

self

get_agent_alias

def get_agent_alias(agent_name: str) -> AgentAlias

Return details about agent alias definition.

Arguments:

  • agent_name: name of agent

Returns:

AgentAlias