fmq_fuzzer: Exit blocking threads when no more work

One configuration of the fuzzer will run a blocking writer thread with a
number of blocking reader threads. The blocking operations time out
after 10ms. It's possible for one side (readers or writer) to end up
with thousands of extra operations to do after the other side has
finished their job. This leads to thousands of timeouts, greatly
increasing the time of the test.
Since there is no benifit to coniously time out, we now exit when that
happens.

Bug: 218521670
Test: Download corpus from failing runs
Test: m SANITIZE_HOST=address fmq_fuzzer
Test: ./fmq_fuzzer -max_len=50000 -timeout=25 -rss_limit_mb=2560 -use_value_profile=1 -max_total_time=2700 -print_final_stats=1 -artifact_prefix=/usr/local/google/home/devinmoore/workspace/fmq_fuzzer_artifacts  corpus/
Change-Id: Icfe014f07b9838ac28a55cf3f010ee8f33a346de
diff --git a/fuzzer/fmq_fuzzer.cpp b/fuzzer/fmq_fuzzer.cpp
index 79ce5fc..8c8a78e 100644
--- a/fuzzer/fmq_fuzzer.cpp
+++ b/fuzzer/fmq_fuzzer.cpp
@@ -21,6 +21,7 @@
 #include <thread>
 
 #include <android-base/logging.h>
+#include <android-base/scopeguard.h>
 #include <fmq/AidlMessageQueue.h>
 #include <fmq/ConvertMQDescriptors.h>
 #include <fmq/EventFlag.h>
@@ -58,7 +59,7 @@
 
 static constexpr int kMaxNumSyncReaders = 1;
 static constexpr int kMaxNumUnsyncReaders = 5;
-static constexpr int kMaxDataPerReader = 5;
+static constexpr int kMaxDataPerReader = 1000;
 
 typedef android::AidlMessageQueue<payload_t, SynchronizedReadWrite> AidlMessageQueueSync;
 typedef android::AidlMessageQueue<payload_t, UnsynchronizedWrite> AidlMessageQueueUnsync;
@@ -111,30 +112,35 @@
 }
 
 template <typename Queue, typename Desc>
-void readerBlocking(const Desc& desc, std::vector<uint8_t> readerData) {
+void readerBlocking(const Desc& desc, std::vector<uint8_t>& readerData,
+                    std::atomic<size_t>& readersNotFinished,
+                    std::atomic<size_t>& writersNotFinished) {
+    android::base::ScopeGuard guard([&readersNotFinished]() { readersNotFinished--; });
     Queue readMq(desc);
     if (!readMq.isValid()) {
         LOG(ERROR) << "read mq invalid";
         return;
     }
     FuzzedDataProvider fdp(&readerData[0], readerData.size());
-    bool success;
     do {
         size_t count = fdp.remaining_bytes()
                                ? fdp.ConsumeIntegralInRange<size_t>(1, readMq.getQuantumCount())
                                : 1;
         std::vector<payload_t> data;
         data.resize(count);
-        success = readMq.readBlocking(data.data(), count, kBlockingTimeoutNs);
-    } while (success == true || fdp.remaining_bytes() > sizeof(size_t));
+        readMq.readBlocking(data.data(), count, kBlockingTimeoutNs);
+    } while (fdp.remaining_bytes() > sizeof(size_t) && writersNotFinished > 0);
 }
 
 // Can't use blocking calls with Unsync queues(there is a static_assert)
 template <>
 void readerBlocking<AidlMessageQueueUnsync, AidlMQDescUnsync>(const AidlMQDescUnsync&,
-                                                              std::vector<uint8_t>) {}
+                                                              std::vector<uint8_t>&,
+                                                              std::atomic<size_t>&,
+                                                              std::atomic<size_t>&) {}
 template <>
-void readerBlocking<MessageQueueUnsync, MQDescUnsync>(const MQDescUnsync&, std::vector<uint8_t>) {}
+void readerBlocking<MessageQueueUnsync, MQDescUnsync>(const MQDescUnsync&, std::vector<uint8_t>&,
+                                                      std::atomic<size_t>&, std::atomic<size_t>&) {}
 
 template <typename Queue>
 void writer(Queue& writeMq, FuzzedDataProvider& fdp, bool userFd) {
@@ -168,8 +174,11 @@
 }
 
 template <typename Queue>
-void writerBlocking(Queue& writeMq, FuzzedDataProvider& fdp) {
-    while (fdp.remaining_bytes() > sizeof(size_t)) {
+void writerBlocking(Queue& writeMq, FuzzedDataProvider& fdp,
+                    std::atomic<size_t>& writersNotFinished,
+                    std::atomic<size_t>& readersNotFinished) {
+    android::base::ScopeGuard guard([&writersNotFinished]() { writersNotFinished--; });
+    while (fdp.remaining_bytes() > sizeof(size_t) && readersNotFinished > 0) {
         size_t count = fdp.ConsumeIntegralInRange<size_t>(1, writeMq.getQuantumCount());
         std::vector<payload_t> data;
         for (int i = 0; i < count; i++) {
@@ -181,9 +190,11 @@
 
 // Can't use blocking calls with Unsync queues(there is a static_assert)
 template <>
-void writerBlocking<AidlMessageQueueUnsync>(AidlMessageQueueUnsync&, FuzzedDataProvider&) {}
+void writerBlocking<AidlMessageQueueUnsync>(AidlMessageQueueUnsync&, FuzzedDataProvider&,
+                                            std::atomic<size_t>&, std::atomic<size_t>&) {}
 template <>
-void writerBlocking<MessageQueueUnsync>(MessageQueueUnsync&, FuzzedDataProvider&) {}
+void writerBlocking<MessageQueueUnsync>(MessageQueueUnsync&, FuzzedDataProvider&,
+                                        std::atomic<size_t>&, std::atomic<size_t>&) {}
 
 template <typename Queue, typename Desc>
 void fuzzAidlWithReaders(std::vector<uint8_t>& writerData,
@@ -207,25 +218,29 @@
     const auto desc = writeMq.dupeDesc();
     CHECK(desc.handle.fds[0].get() != -1);
 
-    std::vector<std::thread> clients;
+    std::atomic<size_t> readersNotFinished = readerData.size();
+    std::atomic<size_t> writersNotFinished = 1;
+    std::vector<std::thread> readers;
     for (int i = 0; i < readerData.size(); i++) {
         if (blocking) {
-            clients.emplace_back(readerBlocking<Queue, Desc>, std::ref(desc),
-                                 std::ref(readerData[i]));
+            readers.emplace_back(readerBlocking<Queue, Desc>, std::ref(desc),
+                                 std::ref(readerData[i]), std::ref(readersNotFinished),
+                                 std::ref(writersNotFinished));
+
         } else {
-            clients.emplace_back(reader<Queue, Desc>, std::ref(desc), std::ref(readerData[i]),
+            readers.emplace_back(reader<Queue, Desc>, std::ref(desc), std::ref(readerData[i]),
                                  userFd);
         }
     }
 
     if (blocking) {
-        writerBlocking<Queue>(writeMq, fdp);
+        writerBlocking<Queue>(writeMq, fdp, writersNotFinished, readersNotFinished);
     } else {
         writer<Queue>(writeMq, fdp, userFd);
     }
 
-    for (auto& client : clients) {
-        client.join();
+    for (auto& reader : readers) {
+        reader.join();
     }
 }
 
@@ -251,25 +266,28 @@
     const auto desc = writeMq.getDesc();
     CHECK(desc->isHandleValid());
 
-    std::vector<std::thread> clients;
+    std::atomic<size_t> readersNotFinished = readerData.size();
+    std::atomic<size_t> writersNotFinished = 1;
+    std::vector<std::thread> readers;
     for (int i = 0; i < readerData.size(); i++) {
         if (blocking) {
-            clients.emplace_back(readerBlocking<Queue, Desc>, std::ref(*desc),
-                                 std::ref(readerData[i]));
+            readers.emplace_back(readerBlocking<Queue, Desc>, std::ref(*desc),
+                                 std::ref(readerData[i]), std::ref(readersNotFinished),
+                                 std::ref(writersNotFinished));
         } else {
-            clients.emplace_back(reader<Queue, Desc>, std::ref(*desc), std::ref(readerData[i]),
+            readers.emplace_back(reader<Queue, Desc>, std::ref(*desc), std::ref(readerData[i]),
                                  userFd);
         }
     }
 
     if (blocking) {
-        writerBlocking<Queue>(writeMq, fdp);
+        writerBlocking<Queue>(writeMq, fdp, writersNotFinished, readersNotFinished);
     } else {
         writer<Queue>(writeMq, fdp, userFd);
     }
 
-    for (auto& client : clients) {
-        client.join();
+    for (auto& reader : readers) {
+        reader.join();
     }
 }