// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <fcntl.h>
#include <lib/fit/defer.h>
#include <time.h>
#include <unistd.h>

#include <gtest/gtest.h>

#include "src/developer/debug/shared/platform_message_loop.h"

#if defined(__Fuchsia__)
#include <lib/zx/socket.h>

#include "src/developer/debug/shared/socket_watcher.h"
#endif

namespace debug {

namespace {

// This helper class sets a boolean when it's destructed. Tests can use it to ensure that the
// lifetime of a lambda is correct.
class SetOnDestruct {
 public:
  // The flag must outlive this class.
  explicit SetOnDestruct(bool* flag) : flag_(flag) {}
  ~SetOnDestruct() { *flag_ = true; }

 private:
  bool* flag_;
};

}  // namespace

// This test either passes or hangs forever because the post didn't work.
// We could add a timer timeout, but if regular task posting doesn't work it's
// not clear why timer tasks would.
TEST(MessageLoop, PostQuit) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  loop.PostTask(FROM_HERE, [loop_ptr = &loop]() { loop_ptr->QuitNow(); });
  loop.Run();

  loop.Cleanup();
}

// Like the above but expresses the task as a fpromise::promise.
TEST(MessageLoop, PostPendingTaskQuit) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  loop.PostTask(FROM_HERE, fpromise::make_promise([&loop]() { loop.QuitNow(); }));
  loop.Run();

  loop.Cleanup();
}

TEST(MessageLoop, TimerQuit) {
  const uint64_t kNano = 1000000000;

  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  struct timespec start;
  ASSERT_FALSE(clock_gettime(CLOCK_MONOTONIC, &start));

  loop.PostTimer(FROM_HERE, 50, [loop_ptr = &loop]() { loop_ptr->QuitNow(); });
  loop.Run();

  struct timespec end;
  ASSERT_FALSE(clock_gettime(CLOCK_MONOTONIC, &end));
  ASSERT_GE(end.tv_sec, start.tv_sec);

  uint64_t nsec = (end.tv_sec - start.tv_sec) * kNano;
  nsec += end.tv_nsec;
  nsec -= start.tv_nsec;

  EXPECT_GE(nsec, 50u);

  // If we test an upper bound for nsec this test could potentially be flaky.
  // We don't actually make any guarantees about the upper bound anyway.

  loop.Cleanup();
}

// Tests a promise that suspends itself and then continues.
TEST(MessageLoop, SuspendPromise) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  bool lambda_destructed = false;

  fpromise::suspended_task suspended;
  int run_count = 0;
  bool should_complete = false;
  loop.PostTask(FROM_HERE, fpromise::make_promise(
                               [&should_complete, &run_count, &suspended,
                                destructed = std::make_shared<SetOnDestruct>(&lambda_destructed)](
                                   fpromise::context& context) -> fpromise::result<> {
                                 run_count++;

                                 if (should_complete)
                                   return fpromise::ok();

                                 suspended = context.suspend_task();  // So we can signal later.
                                 return fpromise::pending();
                               }));

  // Should not have run yet.
  EXPECT_EQ(0, run_count);

  // Pulse the message loop. The task should have run once (responded with "suspend") and set the
  // suspended_task.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(1, run_count);
  EXPECT_TRUE(suspended);

  // Run the loop again without doing anything. Nothing should have happened because the task was
  // not unsuspended.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(1, run_count);

  // Mark the task as runnable again. It should run once but still report pending. Note that we
  // run the task synchronously from the resume_task() message as explained in
  // MessageLoop::resolve_ticket().
  suspended.resume_task();
  EXPECT_EQ(2, run_count);
  EXPECT_TRUE(suspended);
  EXPECT_FALSE(lambda_destructed);  // Lambda should not be deleted.

  // Tell the task to complete and signal again. It should be done.
  should_complete = true;
  suspended.resume_task();
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(3, run_count);
  EXPECT_FALSE(suspended);
  EXPECT_TRUE(lambda_destructed);  // Lambda should be deleted.

  loop.Cleanup();
}

// Duplicates the suspended_task controlling the suspended promise.
TEST(MessageLoop, DuplicateSuspendedPromise) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  bool lambda_destructed = false;

  fpromise::suspended_task suspended;
  int run_count = 0;
  bool should_complete = false;
  loop.PostTask(FROM_HERE, fpromise::make_promise(
                               [&should_complete, &run_count, &suspended,
                                destructed = std::make_shared<SetOnDestruct>(&lambda_destructed)](
                                   fpromise::context& context) -> fpromise::result<> {
                                 run_count++;

                                 if (should_complete)
                                   return fpromise::ok();

                                 suspended = context.suspend_task();  // So we can signal later.
                                 return fpromise::pending();
                               }));

  // Should not have run yet.
  EXPECT_EQ(0, run_count);

  // Pulse the message loop. The task should have run once (responded with "suspend") and set the
  // suspended_task.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(1, run_count);
  EXPECT_TRUE(suspended);

  // Duplicate the suspended task handle.
  fpromise::suspended_task suspended2 = suspended;
  should_complete = true;
  suspended.resume_task();  // Should run synchronously.
  EXPECT_EQ(2, run_count);

  // Resuming the other one does nothing. This suspend was already marked resolved from the other
  // token.
  should_complete = true;
  suspended2.resume_task();
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(2, run_count);  // Same as before.
  EXPECT_TRUE(lambda_destructed);

  loop.Cleanup();
}

// Tests a promise that suspends itself and then becomes abandoned (deleted before it's runnable).
TEST(MessageLoop, AbandonPromise) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  bool lambda_destructed = false;

  fpromise::suspended_task suspended;
  int run_count = 0;
  loop.PostTask(FROM_HERE, fpromise::make_promise(
                               [&run_count, &suspended,
                                destructed = std::make_shared<SetOnDestruct>(&lambda_destructed)](
                                   fpromise::context& context) -> fpromise::result<> {
                                 run_count++;
                                 suspended = context.suspend_task();  // So we can signal later.
                                 return fpromise::pending();
                               }));

  // Should not have run yet.
  EXPECT_EQ(0, run_count);

  // Pulse the message loop. The task should have run once (responded with "suspend") and set the
  // suspended_task.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(1, run_count);
  EXPECT_TRUE(suspended);

  // Free the suspended task. This should free the lambda and not run it.
  suspended = fpromise::suspended_task();
  EXPECT_EQ(1, run_count);
  EXPECT_TRUE(lambda_destructed);

  loop.Cleanup();
}

// Runs a promise right away without posting to the message loop.
TEST(MessageLoop, RunPromiseSync) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  bool lambda_destructed = false;

  fpromise::suspended_task suspended;
  int run_count = 0;
  bool should_complete = false;
  loop.RunTask(FROM_HERE, fpromise::make_promise(
                              [&run_count, &suspended, &should_complete,
                               destructed = std::make_shared<SetOnDestruct>(&lambda_destructed)](
                                  fpromise::context& context) -> fpromise::result<> {
                                run_count++;

                                if (should_complete)
                                  return fpromise::ok();

                                suspended = context.suspend_task();  // So we can signal later.
                                return fpromise::pending();
                              }));

  // Should have run but not completed.
  EXPECT_EQ(1, run_count);
  EXPECT_FALSE(lambda_destructed);
  EXPECT_TRUE(suspended);

  // Pulse the message loop.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(1, run_count);  // Same as before.

  // Unsuspend, the task should complete.
  should_complete = true;
  suspended.resume_task();
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(2, run_count);
  EXPECT_FALSE(suspended);
  EXPECT_TRUE(lambda_destructed);

  loop.Cleanup();
}

// Runs a promise without posting from inside another promise.
TEST(MessageLoop, RunNestedPromiseSync) {
  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  fpromise::suspended_task inner_suspended;
  int inner_run_count = 0;
  bool inner_should_complete = false;

  fpromise::suspended_task outer_suspended;
  int outer_run_count = 0;
  bool outer_should_complete = false;

  loop.PostTask(
      FROM_HERE, fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
        outer_run_count++;

        if (outer_should_complete)
          return fpromise::ok();

        int old_inner_run_count = inner_run_count;
        loop.RunTask(FROM_HERE,
                     fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
                       inner_run_count++;

                       if (inner_should_complete)
                         return fpromise::ok();

                       inner_suspended = context.suspend_task();
                       return fpromise::pending();
                     }));
        EXPECT_EQ(inner_run_count, old_inner_run_count + 1);  // Should have run once.

        outer_suspended = context.suspend_task();  // So we can signal later.
        return fpromise::pending();
      }));

  // Nothing should have happened yet.
  EXPECT_EQ(0, inner_run_count);
  EXPECT_EQ(0, outer_run_count);

  // Pulse the message loop. Both outer and inner loops should run once and suspend.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(1, inner_run_count);
  EXPECT_EQ(1, outer_run_count);

  // Let the inner one complete.
  inner_should_complete = true;
  inner_suspended.resume_task();
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(2, inner_run_count);  // One more run.
  EXPECT_EQ(1, outer_run_count);  // Same as before.

  // Run the outer one again but it will still return async. This will queue up another inner loop
  // but this time the inner loop should exit right away and not set the inner suspended.
  inner_suspended = fpromise::suspended_task();
  outer_suspended.resume_task();
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(3, inner_run_count);  // One more run.
  EXPECT_EQ(2, outer_run_count);  // One more run.
  EXPECT_FALSE(inner_suspended);  // Not suspended, it completed synchronously now.

  // Complete the outer one.
  outer_should_complete = true;
  outer_suspended.resume_task();
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });
  loop.Run();
  EXPECT_EQ(3, inner_run_count);  // Same as before
  EXPECT_EQ(3, outer_run_count);  // One more run.

  loop.Cleanup();
}

TEST(MessageLoop, WatchPipeFD) {
  // Make a pipe to talk about.
  fbl::unique_fd pipe_out;
  fbl::unique_fd pipe_in;
  ASSERT_TRUE(CreateLocalNonBlockingPipe(&pipe_out, &pipe_in));

  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  // Scope everything to before MessageLoop::Cleanup().
  {
    bool got_read = false;
    bool got_write = true;
    bool got_err = true;

    // Going to write to pipefd[1] -> read from pipefd[0].
    MessageLoop::WatchHandle watch_handle =
        loop.WatchFD(MessageLoop::WatchMode::kRead, pipe_out.get(),
                     [&](int fd, bool read, bool write, bool err) {
                       got_read = read;
                       got_write = write;
                       got_err = err;
                       loop.QuitNow();
                     });
    ASSERT_TRUE(watch_handle.watching());

    // Enqueue a task that should cause pipefd[0] to become readable.
    loop.PostTask(FROM_HERE, [write_fd = pipe_in.get()]() { write(write_fd, "Hello", 5); });

    // This will quit on success because the OnFDReady callback called QuitNow,
    // or hang forever on failure.
    // TODO(brettw) add a timeout when timers are supported in the message loop.
    loop.Run();

    EXPECT_TRUE(got_read);
    EXPECT_FALSE(got_write);
    EXPECT_FALSE(got_err);
  }
  loop.Cleanup();
}

TEST(MessageLoop, RunUntilNoTasks) {
  PlatformMessageLoop loop;

  static constexpr int kCallCount = 5;
  struct Calls {
    int called[kCallCount];
  };

  Calls calls;
  for (int i = 0; i < kCallCount; i++) {
    calls.called[i] = -1;
  }

  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;
  {
    loop.PostTask(FROM_HERE, [&calls]() mutable { calls.called[0] = 0; });
    loop.PostTask(FROM_HERE, [&calls]() mutable { calls.called[1] = 1; });

    // Nested calles should work.
    loop.PostTask(FROM_HERE, [&calls, &loop]() mutable {
      loop.PostTask(FROM_HERE, [&calls, &loop]() {
        loop.PostTask(FROM_HERE, [&calls]() { calls.called[4] = 4; });
        calls.called[3] = 3;
      });
      calls.called[2] = 2;
    });

    loop.RunUntilNoTasks();

    // All should've been called in the expected order.
    EXPECT_EQ(calls.called[0], 0);
    EXPECT_EQ(calls.called[1], 1);
    EXPECT_EQ(calls.called[2], 2);
    EXPECT_EQ(calls.called[3], 3);
    EXPECT_EQ(calls.called[4], 4);
  }

  loop.Cleanup();
}

TEST(MessageLoop, RunUntilNoTasks_EmptyQueue) {
  PlatformMessageLoop loop;

  static constexpr int kCallCount = 5;
  struct Calls {
    int called[kCallCount];
  };

  Calls calls;
  for (int i = 0; i < kCallCount; i++) {
    calls.called[i] = -1;
  }

  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;
  { loop.RunUntilNoTasks(); }

  loop.Cleanup();
}

// Cleanup should destruct tasks correctly.
TEST(MessageLoop, CorrectCleanup) {
  PlatformMessageLoop loop;

  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  // Post a task that quits the loop immediately.
  loop.PostTask(FROM_HERE, [&loop]() { loop.QuitNow(); });

  // Post a timer in the future.
  bool timer_destructed = false;
  auto on_timer_destruct = fit::defer([&timer_destructed]() { timer_destructed = true; });
  loop.PostTimer(FROM_HERE, 100, [on_timer_destruct = std::move(on_timer_destruct)]() {});

  // Watch a pipe forever.
  fbl::unique_fd pipe_out;
  fbl::unique_fd pipe_in;
  ASSERT_TRUE(CreateLocalNonBlockingPipe(&pipe_out, &pipe_in));
  MessageLoop::WatchHandle watch_handle;
  auto on_destruct = fit::defer([&watch_handle]() { watch_handle = {}; });
  watch_handle = loop.WatchFD(
      MessageLoop::WatchMode::kRead, pipe_out.get(),
      [on_destruct = std::move(on_destruct)](int fd, bool read, bool write, bool err) {});

  loop.Run();
  EXPECT_FALSE(timer_destructed);
  EXPECT_TRUE(watch_handle.watching());

  loop.Cleanup();
  EXPECT_TRUE(timer_destructed);
  EXPECT_FALSE(watch_handle.watching());
}

#if defined(__Fuchsia__)
TEST(MessageLoop, ZirconSocket) {
  zx::socket sender, receiver;
  ASSERT_EQ(ZX_OK, zx::socket::create(ZX_SOCKET_STREAM, &sender, &receiver));

  class ReadableWatcher : public SocketWatcher {
   public:
    explicit ReadableWatcher(MessageLoop* loop) : loop_(loop) {}
    void OnSocketReadable(zx_handle_t socket_handle) override { loop_->QuitNow(); }

   private:
    MessageLoop* loop_;
  };

  PlatformMessageLoop loop;
  std::string error_message;
  ASSERT_TRUE(loop.Init(&error_message)) << error_message;

  // Scope everything to before MessageLoop::Cleanup().
  {
    ReadableWatcher watcher(&loop);

    MessageLoop::WatchHandle watch_handle;
    ASSERT_EQ(ZX_OK, loop.WatchSocket(MessageLoop::WatchMode::kRead, receiver.get(), &watcher,
                                      &watch_handle));
    ASSERT_TRUE(watch_handle.watching());

    // Enqueue a task that should cause receiver to become readable.
    loop.PostTask(FROM_HERE, [&sender]() { sender.write(0, "Hello", 5, nullptr); });

    // This will quit on success because the OnSocketReadable callback called
    // QuitNow, or hang forever on failure.
    // TODO(brettw) add a timeout when timers are supported in the message loop.
    loop.Run();
  }
  loop.Cleanup();
}
#endif

}  // namespace debug
