Skip to content

aea.helpers.multiple_executor

This module contains the helpers to run multiple stoppable tasks in different modes: async, threaded, multiprocess .

ExecutorExceptionPolicies Objects

class ExecutorExceptionPolicies(Enum)

Runner exception policy modes.

stop_all

stop all agents on one agent's failure, log exception

propagate

log exception and reraise it to upper level

log_only

log exception and skip it

AbstractExecutorTask Objects

class AbstractExecutorTask(ABC)

Abstract task class to create Task classes.

__init__

def __init__() -> None

Init task.

future

@property
def future() -> Optional[TaskAwaitable]

Return awaitable to get result of task execution.

future

@future.setter
def future(future: TaskAwaitable) -> None

Set awaitable to get result of task execution.

start

@abstractmethod
def start() -> Tuple[Callable, Sequence[Any]]

Implement start task function here.

stop

@abstractmethod
def stop() -> None

Implement stop task function here.

create_async_task

@abstractmethod
def create_async_task(loop: AbstractEventLoop) -> TaskAwaitable

Create asyncio task for task run in asyncio loop.

Arguments:

  • loop: the event loop

Returns:

task to run in asyncio loop.

id

@property
def id() -> Any

Return task id.

failed

@property
def failed() -> bool

Return was exception failed or not.

If it's running it's not failed.

Returns:

bool

AbstractMultiprocessExecutorTask Objects

class AbstractMultiprocessExecutorTask(AbstractExecutorTask)

Task for multiprocess executor.

start

@abstractmethod
def start() -> Tuple[Callable, Sequence[Any]]

Return function and arguments to call within subprocess.

create_async_task

def create_async_task(loop: AbstractEventLoop) -> TaskAwaitable

Create asyncio task for task run in asyncio loop.

Raise error, cause async mode is not supported, cause this task for multiprocess executor only.

Arguments:

  • loop: the event loop

Raises:

  • ValueError: async task construction not possible

AbstractMultipleExecutor Objects

class AbstractMultipleExecutor(ABC)

Abstract class to create multiple executors classes.

__init__

def __init__(
    tasks: Sequence[AbstractExecutorTask],
    task_fail_policy: ExecutorExceptionPolicies = ExecutorExceptionPolicies.
    propagate
) -> None

Init executor.

Arguments:

  • tasks: sequence of AbstractExecutorTask instances to run.
  • task_fail_policy: the exception policy of all the tasks

is_running

@property
def is_running() -> bool

Return running state of the executor.

start

def start() -> None

Start tasks.

stop

def stop() -> None

Stop tasks.

num_failed

@property
def num_failed() -> int

Return number of failed tasks.

failed_tasks

@property
def failed_tasks() -> Sequence[AbstractExecutorTask]

Return sequence failed tasks.

not_failed_tasks

@property
def not_failed_tasks() -> Sequence[AbstractExecutorTask]

Return sequence successful tasks.

ThreadExecutor Objects

class ThreadExecutor(AbstractMultipleExecutor)

Thread based executor to run multiple agents in threads.

ProcessExecutor Objects

class ProcessExecutor(ThreadExecutor)

Subprocess based executor to run multiple agents in threads.

AsyncExecutor Objects

class AsyncExecutor(AbstractMultipleExecutor)

Thread based executor to run multiple agents in threads.

AbstractMultipleRunner Objects

class AbstractMultipleRunner()

Abstract multiple runner to create classes to launch tasks with selected mode.

__init__

def __init__(
    mode: str,
    fail_policy: ExecutorExceptionPolicies = ExecutorExceptionPolicies.
    propagate
) -> None

Init with selected executor mode.

Arguments:

  • mode: one of supported executor modes
  • fail_policy: one of ExecutorExceptionPolicies to be used with Executor

is_running

@property
def is_running() -> bool

Return state of the executor.

start

def start(threaded: bool = False) -> None

Run agents.

Arguments:

  • threaded: run in dedicated thread without blocking current thread.

stop

def stop(timeout: Optional[float] = None) -> None

Stop agents.

Arguments:

  • timeout: timeout in seconds to wait thread stopped, only if started in thread mode.

num_failed

@property
def num_failed() -> int

Return number of failed tasks.

failed

@property
def failed() -> Sequence[Task]

Return sequence failed tasks.

not_failed

@property
def not_failed() -> Sequence[Task]

Return sequence successful tasks.

try_join_thread

def try_join_thread() -> None

Try to join thread if running in thread mode.