| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <fcntl.h> |
| #include <fidl/fuchsia.posix.socket/cpp/wire_test_base.h> |
| #include <lib/async-loop/cpp/loop.h> |
| #include <lib/async-loop/default.h> |
| #include <lib/fdio/fd.h> |
| #include <lib/fdio/fdio.h> |
| #include <lib/fdio/unsafe.h> |
| #include <lib/fit/defer.h> |
| #include <netinet/in.h> |
| #include <poll.h> |
| #include <sys/ioctl.h> |
| #include <sys/socket.h> |
| #include <sys/time.h> |
| #include <unistd.h> |
| |
| #include <array> |
| #include <cerrno> |
| #include <chrono> |
| #include <cstring> |
| #include <future> |
| #include <latch> |
| |
| #include <fbl/unique_fd.h> |
| #include <zxtest/base/parameterized-value.h> |
| #include <zxtest/zxtest.h> |
| |
| #include "predicates.h" |
| #include "src/connectivity/network/netstack/udp_serde/udp_serde.h" |
| #include "src/connectivity/network/tests/socket/util.h" |
| |
| namespace { |
| |
| class Server final : public fidl::testing::WireTestBase<fuchsia_posix_socket::StreamSocket> { |
| public: |
| explicit Server(zx::socket peer) : peer_(std::move(peer)) {} |
| |
| void NotImplemented_(const std::string& name, ::fidl::CompleterBase& completer) override { |
| ADD_FAILURE("%s should not be called", name.c_str()); |
| completer.Close(ZX_ERR_NOT_SUPPORTED); |
| } |
| |
| void Query(QueryCompleter::Sync& completer) final { |
| const std::string_view kProtocol = fuchsia_posix_socket::wire::kStreamSocketProtocolName; |
| // TODO(https://fxbug.dev/42052765): avoid the const cast. |
| uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<char*>(kProtocol.data())); |
| completer.Reply(fidl::VectorView<uint8_t>::FromExternal(data, kProtocol.size())); |
| } |
| |
| void Close(CloseCompleter::Sync& completer) override { |
| completer.ReplySuccess(); |
| completer.Close(ZX_OK); |
| } |
| |
| void Shutdown(ShutdownRequestView request, ShutdownCompleter::Sync& completer) override { |
| shutdown_count_++; |
| completer.ReplySuccess(); |
| } |
| |
| void Describe(DescribeCompleter::Sync& completer) override { |
| zx::socket peer; |
| if (const zx_status_t status = |
| peer_.duplicate(ZX_RIGHTS_BASIC | ZX_RIGHT_READ | ZX_RIGHT_WRITE, &peer); |
| status != ZX_OK) { |
| return completer.Close(status); |
| } |
| fidl::Arena alloc; |
| completer.Reply(fuchsia_posix_socket::wire::StreamSocketDescribeResponse::Builder(alloc) |
| .socket(std::move(peer)) |
| .Build()); |
| } |
| |
| void Connect(ConnectRequestView request, ConnectCompleter::Sync& completer) override { |
| if (on_connect_) { |
| on_connect_(peer_, completer); |
| } else { |
| fidl::testing::WireTestBase<fuchsia_posix_socket::StreamSocket>::Connect(request, completer); |
| } |
| } |
| |
| void GetError(GetErrorCompleter::Sync& completer) override { completer.ReplySuccess(); } |
| |
| void FillPeerSocket() const { |
| zx_info_socket_t info; |
| ASSERT_OK(peer_.get_info(ZX_INFO_SOCKET, &info, sizeof(info), nullptr, nullptr)); |
| size_t tx_buf_available = info.tx_buf_max - info.tx_buf_size; |
| std::unique_ptr<uint8_t[]> buf(new uint8_t[tx_buf_available + 1]); |
| size_t actual; |
| ASSERT_OK(peer_.write(0, buf.get(), tx_buf_available, &actual)); |
| ASSERT_EQ(actual, tx_buf_available); |
| } |
| |
| void ResetSocket() { peer_.reset(); } |
| |
| void SetOnConnect(fit::function<void(zx::socket&, ConnectCompleter::Sync&)> cb) { |
| on_connect_ = std::move(cb); |
| } |
| |
| uint16_t ShutdownCount() const { return shutdown_count_.load(); } |
| |
| private: |
| zx::socket peer_; |
| std::atomic<uint16_t> shutdown_count_ = 0; |
| |
| fit::function<void(zx::socket&, ConnectCompleter::Sync&)> on_connect_; |
| }; |
| |
| template <int sock_type> |
| class BaseTest : public zxtest::Test { |
| static_assert(sock_type == ZX_SOCKET_STREAM || sock_type == ZX_SOCKET_DATAGRAM); |
| |
| public: |
| BaseTest() : loop_(&kAsyncLoopConfigNoAttachToCurrentThread) {} |
| |
| protected: |
| void SetUp() override { |
| zx::socket client_socket; |
| ASSERT_OK(zx::socket::create(sock_type, &client_socket, &server_socket_)); |
| server_.emplace(std::move(client_socket)); |
| |
| auto endpoints = fidl::Endpoints<fuchsia_posix_socket::StreamSocket>::Create(); |
| |
| fidl::BindServer(loop_.dispatcher(), std::move(endpoints.server), &server_.value()); |
| ASSERT_OK(loop_.StartThread("fake-socket-server")); |
| ASSERT_OK( |
| fdio_fd_create(endpoints.client.channel().release(), client_fd_.reset_and_get_address())); |
| } |
| |
| const zx::socket& server_socket() { return server_socket_; } |
| |
| zx::socket& mutable_server_socket() { return server_socket_; } |
| |
| const fbl::unique_fd& client_fd() { return client_fd_; } |
| |
| fbl::unique_fd& mutable_client_fd() { return client_fd_; } |
| |
| const Server& server() { return server_.value(); } |
| |
| Server& mutable_server() { return server_.value(); } |
| |
| void set_connected() { |
| mutable_server().SetOnConnect( |
| [connected = false](zx::socket& peer, Server::ConnectCompleter::Sync& completer) mutable { |
| switch (sock_type) { |
| case ZX_SOCKET_STREAM: |
| if (!connected) { |
| connected = true; |
| // We need the FDIO to act like it's connected. |
| EXPECT_OK(peer.signal(0, fuchsia_posix_socket::wire::kSignalStreamConnected)); |
| completer.ReplyError(fuchsia_posix::wire::Errno::kEinprogress); |
| break; |
| } |
| __FALLTHROUGH; |
| case ZX_SOCKET_DATAGRAM: |
| completer.ReplySuccess(); |
| break; |
| } |
| }); |
| const sockaddr_in addr = { |
| .sin_family = AF_INET, |
| }; |
| ASSERT_SUCCESS( |
| connect(client_fd().get(), reinterpret_cast<const sockaddr*>(&addr), sizeof(addr))); |
| } |
| |
| void set_nonblocking_io() { |
| int flags; |
| ASSERT_GE(flags = fcntl(client_fd().get(), F_GETFL), 0, "%s", strerror(errno)); |
| ASSERT_SUCCESS(fcntl(client_fd().get(), F_SETFL, flags | O_NONBLOCK)); |
| } |
| |
| private: |
| zx::socket clientSocket() {} |
| |
| zx::socket server_socket_; |
| fbl::unique_fd client_fd_; |
| std::optional<Server> server_; |
| async::Loop loop_; |
| }; |
| |
| using TcpSocketTest = BaseTest<ZX_SOCKET_STREAM>; |
| TEST_F(TcpSocketTest, CloseZXSocketOnTransfer) { |
| // A socket's peer is not closed until all copies of that peer are closed. Since the server holds |
| // one of those copies (and the file descriptor holds the other), we must destroy the server's |
| // copy before asserting that fdio_fd_transfer closes the file descriptor's copy. |
| mutable_server().ResetSocket(); |
| |
| // The file descriptor still holds a copy of the peer; the peer is still open. |
| ASSERT_OK(server_socket().wait_one(ZX_SOCKET_WRITABLE, zx::time::infinite_past(), nullptr)); |
| |
| zx::handle handle; |
| ASSERT_OK(fdio_fd_transfer(client_fd().get(), handle.reset_and_get_address())); |
| |
| // The file descriptor has been destroyed; the peer is closed. |
| ASSERT_OK(server_socket().wait_one(ZX_SOCKET_PEER_CLOSED, zx::time::infinite_past(), nullptr)); |
| } |
| |
| // Verify scenario, where multi-segment recvmsg is requested, but the socket has |
| // just enough data to *completely* fill one segment. |
| // In this scenario, an attempt to read data for the next segment immediately |
| // fails with ZX_ERR_SHOULD_WAIT, and this may lead to bogus EAGAIN even if some |
| // data has actually been read. |
| TEST_F(TcpSocketTest, RecvmsgNonblockBoundary) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| ASSERT_NO_FATAL_FAILURE(set_nonblocking_io()); |
| |
| // Write 4 bytes of data to socket. |
| size_t actual; |
| const uint32_t data_out = 0x12345678; |
| EXPECT_OK(server_socket().write(0, &data_out, sizeof(data_out), &actual)); |
| EXPECT_EQ(actual, sizeof(data_out)); |
| |
| uint32_t data_in1, data_in2; |
| // Fail at compilation stage if anyone changes types. |
| // This is mandatory here: we need the first chunk to be exactly the same |
| // length as total size of data we just wrote. |
| static_assert(sizeof(data_in1) == sizeof(data_out)); |
| |
| struct iovec iov[] = { |
| { |
| .iov_base = &data_in1, |
| .iov_len = sizeof(data_in1), |
| }, |
| { |
| .iov_base = &data_in2, |
| .iov_len = sizeof(data_in2), |
| }, |
| }; |
| |
| struct msghdr msg = { |
| .msg_iov = iov, |
| .msg_iovlen = std::size(iov), |
| }; |
| |
| EXPECT_EQ(recvmsg(client_fd().get(), &msg, 0), ssize_t(sizeof(data_out)), "%s", strerror(errno)); |
| |
| EXPECT_SUCCESS(close(mutable_client_fd().release())); |
| } |
| |
| // Make sure we can successfully read zero bytes if we pass a zero sized input buffer. |
| TEST_F(TcpSocketTest, RecvmsgEmptyBuffer) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| ASSERT_NO_FATAL_FAILURE(set_nonblocking_io()); |
| |
| // Write 4 bytes of data to socket. |
| size_t actual; |
| const uint32_t data_out = 0x12345678; |
| EXPECT_OK(server_socket().write(0, &data_out, sizeof(data_out), &actual)); |
| EXPECT_EQ(actual, sizeof(data_out)); |
| |
| // Try to read into an empty set of io vectors. |
| struct msghdr msg = {}; |
| |
| // We should "successfully" read zero bytes. |
| EXPECT_SUCCESS(recvmsg(client_fd().get(), &msg, 0)); |
| } |
| |
| // Verify scenario, where multi-segment sendmsg is requested, but the socket has |
| // just enough spare buffer to *completely* read one segment. |
| // In this scenario, an attempt to send second segment should immediately fail |
| // with ZX_ERR_SHOULD_WAIT, but the sendmsg should report first segment length |
| // rather than failing with EAGAIN. |
| TEST_F(TcpSocketTest, SendmsgNonblockBoundary) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| ASSERT_NO_FATAL_FAILURE(set_nonblocking_io()); |
| |
| const size_t memlength = 65536; |
| std::unique_ptr<uint8_t[]> memchunk(new uint8_t[memlength]); |
| |
| struct iovec iov[]{ |
| { |
| .iov_base = memchunk.get(), |
| .iov_len = memlength, |
| }, |
| { |
| .iov_base = memchunk.get(), |
| .iov_len = memlength, |
| }, |
| }; |
| |
| const struct msghdr msg = { |
| .msg_iov = iov, |
| .msg_iovlen = std::size(iov), |
| }; |
| |
| // 1. Fill up the client socket. |
| server().FillPeerSocket(); |
| |
| // 2. Consume one segment of the data |
| size_t actual; |
| EXPECT_OK(server_socket().read(0, memchunk.get(), memlength, &actual)); |
| EXPECT_EQ(memlength, actual); |
| |
| // 3. Push again 2 packets of <memlength> bytes, observe only one sent. |
| EXPECT_EQ(sendmsg(client_fd().get(), &msg, 0), (ssize_t)memlength, "%s", strerror(errno)); |
| |
| EXPECT_SUCCESS(close(mutable_client_fd().release())); |
| } |
| |
| TEST_F(TcpSocketTest, WaitBeginEndConnecting) { |
| ASSERT_NO_FATAL_FAILURE(set_nonblocking_io()); |
| |
| // Like set_connected, but does not advance to the connected state. |
| mutable_server().SetOnConnect([](zx::socket& peer, Server::ConnectCompleter::Sync& completer) { |
| completer.ReplyError(fuchsia_posix::wire::Errno::kEinprogress); |
| }); |
| const sockaddr_in addr = { |
| .sin_family = AF_INET, |
| }; |
| ASSERT_EQ(connect(client_fd().get(), reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)), -1); |
| ASSERT_ERRNO(EINPROGRESS); |
| |
| fdio_t* io = fdio_unsafe_fd_to_io(client_fd().get()); |
| auto release = fit::defer([io]() { fdio_unsafe_release(io); }); |
| |
| zx_handle_t handle; |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLIN, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(fuchsia_posix_socket::wire::kSignalStreamIncoming | ZX_SOCKET_READABLE | |
| ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED, |
| signals); |
| } |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLOUT, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(fuchsia_posix_socket::wire::kSignalStreamConnected | ZX_SOCKET_PEER_CLOSED | |
| ZX_SOCKET_WRITE_DISABLED, |
| signals); |
| } |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLRDHUP, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED, signals); |
| } |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLHUP, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(ZX_SOCKET_PEER_CLOSED, signals); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_READABLE, &events); |
| EXPECT_EQ(0, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_PEER_CLOSED, &events); |
| EXPECT_EQ(POLLIN | POLLOUT | POLLERR | POLLHUP | POLLRDHUP, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_PEER_WRITE_DISABLED, &events); |
| EXPECT_EQ(POLLIN | POLLRDHUP, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_WRITABLE, &events); |
| EXPECT_EQ(0, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_WRITE_DISABLED, &events); |
| EXPECT_EQ(POLLOUT | POLLHUP, int32_t(events)); |
| } |
| } |
| |
| TEST_F(TcpSocketTest, WaitBeginEndConnected) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| fdio_t* io = fdio_unsafe_fd_to_io(client_fd().get()); |
| auto release = fit::defer([io]() { fdio_unsafe_release(io); }); |
| |
| zx_handle_t handle; |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLIN, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(ZX_SOCKET_READABLE | ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED, signals); |
| } |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLOUT, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_WRITABLE | ZX_SOCKET_WRITE_DISABLED, signals); |
| } |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLRDHUP, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED, signals); |
| } |
| |
| { |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(io, POLLHUP, &handle, &signals); |
| EXPECT_NE(ZX_HANDLE_INVALID, handle); |
| EXPECT_EQ(ZX_SOCKET_PEER_CLOSED, signals); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_READABLE, &events); |
| EXPECT_EQ(POLLIN, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_PEER_CLOSED, &events); |
| EXPECT_EQ(POLLIN | POLLOUT | POLLERR | POLLHUP | POLLRDHUP, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_PEER_WRITE_DISABLED, &events); |
| EXPECT_EQ(POLLIN | POLLRDHUP, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_WRITABLE, &events); |
| EXPECT_EQ(POLLOUT, int32_t(events)); |
| } |
| |
| { |
| uint32_t events; |
| fdio_unsafe_wait_end(io, ZX_SOCKET_WRITE_DISABLED, &events); |
| EXPECT_EQ(POLLOUT | POLLHUP, int32_t(events)); |
| } |
| } |
| |
| TEST_F(TcpSocketTest, Shutdown) { |
| ASSERT_EQ(shutdown(client_fd().get(), SHUT_RD), 0, "%s", strerror(errno)); |
| ASSERT_EQ(server().ShutdownCount(), 1); |
| } |
| |
| TEST_F(TcpSocketTest, GetReadBufferAvailable) { |
| int available = 0; |
| EXPECT_EQ(ioctl(client_fd().get(), FIONREAD, &available), 0, "%s", strerror(errno)); |
| EXPECT_EQ(available, 0); |
| |
| constexpr size_t data_size = 47; |
| std::array<char, data_size> data_buf; |
| size_t actual = 0; |
| EXPECT_OK(server_socket().write(0u, data_buf.data(), data_buf.size(), &actual)); |
| EXPECT_EQ(actual, data_size); |
| |
| EXPECT_EQ(ioctl(client_fd().get(), FIONREAD, &available), 0, "%s", strerror(errno)); |
| EXPECT_EQ(available, data_size); |
| } |
| |
| TEST_F(TcpSocketTest, PollNoEvents) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| struct pollfd pfds[] = { |
| { |
| .fd = client_fd().get(), |
| .events = 0, |
| }, |
| }; |
| |
| EXPECT_EQ(poll(pfds, std::size(pfds), 5), 0, "error: %s", strerror(errno)); |
| } |
| |
| using UdpSocketTest = BaseTest<ZX_SOCKET_DATAGRAM>; |
| TEST_F(UdpSocketTest, DatagramSendMsg) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| |
| { |
| const struct msghdr msg = {}; |
| // sendmsg should accept 0 length payload. |
| EXPECT_SUCCESS(sendmsg(client_fd().get(), &msg, 0)); |
| // no data will have arrived on the other end. |
| constexpr size_t prior = 1337; |
| size_t actual = prior; |
| std::array<char, 1> rcv_buf; |
| EXPECT_EQ(server_socket().read(0, rcv_buf.data(), rcv_buf.size(), &actual), ZX_ERR_SHOULD_WAIT); |
| EXPECT_EQ(actual, prior); |
| } |
| |
| struct sockaddr_in addr = { |
| .sin_family = AF_INET, |
| .sin_addr = |
| { |
| .s_addr = htonl(INADDR_LOOPBACK), |
| }, |
| }; |
| const socklen_t addrlen = sizeof(addr); |
| |
| constexpr char payload[] = "hello"; |
| struct iovec iov[] = { |
| { |
| .iov_base = static_cast<void*>(const_cast<char*>(payload)), |
| .iov_len = sizeof(payload), |
| }, |
| }; |
| |
| struct msghdr msg = { |
| .msg_name = &addr, |
| .msg_namelen = addrlen, |
| .msg_iov = iov, |
| .msg_iovlen = std::size(iov), |
| }; |
| |
| EXPECT_EQ(sendmsg(client_fd().get(), &msg, 0), ssize_t(sizeof(payload)), "%s", strerror(errno)); |
| |
| // sendmsg doesn't fail when msg_namelen is greater than sizeof(struct sockaddr_storage) because |
| // what's being tested here is a fuchsia.posix.socket.StreamSocket backed by a |
| // zx::socket(ZX_SOCKET_DATAGRAM), a Frankenstein's monster which implements stream semantics on |
| // the network and datagram semantics on the transport to the netstack. |
| msg.msg_namelen = sizeof(sockaddr_storage) + 1; |
| EXPECT_EQ(sendmsg(client_fd().get(), &msg, 0), ssize_t(sizeof(payload)), "%s", strerror(errno)); |
| |
| { |
| size_t actual; |
| std::array<char, sizeof(payload) + 1> rcv_buf; |
| for (int i = 0; i < 2; i++) { |
| EXPECT_OK(server_socket().read(0, rcv_buf.data(), rcv_buf.size(), &actual)); |
| EXPECT_EQ(actual, sizeof(payload)); |
| } |
| } |
| |
| EXPECT_SUCCESS(close(mutable_client_fd().release())); |
| } |
| |
| TEST_F(UdpSocketTest, Shutdown) { |
| ASSERT_EQ(shutdown(client_fd().get(), SHUT_RD), 0, "%s", strerror(errno)); |
| ASSERT_EQ(server().ShutdownCount(), 1); |
| } |
| |
| class TcpSocketTimeoutTest : public TcpSocketTest { |
| protected: |
| template <int optname> |
| void timeout(fbl::unique_fd& fd, zx::socket& server_socket) { |
| static_assert(optname == SO_RCVTIMEO || optname == SO_SNDTIMEO); |
| |
| // We want this to be a small number so the test is fast, but at least 1 |
| // second so that we exercise `tv_sec`. |
| const auto timeout = std::chrono::seconds(1) + std::chrono::milliseconds(50); |
| { |
| const auto sec = std::chrono::duration_cast<std::chrono::seconds>(timeout); |
| const struct timeval tv = { |
| .tv_sec = sec.count(), |
| .tv_usec = std::chrono::duration_cast<std::chrono::microseconds>(timeout - sec).count(), |
| }; |
| ASSERT_SUCCESS(setsockopt(fd.get(), SOL_SOCKET, optname, &tv, sizeof(tv))); |
| struct timeval actual_tv; |
| socklen_t optlen = sizeof(actual_tv); |
| ASSERT_EQ(getsockopt(fd.get(), SOL_SOCKET, optname, &actual_tv, &optlen), 0, "%s", |
| strerror(errno)); |
| ASSERT_EQ(optlen, sizeof(actual_tv)); |
| ASSERT_EQ(actual_tv.tv_sec, tv.tv_sec); |
| ASSERT_EQ(actual_tv.tv_usec, tv.tv_usec); |
| } |
| |
| const auto margin = std::chrono::milliseconds(50); |
| |
| uint8_t buf[16]; |
| |
| // Perform the read/write. This is the core of the test - we expect the operation to time out |
| // per our setting of the timeout above. |
| |
| const auto start = std::chrono::steady_clock::now(); |
| |
| switch (optname) { |
| case SO_RCVTIMEO: |
| ASSERT_EQ(read(fd.get(), buf, sizeof(buf)), -1); |
| break; |
| case SO_SNDTIMEO: |
| ASSERT_EQ(write(fd.get(), buf, sizeof(buf)), -1); |
| break; |
| } |
| ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK, "%s", strerror(errno)); |
| |
| const auto elapsed = std::chrono::steady_clock::now() - start; |
| |
| // Check that the actual time waited was close to the expectation. |
| const auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed); |
| const auto timeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(timeout); |
| |
| // TODO(https://fxbug.dev/42116074): Only the lower bound of the elapsed time is checked. The |
| // upper bound check is ignored as the syscall could far miss the defined deadline to return. |
| EXPECT_GT(elapsed, timeout - margin, "elapsed=%lld ms (which is not within %lld ms of %lld ms)", |
| elapsed_ms.count(), margin.count(), timeout_ms.count()); |
| |
| // Remove the timeout. |
| const struct timeval tv = {}; |
| ASSERT_SUCCESS(setsockopt(fd.get(), SOL_SOCKET, optname, &tv, sizeof(tv))); |
| // Wrap the read/write in a future to enable a timeout. We expect the future |
| // to time out. |
| std::latch fut_started(1); |
| auto fut = std::async(std::launch::async, [&]() -> std::pair<ssize_t, int> { |
| fut_started.count_down(); |
| |
| switch (optname) { |
| case SO_RCVTIMEO: |
| return std::make_pair(read(fd.get(), buf, sizeof(buf)), errno); |
| case SO_SNDTIMEO: |
| return std::make_pair(write(fd.get(), buf, sizeof(buf)), errno); |
| } |
| }); |
| fut_started.wait(); |
| EXPECT_EQ(fut.wait_for(margin), std::future_status::timeout); |
| // Resetting the remote end socket should cause the read/write to complete. |
| server_socket.reset(); |
| // Closing the socket without returning an error from `getsockopt(_, SO_ERROR, ...)` looks like |
| // the connection was gracefully closed. The same behavior is exercised in |
| // src/connectivity/network/tests/bsdsocket_test.cc:{StopListenWhileConnect,BlockedIOTest/CloseWhileBlocked}. |
| auto return_code_and_errno = fut.get(); |
| switch (optname) { |
| case SO_RCVTIMEO: |
| EXPECT_EQ(return_code_and_errno.first, 0); |
| break; |
| case SO_SNDTIMEO: |
| EXPECT_EQ(return_code_and_errno.first, -1); |
| ASSERT_EQ(return_code_and_errno.second, EPIPE, "%s", |
| strerror(return_code_and_errno.second)); |
| break; |
| } |
| |
| ASSERT_SUCCESS(close(fd.release())); |
| } |
| }; |
| |
| TEST_F(TcpSocketTimeoutTest, Rcv) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| timeout<SO_RCVTIMEO>(mutable_client_fd(), mutable_server_socket()); |
| } |
| |
| TEST_F(TcpSocketTimeoutTest, Snd) { |
| ASSERT_NO_FATAL_FAILURE(set_connected()); |
| server().FillPeerSocket(); |
| timeout<SO_SNDTIMEO>(mutable_client_fd(), mutable_server_socket()); |
| } |
| |
| // An arbitrary maximum payload size. |
| constexpr size_t kUdpMaxPayloadSize = 60000; |
| |
| class DatagramSocketServer final |
| : public fidl::testing::WireTestBase<fuchsia_posix_socket::DatagramSocket> { |
| public: |
| explicit DatagramSocketServer(zx::socket socket) : socket_(std::move(socket)) {} |
| |
| private: |
| void NotImplemented_(const std::string& name, fidl::CompleterBase& completer) final { |
| ADD_FAILURE("unexpected message received: %s", name.c_str()); |
| completer.Close(ZX_ERR_NOT_SUPPORTED); |
| } |
| |
| void Close(CloseCompleter::Sync& completer) override { |
| completer.ReplySuccess(); |
| completer.Close(ZX_OK); |
| } |
| |
| void Query(QueryCompleter::Sync& completer) final { |
| const std::string_view kProtocol = fuchsia_posix_socket::wire::kDatagramSocketProtocolName; |
| // TODO(https://fxbug.dev/42052765): avoid the const cast. |
| uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<char*>(kProtocol.data())); |
| completer.Reply(fidl::VectorView<uint8_t>::FromExternal(data, kProtocol.size())); |
| } |
| |
| void Describe(DescribeCompleter::Sync& completer) final { |
| ASSERT_TRUE(socket_.is_valid()); |
| fidl::Arena alloc; |
| completer.Reply(fuchsia_posix_socket::wire::DatagramSocketDescribeResponse::Builder(alloc) |
| .socket(std::move(socket_)) |
| .tx_meta_buf_size(kTxUdpPreludeSize) |
| .rx_meta_buf_size(kRxUdpPreludeSize) |
| .metadata_encoding_protocol_version({}) |
| .Build()); |
| } |
| |
| void SendMsgPreflight(fuchsia_posix_socket::wire::DatagramSocketSendMsgPreflightRequest* request, |
| SendMsgPreflightCompleter::Sync& completer) override { |
| fuchsia_net::wire::Ipv4SocketAddress fidl_addr = { |
| .port = 8080, |
| }; |
| in_addr_t addr = htonl(INADDR_LOOPBACK); |
| memcpy(fidl_addr.address.addr.data(), &addr, sizeof(addr)); |
| // Providing no eventpairs means that the client's cache will never be |
| // invalidated; every subsequent preflight check will succeed client-side. |
| std::array<zx::eventpair, 0> eventpairs = {}; |
| |
| fidl::Arena alloc; |
| fidl::WireTableBuilder response_builder = |
| fuchsia_posix_socket::wire::DatagramSocketSendMsgPreflightResponse::Builder(alloc); |
| response_builder.to(fuchsia_net::wire::SocketAddress::WithIpv4(alloc, fidl_addr)) |
| .validity(fidl::VectorView<zx::eventpair>::FromExternal(eventpairs)) |
| .maximum_size(kUdpMaxPayloadSize); |
| completer.ReplySuccess(response_builder.Build()); |
| } |
| |
| zx::socket socket_; |
| }; |
| |
| class BlockingOp { |
| public: |
| enum class Op { |
| SEND, |
| POLL, |
| SELECT, |
| }; |
| |
| explicit BlockingOp(const Op& op) : op_(op) {} |
| |
| void Execute(const fbl::unique_fd& fd, const std::vector<char>& buf) { |
| switch (op_) { |
| case Op::SEND: { |
| ssize_t n = send(fd.get(), buf.data(), buf.size(), /* flags */ 0); |
| ASSERT_GE(n, 0, "%s", strerror(errno)); |
| ASSERT_EQ(n, ssize_t(buf.size())); |
| } break; |
| case Op::POLL: { |
| pollfd pfd = { |
| .fd = fd.get(), |
| .events = POLLOUT, |
| }; |
| int n = poll(&pfd, 1, /* infinite timeout */ -1); |
| ASSERT_GE(n, 0, "%s", strerror(errno)); |
| ASSERT_EQ(n, 1); |
| ASSERT_EQ(pfd.revents, POLLOUT); |
| } break; |
| case Op::SELECT: { |
| fd_set writefds; |
| FD_ZERO(&writefds); |
| FD_SET(fd.get(), &writefds); |
| int n = select(fd.get() + 1, /* readfds */ nullptr, &writefds, |
| /* exceptfds */ nullptr, /* infinite timeout */ nullptr); |
| ASSERT_GE(n, 0, "%s", strerror(errno)); |
| ASSERT_EQ(n, 1); |
| ASSERT_TRUE(FD_ISSET(fd.get(), &writefds)); |
| } break; |
| } |
| } |
| |
| static constexpr const char* Name(enum Op op) { |
| switch (op) { |
| case Op::SEND: |
| return "Send"; |
| case Op::POLL: |
| return "Poll"; |
| case Op::SELECT: |
| return "Select"; |
| } |
| } |
| |
| private: |
| const enum Op op_; |
| }; |
| |
| class DatagramSocketTest : public zxtest::TestWithParam<BlockingOp::Op> { |
| public: |
| const size_t kWriteThreshold = kUdpMaxPayloadSize + kTxUdpPreludeSize; |
| |
| void SetUp() final { |
| zx::socket socket; |
| ASSERT_OK(zx::socket::create(ZX_SOCKET_DATAGRAM, &socket, &peer_)); |
| ASSERT_OK(socket.get_info(ZX_INFO_SOCKET, &info_, sizeof(info_), nullptr, nullptr)); |
| ASSERT_OK(socket.set_property(ZX_PROP_SOCKET_TX_THRESHOLD, &kWriteThreshold, |
| sizeof(kWriteThreshold))); |
| |
| zx::result server_end = fidl::CreateEndpoints(&client_end_); |
| ASSERT_OK(server_end.status_value()); |
| fidl::BindServer(control_loop_.dispatcher(), std::move(server_end.value()), |
| &server_.emplace(std::move(socket))); |
| control_loop_.StartThread("control"); |
| } |
| |
| void TearDown() final { control_loop_.Shutdown(); } |
| |
| const zx_info_socket_t& info() const { return info_; } |
| zx::socket TakePeer() { return std::move(peer_); } |
| fidl::ClientEnd<fsocket::DatagramSocket> TakeClientEnd() { return std::move(client_end_); } |
| |
| private: |
| zx::socket peer_; |
| zx_info_socket_t info_; |
| fidl::ClientEnd<fsocket::DatagramSocket> client_end_; |
| std::optional<DatagramSocketServer> server_; |
| async::Loop control_loop_{&kAsyncLoopConfigNoAttachToCurrentThread}; |
| }; |
| |
| TEST_P(DatagramSocketTest, WriteWithTxZirconSocketRemainder) { |
| // Fast datagram sockets on Fuchsia use multiple buffers to store outbound |
| // payloads, some of which are in Netstack memory and some of which are in |
| // kernel memory. Bytes are shuttled between these buffers using goroutines. |
| // |
| // One edge case arises when the kernel buffers have free space, but not so |
| // much space that they can accept the next datagram payload. In this case, |
| // the client waits until the kernel object can accept the maximum payload |
| // size to ensure that the next write will succeed, in an operation known as a |
| // threshold wait. |
| // |
| // This test exercises this scenario. |
| fbl::unique_fd fd; |
| ASSERT_OK(fdio_fd_create(TakeClientEnd().TakeChannel().release(), fd.reset_and_get_address())); |
| |
| // Pick a payload size which ensures that the zircon socket will have a |
| // "remainder". |
| // |
| // In other words, choose a size by which the zircon socket's capacity is not |
| // divisible, such that even when the maximum amount of payloads are written |
| // into the socket, there will be some capacity remaining, and therefore the |
| // socket will still be considered writable. |
| size_t payload_size = std::min(kUdpMaxPayloadSize, info().tx_buf_max - kTxUdpPreludeSize); |
| size_t total_size = payload_size + kTxUdpPreludeSize; |
| { |
| for (; payload_size > 0; --payload_size) { |
| total_size = payload_size + kTxUdpPreludeSize; |
| if (info().tx_buf_max % total_size != 0) { |
| break; |
| } |
| } |
| ASSERT_GT( |
| payload_size, 0, |
| "couldn't find valid UDP payload size for which (zx_socket_info.tx_buf_max %% payload size) != 0"); |
| } |
| |
| // Send enough packets to fill the zircon socket. |
| std::vector<char> buf(payload_size, 'a'); |
| for (size_t remaining = info().tx_buf_max; remaining > total_size; remaining -= total_size) { |
| ASSERT_EQ(send(fd.get(), buf.data(), buf.size(), /* flags */ 0), ssize_t(buf.size()), |
| "%s: %zu capacity remaining in socket", strerror(errno), remaining); |
| } |
| |
| // The next operation should block because the zircon socket is full. |
| BlockingOp op(GetParam()); |
| std::latch op_started(1); |
| const auto fut = std::async(std::launch::async, [&]() { |
| op_started.count_down(); |
| op.Execute(fd, buf); |
| }); |
| op_started.wait(); |
| ASSERT_NO_FATAL_FAILURE(AssertBlocked(fut)); |
| |
| // Dequeue packets from the netstack's end of the zircon socket; the operation |
| // should continue to block until capacity has reached the write threshold. |
| zx::socket peer = TakePeer(); |
| std::vector<char> recvbuf; |
| recvbuf.resize(buf.size() + kTxUdpPreludeSize); |
| for (size_t capacity = info().tx_buf_max % total_size; capacity < kWriteThreshold; |
| capacity += total_size) { |
| ASSERT_NO_FATAL_FAILURE(AssertBlocked(fut)); |
| |
| size_t actual; |
| ASSERT_OK(peer.read(/* options */ 0, recvbuf.data(), recvbuf.size(), &actual)); |
| ASSERT_EQ(actual, recvbuf.size()); |
| EXPECT_EQ(std::string_view(recvbuf.data() + kTxUdpPreludeSize, buf.size()), |
| std::string_view(buf.data(), buf.size())); |
| } |
| |
| // Now that the capacity in the socket has crossed the write threshold, the |
| // blocking operation should be unblocked. |
| fut.wait(); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(DatagramSocketTests, DatagramSocketTest, |
| zxtest::Values(BlockingOp::Op::SEND, BlockingOp::Op::POLL, |
| BlockingOp::Op::SELECT), |
| [](const auto info) { return BlockingOp::Name(info.param); }); |
| |
| } // namespace |