blob: 4887c17988bd59145b8a28605848329783401349 [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import unittest
from unittest.mock import Mock
import fidl_fuchsia_developer_ffx as ffx
from fidl_codec import encode_fidl_message, method_ordinal
from fuchsia_controller_py import Channel, ZxStatus
class FidlClientTests(unittest.IsolatedAsyncioTestCase):
"""Fidl Client tests."""
async def test_read_and_decode_staged_message(self) -> None:
channel = Mock()
channel.__class__ = Channel # type: ignore[assignment]
channel.as_int.return_value = 0
channel.read.side_effect = [
(bytearray([2, 0, 0, 0]), []),
ZxStatus(ZxStatus.ZX_ERR_SHOULD_WAIT),
(bytearray([1, 0, 0, 0]), []),
]
# The proxy here really doesn't matter, we're trying to access internal methods.
proxy = ffx.EchoClient(channel)
proxy.pending_txids.add(1)
proxy.pending_txids.add(2)
proxy._channel_waker.handle_ready_queues = {}
proxy._channel_waker.handle_ready_queues[0] = asyncio.Queue() # type: ignore[assignment]
proxy._channel_waker.handle_ready_queues[0].put_nowait(0)
proxy._decode = Mock() # type: ignore[method-assign]
proxy._decode.return_value = (bytearray([1, 2, 3]), [])
loop = asyncio.get_running_loop()
first_task = loop.create_task(proxy._read_and_decode(1))
second_task = loop.create_task(proxy._read_and_decode(2))
await asyncio.gather(first_task, second_task)
async def test_read_and_decode_blocked(self) -> None:
channel = Mock()
channel.__class__ = Channel # type: ignore[assignment]
channel.as_int.return_value = 0
channel.read.side_effect = [
ZxStatus(ZxStatus.ZX_ERR_SHOULD_WAIT),
ZxStatus(ZxStatus.ZX_ERR_SHOULD_WAIT),
ZxStatus(ZxStatus.ZX_ERR_SHOULD_WAIT),
(bytearray([1, 0, 0, 0]), []),
]
proxy = ffx.EchoClient(channel)
proxy.pending_txids.add(1)
proxy._channel_waker.handle_ready_queues = {}
proxy._channel_waker.handle_ready_queues[0] = asyncio.Queue() # type: ignore[assignment]
proxy._channel_waker.handle_ready_queues[0].put_nowait(0)
proxy._channel_waker.handle_ready_queues[0].put_nowait(0)
proxy._channel_waker.handle_ready_queues[0].put_nowait(0)
proxy._decode = Mock() # type: ignore[method-assign]
proxy._decode.return_value = (bytearray([1, 2, 3]), [])
await proxy._read_and_decode(1)
async def test_read_and_decode_simul_notification(self) -> None:
channel = Mock()
channel.__class__ = Channel # type: ignore[assignment]
channel.as_int.return_value = 0
channel.read.side_effect = [
ZxStatus(ZxStatus.ZX_ERR_SHOULD_WAIT),
ZxStatus(ZxStatus.ZX_ERR_SHOULD_WAIT),
]
proxy = ffx.EchoClient(channel)
proxy.pending_txids.add(1)
proxy._channel_waker.handle_ready_queues = {}
proxy._channel_waker.handle_ready_queues[0] = asyncio.Queue() # type: ignore[assignment]
proxy._channel_waker.handle_ready_queues[0].put_nowait(0)
proxy.staged_messages[1] = asyncio.Queue(1)
proxy.staged_messages[1].put_nowait((bytearray([1, 0, 0, 0]), []))
proxy._decode = Mock() # type: ignore[method-assign]
proxy._decode.return_value = (bytearray([1, 2, 3]), [])
await proxy._read_and_decode(1)
async def test_unexpected_txid(self) -> None:
channel = Mock()
channel.__class__ = Channel # type: ignore[assignment]
channel.as_int.return_value = 0
channel.read.side_effect = [(bytearray([1, 0, 0, 0]), ())]
proxy = ffx.EchoClient(channel)
proxy._channel_waker.handle_ready_queues = {}
proxy._channel_waker.handle_ready_queues[0] = asyncio.Queue() # type: ignore[assignment]
proxy._channel_waker.handle_ready_queues[0].put_nowait(0)
with self.assertRaises(RuntimeError):
await proxy._read_and_decode(10)
self.assertEqual(proxy._channel, None)
async def test_staging_stages(self) -> None:
channel = Mock()
channel.__class__ = Channel # type: ignore[assignment]
channel.as_int.return_value = 0
proxy = ffx.EchoClient(channel)
proxy.pending_txids.add(1)
proxy._stage_message(1, (bytearray([1, 2, 3]), []))
self.assertEqual(len(proxy.staged_messages), 1)
got = await proxy._get_staged_message(1)
self.assertEqual(got, (bytearray([1, 2, 3]), []))
# This part is a little silly. The decode_fidl_message
# function can't be mocked, so we're decoding with an actual
# FIDL message we know is loaded (the Echo protocol from ffx).
class DecodeObj:
pass
obj = DecodeObj()
obj.__dict__["value"] = "foo"
proxy._decode(
1,
encode_fidl_message(
object=obj,
library="fuchsia.developer.ffx",
type_name="fuchsia.developer.ffx/EchoEchoStringRequest",
txid=1,
ordinal=method_ordinal(
protocol="fuchsia.developer.ffx/Echo", method="EchoString"
),
),
)
# Verifies state is cleaned up.
self.assertEqual(len(proxy.staged_messages), 0)
self.assertEqual(len(proxy.pending_txids), 0)