aea.helpers.file_
io
Read to and write from file with envelopes.
lock_
file
@contextmanager
def lock_file(file_descriptor: IO[bytes],
logger: Logger = _default_logger) -> Generator
Lock file in context manager.
Arguments:
file_descriptor
: file descriptor of file to lock.logger
: the logger.
Returns:
generator
write_
envelope
def write_envelope(envelope: Envelope,
file_pointer: IO[bytes],
separator: bytes = SEPARATOR,
logger: Logger = _default_logger) -> None
Write envelope to file.
write_
with_
lock
def write_with_lock(file_pointer: IO[bytes],
data: Union[bytes],
logger: Logger = _default_logger) -> None
Write bytes to file protected with file lock.
envelope_
from_
bytes
def envelope_from_bytes(
bytes_: bytes,
separator: bytes = SEPARATOR,
logger: Logger = _default_logger) -> Optional[Envelope]
Decode bytes to get the envelope.
Arguments:
bytes_
: the encoded envelopeseparator
: the separator usedlogger
: the logger
Returns:
Envelope