aea.skills.tasks
This module contains the classes for tasks.
Task Objects
class Task(WithLogger)
This class implements an abstract task.
__
init__
def __init__(**kwargs: Any) -> None
Initialize a task.
__
call__
def __call__(*args: Any, **kwargs: Any) -> Any
Execute the task.
Arguments:
args
: positional arguments forwarded to the 'execute' method.kwargs
: keyword arguments forwarded to the 'execute' method.
Raises:
ValueError
: if the task has already been executed.
Returns:
the task instance
is_
executed
@property
def is_executed() -> bool
Check if the task has already been executed.
result
@property
def result() -> Any
Get the result.
Raises:
ValueError
: if the task has not been executed yet.
Returns:
the result from the execute method.
setup
def setup() -> None
Implement the task setup.
execute
@abstractmethod
def execute(*args: Any, **kwargs: Any) -> Any
Run the task logic.
Arguments:
args
: the positional argumentskwargs
: the keyword arguments
Returns:
any
teardown
def teardown() -> None
Implement the task teardown.
init_
worker
def init_worker() -> None
Initialize a worker.
Disable the SIGINT handler of process pool is using. Related to a well-known bug: https://bugs.python.org/issue8296
TaskManager Objects
class TaskManager(WithLogger)
A Task manager.
__
init__
def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
is_lazy_pool_start: bool = True,
logger: Optional[logging.Logger] = None,
pool_mode: str = THREAD_POOL_MODE) -> None
Initialize the task manager.
Arguments:
nb_workers
: the number of worker processes.is_lazy_pool_start
: option to postpone pool creation till the first enqueue_task called.logger
: the logger.pool_mode
: str. multithread or multiprocess
is_
started
@property
def is_started() -> bool
Get started status of TaskManager.
Returns:
bool
nb_
workers
@property
def nb_workers() -> int
Get the number of workers.
Returns:
int
enqueue_
task
def enqueue_task(func: Callable,
args: Sequence = (),
kwargs: Optional[Dict[str, Any]] = None) -> int
Enqueue a task with the executor.
Arguments:
func
: the callable instance to be enqueuedargs
: the positional arguments to be passed to the function.kwargs
: the keyword arguments to be passed to the function.
Raises:
ValueError
: if the task manager is not running.
Returns:
the task id to get the the result.
get_
task_
result
def get_task_result(task_id: int) -> AsyncResult
Get the result from a task.
Arguments:
task_id
: the task id
Returns:
async result for task_id
start
def start() -> None
Start the task manager.
stop
def stop() -> None
Stop the task manager.
ThreadedTaskManager Objects
class ThreadedTaskManager(TaskManager)
A threaded task manager.
__
init__
def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
is_lazy_pool_start: bool = True,
logger: Optional[logging.Logger] = None) -> None
Initialize the task manager.
Arguments:
nb_workers
: the number of worker processes.is_lazy_pool_start
: option to postpone pool creation till the first enqueue_task called.logger
: the logger.
ProcessTaskManager Objects
class ProcessTaskManager(TaskManager)
A multiprocess task manager.
__
init__
def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
is_lazy_pool_start: bool = True,
logger: Optional[logging.Logger] = None) -> None
Initialize the task manager.
Arguments:
nb_workers
: the number of worker processes.is_lazy_pool_start
: option to postpone pool creation till the first enqueue_task called.logger
: the logger.