blob: 960f8bb75bacb5f64888bc2314a00efc54ca8b96 [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import os
import os.path
import sys
import typing
import unittest
import fidl_fuchsia_controller_test as fc_test
import fidl_fuchsia_developer_ffx as ffx_fidl
from fidl_codec import encode_fidl_message, method_ordinal
from fuchsia_controller_py import Channel, Context, IsolateDir, Socket, ZxStatus
from fidl import DomainError
class TestSocketServer(fc_test.TestingServer):
def __init__(self, ch: Channel):
client, server = Socket.create()
self.client = client
self.server = server
super().__init__(ch)
def return_socket_or_error(
self,
) -> fc_test.TestingServer.ReturnSocketOrErrorResponse:
if self.server.as_int() == 0:
return DomainError(ZxStatus.ZX_ERR_NOT_SUPPORTED)
return fc_test.TestingReturnSocketOrErrorResponse(
reader_thing=self.server.take()
)
def return_union(self) -> fc_test.TestingServer.ReturnUnionResponse:
res = fc_test.TestingReturnUnionResponse(x=10)
return res
def return_union_with_table(
self,
) -> fc_test.TestingServer.ReturnUnionWithTableResponse:
x = fc_test.NoopUnion(union_str="foobar")
res = fc_test.TestingReturnUnionWithTableResponse(x=x)
return res
def return_other_composed_protocol(
self,
) -> fc_test.TestingServer.ReturnOtherComposedProtocolResponse:
return fc_test.TestingReturnOtherComposedProtocolResponse(
client_thing=0
)
def return_possible_error(
self,
) -> fc_test.TestingServer.ReturnPossibleErrorResponse:
return DomainError("not implemented")
def return_possible_error2(
self,
) -> fc_test.TestingServer.ReturnPossibleError2Response:
return DomainError("not implemented")
class EndToEnd(unittest.IsolatedAsyncioTestCase):
def _get_default_config(self) -> typing.Dict[str, str]:
return {}
def _get_isolate_dir(self) -> IsolateDir:
isolation_path = None
tmp_path = os.getenv("TEST_UNDECLARED_OUTPUTS_DIR")
if tmp_path:
isolation_path = os.path.join(tmp_path, "isolate")
return IsolateDir(dir=isolation_path)
def _make_ctx(self) -> Context:
return Context(
config=self._get_default_config(),
isolate_dir=self._get_isolate_dir(),
)
def test_config_get_nonexistent(self) -> None:
ctx = self._make_ctx()
self.assertEqual(ctx.config_get_string("foobarzzzzzzo==?"), None)
def test_config_get_exists(self) -> None:
config = self._get_default_config()
key = "foobar"
expect = "bazmumble"
config[key] = expect
ctx = Context(config=config, isolate_dir=self._get_isolate_dir())
self.assertEqual(ctx.config_get_string(key), expect)
def test_config_get_too_long(self) -> None:
config = self._get_default_config()
key = "foobarzington"
expect = "b" * 50000
config[key] = expect
ctx = Context(config=config, isolate_dir=self._get_isolate_dir())
with self.assertRaises(BufferError):
ctx.config_get_string(key)
async def test_client_sends_message_before_coro_await(self) -> None:
(ch0, ch1) = Channel.create()
echo_proxy = ffx_fidl.EchoClient(ch0)
coro = echo_proxy.echo_string(value="foo")
buf, _ = ch1.read()
txid = int.from_bytes(buf[0:4], sys.byteorder)
encoded_bytes, _ = encode_fidl_message(
object=ffx_fidl.EchoEchoStringRequest(value="foo"),
library="fuchsia.developer.ffx",
type_name="fuchsia.developer.ffx/EchoEchoStringRequest",
txid=txid,
ordinal=method_ordinal(
protocol="fuchsia.developer.ffx/Echo", method="EchoString"
),
)
self.assertEqual(buf, encoded_bytes)
msg = encode_fidl_message(
object=ffx_fidl.EchoEchoStringResponse(response="otherthing"),
library="fuchsia.developer.ffx",
type_name="fuchsia.developer.ffx/EchoEchoStringResponse",
txid=txid,
ordinal=method_ordinal(
protocol="fuchsia.developer.ffx/Echo", method="EchoString"
),
)
ch1.write(msg)
result = await coro
self.assertEqual(result.response, "otherthing")
def test_context_creation_duplicate_target_raises_exception(self) -> None:
with self.assertRaises(RuntimeError):
_ctx = Context(target="foo", config={"target.default": "bar"})
def test_context_creation_no_args(self) -> None:
Context()
async def test_sending_fidl_protocol(self) -> None:
tc_server, tc_client = Channel.create()
list_server, list_client = Channel.create()
tc_proxy = ffx_fidl.TargetCollectionClient(tc_client)
query = ffx_fidl.TargetQuery(string_matcher="foobar")
tc_proxy.list_targets(query=query, reader=list_client.take())
buf, hdls = tc_server.read()
self.assertEqual(len(hdls), 1)
new_list_client = Channel(hdls[0])
new_list_client.write((bytearray([5, 6, 7]), []))
list_buf, list_hdls = list_server.read()
self.assertEqual(len(list_hdls), 0)
self.assertEqual(list_buf, bytearray([5, 6, 7]))
async def test_sending_socket_as_result(self) -> None:
t_server, t_client = Channel.create()
server = TestSocketServer(t_server)
client = fc_test.TestingClient(t_client)
server_task = asyncio.get_running_loop().create_task(server.serve())
res1 = await client.return_union()
self.assertEqual(res1.x, 10)
res2 = await client.return_union_with_table()
# This is to placate mypy
assert res2.x is not None
self.assertEqual(res2.x.union_str, "foobar")
res3 = await client.return_socket_or_error()
# TODO(b/415379365): Given this is intended to be a socket, it should return
# the appropriate type here.
assert res3.response is not None
self.assertNotEqual(res3.response.reader_thing, 0)
res4 = await client.return_socket_or_error()
self.assertEqual(res4.err, ZxStatus.ZX_ERR_NOT_SUPPORTED)
server_task.cancel()
server.close()