aea.manager.manager
This module contains the implementation of AEA agents manager.
ProjectNotFoundError Objects
class ProjectNotFoundError(ValueError)
Project not found exception.
ProjectCheckError Objects
class ProjectCheckError(ValueError)
Project check error exception.
__
init__
def __init__(msg: str, source_exception: Exception)
Init exception.
ProjectPackageConsistencyCheckError Objects
class ProjectPackageConsistencyCheckError(ValueError)
Check consistency of package versions against already added project.
__
init__
def __init__(agent_project_id: PublicId,
conflicting_packages: List[Tuple[PackageIdPrefix, str, str,
Set[PublicId]]])
Initialize the exception.
Arguments:
agent_project_id
: the agent project id whose addition has failed.conflicting_packages
: the conflicting packages.
BaseAgentRunTask Objects
class BaseAgentRunTask(ABC)
Base abstract class for agent run tasks.
start
@abstractmethod
def start() -> None
Start task.
wait
@abstractmethod
def wait() -> asyncio.Future
Return future to wait task completed.
stop
@abstractmethod
def stop() -> None
Stop task.
is_
running
@property
@abstractmethod
def is_running() -> bool
Return is task running.
AgentRunAsyncTask Objects
class AgentRunAsyncTask(BaseAgentRunTask)
Async task wrapper for agent.
__
init__
def __init__(agent: AEA, loop: asyncio.AbstractEventLoop) -> None
Init task with agent alias and loop.
create_
run_
loop
def create_run_loop() -> None
Create run loop.
start
def start() -> None
Start task.
wait
def wait() -> asyncio.Future
Return future to wait task completed.
stop
def stop() -> None
Stop task.
run
async def run() -> None
Run task body.
is_
running
@property
def is_running() -> bool
Return is task running.
AgentRunThreadTask Objects
class AgentRunThreadTask(AgentRunAsyncTask)
Threaded wrapper to run agent.
__
init__
def __init__(agent: AEA, loop: asyncio.AbstractEventLoop) -> None
Init task with agent alias and loop.
create_
run_
loop
def create_run_loop() -> None
Create run loop.
start
def start() -> None
Run task in a dedicated thread.
stop
def stop() -> None
Stop the task.
AgentRunProcessTask Objects
class AgentRunProcessTask(BaseAgentRunTask)
Subprocess wrapper to run agent.
PROCESS_
JOIN_
TIMEOUT
in seconds
PROCESS_
ALIVE_
SLEEP_
TIME
in seconds
__
init__
def __init__(agent_alias: AgentAlias, loop: asyncio.AbstractEventLoop) -> None
Init task with agent alias and loop.
start
def start() -> None
Run task in a dedicated process.
wait
def wait() -> asyncio.Future
Return future to wait task completed.
stop
def stop() -> None
Stop the task.
is_
running
@property
def is_running() -> bool
Is agent running.
MultiAgentManager Objects
class MultiAgentManager()
Multi agents manager.
__
init__
def __init__(working_dir: str,
mode: str = "async",
registry_path: str = DEFAULT_REGISTRY_NAME,
auto_add_remove_project: bool = False,
password: Optional[str] = None) -> None
Initialize manager.
Arguments:
working_dir
: directory to store base agents.mode
: str. async or threadedregistry_path
: str. path to the local packages registryauto_add_remove_project
: bool. add/remove project on the first agent add/last agent removepassword
: the password to encrypt/decrypt the private key.
data_
dir
@property
def data_dir() -> str
Get the certs directory.
get_
data_
dir_
of_
agent
def get_data_dir_of_agent(agent_name: str) -> str
Get the data directory of a specific agent.
is_
running
@property
def is_running() -> bool
Is manager running.
dict_
state
@property
def dict_state() -> Dict[str, Any]
Create MultiAgentManager dist state.
projects
@property
def projects() -> Dict[PublicId, Project]
Get all projects.
add_
error_
callback
def add_error_callback(
error_callback: Callable[[str, BaseException],
None]) -> "MultiAgentManager"
Add error callback to call on error raised.
start_
manager
def start_manager(local: bool = False,
remote: bool = False) -> "MultiAgentManager"
Start manager.
If local = False and remote = False, then the packages are fetched in mixed mode (i.e. first try from local registry, and then from remote registry in case of failure).
Arguments:
local
: whether or not to fetch from local registry.remote
: whether or not to fetch from remote registry.
Returns:
the MultiAgentManager instance.
last_
start_
status
@property
def last_start_status() -> Tuple[
bool,
Dict[PublicId, List[Dict]],
List[Tuple[PublicId, List[Dict], Exception]],
]
Get status of the last agents start loading state.
stop_
manager
def stop_manager(cleanup: bool = True,
save: bool = False) -> "MultiAgentManager"
Stop manager.
Stops all running agents and stop agent.
Arguments:
cleanup
: bool is cleanup on stop.save
: bool is save state to file on stop.
Returns:
None
add_
project
def add_project(public_id: PublicId,
local: bool = False,
remote: bool = False,
restore: bool = False) -> "MultiAgentManager"
Fetch agent project and all dependencies to working_dir.
If local = False and remote = False, then the packages are fetched in mixed mode (i.e. first try from local registry, and then from remote registry in case of failure).
Arguments:
public_id
: the public if of the agent project.local
: whether or not to fetch from local registry.remote
: whether or not to fetch from remote registry.restore
: bool flag for restoring already fetched agent.
Returns:
self
remove_
project
def remove_project(public_id: PublicId,
keep_files: bool = False) -> "MultiAgentManager"
Remove agent project.
list_
projects
def list_projects() -> List[PublicId]
List all agents projects added.
Returns:
list of public ids of projects
add_
agent
def add_agent(public_id: PublicId,
agent_name: Optional[str] = None,
agent_overrides: Optional[dict] = None,
component_overrides: Optional[List[dict]] = None,
local: bool = False,
remote: bool = False,
restore: bool = False) -> "MultiAgentManager"
Create new agent configuration based on project with config overrides applied.
Alias is stored in memory only!
Arguments:
public_id
: base agent project public idagent_name
: unique name for the agentagent_overrides
: overrides for agent config.component_overrides
: overrides for component section.local
: whether or not to fetch from local registry.remote
: whether or not to fetch from remote registry.restore
: bool flag for restoring already fetched agent.
Returns:
self
add_
agent_
with_
config
def add_agent_with_config(
public_id: PublicId,
config: List[dict],
agent_name: Optional[str] = None) -> "MultiAgentManager"
Create new agent configuration based on project with config provided.
Alias is stored in memory only!
Arguments:
public_id
: base agent project public idagent_name
: unique name for the agentconfig
: agent config (used for agent re-creation).
Returns:
manager
get_
agent_
overridables
def get_agent_overridables(agent_name: str) -> Tuple[Dict, List[Dict]]
Get agent config overridables.
Arguments:
agent_name
: str
Returns:
Tuple of agent overridables dict and and list of component overridables dict.
set_
agent_
overrides
def set_agent_overrides(
agent_name: str, agent_overides: Optional[Dict],
components_overrides: Optional[List[Dict]]) -> "MultiAgentManager"
Set agent overrides.
Arguments:
agent_name
: stragent_overides
: optional dict of agent config overridescomponents_overrides
: optional list of dict of components overrides
Returns:
self
list_
agents_
info
def list_agents_info() -> List[Dict[str, Any]]
List agents detailed info.
Returns:
list of dicts that represents agent info: public_id, name, is_running.
list_
agents
def list_agents(running_only: bool = False) -> List[str]
List all agents.
Arguments:
running_only
: returns only running if set to True
Returns:
list of agents names
remove_
agent
def remove_agent(
agent_name: str,
skip_project_auto_remove: bool = False) -> "MultiAgentManager"
Remove agent alias definition from registry.
Arguments:
agent_name
: agent name to removeskip_project_auto_remove
: disable auto project remove on last agent removed.
Returns:
None
start_
agent
def start_agent(agent_name: str) -> "MultiAgentManager"
Start selected agent.
Arguments:
agent_name
: agent name to start
Returns:
None
start_
all_
agents
def start_all_agents() -> "MultiAgentManager"
Start all not started agents.
Returns:
None
stop_
agent
def stop_agent(agent_name: str) -> "MultiAgentManager"
Stop running agent.
Arguments:
agent_name
: agent name to stop
Returns:
self
stop_
all_
agents
def stop_all_agents() -> "MultiAgentManager"
Stop all agents running.
Returns:
self
stop_
agents
def stop_agents(agent_names: List[str]) -> "MultiAgentManager"
Stop specified agents.
Arguments:
agent_names
: names of agents
Returns:
self
start_
agents
def start_agents(agent_names: List[str]) -> "MultiAgentManager"
Stop specified agents.
Arguments:
agent_names
: names of agents
Returns:
self
get_
agent_
alias
def get_agent_alias(agent_name: str) -> AgentAlias
Return details about agent alias definition.
Arguments:
agent_name
: name of agent
Returns:
AgentAlias