blob: 6600c911f36fd041e6a88700344c7773ab59f45e [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import typing
import unittest
import fidl.fuchsia_controller_othertest as fc_othertest
import fidl.fuchsia_controller_test as fc_test
import fidl.fuchsia_developer_ffx as ffx
import fidl.fuchsia_io as f_io
from fuchsia_controller_py import Channel, ZxStatus
from fidl import DomainError, FrameworkError, StopEventHandler, StopServer
# [START echo_server_impl]
class TestEchoer(ffx.Echo.Server):
def echo_string(self, request: ffx.EchoEchoStringRequest):
return ffx.EchoEchoStringResponse(response=request.value)
# [END echo_server_impl]
class AsyncEchoer(ffx.Echo.Server):
async def echo_string(self, request: ffx.EchoEchoStringRequest):
await asyncio.sleep(0.1) # This isn't necessary, but it is fun.
return ffx.EchoEchoStringResponse(response=request.value)
class TargetCollectionReaderImpl(ffx.TargetCollectionReader.Server):
def __init__(self, channel: Channel, target_list):
super().__init__(channel)
self.target_list = target_list
def next(self, request: ffx.TargetCollectionReaderNextRequest):
if not request.entry:
raise StopServer
self.target_list.extend(request.entry)
class TargetCollectionImpl(ffx.TargetCollection.Server):
async def list_targets(
self, request: ffx.TargetCollectionListTargetsRequest
):
reader = ffx.TargetCollectionReader.Client(request.reader)
await reader.next(
entry=[
ffx.TargetInfo(nodename="foo"),
ffx.TargetInfo(nodename="bar"),
]
)
await reader.next(
entry=[
ffx.TargetInfo(nodename="baz"),
]
)
await reader.next(entry=[])
class StubFileServer(f_io.File.Server):
def read(self, request: f_io.ReadableReadRequest):
return f_io.ReadableReadResponse(data=[1, 2, 3, 4])
class TestEventHandler(fc_othertest.CrossLibraryNoop.EventHandler):
def __init__(
self,
client: fc_othertest.CrossLibraryNoop.Client,
random_event_handler: typing.Callable[
[fc_othertest.CrossLibraryNoopOnRandomEventRequest], None
],
):
super().__init__(client)
self.random_event_handler = random_event_handler
def on_random_event(
self, request: fc_othertest.CrossLibraryNoopOnRandomEventRequest
):
self.random_event_handler(request)
def on_empty_event(self):
raise StopEventHandler
class FailingFileServer(f_io.File.Server):
def read(self, _: f_io.ReadableReadRequest):
return DomainError(ZxStatus.ZX_ERR_PEER_CLOSED)
class TestingServer(fc_test.Testing.Server):
def return_union(self):
res = fc_test.TestingReturnUnionResponse()
res.y = "foobar"
return res
def return_union_with_table(self):
res = fc_test.TestingReturnUnionWithTableResponse()
res.y = fc_test.NoopTable(str="bazzz", integer=-2)
return res
class ServerTests(unittest.IsolatedAsyncioTestCase):
async def test_echo_server_sync(self):
# [START use_echoer_example]
(tx, rx) = Channel.create()
server = TestEchoer(rx)
client = ffx.Echo.Client(tx)
server_task = asyncio.get_running_loop().create_task(server.serve())
res = await client.echo_string(value="foobar")
self.assertEqual(res.response, "foobar")
server_task.cancel()
# [END use_echoer_example]
async def test_epitaph_propagation(self):
(tx, rx) = Channel.create()
client = ffx.Echo.Client(tx)
coro1 = client.echo_string(value="foobar")
# Creating a task here so at least one task is awaiting on a staged
# message/notification.
task = asyncio.get_running_loop().create_task(
client.echo_string(value="foobar")
)
coro2 = client.echo_string(value="foobar")
# Put `task` onto the executor so it makes partial progress, since this will yield
# to the executor.
await asyncio.sleep(0)
err_msg = ZxStatus.ZX_ERR_NOT_SUPPORTED
rx.close_with_epitaph(err_msg)
# The main thing here is to ensure that PEER_CLOSED is not sent early.
# After running rx.close_with_epitaph, the channel will be closed, and
# that message will have been queued for the client.
with self.assertRaises(ZxStatus):
try:
await coro1
except ZxStatus as e:
self.assertEqual(e.args[0], err_msg)
raise e
with self.assertRaises(ZxStatus):
try:
await task
except ZxStatus as e:
self.assertEqual(e.args[0], err_msg)
raise e
with self.assertRaises(ZxStatus):
try:
await coro2
except ZxStatus as e:
self.assertEqual(e.args[0], err_msg)
raise e
# Finally, ensure that the channel is just plain-old closed for new
# interactions.
with self.assertRaises(ZxStatus):
try:
await client.echo_string(value="foobar")
except ZxStatus as e:
self.assertEqual(e.args[0], ZxStatus.ZX_ERR_PEER_CLOSED)
raise e
async def test_echo_server_async(self):
(tx, rx) = Channel.create()
server = AsyncEchoer(rx)
client = ffx.Echo.Client(tx)
server_task = asyncio.get_running_loop().create_task(server.serve())
res = await client.echo_string(value="foobar")
self.assertEqual(res.response, "foobar")
server_task.cancel()
# TODO(https://fxbug.dev/364878315): This test sends many messages to AsyncEchoer which causes
# many spurious wakeups as a side-effect. Having this test ensures ServerBase is resilient
# to spurious wakeups, even several at a time.
async def test_echo_server_async_stress(self):
(tx, rx) = Channel.create()
server = AsyncEchoer(rx)
client = ffx.Echo.Client(tx)
server_task = asyncio.get_running_loop().create_task(server.serve())
for _ in range(12):
res = await client.echo_string(value="foobar")
self.assertEqual(res.response, "foobar")
server_task.cancel()
async def test_not_implemented(self):
(tx, rx) = Channel.create()
server = ffx.Echo.Server(rx)
client = ffx.Echo.Client(tx)
task = asyncio.get_running_loop().create_task(server.serve())
# The first thing that will happen is the server will receive the
# request, then attempt to call the corresponding function. Since it is not implemented,
# An exception will be raised, closing the channel. The client will then receive a
# PEER_CLOSED error. In order to diagnose the root cause the task must then be awaited.
with self.assertRaises(ZxStatus):
try:
res = await client.echo_string(value="foobar")
except ZxStatus as e:
self.assertEqual(e.args[0], ZxStatus.ZX_ERR_PEER_CLOSED)
raise e
self.assertTrue(task.done())
with self.assertRaises(NotImplementedError):
await task
async def test_target_iterator(self):
(reader_client_channel, reader_server_channel) = Channel.create()
target_list: typing.List[typing.Any] = []
server = TargetCollectionReaderImpl(reader_server_channel, target_list)
(tc_client_channel, tc_server_channel) = Channel.create()
target_collection_server = TargetCollectionImpl(tc_server_channel)
loop = asyncio.get_running_loop()
reader_task = loop.create_task(server.serve())
tc_task = loop.create_task(target_collection_server.serve())
tc_client = ffx.TargetCollection.Client(tc_client_channel)
tc_client.list_targets(
query=ffx.TargetQuery(), reader=reader_client_channel.take()
)
done, pending = await asyncio.wait(
[reader_task, tc_task], return_when=asyncio.FIRST_COMPLETED
)
# This will just surface exceptions if they happen. For correct behavior this should just
# return the result of the reader task.
done.pop().result()
self.assertEqual(len(target_list), 3)
foo_targets = [x for x in target_list if x.nodename == "foo"]
self.assertEqual(len(foo_targets), 1)
bar_targets = [x for x in target_list if x.nodename == "bar"]
self.assertEqual(len(bar_targets), 1)
baz_targets = [x for x in target_list if x.nodename == "baz"]
self.assertEqual(len(baz_targets), 1)
async def test_file_server(self):
# This handles the kind of case where a method has a signature of `-> (data) error Error;`
client, server = Channel.create()
file_proxy = f_io.File.Client(client)
file_server = StubFileServer(server)
server_task = asyncio.get_running_loop().create_task(
file_server.serve()
)
res = await file_proxy.read(count=4)
self.assertEqual(res.response.data, [1, 2, 3, 4])
server_task.cancel()
async def test_failing_file_server(self):
client, server = Channel.create()
file_proxy = f_io.File.Client(client)
file_server = FailingFileServer(server)
server_task = asyncio.get_running_loop().create_task(
file_server.serve()
)
res = await file_proxy.read(count=4)
self.assertEqual(res.response, None)
self.assertEqual(res.err, ZxStatus.ZX_ERR_PEER_CLOSED)
server_task.cancel()
async def test_testing_server(self):
client, server = Channel.create()
t_client = fc_test.Testing.Client(client)
t_server = TestingServer(server)
server_task = asyncio.get_running_loop().create_task(t_server.serve())
res = await t_client.return_union()
self.assertEqual(res.y, "foobar")
res = await t_client.return_union_with_table()
self.assertEqual(res.y.str, "bazzz")
self.assertEqual(res.y.integer, -2)
server_task.cancel()
async def test_flexible_method_framework_err(self):
class FlexibleMethodTesterServer(fc_test.FlexibleMethodTester.Server):
def some_method(self):
# This should be handled internally, but right now there's not really
# a good way to force this interaction without making multiple FIDL
# versions run in this program simultaneously somehow.
return FrameworkError.UNKNOWN_METHOD
def some_method_without_error(self):
return FrameworkError.UNKNOWN_METHOD
def some_method_just_error(self):
return FrameworkError.UNKNOWN_METHOD
client, server = Channel.create()
t_client = fc_test.FlexibleMethodTester.Client(client)
t_server = FlexibleMethodTesterServer(server)
server_task = asyncio.get_running_loop().create_task(t_server.serve())
res = await t_client.some_method()
self.assertEqual(res.framework_err, FrameworkError.UNKNOWN_METHOD)
res = await t_client.some_method_without_error()
self.assertEqual(res.framework_err, FrameworkError.UNKNOWN_METHOD)
res = await t_client.some_method_just_error()
self.assertEqual(res.framework_err, FrameworkError.UNKNOWN_METHOD)
server_task.cancel()
async def test_flexible_method_unwrap_response(self):
class FlexibleMethodTesterServer(fc_test.FlexibleMethodTester.Server):
def some_method(self):
return fc_test.FlexibleMethodTesterSomeMethodResponse(
some_bool_value=True
)
def some_method_without_error(self):
return (
fc_test.FlexibleMethodTesterSomeMethodWithoutErrorResponse(
some_bool_value=False
)
)
def some_method_just_error(self):
return fc_test.FlexibleMethodTesterSomeMethodJustErrorResponse()
client, server = Channel.create()
t_client = fc_test.FlexibleMethodTester.Client(client)
t_server = FlexibleMethodTesterServer(server)
server_task = asyncio.get_running_loop().create_task(t_server.serve())
res = await t_client.some_method()
self.assertTrue(res.unwrap().some_bool_value)
res = await t_client.some_method_without_error()
self.assertFalse(res.unwrap().some_bool_value)
res = await t_client.some_method_just_error()
self.assertEqual(
res.unwrap(),
fc_test.FlexibleMethodTesterSomeMethodJustErrorResponse(),
)
server_task.cancel()
async def test_flexible_method_unwrap_error(self):
class FlexibleMethodTesterServer(fc_test.FlexibleMethodTester.Server):
def some_method(self):
return DomainError(error=ZxStatus.ZX_ERR_INTERNAL)
def some_method_without_error(self):
return FrameworkError.UNKNOWN_METHOD
def some_method_just_error(self):
return DomainError(error=ZxStatus.ZX_ERR_INTERNAL)
client, server = Channel.create()
t_client = fc_test.FlexibleMethodTester.Client(client)
t_server = FlexibleMethodTesterServer(server)
server_task = asyncio.get_running_loop().create_task(t_server.serve())
res = await t_client.some_method()
with self.assertRaisesRegex(RuntimeError, "Result error"):
res.unwrap()
res = await t_client.some_method_without_error()
with self.assertRaisesRegex(RuntimeError, "Result framework error"):
res.unwrap()
res = await t_client.some_method_just_error()
with self.assertRaisesRegex(RuntimeError, "Result error"):
res.unwrap()
server_task.cancel()
async def test_sending_and_receiving_event(self):
client, server = Channel.create()
t_client = fc_othertest.CrossLibraryNoop.Client(client)
THIS_EXPECTED = 3
THAT_EXPECTED = fc_othertest.TestingEnum.FLIPPED_OTHER_TEST
def random_event_handler(
request: fc_othertest.CrossLibraryNoopOnRandomEventRequest,
):
self.assertEqual(request.this, THIS_EXPECTED)
self.assertEqual(request.that, THAT_EXPECTED)
raise StopEventHandler
# It's okay to use an unimplemented server here since we're not fielding any calls.
t_server = fc_othertest.CrossLibraryNoop.Server(server)
event_handler = TestEventHandler(t_client, random_event_handler)
t_server.on_random_event(this=THIS_EXPECTED, that=THAT_EXPECTED)
task = asyncio.get_running_loop().create_task(event_handler.serve())
# If the event is never received this will loop forever (since the channel is open, and the
# handler should stop service after receiving the event.
await task
async def test_sending_and_receiving_empty_event(self):
client, server = Channel.create()
t_client = fc_othertest.CrossLibraryNoop.Client(client)
# It's okay to use an unimplemented server here since we're not fielding any calls.
t_server = fc_othertest.CrossLibraryNoop.Server(server)
event_handler = TestEventHandler(t_client, self)
t_server.on_empty_event()
task = asyncio.get_running_loop().create_task(event_handler.serve())
# If the event is never received this will loop forever (since the channel is open, and the
# handler should stop service after receiving the event.
await task
async def test_closing_channel_closes_event_loop(self):
client, server = Channel.create()
t_client = fc_othertest.CrossLibraryNoop.Client(client)
del server
# A generic unimplemented event handler is fine, since we're just making it exit.
event_handler = fc_othertest.CrossLibraryNoop.EventHandler(t_client)
task = asyncio.get_running_loop().create_task(event_handler.serve())
await task
async def test_echo_server_unimplemented(self):
_client, server = Channel.create()
s = fc_test.Noop.Server(server)
with self.assertRaises(NotImplementedError):
s.do_noop()