Skip to content

aea.helpers.file_io

Read to and write from file with envelopes.

lock_file

@contextmanager
def lock_file(file_descriptor: IO[bytes],
              logger: Logger = _default_logger) -> Generator

Lock file in context manager.

Arguments:

  • file_descriptor: file descriptor of file to lock.
  • logger: the logger.

Returns:

generator

write_envelope

def write_envelope(envelope: Envelope,
                   file_pointer: IO[bytes],
                   separator: bytes = SEPARATOR,
                   logger: Logger = _default_logger) -> None

Write envelope to file.

write_with_lock

def write_with_lock(file_pointer: IO[bytes],
                    data: Union[bytes],
                    logger: Logger = _default_logger) -> None

Write bytes to file protected with file lock.

envelope_from_bytes

def envelope_from_bytes(
        bytes_: bytes,
        separator: bytes = SEPARATOR,
        logger: Logger = _default_logger) -> Optional[Envelope]

Decode bytes to get the envelope.

Arguments:

  • bytes_: the encoded envelope
  • separator: the separator used
  • logger: the logger

Returns:

Envelope