[fdio] Merge pipe() and socketpair() impls

As far as I can tell, there's no reason for these implementations to be
separate. Merging the two means we'll be able to recover the client type
of a zx::socket from the information in the kernel. Specifically, a
stream socket with no control plane will map to a pipe (aka socketpair).

Test: pipe and socketpair tests continue to pass
Change-Id: If0251b662daa1ad4ce4017c7d949016204c667a1
diff --git a/system/ulib/fdio/pipe.c b/system/ulib/fdio/pipe.c
index 43e24e2..4b65fdf 100644
--- a/system/ulib/fdio/pipe.c
+++ b/system/ulib/fdio/pipe.c
@@ -18,10 +18,15 @@
 #include <lib/fdio/util.h>
 #include <lib/fdio/vfs.h>
 
-#include "pipe.h"
 #include "private.h"
 
-ssize_t zx_pipe_read_internal(zx_handle_t h, void* data, size_t len, int nonblock) {
+// Used by pipe(2) and socketpair(2) primitives.
+typedef struct zx_pipe {
+    fdio_t io;
+    zx_handle_t h;
+} zx_pipe_t;
+
+static ssize_t zx_pipe_read_internal(zx_handle_t h, void* data, size_t len, int nonblock) {
     // TODO: let the generic read() to do this loop
     for (;;) {
         size_t bytes_read;
@@ -59,7 +64,7 @@
     }
 }
 
-ssize_t zx_pipe_write_internal(zx_handle_t h, const void* data, size_t len, int nonblock) {
+static ssize_t zx_pipe_write_internal(zx_handle_t h, const void* data, size_t len, int nonblock) {
     // TODO: let the generic write() to do this loop
     for (;;) {
         ssize_t r;
@@ -89,23 +94,23 @@
 }
 
 
-ssize_t zx_pipe_write(fdio_t* io, const void* data, size_t len) {
+static ssize_t zx_pipe_write(fdio_t* io, const void* data, size_t len) {
     zx_pipe_t* p = (zx_pipe_t*)io;
     return zx_pipe_write_internal(p->h, data, len, io->ioflag & IOFLAG_NONBLOCK);
 }
 
-ssize_t zx_pipe_read(fdio_t* io, void* data, size_t len) {
+static ssize_t zx_pipe_read(fdio_t* io, void* data, size_t len) {
     zx_pipe_t* p = (zx_pipe_t*)io;
     return zx_pipe_read_internal(p->h, data, len, io->ioflag & IOFLAG_NONBLOCK);
 }
 
-zx_status_t zx_pipe_get_attr(fdio_t* io, vnattr_t* attr) {
+static zx_status_t zx_pipe_get_attr(fdio_t* io, vnattr_t* attr) {
     memset(attr, 0, sizeof(*attr));
     attr->mode = V_TYPE_PIPE | V_IRUSR | V_IWUSR;
     return ZX_OK;
 }
 
-zx_status_t zx_pipe_close(fdio_t* io) {
+static zx_status_t zx_pipe_close(fdio_t* io) {
     zx_pipe_t* p = (zx_pipe_t*)io;
     zx_handle_t h = p->h;
     p->h = 0;
@@ -113,7 +118,7 @@
     return 0;
 }
 
-void zx_pipe_wait_begin(fdio_t* io, uint32_t events, zx_handle_t* handle, zx_signals_t* _signals) {
+static void zx_pipe_wait_begin(fdio_t* io, uint32_t events, zx_handle_t* handle, zx_signals_t* _signals) {
     zx_pipe_t* p = (void*)io;
     *handle = p->h;
     zx_signals_t signals = 0;
@@ -129,7 +134,7 @@
     *_signals = signals;
 }
 
-void zx_pipe_wait_end(fdio_t* io, zx_signals_t signals, uint32_t* _events) {
+static void zx_pipe_wait_end(fdio_t* io, zx_signals_t signals, uint32_t* _events) {
     uint32_t events = 0;
     if (signals & (ZX_SOCKET_READABLE | ZX_SOCKET_PEER_CLOSED | ZX_SOCKET_READ_DISABLED)) {
         events |= POLLIN;
@@ -143,7 +148,7 @@
     *_events = events;
 }
 
-zx_status_t zx_pipe_clone(fdio_t* io, zx_handle_t* handles, uint32_t* types) {
+static zx_status_t zx_pipe_clone(fdio_t* io, zx_handle_t* handles, uint32_t* types) {
     zx_pipe_t* p = (void*)io;
     zx_status_t status = zx_handle_duplicate(p->h, ZX_RIGHT_SAME_RIGHTS, &handles[0]);
     if (status < 0) {
@@ -153,14 +158,14 @@
     return 1;
 }
 
-zx_status_t zx_pipe_unwrap(fdio_t* io, zx_handle_t* handles, uint32_t* types) {
+static zx_status_t zx_pipe_unwrap(fdio_t* io, zx_handle_t* handles, uint32_t* types) {
     zx_pipe_t* p = (void*)io;
     handles[0] = p->h;
     types[0] = PA_FDIO_PIPE;
     return 1;
 }
 
-ssize_t zx_pipe_posix_ioctl(fdio_t* io, int req, va_list va) {
+static ssize_t zx_pipe_posix_ioctl(fdio_t* io, int req, va_list va) {
     zx_pipe_t* p = (void*)io;
     switch (req) {
     case FIONREAD: {
@@ -181,6 +186,44 @@
     }
 }
 
+static ssize_t zx_pipe_recvfrom(fdio_t* io, void* data, size_t len, int flags, struct sockaddr* restrict addr, socklen_t* restrict addrlen) {
+    if (flags != 0 && flags != MSG_DONTWAIT) {
+        return ZX_ERR_INVALID_ARGS;
+    }
+    zx_pipe_t* p = (zx_pipe_t*)io;
+    int nonblock = (io->ioflag & IOFLAG_NONBLOCK) || (flags & MSG_DONTWAIT);
+    return zx_pipe_read_internal(p->h, data, len, nonblock);
+}
+
+static ssize_t zx_pipe_sendto(fdio_t* io, const void* data, size_t len, int flags, const struct sockaddr* addr, socklen_t addrlen) {
+    if (flags != 0 && flags != MSG_DONTWAIT) {
+        return ZX_ERR_INVALID_ARGS;
+    }
+    if (addr != NULL) {
+        return ZX_ERR_INVALID_ARGS;  // should set errno to EISCONN
+    }
+    zx_pipe_t* p = (zx_pipe_t*)io;
+    int nonblock = (io->ioflag & IOFLAG_NONBLOCK) || (flags & MSG_DONTWAIT);
+    return zx_pipe_write_internal(p->h, data, len, nonblock);
+}
+
+static zx_status_t zx_pipe_shutdown(fdio_t* io, int how) {
+    uint32_t options = 0;
+    switch (how) {
+    case SHUT_RD:
+        options = ZX_SOCKET_SHUTDOWN_READ;
+        break;
+    case SHUT_WR:
+        options = ZX_SOCKET_SHUTDOWN_WRITE;
+        break;
+    case SHUT_RDWR:
+        options = ZX_SOCKET_SHUTDOWN_READ | ZX_SOCKET_SHUTDOWN_WRITE;
+        break;
+    }
+    zx_pipe_t* p = (zx_pipe_t*)io;
+    return zx_socket_write(p->h, options, NULL, 0, NULL);
+}
+
 static fdio_ops_t zx_pipe_ops = {
     .read = zx_pipe_read,
     .read_at = fdio_default_read_at,
@@ -209,11 +252,11 @@
     .link = fdio_default_link,
     .get_flags = fdio_default_get_flags,
     .set_flags = fdio_default_set_flags,
-    .recvfrom = fdio_default_recvfrom,
-    .sendto = fdio_default_sendto,
+    .recvfrom = zx_pipe_recvfrom,
+    .sendto = zx_pipe_sendto,
     .recvmsg = fdio_default_recvmsg,
     .sendmsg = fdio_default_sendmsg,
-    .shutdown = fdio_default_shutdown,
+    .shutdown = zx_pipe_shutdown,
 };
 
 fdio_t* fdio_pipe_create(zx_handle_t h) {
@@ -229,6 +272,10 @@
     return &p->io;
 }
 
+fdio_t* fdio_socketpair_create(zx_handle_t h) {
+    return fdio_pipe_create(h);
+}
+
 int fdio_pipe_pair(fdio_t** _a, fdio_t** _b) {
     zx_handle_t h0, h1;
     fdio_t *a, *b;
diff --git a/system/ulib/fdio/pipe.h b/system/ulib/fdio/pipe.h
deleted file mode 100644
index d94f7cc..0000000
--- a/system/ulib/fdio/pipe.h
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2017 The Fuchsia Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#pragma once
-
-#include <stdint.h>
-#include <lib/fdio/io.h>
-
-#include "private.h"
-
-// This defines operations shared by pipe(2) and socketpair(2) primitives.
-typedef struct zx_pipe {
-    fdio_t io;
-    zx_handle_t h;
-} zx_pipe_t;
-
-ssize_t zx_pipe_read(fdio_t* io, void* data, size_t len);
-ssize_t zx_pipe_write(fdio_t* io, const void* data, size_t len);
-zx_status_t zx_pipe_get_attr(fdio_t* io, vnattr_t* attr);
-zx_status_t zx_pipe_close(fdio_t* io);
-zx_status_t zx_pipe_clone(fdio_t* io, zx_handle_t* handles, uint32_t* types);
-void zx_pipe_wait_begin(fdio_t* io, uint32_t events, zx_handle_t* handle, zx_signals_t* _signals);
-void zx_pipe_wait_end(fdio_t* io, zx_signals_t signals, uint32_t* _events);
-zx_status_t zx_pipe_unwrap(fdio_t* io, zx_handle_t* handles, uint32_t* types);
-ssize_t zx_pipe_posix_ioctl(fdio_t* io, int req, va_list va);
-
-ssize_t zx_pipe_read_internal(zx_handle_t h, void* data, size_t len, int nonblock);
-ssize_t zx_pipe_write_internal(zx_handle_t h, const void* data, size_t len, int nonblock);
diff --git a/system/ulib/fdio/rules.mk b/system/ulib/fdio/rules.mk
index 8427407..2e7ce68 100644
--- a/system/ulib/fdio/rules.mk
+++ b/system/ulib/fdio/rules.mk
@@ -23,7 +23,6 @@
     $(LOCAL_DIR)/remoteio.c \
     $(LOCAL_DIR)/service.c \
     $(LOCAL_DIR)/socket.c \
-    $(LOCAL_DIR)/socketpair.c \
     $(LOCAL_DIR)/spawn.c \
     $(LOCAL_DIR)/stubs.c \
     $(LOCAL_DIR)/uname.c \
diff --git a/system/ulib/fdio/socketpair.c b/system/ulib/fdio/socketpair.c
deleted file mode 100644
index f4caec0..0000000
--- a/system/ulib/fdio/socketpair.c
+++ /dev/null
@@ -1,156 +0,0 @@
-// Copyright 2017 The Fuchsia Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include <sys/socket.h>
-
-#include <zircon/processargs.h>
-#include <zircon/syscalls.h>
-
-#include <errno.h>
-#include <lib/fdio/io.h>
-#include <lib/fdio/socket.h>
-#include <lib/fdio/util.h>
-
-#include "pipe.h"
-#include "private.h"
-#include "unistd.h"
-
-static ssize_t zx_socketpair_recvfrom(fdio_t* io, void* data, size_t len, int flags, struct sockaddr* restrict addr, socklen_t* restrict addrlen) {
-    if (flags != 0 && flags != MSG_DONTWAIT) {
-        return ZX_ERR_INVALID_ARGS;
-    }
-    zx_pipe_t* p = (zx_pipe_t*)io;
-    int nonblock = (io->ioflag & IOFLAG_NONBLOCK) || (flags & MSG_DONTWAIT);
-    return zx_pipe_read_internal(p->h, data, len, nonblock);
-}
-
-static ssize_t zx_socketpair_sendto(fdio_t* io, const void* data, size_t len, int flags, const struct sockaddr* addr, socklen_t addrlen) {
-    if (flags != 0 && flags != MSG_DONTWAIT) {
-        return ZX_ERR_INVALID_ARGS;
-    }
-    if (addr != NULL) {
-        return ZX_ERR_INVALID_ARGS;  // should set errno to EISCONN
-    }
-    zx_pipe_t* p = (zx_pipe_t*)io;
-    int nonblock = (io->ioflag & IOFLAG_NONBLOCK) || (flags & MSG_DONTWAIT);
-    return zx_pipe_write_internal(p->h, data, len, nonblock);
-}
-
-static zx_status_t zx_socketpair_clone(fdio_t* io, zx_handle_t* handles, uint32_t* types) {
-    zx_status_t status = zx_pipe_clone(io, handles, types);
-    if (status < 0)
-        return status;
-    types[0] = PA_FDIO_SOCKETPAIR;
-    return status;
-}
-
-static zx_status_t zx_socketpair_unwrap(fdio_t* io, zx_handle_t* handles, uint32_t* types) {
-    zx_status_t status = zx_pipe_unwrap(io, handles, types);
-    if (status < 0)
-        return status;
-    types[0] = PA_FDIO_SOCKETPAIR;
-    return status;
-}
-
-static zx_status_t zx_socketpair_create(zx_handle_t h, int* fd) {
-    fdio_t* io;
-    if ((io = fdio_socketpair_create(h)) == NULL)
-        return ZX_ERR_NO_MEMORY;
-    if ((*fd = fdio_bind_to_fd(io, -1, 0)) < 0) {
-        io->ops->close(io);
-        fdio_release(io);
-        return ZX_ERR_NO_MEMORY;
-    }
-    return ZX_OK;
-}
-
-static fdio_ops_t zx_socketpair_ops = {
-    .read = zx_pipe_read,
-    .read_at = fdio_default_read_at,
-    .write = zx_pipe_write,
-    .write_at = fdio_default_write_at,
-    .seek = fdio_default_seek,
-    .misc = fdio_default_misc,
-    .close = zx_pipe_close,
-    .open = fdio_default_open,
-    .clone = zx_socketpair_clone,
-    .ioctl = fdio_default_ioctl,
-    .wait_begin = zx_pipe_wait_begin,
-    .wait_end = zx_pipe_wait_end,
-    .unwrap = zx_socketpair_unwrap,
-    .posix_ioctl = zx_pipe_posix_ioctl,
-    .get_vmo = fdio_default_get_vmo,
-    .get_token = fdio_default_get_token,
-    .get_attr = zx_pipe_get_attr,
-    .set_attr = fdio_default_set_attr,
-    .sync = fdio_default_sync,
-    .readdir = fdio_default_readdir,
-    .rewind = fdio_default_rewind,
-    .unlink = fdio_default_unlink,
-    .truncate = fdio_default_truncate,
-    .rename = fdio_default_rename,
-    .link = fdio_default_link,
-    .get_flags = fdio_default_get_flags,
-    .set_flags = fdio_default_set_flags,
-    .recvfrom = zx_socketpair_recvfrom,
-    .sendto = zx_socketpair_sendto,
-    .recvmsg = fdio_default_recvmsg,
-    .sendmsg = fdio_default_sendmsg,
-    .shutdown = fdio_socketpair_shutdown,
-};
-
-int socketpair(int domain, int type, int protocol, int fd[2]) {
-    if (type != SOCK_STREAM) {  // TODO(jamesr): SOCK_DGRAM
-        errno = EPROTOTYPE;
-        return -1;
-    }
-    if (domain != AF_UNIX) {
-        errno = EAFNOSUPPORT;
-        return -1;
-    }
-    if (protocol != 0) {
-        errno = EPROTONOSUPPORT;
-        return -1;
-    }
-
-    zx_handle_t h[2];
-    zx_status_t r = zx_socket_create(0, &h[0], &h[1]);
-    if (r < 0)
-        return r;
-    if ((r = zx_socketpair_create(h[0], &fd[0])) < 0) {
-        zx_handle_close(h[1]);
-        return STATUS(r);
-    }
-    if ((r = zx_socketpair_create(h[1], &fd[1])) < 0) {
-        close(fd[0]);
-        return STATUS(r);
-    }
-    return 0;
-}
-
-zx_status_t fdio_socketpair_shutdown(fdio_t* io, int how) {
-    zx_pipe_t* p = (zx_pipe_t*)io;
-
-    uint32_t options = 0;
-    switch (how) {
-    case SHUT_RD:
-        options = ZX_SOCKET_SHUTDOWN_READ;
-        break;
-    case SHUT_WR:
-        options = ZX_SOCKET_SHUTDOWN_WRITE;
-        break;
-    case SHUT_RDWR:
-        options = ZX_SOCKET_SHUTDOWN_READ | ZX_SOCKET_SHUTDOWN_WRITE;
-        break;
-    }
-    return zx_socket_write(p->h, options, NULL, 0, NULL);
-}
-
-fdio_t* fdio_socketpair_create(zx_handle_t h) {
-    fdio_t* io;
-    if ((io = fdio_pipe_create(h)) == NULL)
-        return NULL;
-    io->ops = &zx_socketpair_ops;
-    return io;
-}
diff --git a/system/ulib/fdio/unistd.c b/system/ulib/fdio/unistd.c
index dc70655..0b695f5 100644
--- a/system/ulib/fdio/unistd.c
+++ b/system/ulib/fdio/unistd.c
@@ -1636,6 +1636,23 @@
     return pipe2(pipefd, 0);
 }
 
+int socketpair(int domain, int type, int protocol, int fd[2]) {
+    if (type != SOCK_STREAM) {  // TODO(jamesr): SOCK_DGRAM
+        errno = EPROTOTYPE;
+        return -1;
+    }
+    if (domain != AF_UNIX) {
+        errno = EAFNOSUPPORT;
+        return -1;
+    }
+    if (protocol != 0) {
+        errno = EPROTONOSUPPORT;
+        return -1;
+    }
+
+    return pipe(fd);
+}
+
 int faccessat(int dirfd, const char* filename, int amode, int flag) {
     // For now, we just check to see if the file exists, until we
     // model permissions. But first, check that the flags and amode
diff --git a/system/utest/fdio/fdio_socketpair.c b/system/utest/fdio/fdio_socketpair.c
index a539978..f33375a 100644
--- a/system/utest/fdio/fdio_socketpair.c
+++ b/system/utest/fdio/fdio_socketpair.c
@@ -411,7 +411,7 @@
     uint32_t types[FDIO_MAX_HANDLES];
     zx_status_t handle_count = fdio_clone_fd(fds[0], fds[0], handles, types);
     ASSERT_GT(handle_count, 0, "fdio_clone_fd() failed");
-    EXPECT_EQ(PA_HND_TYPE(types[0]), PA_FDIO_SOCKETPAIR, "Wrong cloned fd type");
+    EXPECT_EQ(PA_HND_TYPE(types[0]), PA_FDIO_PIPE, "Wrong cloned fd type");
 
     int cloned_fd = -1;
     status = fdio_create_fd(handles, types, handle_count, &cloned_fd);
@@ -419,7 +419,7 @@
 
     handle_count = fdio_transfer_fd(fds[0], fds[0], handles, types);
     ASSERT_GT(handle_count, 0, "fdio_transfer_fd() failed");
-    EXPECT_EQ(PA_HND_TYPE(types[0]), PA_FDIO_SOCKETPAIR, "Wrong transferred fd type");
+    EXPECT_EQ(PA_HND_TYPE(types[0]), PA_FDIO_PIPE, "Wrong transferred fd type");
 
     int transferred_fd = -1;
     status = fdio_create_fd(handles, types, handle_count, &transferred_fd);