blob: 5ca2c3745634434314474466cda4961741a0b035 [file]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import fuchsia_controller_py as fc
from ._ipc import GlobalHandleWaker
from ._ipc import HandleWaker
class AsyncSocket:
"""Represents an async socket.
In 99% of cases it is recommended to use this instead of the standard fuchsia-controller Socket
object. This has built-in support for handling waits.
In the remaining 1% of cases it may be useful for the user to do a one-off attempt at reading a
socket directly and immediately exiting in the event that ZX_ERR_SHOULD_WAIT is encountered.
Such a case would likely involve adding some custom behavior to existing async code, like
registering custom wakers. Another case would be where the user is _only_ writing to the socket,
as writes in AsyncSocket are also synchronous and wrap Socket.write directly.
Args:
socket: The socket which will be asynchronously readable.
waker: (Optional) the HandleWaker implementation (defaults to GlobalHandleWaker).
"""
def __init__(self, socket: fc.Socket, waker: HandleWaker | None = None):
self.socket = socket
if waker is None:
self.waker = GlobalHandleWaker()
def __del__(self):
if self.waker is not None:
self.waker.unregister(self.socket)
async def read(self) -> bytearray:
"""Attempts to read off of the socket.
Returns:
A byte array to the caller.
Raises:
ZxStatus exception outlining the specific failure of the underlying handle.
"""
self.waker.register(self.socket)
while True:
try:
result = self.socket.read()
self.waker.unregister(self.socket)
return result
except fc.ZxStatus as e:
if e.args[0] != fc.ZxStatus.ZX_ERR_SHOULD_WAIT:
self.waker.unregister(self.socket)
raise e
await self.waker.wait_channel_ready(self.socket)
async def read_all(self) -> bytearray:
"""Attempts to read all data on the socket until it is closed.
Returns:
All bytes read on the socket.
Raises:
Any ZX errors encountered besides ZX_ERR_SHOULD_WAIT and ZX_ERR_PEER_CLOSED.
"""
output = bytearray()
while True:
try:
output.extend(await self.read())
except fc.ZxStatus as zx:
if zx.args[0] != fc.ZxStatus.ZX_ERR_PEER_CLOSED:
raise zx
break
return output
def write(self, buf: bytes) -> None:
"""Does a blocking write on the socket.
This is identical to calling the write function on the socket itself.
Args:
buf: The array of bytes (read-only) to write to the socket.
Raises:
ZxStatus exception on failure of the underlying handle.
"""
self.socket.write(buf)