Skip to content

aea.helpers.async_utils

This module contains the misc utils for async code.

ensure_list

def ensure_list(value: Any) -> List

Return [value] or list(value) if value is a sequence.

AsyncState Objects

class AsyncState()

Awaitable state.

__init__

def __init__(initial_state: Any = None,
             states_enum: Optional[Container[Any]] = None) -> None

Init async state.

Arguments:

  • initial_state: state to set on start.
  • states_enum: container of valid states if not provided state not checked on set.

set

def set(state: Any) -> None

Set state.

add_callback

def add_callback(callback_fn: Callable[[Any], None]) -> None

Add callback to track state changes.

Arguments:

  • callback_fn: callable object to be called on state changed.

get

def get() -> Any

Get state.

wait

async def wait(state_or_states: Union[Any, Sequence[Any]]) -> Tuple[Any, Any]

Wait state to be set.

Arguments:

  • state_or_states: state or list of states.

Returns:

tuple of previous state and new state.

transit

@contextmanager
def transit(initial: Any = not_set,
            success: Any = not_set,
            fail: Any = not_set) -> Generator

Change state context according to success or not.

Arguments:

  • initial: set state on context enter, not_set by default
  • success: set state on context block done, not_set by default
  • fail: set state on context block raises exception, not_set by default

Returns:

generator

PeriodicCaller Objects

class PeriodicCaller()

Schedule a periodic call of callable using event loop.

Used for periodic function run using asyncio.

__init__

def __init__(callback: Callable,
             period: float,
             start_at: Optional[datetime.datetime] = None,
             exception_callback: Optional[Callable[[Callable, Exception],
                                                   None]] = None,
             loop: Optional[AbstractEventLoop] = None) -> None

Init periodic caller.

Arguments:

  • callback: function to call periodically
  • period: period in seconds.
  • start_at: optional first call datetime
  • exception_callback: optional handler to call on exception raised.
  • loop: optional asyncio event loop

start

def start() -> None

Activate period calls.

stop

def stop() -> None

Remove from schedule.

AnotherThreadTask Objects

class AnotherThreadTask()

Schedule a task to run on the loop in another thread.

Provides better cancel behaviour: on cancel it will wait till cancelled completely.

__init__

def __init__(coro: Coroutine, loop: AbstractEventLoop) -> None

Init the task.

Arguments:

  • coro: coroutine to schedule
  • loop: an event loop to schedule on.

result

def result(timeout: Optional[float] = None) -> Any

Wait for coroutine execution result.

Arguments:

  • timeout: optional timeout to wait in seconds.

Returns:

result

cancel

def cancel() -> None

Cancel coroutine task execution in a target loop.

done

def done() -> bool

Check task is done.

ThreadedAsyncRunner Objects

class ThreadedAsyncRunner(Thread)

Util to run thread with event loop and execute coroutines inside.

__init__

def __init__(loop: Optional[AbstractEventLoop] = None) -> None

Init threaded runner.

Arguments:

  • loop: optional event loop. is it's running loop, threaded runner will use it.

start

def start() -> None

Start event loop in dedicated thread.

run

def run() -> None

Run code inside thread.

call

def call(coro: Coroutine) -> Any

Run a coroutine inside the event loop.

Arguments:

  • coro: a coroutine to run.

Returns:

task

stop

def stop() -> None

Stop event loop in thread.

Runnable Objects

class Runnable(ABC)

Abstract Runnable class.

Use to run async task in same event loop or in dedicated thread. Provides: start, stop sync methods to start and stop task Use wait_completed to await task was completed.

__init__

def __init__(loop: Optional[asyncio.AbstractEventLoop] = None,
             threaded: bool = False) -> None

Init runnable.

Arguments:

  • loop: asyncio event loop to use.
  • threaded: bool. start in thread if True.

start

def start() -> bool

Start runnable.

Returns:

bool started or not.

is_running

@property
def is_running() -> bool

Get running state.

run

@abstractmethod
async def run() -> Any

Implement run logic respectful to CancelError on termination.

wait_completed

def wait_completed(
        sync: bool = False,
        timeout: Optional[float] = None,
        force_result: bool = False) -> Union[Coroutine, asyncio.Future]

Wait runnable execution completed.

Arguments:

  • sync: bool. blocking wait
  • timeout: float seconds
  • force_result: check result even it was waited.

Returns:

awaitable if sync is False, otherwise None

stop

def stop(force: bool = False) -> None

Stop runnable.

start_and_wait_completed

def start_and_wait_completed(*args: Any,
                             **kwargs: Any) -> Union[Coroutine, Future]

Alias for start and wait methods.