Skip to content

aea.skills.tasks

This module contains the classes for tasks.

Task Objects

class Task(WithLogger)

This class implements an abstract task.

__init__

def __init__(**kwargs: Any) -> None

Initialize a task.

__call__

def __call__(*args: Any, **kwargs: Any) -> Any

Execute the task.

Arguments:

  • args: positional arguments forwarded to the 'execute' method.
  • kwargs: keyword arguments forwarded to the 'execute' method.

Raises:

  • ValueError: if the task has already been executed.

Returns:

the task instance

is_executed

@property
def is_executed() -> bool

Check if the task has already been executed.

result

@property
def result() -> Any

Get the result.

Raises:

  • ValueError: if the task has not been executed yet.

Returns:

the result from the execute method.

setup

def setup() -> None

Implement the task setup.

execute

@abstractmethod
def execute(*args: Any, **kwargs: Any) -> Any

Run the task logic.

Arguments:

  • args: the positional arguments
  • kwargs: the keyword arguments

Returns:

any

teardown

def teardown() -> None

Implement the task teardown.

init_worker

def init_worker() -> None

Initialize a worker.

Disable the SIGINT handler of process pool is using. Related to a well-known bug: https://bugs.python.org/issue8296

TaskManager Objects

class TaskManager(WithLogger)

A Task manager.

__init__

def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
             is_lazy_pool_start: bool = True,
             logger: Optional[logging.Logger] = None,
             pool_mode: str = THREAD_POOL_MODE) -> None

Initialize the task manager.

Arguments:

  • nb_workers: the number of worker processes.
  • is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
  • logger: the logger.
  • pool_mode: str. multithread or multiprocess

is_started

@property
def is_started() -> bool

Get started status of TaskManager.

Returns:

bool

nb_workers

@property
def nb_workers() -> int

Get the number of workers.

Returns:

int

enqueue_task

def enqueue_task(func: Callable,
                 args: Sequence = (),
                 kwargs: Optional[Dict[str, Any]] = None) -> int

Enqueue a task with the executor.

Arguments:

  • func: the callable instance to be enqueued
  • args: the positional arguments to be passed to the function.
  • kwargs: the keyword arguments to be passed to the function.

Raises:

  • ValueError: if the task manager is not running.

Returns:

the task id to get the the result.

get_task_result

def get_task_result(task_id: int) -> AsyncResult

Get the result from a task.

Arguments:

  • task_id: the task id

Returns:

async result for task_id

start

def start() -> None

Start the task manager.

stop

def stop() -> None

Stop the task manager.

ThreadedTaskManager Objects

class ThreadedTaskManager(TaskManager)

A threaded task manager.

__init__

def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
             is_lazy_pool_start: bool = True,
             logger: Optional[logging.Logger] = None) -> None

Initialize the task manager.

Arguments:

  • nb_workers: the number of worker processes.
  • is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
  • logger: the logger.

ProcessTaskManager Objects

class ProcessTaskManager(TaskManager)

A multiprocess task manager.

__init__

def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
             is_lazy_pool_start: bool = True,
             logger: Optional[logging.Logger] = None) -> None

Initialize the task manager.

Arguments:

  • nb_workers: the number of worker processes.
  • is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
  • logger: the logger.