Skip to content

aea.helpers.storage.backends.base

This module contains storage abstract backend class.

AbstractStorageBackend Objects

class AbstractStorageBackend(ABC)

Abstract base class for storage backend.

__init__

def __init__(uri: str) -> None

Init backend.

connect

@abstractmethod
async def connect() -> None

Connect to backend.

disconnect

@abstractmethod
async def disconnect() -> None

Disconnect the backend.

ensure_collection

@abstractmethod
async def ensure_collection(collection_name: str) -> None

Create collection if not exits.

Arguments:

  • collection_name: str.

Returns:

None

put

@abstractmethod
async def put(collection_name: str, object_id: str,
              object_body: JSON_TYPES) -> None

Put object into collection.

Arguments:

  • collection_name: str.
  • object_id: str object id
  • object_body: python dict, json compatible.

Returns:

None

get

@abstractmethod
async def get(collection_name: str, object_id: str) -> Optional[JSON_TYPES]

Get object from the collection.

Arguments:

  • collection_name: str.
  • object_id: str object id

Returns:

dict if object exists in collection otherwise None

remove

@abstractmethod
async def remove(collection_name: str, object_id: str) -> None

Remove object from the collection.

Arguments:

  • collection_name: str.
  • object_id: str object id

Returns:

None

find

@abstractmethod
async def find(collection_name: str, field: str,
               equals: EQUALS_TYPE) -> List[OBJECT_ID_AND_BODY]

Get objects from the collection by filtering by field value.

Arguments:

  • collection_name: str.
  • field: field name to search: example "parent.field"
  • equals: value field should be equal to

Returns:

list of objects bodies

list

@abstractmethod
async def list(collection_name: str) -> List[OBJECT_ID_AND_BODY]

List all objects with keys from the collection.

Arguments:

  • collection_name: str.

Returns:

Tuple of objects keys, bodies.