blob: b88bbc583341c3fcac735de4f7bec0448466bace [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <lib/fdio/fd.h>
#include <lib/fdio/fdio.h>
#include <lib/fdio/unsafe.h>
#include <lib/zx/clock.h>
#include <lib/zx/thread.h>
#include <poll.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <zircon/syscalls.h>
#include <zircon/threads.h>
#include <array>
#include <future>
#include <fbl/unique_fd.h>
#include <zxtest/zxtest.h>
// TODO(fxbug.dev/44390): Remove once parameterized tests are supported.
// This macro is a cheap workaround, as each test must be uniquely named
// and it's best to have each test exercise a single use case.
#define TEST_P(TestCase, Test) \
TEST(TestCase, Socket##Test) { Test(SOCK_STREAM); } \
TEST(TestCase, Datagram##Test) { Test(SOCK_DGRAM); }
void SocketpairSetup(std::array<fbl::unique_fd, 2>& fds, uint16_t type) {
int int_fds[fds.size()];
int status = socketpair(AF_UNIX, type, 0, int_fds);
ASSERT_EQ(status, 0, "socketpair(AF_UNIX, %u, 0, fds) failed", type);
fds[0].reset(int_fds[0]);
fds[1].reset(int_fds[1]);
}
void SocketpairShutdownSetup(std::array<fbl::unique_fd, 2>& fds, uint16_t type) {
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
// Set both ends to non-blocking to make testing for readability/writability easier.
ASSERT_EQ(fcntl(fds[0].get(), F_SETFL, O_NONBLOCK), 0);
ASSERT_EQ(fcntl(fds[1].get(), F_SETFL, O_NONBLOCK), 0);
char buf[1] = {};
// Both sides should be readable.
errno = 0;
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), -1, "fds[0] should initially be readable");
EXPECT_EQ(errno, EAGAIN);
errno = 0;
EXPECT_EQ(read(fds[1].get(), buf, sizeof(buf)), -1, "fds[1] should initially be readable");
EXPECT_EQ(errno, EAGAIN);
// Both sides should be writable.
EXPECT_EQ(write(fds[0].get(), buf, sizeof(buf)), 1, "fds[0] should be initially writable");
EXPECT_EQ(write(fds[1].get(), buf, sizeof(buf)), 1, "fds[1] should be initially writable");
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), 1);
EXPECT_EQ(read(fds[1].get(), buf, sizeof(buf)), 1);
}
void Control(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
// write() and read() should work.
const char buf[] = "abc";
ASSERT_EQ(write(fds[0].get(), buf, sizeof(buf)), sizeof(buf), "%s", strerror(errno));
char recvbuf[sizeof(buf)];
ASSERT_EQ(read(fds[1].get(), recvbuf, sizeof(recvbuf)), sizeof(buf), "%s", strerror(errno));
recvbuf[sizeof(recvbuf) - 1] = 0;
ASSERT_STR_EQ(recvbuf, buf);
// send() and recv() should also work.
ASSERT_EQ(send(fds[1].get(), buf, sizeof(buf), 0), sizeof(buf), "%s", strerror(errno));
ASSERT_EQ(recv(fds[0].get(), recvbuf, sizeof(recvbuf), 0), sizeof(buf), "%s", strerror(errno));
recvbuf[sizeof(recvbuf) - 1] = 0;
ASSERT_STR_EQ(recvbuf, buf);
EXPECT_EQ(close(fds[0].release()), 0, "close(fds[0]) failed");
EXPECT_EQ(close(fds[1].release()), 0, "close(fds[1]) failed");
}
TEST_P(SocketpairTest, Control)
static_assert(EAGAIN == EWOULDBLOCK, "Assuming EAGAIN and EWOULDBLOCK have same value");
#if defined(__Fuchsia__)
#define SEND_FLAGS 0
#else
#define SEND_FLAGS MSG_NOSIGNAL
#endif
void ShutdownRead(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairShutdownSetup(fds, type));
// Write a byte into fds[1] to test for readability later.
char buf[1] = {};
EXPECT_EQ(write(fds[1].get(), buf, sizeof(buf)), 1);
// Close one side down for reading.
int status = shutdown(fds[0].get(), SHUT_RD);
EXPECT_EQ(status, 0, "shutdown(fds[0], SHUT_RD)");
if (status != 0)
printf("\nerrno %d\n", errno);
// Can read the byte already written into the pipe.
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), 1, "fds[0] should not be readable after SHUT_RD");
// But not send any further bytes
EXPECT_EQ(send(fds[1].get(), buf, sizeof(buf), SEND_FLAGS), -1);
EXPECT_EQ(errno, EPIPE, "send should return EPIPE after shutdown(SHUT_RD) on other side");
// Or read any more
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), 0);
EXPECT_EQ(close(fds[0].release()), 0);
EXPECT_EQ(close(fds[1].release()), 0);
}
TEST_P(SocketpairTest, ShutdownRead)
void ShutdownWrite(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairShutdownSetup(fds, type));
// Close one side down for writing.
int status = shutdown(fds[0].get(), SHUT_WR);
EXPECT_EQ(status, 0, "shutdown(fds[0], SHUT_WR)");
if (status != 0)
printf("\nerrno %d\n", errno);
char buf[1] = {};
// Should still be readable.
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), -1);
EXPECT_EQ(errno, EAGAIN, "errno after read after SHUT_WR");
// But not writable
EXPECT_EQ(send(fds[0].get(), buf, sizeof(buf), SEND_FLAGS), -1, "write after SHUT_WR");
EXPECT_EQ(errno, EPIPE, "errno after write after SHUT_WR");
// Should still be able to write + read a message in the other direction.
EXPECT_EQ(write(fds[1].get(), buf, sizeof(buf)), 1);
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), 1);
EXPECT_EQ(close(fds[0].release()), 0);
EXPECT_EQ(close(fds[1].release()), 0);
}
TEST_P(SocketpairTest, ShutdownWrite)
void ShutdownReadWrite(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairShutdownSetup(fds, type));
// Close one side for reading and writing.
int status = shutdown(fds[0].get(), SHUT_RDWR);
EXPECT_EQ(status, 0, "shutdown(fds[0], SHUT_RDWR");
if (status != 0)
printf("\nerrno %d\n", errno);
char buf[1] = {};
// Writing should fail.
EXPECT_EQ(send(fds[0].get(), buf, sizeof(buf), SEND_FLAGS), -1);
EXPECT_EQ(errno, EPIPE, "errno after write after SHUT_RDWR");
// Reading should return no data.
EXPECT_EQ(read(fds[0].get(), buf, sizeof(buf)), 0);
}
TEST_P(SocketpairTest, ShutdownReadWrite)
std::thread poll_for_read_with_timeout(const fbl::unique_fd& fd) {
return std::thread([&]() {
struct pollfd pfd = {
.fd = fd.get(),
.events = POLLIN,
};
int timeout_ms = 100;
zx_time_t time_before = zx_clock_get_monotonic();
EXPECT_EQ(poll(&pfd, 1, timeout_ms), 1, "poll should have one entry");
zx_time_t time_after = zx_clock_get_monotonic();
EXPECT_LT(time_after - time_before, 100u * 1000 * 1000, "poll should not have timed out");
int num_readable = 0;
EXPECT_EQ(ioctl(pfd.fd, FIONREAD, &num_readable), 0, "ioctl(FIONREAD)");
EXPECT_EQ(num_readable, 0);
});
}
void ShutdownSelfWritePoll(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairShutdownSetup(fds, type));
std::thread poll_thread = poll_for_read_with_timeout(fds[0]);
EXPECT_EQ(shutdown(fds[0].get(), SHUT_RDWR), 0, "%s", strerror(errno));
poll_thread.join();
}
TEST_P(SocketpairTest, ShutdownSelfWritePoll)
void ShutdownPeerWritePoll(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairShutdownSetup(fds, type));
std::thread poll_thread = poll_for_read_with_timeout(fds[0]);
EXPECT_EQ(shutdown(fds[1].get(), SHUT_RDWR), 0, "%s", strerror(errno));
poll_thread.join();
}
TEST_P(SocketpairTest, ShutdownPeerWritePoll)
std::thread recv_thread(const fbl::unique_fd& fd) {
return std::thread([&]() {
std::array<char, 256> buf;
EXPECT_EQ(recv(fd.get(), buf.data(), buf.size(), 0), 0, "%s", strerror(errno));
});
}
void ShutdownSelfReadDuringRecv(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
std::thread t = recv_thread(fds[0]);
EXPECT_EQ(shutdown(fds[0].get(), SHUT_RD), 0, "%s", strerror(errno));
t.join();
}
TEST_P(SocketpairTest, ShutdownSelfReadDuringRecv)
void ShutdownSelfWriteDuringRecv(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
std::thread t = recv_thread(fds[0]);
EXPECT_EQ(shutdown(fds[1].get(), SHUT_WR), 0, "%s", strerror(errno));
t.join();
}
TEST_P(SocketpairTest, ShutdownSelfWriteDuringRecv)
std::thread send_thread(const fbl::unique_fd& fd) {
return std::thread([&]() {
std::array<char, 256> buf;
EXPECT_EQ(send(fd.get(), buf.data(), buf.size(), 0), -1);
EXPECT_EQ(errno, EPIPE, "%s", strerror(errno));
});
}
constexpr zx::duration kStateCheckIntervals = zx::usec(5);
// Wait until |thread| has entered |state|.
zx_status_t WaitForState(const zx::thread& thread, zx_thread_state_t desired_state) {
while (true) {
zx_info_thread_t info;
zx_status_t status = thread.get_info(ZX_INFO_THREAD, &info, sizeof(info), nullptr, nullptr);
if (status != ZX_OK) {
return status;
}
if (info.state == desired_state) {
return ZX_OK;
}
zx::nanosleep(zx::deadline_after(kStateCheckIntervals));
}
}
void ShutdownSelfWriteDuringSend(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
// First, fill up the socket so the next send() will block.
std::array<char, 256> buf;
while (true) {
ssize_t status = send(fds[0].get(), buf.data(), buf.size(), MSG_DONTWAIT);
if (status < 0) {
ASSERT_EQ(errno, EAGAIN, "send should eventually return EAGAIN when full");
break;
}
}
// Then start a thread blocking on a send().
std::thread t = send_thread(fds[0]);
// Wait for the thread to sleep in send.
ASSERT_OK(WaitForState(*(zx::unowned_thread(thrd_get_zx_handle(t.native_handle()))),
ZX_THREAD_STATE_BLOCKED_WAIT_ONE));
EXPECT_EQ(shutdown(fds[0].get(), SHUT_WR), 0, "%s", strerror(errno));
t.join();
}
TEST_P(SocketpairTest, ShutdownSelfWriteDuringSend)
void ShutdownSelfWriteBeforeSend(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
// First, fill up the socket so the next send() will block.
std::array<char, 256> buf;
while (true) {
ssize_t status = send(fds[0].get(), buf.data(), buf.size(), MSG_DONTWAIT);
if (status < 0) {
ASSERT_EQ(errno, EAGAIN, "send should eventually return EAGAIN when full");
break;
}
}
EXPECT_EQ(shutdown(fds[0].get(), SHUT_WR), 0, "%s", strerror(errno));
// Then start a thread blocking on a send().
std::thread t = send_thread(fds[0]);
t.join();
}
TEST_P(SocketpairTest, ShutdownSelfWriteBeforeSend)
void ShutdownPeerReadDuringSend(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
// First, fill up the socket so the next send() will block.
std::array<char, 256> buf;
while (true) {
ssize_t status = send(fds[0].get(), buf.data(), buf.size(), MSG_DONTWAIT);
if (status < 0) {
ASSERT_EQ(errno, EAGAIN, "send should eventually return EAGAIN when full");
break;
}
}
// Then start a thread blocking on a send().
std::thread t = send_thread(fds[0]);
// Wait for the thread to sleep in send.
ASSERT_OK(WaitForState(*(zx::unowned_thread(thrd_get_zx_handle(t.native_handle()))),
ZX_THREAD_STATE_BLOCKED_WAIT_ONE));
EXPECT_EQ(shutdown(fds[1].get(), SHUT_RD), 0, "%s", strerror(errno));
t.join();
}
TEST_P(SocketpairTest, ShutdownPeerReadDuringSend)
void ShutdownPeerReadBeforeSend(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
// First, fill up the socket so the next send() will block.
std::array<char, 256> buf;
while (true) {
ssize_t status = send(fds[0].get(), buf.data(), buf.size(), MSG_DONTWAIT);
if (status < 0) {
ASSERT_EQ(errno, EAGAIN, "send should eventually return EAGAIN when full");
break;
}
}
EXPECT_EQ(shutdown(fds[1].get(), SHUT_RD), 0, "%s", strerror(errno));
std::thread t = send_thread(fds[0]);
t.join();
}
TEST_P(SocketpairTest, ShutdownPeerReadBeforeSend)
void CloneOrUnwrapAndWrap(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
zx::handle handle;
int status = fdio_fd_clone(fds[0].get(), handle.reset_and_get_address());
ASSERT_OK(status, "fdio_fd_clone() failed");
fbl::unique_fd cloned_fd;
status = fdio_fd_create(handle.release(), cloned_fd.reset_and_get_address());
EXPECT_EQ(status, 0, "fdio_fd_create(..., &cloned_fd) failed");
status = fdio_fd_transfer(fds[0].release(), handle.reset_and_get_address());
ASSERT_OK(status, "fdio_fd_transfer() failed");
fbl::unique_fd transferred_fd;
status = fdio_fd_create(handle.release(), transferred_fd.reset_and_get_address());
EXPECT_EQ(status, 0, "fdio_fd_create(..., &transferred_fd) failed");
// Verify that an operation specific to socketpairs works on these fds.
ASSERT_EQ(0, shutdown(transferred_fd.get(), SHUT_WR), "shutdown(transferred_fd, SHUT_WR) failed");
ASSERT_EQ(0, shutdown(cloned_fd.get(), SHUT_RD), "shutdown(cloned_fd, SHUT_RD) failed");
}
TEST_P(SocketpairTest, CloneOrUnwrapAndWrap)
// Verify scenario, where multi-segment recvmsg is requested, but the socket has
// just enough data to *completely* fill one segment.
// In this scenario, an attempt to read data for the next segment immediately
// fails with ZX_ERR_SHOULD_WAIT; at this point recvmsg should report total
// number of bytes read, instead of failing with EAGAIN.
TEST(SocketpairTest, StreamRecvmsgNonblockBoundary) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, SOCK_STREAM));
ASSERT_EQ(fcntl(fds[0].get(), F_SETFL, O_NONBLOCK), 0);
ASSERT_EQ(fcntl(fds[1].get(), F_SETFL, O_NONBLOCK), 0);
// Write 4 bytes of data to socket.
size_t actual;
const uint32_t data_out = 0x12345678;
EXPECT_EQ((ssize_t)sizeof(data_out), write(fds[0].get(), &data_out, sizeof(data_out)),
"Socket write failed");
uint32_t data_in1, data_in2;
// Fail at compilation stage if anyone changes types.
// This is mandatory here: we need the first chunk to be exactly the same
// length as total size of data we just wrote.
assert(sizeof(data_in1) == sizeof(data_out));
struct iovec iov[] = {{
.iov_base = &data_in1,
.iov_len = sizeof(data_in1),
},
{
.iov_base = &data_in2,
.iov_len = sizeof(data_in2),
}};
struct msghdr msg = {
.msg_iov = iov,
.msg_iovlen = std::size(iov),
};
actual = recvmsg(fds[1].get(), &msg, 0);
EXPECT_EQ(sizeof(data_in1), actual, "Socket read failed");
}
// Verify scenario, where multi-segment sendmsg is requested, but the socket has
// just enough spare buffer to *completely* read one segment.
// In this scenario, an attempt to send second segment should immediately fail
// with ZX_ERR_SHOULD_WAIT, but the sendmsg should report first segment length
// rather than failing with EAGAIN.
TEST(SocketpairTest, StreamSendmsgNonblockBoundary) {
const ssize_t memlength = 65536;
void* memchunk = malloc(memlength);
struct iovec iov[] = {
{
.iov_base = memchunk,
.iov_len = memlength,
},
{
.iov_base = memchunk,
.iov_len = memlength,
},
};
std::array<fbl::unique_fd, 2> fds;
SocketpairSetup(fds, SOCK_STREAM);
ASSERT_EQ(fcntl(fds[0].get(), F_SETFL, O_NONBLOCK), 0);
ASSERT_EQ(fcntl(fds[1].get(), F_SETFL, O_NONBLOCK), 0);
struct msghdr msg = {
.msg_iov = iov,
.msg_iovlen = std::size(iov),
};
// 1. Keep sending data until socket is saturated.
while (sendmsg(fds[0].get(), &msg, 0) > 0)
;
// 2. Consume one segment of the data.
EXPECT_EQ(memlength, read(fds[1].get(), memchunk, memlength), "Socket read failed.");
// 3. Push again 2 packets of <memlength> bytes, observe only one sent.
EXPECT_EQ(memlength, sendmsg(fds[0].get(), &msg, 0),
"Partial sendmsg failed; is the socket buffer varying?");
free(memchunk);
}
void WaitBeginEnd(uint16_t type) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, type));
fdio_t* io = fdio_unsafe_fd_to_io(fds[0].get());
// fdio_unsafe_wait_begin
zx::handle handle;
{
zx_signals_t signals;
fdio_unsafe_wait_begin(io, POLLIN, handle.reset_and_get_address(), &signals);
EXPECT_NE(handle.get(), ZX_HANDLE_INVALID);
EXPECT_EQ(signals, ZX_SOCKET_READABLE | ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED);
}
{
zx_signals_t signals;
fdio_unsafe_wait_begin(io, POLLOUT, handle.reset_and_get_address(), &signals);
EXPECT_NE(handle.get(), ZX_HANDLE_INVALID);
EXPECT_EQ(signals, ZX_SOCKET_WRITABLE | ZX_SOCKET_WRITE_DISABLED);
}
{
zx_signals_t signals;
fdio_unsafe_wait_begin(io, POLLRDHUP, handle.reset_and_get_address(), &signals);
EXPECT_NE(handle.get(), ZX_HANDLE_INVALID);
EXPECT_EQ(signals, ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_PEER_WRITE_DISABLED);
}
{
zx_signals_t signals;
fdio_unsafe_wait_begin(io, POLLHUP, handle.reset_and_get_address(), &signals);
EXPECT_NE(handle.get(), ZX_HANDLE_INVALID);
EXPECT_EQ(signals, ZX_SIGNAL_NONE);
}
// fdio_unsafe_wait_end
{
uint32_t events;
fdio_unsafe_wait_end(io, ZX_SOCKET_READABLE, &events);
EXPECT_EQ(events, POLLIN);
}
{
uint32_t events;
fdio_unsafe_wait_end(io, ZX_SOCKET_PEER_CLOSED, &events);
EXPECT_EQ(events, POLLIN | POLLRDHUP);
}
{
uint32_t events;
fdio_unsafe_wait_end(io, ZX_SOCKET_PEER_WRITE_DISABLED, &events);
EXPECT_EQ(events, POLLIN | POLLRDHUP);
}
{
uint32_t events;
fdio_unsafe_wait_end(io, ZX_SOCKET_WRITABLE, &events);
EXPECT_EQ(events, POLLOUT);
}
{
uint32_t events;
fdio_unsafe_wait_end(io, ZX_SOCKET_WRITE_DISABLED, &events);
EXPECT_EQ(events, POLLOUT);
}
fdio_unsafe_release(io);
}
TEST_P(SocketpairTest, WaitBeginEnd)
static constexpr ssize_t WRITE_DATA_SIZE = 1024 * 1024;
TEST(SocketpairTest, StreamPartialWrite) {
std::array<fbl::unique_fd, 2> fds;
ASSERT_NO_FATAL_FAILURES(SocketpairSetup(fds, SOCK_STREAM));
// Start a thread that reads everything we write.
auto fut = std::async(
std::launch::async,
[](int fd) {
static char buf[WRITE_DATA_SIZE];
ssize_t progress = 0;
while (progress < WRITE_DATA_SIZE) {
size_t n = WRITE_DATA_SIZE - progress;
ssize_t status = read(fd, buf, n);
if (status < 0) {
return status;
}
progress += status;
}
return progress;
},
fds[1].get());
// Write more data than can fit in the socket send buffer.
static char buf[WRITE_DATA_SIZE];
size_t progress = 0;
while (progress < WRITE_DATA_SIZE) {
size_t n = WRITE_DATA_SIZE - progress;
ssize_t status = write(fds[0].get(), buf, n);
if (status < 0) {
ASSERT_EQ(errno, EAGAIN, "%s", strerror(errno));
}
progress += status;
}
// Make sure the other thread read everything.
ASSERT_EQ(fut.wait_for(std::chrono::seconds(1)), std::future_status::ready);
ASSERT_EQ(fut.get(), WRITE_DATA_SIZE, "other thread did not read everything");
}
#undef TEST_P