| // Copyright 2023 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| // clang-format off |
| #include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first. |
| |
| #include "pw_rpc/fuzz/engine.h" |
| // clang-format on |
| |
| #include <algorithm> |
| #include <cctype> |
| #include <chrono> |
| #include <cinttypes> |
| #include <limits> |
| #include <mutex> |
| |
| #include "pw_assert/check.h" |
| #include "pw_bytes/span.h" |
| #include "pw_log/log.h" |
| #include "pw_span/span.h" |
| #include "pw_status/status.h" |
| #include "pw_string/format.h" |
| |
| namespace pw::rpc::fuzz { |
| namespace { |
| |
| using namespace std::chrono_literals; |
| |
| // Maximum number of bytes written in a single unary or stream request. |
| constexpr size_t kMaxWriteLen = MaxSafePayloadSize(); |
| static_assert(kMaxWriteLen * 0x7E <= std::numeric_limits<uint16_t>::max()); |
| |
| struct ActiveVisitor final { |
| using result_type = bool; |
| result_type operator()(std::monostate&) { return false; } |
| result_type operator()(pw::rpc::RawUnaryReceiver& call) { |
| return call.active(); |
| } |
| result_type operator()(pw::rpc::RawClientReaderWriter& call) { |
| return call.active(); |
| } |
| }; |
| |
| struct CloseClientStreamVisitor final { |
| using result_type = void; |
| result_type operator()(std::monostate&) {} |
| result_type operator()(pw::rpc::RawUnaryReceiver&) {} |
| result_type operator()(pw::rpc::RawClientReaderWriter& call) { |
| call.RequestCompletion().IgnoreError(); |
| } |
| }; |
| |
| struct WriteVisitor final { |
| using result_type = bool; |
| result_type operator()(std::monostate&) { return false; } |
| result_type operator()(pw::rpc::RawUnaryReceiver&) { return false; } |
| result_type operator()(pw::rpc::RawClientReaderWriter& call) { |
| if (!call.active()) { |
| return false; |
| } |
| call.Write(data).IgnoreError(); |
| return true; |
| } |
| ConstByteSpan data; |
| }; |
| |
| struct CancelVisitor final { |
| using result_type = void; |
| result_type operator()(std::monostate&) {} |
| result_type operator()(pw::rpc::RawUnaryReceiver& call) { |
| call.Cancel().IgnoreError(); |
| } |
| result_type operator()(pw::rpc::RawClientReaderWriter& call) { |
| call.Cancel().IgnoreError(); |
| } |
| }; |
| |
| struct AbandonVisitor final { |
| using result_type = void; |
| result_type operator()(std::monostate&) {} |
| result_type operator()(pw::rpc::RawUnaryReceiver& call) { call.Abandon(); } |
| result_type operator()(pw::rpc::RawClientReaderWriter& call) { |
| call.Abandon(); |
| } |
| }; |
| |
| } // namespace |
| |
| // `Action` methods. |
| |
| Action::Action(uint32_t encoded) { |
| // The first byte is used to determine the operation. The ranges used set the |
| // relative likelihood of each result, e.g. `kWait` is more likely than |
| // `kAbandon`. |
| uint32_t raw = encoded & 0xFF; |
| if (raw == 0) { |
| op = kSkip; |
| } else if (raw < 0x60) { |
| op = kWait; |
| } else if (raw < 0x80) { |
| op = kWriteUnary; |
| } else if (raw < 0xA0) { |
| op = kWriteStream; |
| } else if (raw < 0xC0) { |
| op = kCloseClientStream; |
| } else if (raw < 0xD0) { |
| op = kCancel; |
| } else if (raw < 0xE0) { |
| op = kAbandon; |
| } else if (raw < 0xF0) { |
| op = kSwap; |
| } else { |
| op = kDestroy; |
| } |
| target = ((encoded & 0xFF00) >> 8) % Fuzzer::kMaxConcurrentCalls; |
| value = encoded >> 16; |
| } |
| |
| Action::Action(Op op_, size_t target_, uint16_t value_) |
| : op(op_), target(target_), value(value_) {} |
| |
| Action::Action(Op op_, size_t target_, char val, size_t len) |
| : op(op_), target(target_) { |
| PW_ASSERT(op == kWriteUnary || op == kWriteStream); |
| value = static_cast<uint16_t>(((val % 0x80) * kMaxWriteLen) + |
| (len % kMaxWriteLen)); |
| } |
| |
| char Action::DecodeWriteValue(uint16_t value) { |
| return static_cast<char>((value / kMaxWriteLen) % 0x7F); |
| } |
| |
| size_t Action::DecodeWriteLength(uint16_t value) { |
| return value % kMaxWriteLen; |
| } |
| |
| uint32_t Action::Encode() const { |
| uint32_t encoded = 0; |
| switch (op) { |
| case kSkip: |
| encoded = 0x00; |
| break; |
| case kWait: |
| encoded = 0x5F; |
| break; |
| case kWriteUnary: |
| encoded = 0x7F; |
| break; |
| case kWriteStream: |
| encoded = 0x9F; |
| break; |
| case kCloseClientStream: |
| encoded = 0xBF; |
| break; |
| case kCancel: |
| encoded = 0xCF; |
| break; |
| case kAbandon: |
| encoded = 0xDF; |
| break; |
| case kSwap: |
| encoded = 0xEF; |
| break; |
| case kDestroy: |
| encoded = 0xFF; |
| break; |
| } |
| encoded |= |
| ((target < Fuzzer::kMaxConcurrentCalls ? target |
| : Fuzzer::kMaxConcurrentCalls) % |
| 0xFF) |
| << 8; |
| encoded |= (static_cast<uint32_t>(value) << 16); |
| return encoded; |
| } |
| |
| void Action::Log(bool verbose, size_t num_actions, const char* fmt, ...) const { |
| if (!verbose) { |
| return; |
| } |
| char s1[16]; |
| auto result = callback_id < Fuzzer::kMaxConcurrentCalls |
| ? string::Format(s1, "%-3zu", callback_id) |
| : string::Format(s1, "n/a"); |
| va_list ap; |
| va_start(ap, fmt); |
| char s2[128]; |
| if (result.ok()) { |
| result = string::FormatVaList(s2, fmt, ap); |
| } |
| va_end(ap); |
| if (result.ok()) { |
| PW_LOG_INFO("#%-12zu\tthread: %zu\tcallback for: %s\ttarget call: %zu\t%s", |
| num_actions, |
| thread_id, |
| s1, |
| target, |
| s2); |
| } else { |
| LogFailure(verbose, num_actions, result.status()); |
| } |
| } |
| |
| void Action::LogFailure(bool verbose, size_t num_actions, Status status) const { |
| if (verbose && !status.ok()) { |
| PW_LOG_INFO("#%-12zu\tthread: %zu\tFailed to log action: %s", |
| num_actions, |
| thread_id, |
| pw_StatusString(status)); |
| } |
| } |
| |
| // FuzzyCall methods. |
| |
| void FuzzyCall::RecordWrite(size_t num, bool append) { |
| std::lock_guard lock(mutex_); |
| if (append) { |
| last_write_ += num; |
| } else { |
| last_write_ = num; |
| } |
| total_written_ += num; |
| pending_ = true; |
| } |
| |
| void FuzzyCall::Await() { |
| std::unique_lock<sync::Mutex> lock(mutex_); |
| cv_.wait(lock, [this]() PW_NO_LOCK_SAFETY_ANALYSIS { return !pending_; }); |
| } |
| |
| void FuzzyCall::Notify() { |
| if (pending_.exchange(false)) { |
| cv_.notify_all(); |
| } |
| } |
| |
| void FuzzyCall::Swap(FuzzyCall& other) { |
| if (index_ == other.index_) { |
| return; |
| } |
| // Manually acquire locks in an order based on call IDs to prevent deadlock. |
| if (index_ < other.index_) { |
| mutex_.lock(); |
| other.mutex_.lock(); |
| } else { |
| other.mutex_.lock(); |
| mutex_.lock(); |
| } |
| call_.swap(other.call_); |
| std::swap(id_, other.id_); |
| pending_ = other.pending_.exchange(pending_); |
| std::swap(last_write_, other.last_write_); |
| std::swap(total_written_, other.total_written_); |
| mutex_.unlock(); |
| other.mutex_.unlock(); |
| cv_.notify_all(); |
| other.cv_.notify_all(); |
| } |
| |
| void FuzzyCall::Reset(Variant call) { |
| { |
| std::lock_guard lock(mutex_); |
| call_ = std::move(call); |
| } |
| cv_.notify_all(); |
| } |
| |
| void FuzzyCall::Log() { |
| if (mutex_.try_lock_for(100ms)) { |
| PW_LOG_INFO("call %zu:", index_); |
| PW_LOG_INFO(" active: %s", |
| std::visit(ActiveVisitor(), call_) ? "true" : "false"); |
| PW_LOG_INFO(" request pending: %s ", pending_ ? "true" : "false"); |
| PW_LOG_INFO(" last write: %zu bytes", last_write_); |
| PW_LOG_INFO(" total written: %zu bytes", total_written_); |
| mutex_.unlock(); |
| } else { |
| PW_LOG_WARN("call %zu: failed to acquire lock", index_); |
| } |
| } |
| |
| // `Fuzzer` methods. |
| |
| #define FUZZ_LOG_VERBOSE(...) \ |
| if (verbose_) { \ |
| PW_LOG_INFO(__VA_ARGS__); \ |
| } |
| |
| Fuzzer::Fuzzer(Client& client, uint32_t channel_id) |
| : client_(client, channel_id), |
| timer_([this](chrono::SystemClock::time_point) { |
| PW_LOG_ERROR( |
| "Workers performed %zu actions before timing out without an " |
| "update.", |
| num_actions_.load()); |
| PW_LOG_INFO("Additional call details:"); |
| for (auto& call : fuzzy_calls_) { |
| call.Log(); |
| } |
| PW_CRASH("Fuzzer found a fatal error condition: TIMEOUT."); |
| }) { |
| for (size_t index = 0; index < kMaxConcurrentCalls; ++index) { |
| fuzzy_calls_.emplace_back(index); |
| indices_.push_back(index); |
| contexts_.push_back(CallbackContext{.id = index, .fuzzer = this}); |
| } |
| } |
| |
| void Fuzzer::Run(uint64_t seed, size_t num_actions) { |
| FUZZ_LOG_VERBOSE("Fuzzing RPC client with:"); |
| FUZZ_LOG_VERBOSE(" num_actions: %zu", num_actions); |
| FUZZ_LOG_VERBOSE(" seed: %" PRIu64, seed); |
| num_actions_.store(0); |
| random::XorShiftStarRng64 rng(seed); |
| while (true) { |
| { |
| size_t actions_done = num_actions_.load(); |
| if (actions_done >= num_actions) { |
| FUZZ_LOG_VERBOSE("Fuzzing complete; %zu actions performed.", |
| actions_done); |
| break; |
| } |
| FUZZ_LOG_VERBOSE("%zu actions remaining.", num_actions - actions_done); |
| } |
| FUZZ_LOG_VERBOSE("Generating %zu random actions.", kMaxActions); |
| pw::Vector<uint32_t, kMaxActions> actions; |
| for (size_t i = 0; i < kNumThreads; ++i) { |
| size_t num_actions_for_thread; |
| rng.GetInt(num_actions_for_thread, kMaxActionsPerThread + 1); |
| for (size_t j = 0; j < num_actions_for_thread; ++j) { |
| uint32_t encoded = 0; |
| while (!encoded) { |
| rng.GetInt(encoded); |
| } |
| actions.push_back(encoded); |
| } |
| actions.push_back(0); |
| } |
| Run(actions); |
| } |
| } |
| |
| void Fuzzer::Run(const pw::Vector<uint32_t>& actions) { |
| FUZZ_LOG_VERBOSE("Starting %zu threads to perform %zu actions:", |
| kNumThreads - 1, |
| actions.size()); |
| FUZZ_LOG_VERBOSE(" timeout: %lldms", timer_.timeout() / 1ms); |
| auto iter = actions.begin(); |
| timer_.Restart(); |
| for (size_t thread_id = 0; thread_id < kNumThreads; ++thread_id) { |
| pw::Vector<uint32_t, kMaxActionsPerThread> thread_actions; |
| while (thread_actions.size() < kMaxActionsPerThread && |
| iter != actions.end()) { |
| uint32_t encoded = *iter++; |
| if (!encoded) { |
| break; |
| } |
| thread_actions.push_back(encoded); |
| } |
| if (thread_id == 0) { |
| std::lock_guard lock(mutex_); |
| callback_actions_ = std::move(thread_actions); |
| callback_iterator_ = callback_actions_.begin(); |
| } else { |
| threads_.emplace_back( |
| [this, thread_id, actions = std::move(thread_actions)]() { |
| for (const auto& encoded : actions) { |
| Action action(encoded); |
| action.set_thread_id(thread_id); |
| Perform(action); |
| } |
| }); |
| } |
| } |
| for (auto& t : threads_) { |
| t.join(); |
| } |
| for (auto& fuzzy_call : fuzzy_calls_) { |
| fuzzy_call.Reset(); |
| } |
| timer_.Cancel(); |
| } |
| |
| void Fuzzer::Perform(const Action& action) { |
| FuzzyCall& fuzzy_call = FindCall(action.target); |
| switch (action.op) { |
| case Action::kSkip: { |
| if (action.thread_id == 0) { |
| action.Log(verbose_, ++num_actions_, "Callback chain completed"); |
| } |
| break; |
| } |
| case Action::kWait: { |
| if (action.callback_id == action.target) { |
| // Don't wait in a callback of the target call. |
| break; |
| } |
| if (fuzzy_call.pending()) { |
| action.Log(verbose_, ++num_actions_, "Waiting for call."); |
| fuzzy_call.Await(); |
| } |
| break; |
| } |
| case Action::kWriteUnary: |
| case Action::kWriteStream: { |
| if (action.callback_id == action.target) { |
| // Don't create a new call from the call's own callback. |
| break; |
| } |
| char buf[kMaxWriteLen]; |
| char val = Action::DecodeWriteValue(action.value); |
| size_t len = Action::DecodeWriteLength(action.value); |
| memset(buf, val, len); |
| if (verbose_) { |
| char msg_buf[64]; |
| span msg(msg_buf); |
| auto result = string::Format( |
| msg, |
| "Writing %s request of ", |
| action.op == Action::kWriteUnary ? "unary" : "stream"); |
| if (result.ok()) { |
| size_t off = result.size(); |
| result = string::Format( |
| msg.subspan(off), |
| isprint(val) ? "['%c'; %zu]." : "['\\x%02x'; %zu].", |
| val, |
| len); |
| } |
| size_t num_actions = ++num_actions_; |
| if (result.ok()) { |
| action.Log(verbose_, num_actions, "%s", msg.data()); |
| } else if (verbose_) { |
| action.LogFailure(verbose_, num_actions, result.status()); |
| } |
| } |
| bool append = false; |
| if (action.op == Action::kWriteUnary) { |
| // Send a unary request. |
| fuzzy_call.Reset(client_.UnaryEcho( |
| as_bytes(span(buf, len)), |
| /* on completed */ |
| [context = GetContext(action.target)](ConstByteSpan, Status) { |
| context->fuzzer->OnCompleted(context->id); |
| }, |
| /* on error */ |
| [context = GetContext(action.target)](Status status) { |
| context->fuzzer->OnError(context->id, status); |
| })); |
| |
| } else if (fuzzy_call.Visit( |
| WriteVisitor{.data = as_bytes(span(buf, len))})) { |
| // Append to an existing stream |
| append = true; |
| } else { |
| // .Open a new stream. |
| fuzzy_call.Reset(client_.BidirectionalEcho( |
| /* on next */ |
| [context = GetContext(action.target)](ConstByteSpan) { |
| context->fuzzer->OnNext(context->id); |
| }, |
| /* on completed */ |
| [context = GetContext(action.target)](Status) { |
| context->fuzzer->OnCompleted(context->id); |
| }, |
| /* on error */ |
| [context = GetContext(action.target)](Status status) { |
| context->fuzzer->OnError(context->id, status); |
| })); |
| } |
| fuzzy_call.RecordWrite(len, append); |
| break; |
| } |
| case Action::kCloseClientStream: |
| action.Log(verbose_, ++num_actions_, "Closing stream."); |
| fuzzy_call.Visit(CloseClientStreamVisitor()); |
| break; |
| case Action::kCancel: |
| action.Log(verbose_, ++num_actions_, "Canceling call."); |
| fuzzy_call.Visit(CancelVisitor()); |
| break; |
| case Action::kAbandon: { |
| action.Log(verbose_, ++num_actions_, "Abandoning call."); |
| fuzzy_call.Visit(AbandonVisitor()); |
| break; |
| } |
| case Action::kSwap: { |
| size_t other_target = action.value % kMaxConcurrentCalls; |
| if (action.callback_id == action.target || |
| action.callback_id == other_target) { |
| // Don't move a call from within its own callback. |
| break; |
| } |
| action.Log(verbose_, |
| ++num_actions_, |
| "Swapping call with call %zu.", |
| other_target); |
| std::lock_guard lock(mutex_); |
| FuzzyCall& other = FindCallLocked(other_target); |
| std::swap(indices_[fuzzy_call.id()], indices_[other.id()]); |
| fuzzy_call.Swap(other); |
| break; |
| } |
| case Action::kDestroy: { |
| if (action.callback_id == action.target) { |
| // Don't destroy a call from within its own callback. |
| break; |
| } |
| action.Log(verbose_, ++num_actions_, "Destroying call."); |
| fuzzy_call.Reset(); |
| break; |
| } |
| default: |
| break; |
| } |
| timer_.Restart(); |
| } |
| |
| void Fuzzer::OnNext(size_t callback_id) { FindCall(callback_id).Notify(); } |
| |
| void Fuzzer::OnCompleted(size_t callback_id) { |
| uint32_t encoded = 0; |
| { |
| std::lock_guard lock(mutex_); |
| if (callback_iterator_ != callback_actions_.end()) { |
| encoded = *callback_iterator_++; |
| } |
| } |
| Action action(encoded); |
| action.set_callback_id(callback_id); |
| Perform(action); |
| FindCall(callback_id).Notify(); |
| } |
| |
| void Fuzzer::OnError(size_t callback_id, Status status) { |
| FuzzyCall& call = FindCall(callback_id); |
| PW_LOG_WARN("Call %zu received an error from the server: %s", |
| call.id(), |
| pw_StatusString(status)); |
| call.Notify(); |
| } |
| |
| } // namespace pw::rpc::fuzz |