aea.helpers.async_
utils
This module contains the misc utils for async code.
ensure_
list
def ensure_list(value: Any) -> List
Return [value] or list(value) if value is a sequence.
AsyncState Objects
class AsyncState()
Awaitable state.
__
init__
def __init__(initial_state: Any = None,
states_enum: Optional[Container[Any]] = None) -> None
Init async state.
Arguments:
initial_state
: state to set on start.states_enum
: container of valid states if not provided state not checked on set.
set
def set(state: Any) -> None
Set state.
add_
callback
def add_callback(callback_fn: Callable[[Any], None]) -> None
Add callback to track state changes.
Arguments:
callback_fn
: callable object to be called on state changed.
get
def get() -> Any
Get state.
wait
async def wait(state_or_states: Union[Any, Sequence[Any]]) -> Tuple[Any, Any]
Wait state to be set.
Arguments:
state_or_states
: state or list of states.
Returns:
tuple of previous state and new state.
transit
@contextmanager
def transit(initial: Any = not_set,
success: Any = not_set,
fail: Any = not_set) -> Generator
Change state context according to success or not.
Arguments:
initial
: set state on context enter, not_set by defaultsuccess
: set state on context block done, not_set by defaultfail
: set state on context block raises exception, not_set by default
Returns:
generator
PeriodicCaller Objects
class PeriodicCaller()
Schedule a periodic call of callable using event loop.
Used for periodic function run using asyncio.
__
init__
def __init__(callback: Callable,
period: float,
start_at: Optional[datetime.datetime] = None,
exception_callback: Optional[Callable[[Callable, Exception],
None]] = None,
loop: Optional[AbstractEventLoop] = None) -> None
Init periodic caller.
Arguments:
callback
: function to call periodicallyperiod
: period in seconds.start_at
: optional first call datetimeexception_callback
: optional handler to call on exception raised.loop
: optional asyncio event loop
start
def start() -> None
Activate period calls.
stop
def stop() -> None
Remove from schedule.
AnotherThreadTask Objects
class AnotherThreadTask()
Schedule a task to run on the loop in another thread.
Provides better cancel behaviour: on cancel it will wait till cancelled completely.
__
init__
def __init__(coro: Coroutine, loop: AbstractEventLoop) -> None
Init the task.
Arguments:
coro
: coroutine to scheduleloop
: an event loop to schedule on.
result
def result(timeout: Optional[float] = None) -> Any
Wait for coroutine execution result.
Arguments:
timeout
: optional timeout to wait in seconds.
Returns:
result
cancel
def cancel() -> None
Cancel coroutine task execution in a target loop.
done
def done() -> bool
Check task is done.
ThreadedAsyncRunner Objects
class ThreadedAsyncRunner(Thread)
Util to run thread with event loop and execute coroutines inside.
__
init__
def __init__(loop: Optional[AbstractEventLoop] = None) -> None
Init threaded runner.
Arguments:
loop
: optional event loop. is it's running loop, threaded runner will use it.
start
def start() -> None
Start event loop in dedicated thread.
run
def run() -> None
Run code inside thread.
call
def call(coro: Coroutine) -> Any
Run a coroutine inside the event loop.
Arguments:
coro
: a coroutine to run.
Returns:
task
stop
def stop() -> None
Stop event loop in thread.
Runnable Objects
class Runnable(ABC)
Abstract Runnable class.
Use to run async task in same event loop or in dedicated thread. Provides: start, stop sync methods to start and stop task Use wait_completed to await task was completed.
__
init__
def __init__(loop: Optional[asyncio.AbstractEventLoop] = None,
threaded: bool = False) -> None
Init runnable.
Arguments:
loop
: asyncio event loop to use.threaded
: bool. start in thread if True.
start
def start() -> bool
Start runnable.
Returns:
bool started or not.
is_
running
@property
def is_running() -> bool
Get running state.
run
@abstractmethod
async def run() -> Any
Implement run logic respectful to CancelError on termination.
wait_
completed
def wait_completed(
sync: bool = False,
timeout: Optional[float] = None,
force_result: bool = False) -> Union[Coroutine, asyncio.Future]
Wait runnable execution completed.
Arguments:
sync
: bool. blocking waittimeout
: float secondsforce_result
: check result even it was waited.
Returns:
awaitable if sync is False, otherwise None
stop
def stop(force: bool = False) -> None
Stop runnable.
start_
and_
wait_
completed
def start_and_wait_completed(*args: Any,
**kwargs: Any) -> Union[Coroutine, Future]
Alias for start and wait methods.