Skip to content

aea.helpers.pipe

Portable pipe implementation for Linux, MacOS, and Windows.

IPCChannelClient Objects

class IPCChannelClient(ABC)

Multi-platform interprocess communication channel for the client side.

connect

@abstractmethod
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to communication channel

Arguments:

  • timeout: timeout for other end to connect

Returns:

connection status

write

@abstractmethod
async def write(data: bytes) -> None

Write data bytes to the other end of the channel

Will first write the size than the actual data

Arguments:

  • data: bytes to write

read

@abstractmethod
async def read() -> Optional[bytes]

Read bytes from the other end of the channel

Will first read the size than the actual data

Returns:

read bytes

close

@abstractmethod
async def close() -> None

Close the communication channel.

IPCChannel Objects

class IPCChannel(IPCChannelClient)

Multi-platform interprocess communication channel.

in_path

@property
@abstractmethod
def in_path() -> str

Rendezvous point for incoming communication.

Returns:

path

out_path

@property
@abstractmethod
def out_path() -> str

Rendezvous point for outgoing communication.

Returns:

path

PosixNamedPipeProtocol Objects

class PosixNamedPipeProtocol()

Posix named pipes async wrapper communication protocol.

__init__

def __init__(in_path: str,
             out_path: str,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None

Initialize a new posix named pipe.

Arguments:

  • in_path: rendezvous point for incoming data
  • out_path: rendezvous point for outgoing data
  • logger: the logger
  • loop: the event loop

connect

async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to the other end of the pipe

Arguments:

  • timeout: timeout before failing

Returns:

connection success

write

async def write(data: bytes) -> None

Write to pipe.

Arguments:

  • data: bytes to write to pipe

read

async def read() -> Optional[bytes]

Read from pipe.

Returns:

read bytes

close

async def close() -> None

Disconnect pipe.

TCPSocketProtocol Objects

class TCPSocketProtocol()

TCP socket communication protocol.

__init__

def __init__(reader: asyncio.StreamReader,
             writer: asyncio.StreamWriter,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None

Initialize the tcp socket protocol.

Arguments:

  • reader: established asyncio reader
  • writer: established asyncio writer
  • logger: the logger
  • loop: the event loop

writer

@property
def writer() -> StreamWriter

Get a writer associated with protocol.

write

async def write(data: bytes) -> None

Write to socket.

Arguments:

  • data: bytes to write

read

async def read() -> Optional[bytes]

Read from socket.

Returns:

read bytes

close

async def close() -> None

Disconnect socket.

TCPSocketChannel Objects

class TCPSocketChannel(IPCChannel)

Interprocess communication channel implementation using tcp sockets.

__init__

def __init__(logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None

Initialize tcp socket interprocess communication channel.

connect

async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Setup communication channel and wait for other end to connect.

Arguments:

  • timeout: timeout for the connection to be established

Returns:

connection status

write

async def write(data: bytes) -> None

Write to channel.

Arguments:

  • data: bytes to write

read

async def read() -> Optional[bytes]

Read from channel.

Returns:

read bytes

close

async def close() -> None

Disconnect from channel and clean it up.

in_path

@property
def in_path() -> str

Rendezvous point for incoming communication.

out_path

@property
def out_path() -> str

Rendezvous point for outgoing communication.

PosixNamedPipeChannel Objects

class PosixNamedPipeChannel(IPCChannel)

Interprocess communication channel implementation using Posix named pipes.

__init__

def __init__(logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None

Initialize posix named pipe interprocess communication channel.

connect

async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Setup communication channel and wait for other end to connect.

Arguments:

  • timeout: timeout for connection to be established

Returns:

bool, indicating success

write

async def write(data: bytes) -> None

Write to the channel.

Arguments:

  • data: data to write to channel

read

async def read() -> Optional[bytes]

Read from the channel.

Returns:

read bytes

close

async def close() -> None

Close the channel and clean it up.

in_path

@property
def in_path() -> str

Rendezvous point for incoming communication.

out_path

@property
def out_path() -> str

Rendezvous point for outgoing communication.

TCPSocketChannelClient Objects

class TCPSocketChannelClient(IPCChannelClient)

Interprocess communication channel client using tcp sockets.

__init__

def __init__(in_path: str,
             out_path: str,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None

Initialize a tcp socket communication channel client.

Arguments:

  • in_path: rendezvous point for incoming data
  • out_path: rendezvous point for outgoing data
  • logger: the logger
  • loop: the event loop

connect

async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to the other end of the communication channel.

Arguments:

  • timeout: timeout for connection to be established

Returns:

connection status

write

async def write(data: bytes) -> None

Write data to channel.

Arguments:

  • data: bytes to write

read

async def read() -> Optional[bytes]

Read data from channel.

Returns:

read bytes

close

async def close() -> None

Disconnect from communication channel.

PosixNamedPipeChannelClient Objects

class PosixNamedPipeChannelClient(IPCChannelClient)

Interprocess communication channel client using Posix named pipes.

__init__

def __init__(in_path: str,
             out_path: str,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None

Initialize a posix named pipe communication channel client.

Arguments:

  • in_path: rendezvous point for incoming data
  • out_path: rendezvous point for outgoing data
  • logger: the logger
  • loop: the event loop

connect

async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool

Connect to the other end of the communication channel.

Arguments:

  • timeout: timeout for connection to be established

Returns:

connection status

write

async def write(data: bytes) -> None

Write data to channel.

Arguments:

  • data: bytes to write

read

async def read() -> Optional[bytes]

Read data from channel.

Returns:

read bytes

close

async def close() -> None

Disconnect from communication channel.

make_ipc_channel

def make_ipc_channel(logger: logging.Logger = _default_logger,
                     loop: Optional[AbstractEventLoop] = None) -> IPCChannel

Build a portable bidirectional InterProcess Communication channel

Arguments:

  • logger: the logger
  • loop: the loop

Returns:

IPCChannel

make_ipc_channel_client

def make_ipc_channel_client(
        in_path: str,
        out_path: str,
        logger: logging.Logger = _default_logger,
        loop: Optional[AbstractEventLoop] = None) -> IPCChannelClient

Build a portable bidirectional InterProcess Communication client channel

Arguments:

  • in_path: rendezvous point for incoming communication
  • out_path: rendezvous point for outgoing outgoing
  • logger: the logger
  • loop: the loop

Returns:

IPCChannel