Revert "[async] Change ZX_WAIT_ASYNC_REPEATING to ZX_WAIT_ASYNC_ONCE"

This reverts commit 3722298390db2032c1c9f3c2fd85f2fb66233d30.

Reason for revert: Breaks global integration.

Original change's description:
> [async] Change ZX_WAIT_ASYNC_REPEATING to ZX_WAIT_ASYNC_ONCE
> 
> This is a step towards removing ZX_WAIT_ASYNC_REPEATING.
> 
> As part of this change, remove port_wait_repeating(), which is a direct
> consumer of ZX_WAIT_ASYNC_REPEATING, and instead rely on port_wait().
> 
> Change utests not use ZX_WAIT_ASYNC_REPEATING.
> 
> ZX-3090
> 
> Test: port-test, threads-test pass.
> 
> Change-Id: I7b769d7917345de2a756e0b5864dc6653f1b162b

TBR=cpu@google.com,jeffbrown@google.com,mfaltesek@google.com

# Not skipping CQ checks because original CL landed > 1 day ago.

Change-Id: I1eb916a88c3d9d6160a01a96357c57eba46320aa
diff --git a/system/ulib/async-loop/loop.c b/system/ulib/async-loop/loop.c
index 17433fa..fe75712 100644
--- a/system/ulib/async-loop/loop.c
+++ b/system/ulib/async-loop/loop.c
@@ -167,6 +167,11 @@
     if (status == ZX_OK)
         status = zx_timer_create(ZX_TIMER_SLACK_LATE, ZX_CLOCK_MONOTONIC, &loop->timer);
     if (status == ZX_OK) {
+        status = zx_object_wait_async(loop->timer, loop->port, KEY_CONTROL,
+                                      ZX_TIMER_SIGNALED,
+                                      ZX_WAIT_ASYNC_REPEATING);
+    }
+    if (status == ZX_OK) {
         *out_loop = loop;
         if (loop->config.make_default_for_current_thread) {
             ZX_DEBUG_ASSERT(async_get_default_dispatcher() == NULL);
@@ -264,7 +269,7 @@
             return ZX_OK;
 
         // Handle task timer expirations.
-        if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE &&
+        if (packet.type == ZX_PKT_TYPE_SIGNAL_REP &&
             packet.signal.observed & ZX_TIMER_SIGNALED) {
             return async_loop_dispatch_tasks(loop);
         }
@@ -726,10 +731,6 @@
 
     zx_status_t status = zx_timer_set(loop->timer, deadline, 0);
     ZX_ASSERT_MSG(status == ZX_OK, "zx_timer_set: status=%d", status);
-    status = zx_object_wait_async(loop->timer, loop->port, KEY_CONTROL,
-                                              ZX_TIMER_SIGNALED,
-                                              ZX_WAIT_ASYNC_ONCE);
-    ZX_ASSERT_MSG(status == ZX_OK, "zx_object_wait_async: status=%d", status);
 }
 
 static void async_loop_invoke_prologue(async_loop_t* loop) {
diff --git a/system/ulib/port/include/port/port.h b/system/ulib/port/include/port/port.h
index 06d7afb..4f700a9 100644
--- a/system/ulib/port/include/port/port.h
+++ b/system/ulib/port/include/port/port.h
@@ -30,6 +30,10 @@
 // the provided port handler.
 zx_status_t port_wait(port_t* port, port_handler_t* ph);
 
+// Wait for an event on a handle, as specified by
+// the provided port handler, in repeating mode.
+zx_status_t port_wait_repeating(port_t* port, port_handler_t* ph);
+
 // Wait for a packet to arrive of for the port to timeout
 // If the port wait returns and error or timeout, returns that.
 // If once is true, returns ZX_OK after handling a packet.
diff --git a/system/ulib/port/port.c b/system/ulib/port/port.c
index cbc5b60..8fdbc3a 100644
--- a/system/ulib/port/port.c
+++ b/system/ulib/port/port.c
@@ -34,6 +34,15 @@
                                 ph->waitfor, ZX_WAIT_ASYNC_ONCE);
 }
 
+zx_status_t port_wait_repeating(port_t* port, port_handler_t* ph) {
+    zprintf("port_wait_repeating(%p, %p) obj=%x port=%x\n",
+            port, ph, ph->handle, port->handle);
+    return zx_object_wait_async(ph->handle, port->handle,
+                                (uint64_t)(uintptr_t)ph,
+                                ph->waitfor, ZX_WAIT_ASYNC_REPEATING);
+}
+
+
 zx_status_t port_cancel(port_t* port, port_handler_t* ph) {
     zx_status_t r = zx_port_cancel(port->handle, ph->handle,
                                    (uint64_t)(uintptr_t)ph);
diff --git a/system/utest/core/port/ports.cpp b/system/utest/core/port/ports.cpp
index 16f6c2f..370ccb3 100644
--- a/system/utest/core/port/ports.cpp
+++ b/system/utest/core/port/ports.cpp
@@ -200,32 +200,38 @@
 
 static bool async_wait_close_order_1() {
     int order[] = {0, 1, 2};
-    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE);
+    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE) &&
+           async_wait_close_order(order, ZX_WAIT_ASYNC_REPEATING);
 }
 
 static bool async_wait_close_order_2() {
     int order[] = {0, 2, 1};
-    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE);
+    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE) &&
+           async_wait_close_order(order, ZX_WAIT_ASYNC_REPEATING);
 }
 
 static bool async_wait_close_order_3() {
     int order[] = {1, 2, 0};
-    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE);
+    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE) &&
+           async_wait_close_order(order, ZX_WAIT_ASYNC_REPEATING);
 }
 
 static bool async_wait_close_order_4() {
     int order[] = {1, 0, 2};
-    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE);
+    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE) &&
+           async_wait_close_order(order, ZX_WAIT_ASYNC_REPEATING);
 }
 
 static bool async_wait_close_order_5() {
     int order[] = {2, 1, 0};
-    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE);
+    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE) &&
+           async_wait_close_order(order, ZX_WAIT_ASYNC_REPEATING);
 }
 
 static bool async_wait_close_order_6() {
     int order[] = {2, 0, 1};
-    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE);
+    return async_wait_close_order(order, ZX_WAIT_ASYNC_ONCE) &&
+           async_wait_close_order(order, ZX_WAIT_ASYNC_REPEATING);
 }
 
 static bool async_wait_event_test_single(void) {
@@ -288,20 +294,20 @@
 
     const uint64_t key0 = 1122ull;
 
+    status = zx_object_wait_async(ev, port, key0,
+        ZX_EVENT_SIGNALED | ZX_USER_SIGNAL_2, ZX_WAIT_ASYNC_REPEATING);
+    EXPECT_EQ(status, ZX_OK);
+
     zx_port_packet_t out = {};
     uint64_t count[3] = {};
 
     for (int ix = 0; ix != 24; ++ix) {
-        status = zx_object_wait_async(ev, port, key0,
-            ZX_EVENT_SIGNALED | ZX_USER_SIGNAL_2, ZX_WAIT_ASYNC_ONCE);
-        EXPECT_EQ(status, ZX_OK);
-
         uint32_t ub = (ix % 2) ? 0u : ZX_USER_SIGNAL_2;
         EXPECT_EQ(zx_object_signal(ev, 0u, ZX_EVENT_SIGNALED | ub), ZX_OK);
         EXPECT_EQ(zx_object_signal(ev, ZX_EVENT_SIGNALED | ub, 0u), ZX_OK);
 
         ASSERT_EQ(zx_port_wait(port, 0ull, &out), ZX_OK);
-        ASSERT_EQ(out.type, ZX_PKT_TYPE_SIGNAL_ONE);
+        ASSERT_EQ(out.type, ZX_PKT_TYPE_SIGNAL_REP);
         ASSERT_EQ(out.signal.count, 1u);
         count[0] += (out.signal.observed & ZX_EVENT_SIGNALED) ? 1 : 0;
         count[1] += (out.signal.observed & ZX_USER_SIGNAL_2) ? 1 : 0;
@@ -334,7 +340,7 @@
     zx_handle_t event;
     ASSERT_EQ(zx_event_create(0u, &event), ZX_OK);
     const uint64_t kKey = 0;
-    const uint32_t kInvalidOption = ZX_WAIT_ASYNC_ONCE + 2;
+    const uint32_t kInvalidOption = ZX_WAIT_ASYNC_REPEATING + 1;
     EXPECT_EQ(zx_object_wait_async(event, port, kKey, ZX_EVENT_SIGNALED,
                                    kInvalidOption), ZX_ERR_INVALID_ARGS);
     ASSERT_EQ(zx_handle_close(event), ZX_OK);
@@ -342,7 +348,7 @@
     END_TEST;
 }
 
-static bool channel_pre_writes_test() {
+static bool pre_writes_channel_test(uint32_t mode) {
     BEGIN_TEST;
     zx_status_t status;
 
@@ -364,7 +370,7 @@
     EXPECT_EQ(status, ZX_OK);
 
     status = zx_object_wait_async(ch[1], port, key0,
-        ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, ZX_WAIT_ASYNC_ONCE);
+        ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, mode);
     EXPECT_EQ(status, ZX_OK);
 
     zx_port_packet_t out = {};
@@ -376,7 +382,7 @@
         if (status != ZX_OK)
             break;
         wait_count++;
-        if (out.signal.observed != ZX_CHANNEL_PEER_CLOSED)
+        if (out.signal.trigger != ZX_CHANNEL_PEER_CLOSED)
             read_count += out.signal.count;
         EXPECT_NE(out.signal.count, 0u);
     }
@@ -394,7 +400,15 @@
     END_TEST;
 }
 
-static bool cancel_event_key() {
+static bool channel_pre_writes_once() {
+    return pre_writes_channel_test(ZX_WAIT_ASYNC_ONCE);
+}
+
+static bool channel_pre_writes_repeat() {
+    return pre_writes_channel_test(ZX_WAIT_ASYNC_REPEATING);
+}
+
+static bool cancel_event(uint32_t wait_mode) {
     BEGIN_TEST;
     zx_status_t status;
 
@@ -409,7 +423,7 @@
 
     for (uint32_t ix = 0; ix != fbl::count_of(keys); ++ix) {
         EXPECT_EQ(zx_object_wait_async(
-            ev, port, keys[ix], ZX_EVENT_SIGNALED, ZX_WAIT_ASYNC_ONCE), ZX_OK);
+            ev, port, keys[ix], ZX_EVENT_SIGNALED, wait_mode), ZX_OK);
     }
 
     // We cancel before it is signaled so no packets from |13| are seen.
@@ -434,8 +448,10 @@
         EXPECT_EQ(out.signal.observed, ZX_EVENT_SIGNALED);
     }
 
-    // We cancel after the packet has been delivered.
-    EXPECT_EQ(zx_port_cancel(port, ev, 128u), ZX_ERR_NOT_FOUND);
+    if (wait_mode == ZX_WAIT_ASYNC_ONCE) {
+        // We cancel after the packet has been delivered.
+        EXPECT_EQ(zx_port_cancel(port, ev, 128u), ZX_ERR_NOT_FOUND);
+    }
 
     EXPECT_EQ(wait_count, 2);
     EXPECT_EQ(key_sum, keys[0] + keys[2]);
@@ -445,7 +461,15 @@
     END_TEST;
 }
 
-static bool cancel_event_key_after() {
+static bool cancel_event_key_once() {
+    return cancel_event(ZX_WAIT_ASYNC_ONCE);
+}
+
+static bool cancel_event_key_repeat() {
+    return cancel_event(ZX_WAIT_ASYNC_REPEATING);
+}
+
+static bool cancel_event_after(uint32_t wait_mode) {
     BEGIN_TEST;
 
     zx_status_t status;
@@ -460,7 +484,7 @@
 
         EXPECT_EQ(zx_event_create(0u, &ev[ix]), ZX_OK);
         EXPECT_EQ(zx_object_wait_async(
-            ev[ix], port, keys[ix], ZX_EVENT_SIGNALED, ZX_WAIT_ASYNC_ONCE), ZX_OK);
+            ev[ix], port, keys[ix], ZX_EVENT_SIGNALED, wait_mode), ZX_OK);
     }
 
     EXPECT_EQ(zx_object_signal(ev[0], 0u, ZX_EVENT_SIGNALED), ZX_OK);
@@ -496,6 +520,13 @@
     END_TEST;
 }
 
+static bool cancel_event_key_once_after() {
+    return cancel_event_after(ZX_WAIT_ASYNC_ONCE);
+}
+
+static bool cancel_event_key_repeat_after() {
+    return cancel_event_after(ZX_WAIT_ASYNC_REPEATING);
+}
 
 struct test_context {
     zx_handle_t port;
@@ -515,7 +546,7 @@
     return 0;
 }
 
-static bool threads_event() {
+static bool threads_event(uint32_t wait_mode) {
     BEGIN_TEST;
 
     zx_handle_t port;
@@ -532,7 +563,7 @@
         ctx[ix] = { port, 1u };
 
         EXPECT_EQ(zx_object_wait_async(
-                  ev, port, (500u + ix), ZX_EVENT_SIGNALED, ZX_WAIT_ASYNC_ONCE), ZX_OK);
+                  ev, port, (500u + ix), ZX_EVENT_SIGNALED, wait_mode), ZX_OK);
         EXPECT_EQ(thrd_create(&threads[ix], port_reader_thread, &ctx[ix]),
                   thrd_success);
     }
@@ -552,6 +583,14 @@
     END_TEST;
 }
 
+static bool threads_event_once() {
+    return threads_event(ZX_WAIT_ASYNC_ONCE);
+}
+
+static bool threads_event_repeat() {
+    return threads_event(ZX_WAIT_ASYNC_REPEATING);
+}
+
 
 static constexpr uint32_t kStressCount = 20000u;
 static constexpr uint64_t kSleeps[] = { 0, 10, 2, 0, 15, 0};
@@ -652,10 +691,14 @@
 RUN_TEST(async_wait_close_order_4)
 RUN_TEST(async_wait_close_order_5)
 RUN_TEST(async_wait_close_order_6)
-RUN_TEST(channel_pre_writes_test)
-RUN_TEST(cancel_event_key)
-RUN_TEST(cancel_event_key_after)
-RUN_TEST(threads_event)
+RUN_TEST(channel_pre_writes_once)
+RUN_TEST(channel_pre_writes_repeat)
+RUN_TEST(cancel_event_key_once)
+RUN_TEST(cancel_event_key_repeat)
+RUN_TEST(cancel_event_key_once_after)
+RUN_TEST(cancel_event_key_repeat_after)
+RUN_TEST(threads_event_once)
+RUN_TEST(threads_event_repeat)
 RUN_TEST_LARGE(cancel_stress)
 END_TEST_CASE(port_tests)
 
diff --git a/system/utest/core/threads/threads.cpp b/system/utest/core/threads/threads.cpp
index 10fd889..8f4b23d 100644
--- a/system/utest/core/threads/threads.cpp
+++ b/system/utest/core/threads/threads.cpp
@@ -878,7 +878,7 @@
     END_TEST;
 }
 
-static bool port_wait_for_signal(zx_handle_t port, zx_handle_t thread,
+static bool port_wait_for_signal_once(zx_handle_t port, zx_handle_t thread,
                                       zx_time_t deadline, zx_signals_t mask,
                                       zx_port_packet_t* packet) {
     ASSERT_EQ(zx_object_wait_async(thread, port, 0u, mask,
@@ -889,8 +889,16 @@
     return true;
 }
 
+static bool port_wait_for_signal_repeating(zx_handle_t port,
+                                           zx_time_t deadline,
+                                           zx_port_packet_t* packet) {
+    ASSERT_EQ(zx_port_wait(port, deadline, packet), ZX_OK);
+    ASSERT_EQ(packet->type, ZX_PKT_TYPE_SIGNAL_REP);
+    return true;
+}
+
 // Test signal delivery of suspended threads via async wait.
-static bool TestSuspendWaitAsyncSignalDeliveryWorker(void) {
+static bool TestSuspendWaitAsyncSignalDeliveryWorker(bool use_repeating) {
     zx_handle_t event;
     zx_handle_t port;
     zxr_thread_t thread;
@@ -901,21 +909,35 @@
     ASSERT_TRUE(start_thread(threads_test_wait_fn, &event, &thread, &thread_h));
 
     ASSERT_EQ(zx_port_create(0, &port), ZX_OK);
+    if (use_repeating) {
+        ASSERT_EQ(zx_object_wait_async(thread_h, port, 0u, run_susp_mask,
+                                       ZX_WAIT_ASYNC_REPEATING),
+                  ZX_OK);
+    }
 
     zx_port_packet_t packet;
     // There should be a RUNNING signal packet present and not SUSPENDED.
     // This is from when the thread first started to run.
-    ASSERT_TRUE(port_wait_for_signal(port, thread_h, 0u, run_susp_mask, &packet));
+    if (use_repeating) {
+        ASSERT_TRUE(port_wait_for_signal_repeating(port, 0u, &packet));
+    } else {
+        ASSERT_TRUE(port_wait_for_signal_once(port, thread_h, 0u, run_susp_mask, &packet));
+    }
     ASSERT_EQ(packet.signal.observed & run_susp_mask, ZX_THREAD_RUNNING);
 
     // Make sure there are no more packets.
-    // RUNNING or SUSPENDED is always asserted.
-    ASSERT_EQ(zx_object_wait_async(thread_h, port, 0u,
-                                   ZX_THREAD_SUSPENDED,
-                                   ZX_WAIT_ASYNC_ONCE),
-              ZX_OK);
-    ASSERT_EQ(zx_port_wait(port, 0u, &packet), ZX_ERR_TIMED_OUT);
-    ASSERT_EQ(zx_port_cancel(port, thread_h, 0u), ZX_OK);
+    if (use_repeating) {
+        ASSERT_EQ(zx_port_wait(port, 0u, &packet), ZX_ERR_TIMED_OUT);
+    } else {
+        // In the non-repeating case we have to do things differently as one of
+        // RUNNING or SUSPENDED is always asserted.
+        ASSERT_EQ(zx_object_wait_async(thread_h, port, 0u,
+                                       ZX_THREAD_SUSPENDED,
+                                       ZX_WAIT_ASYNC_ONCE),
+                  ZX_OK);
+        ASSERT_EQ(zx_port_wait(port, 0u, &packet), ZX_ERR_TIMED_OUT);
+        ASSERT_EQ(zx_port_cancel(port, thread_h, 0u), ZX_OK);
+    }
 
     zx_handle_t suspend_token = ZX_HANDLE_INVALID;
     ASSERT_TRUE(suspend_thread_synchronous(thread_h, &suspend_token));
@@ -932,11 +954,21 @@
     ASSERT_TRUE(info.state == ZX_THREAD_STATE_RUNNING ||
                     info.state == ZX_THREAD_STATE_BLOCKED_WAIT_ONE);
 
-    // We should see just RUNNING,
-    // and it should be immediately present (no deadline).
-    ASSERT_TRUE(port_wait_for_signal(port, thread_h, 0u, run_susp_mask,
-                                          &packet));
-    ASSERT_EQ(packet.signal.observed & run_susp_mask, ZX_THREAD_RUNNING);
+    // For repeating async waits we should see both SUSPENDED and RUNNING on
+    // the port. And we should see them at the same time (and not one followed
+    // by the other).
+    if (use_repeating) {
+        ASSERT_TRUE(port_wait_for_signal_repeating(port,
+                                                   zx_deadline_after(ZX_MSEC(100)),
+                                                   &packet));
+        ASSERT_EQ(packet.signal.observed & run_susp_mask, run_susp_mask);
+    } else {
+        // For non-repeating async waits we should see just RUNNING,
+        // and it should be immediately present (no deadline).
+        ASSERT_TRUE(port_wait_for_signal_once(port, thread_h, 0u, run_susp_mask,
+                                              &packet));
+        ASSERT_EQ(packet.signal.observed & run_susp_mask, ZX_THREAD_RUNNING);
+    }
 
     // The thread should still be blocked on the event when it wakes up.
     ASSERT_TRUE(wait_thread_blocked(thread_h, ZX_THREAD_STATE_BLOCKED_WAIT_ONE));
@@ -945,17 +977,29 @@
     // the expected behavior and is visible via async wait.
     suspend_token = ZX_HANDLE_INVALID;
     ASSERT_EQ(zx_task_suspend_token(thread_h, &suspend_token), ZX_OK);
-    ASSERT_TRUE(port_wait_for_signal(port, thread_h,
-                                     zx_deadline_after(ZX_MSEC(100)),
-                                     ZX_THREAD_SUSPENDED, &packet));
+    if (use_repeating) {
+        ASSERT_TRUE(port_wait_for_signal_repeating(port,
+                                                   zx_deadline_after(ZX_MSEC(100)),
+                                                   &packet));
+    } else {
+        ASSERT_TRUE(port_wait_for_signal_once(port, thread_h,
+                                              zx_deadline_after(ZX_MSEC(100)),
+                                              ZX_THREAD_SUSPENDED, &packet));
+    }
     ASSERT_EQ(packet.signal.observed & run_susp_mask, ZX_THREAD_SUSPENDED);
 
     ASSERT_TRUE(get_thread_info(thread_h, &info));
     ASSERT_EQ(info.state, ZX_THREAD_STATE_SUSPENDED);
     ASSERT_EQ(zx_handle_close(suspend_token), ZX_OK);
-    ASSERT_TRUE(port_wait_for_signal(port, thread_h,
-                                     zx_deadline_after(ZX_MSEC(100)),
-                                     ZX_THREAD_RUNNING, &packet));
+    if (use_repeating) {
+        ASSERT_TRUE(port_wait_for_signal_repeating(port,
+                                                   zx_deadline_after(ZX_MSEC(100)),
+                                                   &packet));
+    } else {
+        ASSERT_TRUE(port_wait_for_signal_once(port, thread_h,
+                                              zx_deadline_after(ZX_MSEC(100)),
+                                              ZX_THREAD_RUNNING, &packet));
+    }
     ASSERT_EQ(packet.signal.observed & run_susp_mask, ZX_THREAD_RUNNING);
 
     // Resumption from being suspended back into a blocking syscall will be
@@ -979,14 +1023,14 @@
 // Test signal delivery of suspended threads via single async wait.
 static bool TestSuspendSingleWaitAsyncSignalDelivery() {
     BEGIN_TEST;
-    EXPECT_TRUE(TestSuspendWaitAsyncSignalDeliveryWorker());
+    EXPECT_TRUE(TestSuspendWaitAsyncSignalDeliveryWorker(false));
     END_TEST;
 }
 
 // Test signal delivery of suspended threads via repeating async wait.
 static bool TestSuspendRepeatingWaitAsyncSignalDelivery() {
     BEGIN_TEST;
-    EXPECT_TRUE(TestSuspendWaitAsyncSignalDeliveryWorker());
+    EXPECT_TRUE(TestSuspendWaitAsyncSignalDeliveryWorker(true));
     END_TEST;
 }