// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "content_stream.h"

#include "dap/any.h"
#include "dap/session.h"

#include "chan.h"
#include "json_serializer.h"
#include "socket.h"

#include <stdarg.h>
#include <stdio.h>
#include <atomic>
#include <deque>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>

namespace {

class Impl : public dap::Session {
 public:
  void onError(const ErrorHandler& handler) override { handlers.put(handler); }

  void registerHandler(const dap::TypeInfo* typeinfo,
                       const GenericRequestHandler& handler) override {
    handlers.put(typeinfo, handler);
  }

  void registerHandler(const dap::TypeInfo* typeinfo,
                       const GenericEventHandler& handler) override {
    handlers.put(typeinfo, handler);
  }

  void registerHandler(const dap::TypeInfo* typeinfo,
                       const GenericResponseSentHandler& handler) override {
    handlers.put(typeinfo, handler);
  }

  void bind(const std::shared_ptr<dap::Reader>& r,
            const std::shared_ptr<dap::Writer>& w) override {
    if (isBound.exchange(true)) {
      handlers.error("Session is already bound!");
      return;
    }

    reader = dap::ContentReader(r);
    writer = dap::ContentWriter(w);

    recvThread = std::thread([this] {
      while (reader.isOpen()) {
        auto request = reader.read();
        if (request.size() > 0) {
          if (auto payload = processMessage(request)) {
            inbox.put(std::move(payload));
          }
        }
      }
    });

    dispatchThread = std::thread([this] {
      while (auto payload = inbox.take()) {
        payload.value()();
      }
    });
  }

  bool send(const dap::TypeInfo* typeinfo,
            const void* request,
            const GenericResponseHandler& responseHandler) override {
    int seq = nextSeq++;

    handlers.put(seq, typeinfo, responseHandler);

    dap::json::Serializer s;
    s.field("seq", dap::integer(seq));
    s.field("type", "request");
    s.field("command", typeinfo->name());
    s.field("arguments", [&](dap::Serializer* s) {
      return typeinfo->serialize(s, request);
    });
    return send(s.dump());
  }

  bool send(const dap::TypeInfo* typeinfo, const void* event) override {
    dap::json::Serializer s;
    s.field("seq", dap::integer(nextSeq++));
    s.field("type", "event");
    s.field("event", typeinfo->name());
    s.field("body",
            [&](dap::Serializer* s) { return typeinfo->serialize(s, event); });
    return send(s.dump());
  }

  ~Impl() {
    inbox.close();
    reader.close();
    writer.close();
    if (recvThread.joinable()) {
      recvThread.join();
    }
    if (dispatchThread.joinable()) {
      dispatchThread.join();
    }
  }

 private:
  using Payload = std::function<void()>;

  class EventHandlers {
   public:
    void put(const ErrorHandler& handler) {
      std::unique_lock<std::mutex> lock(errorMutex);
      errorHandler = handler;
    }

    void error(const char* format, ...) {
      va_list vararg;
      va_start(vararg, format);
      std::unique_lock<std::mutex> lock(errorMutex);
      errorLocked(format, vararg);
      va_end(vararg);
    }

    std::pair<const dap::TypeInfo*, GenericRequestHandler> request(
        const std::string& name) {
      std::unique_lock<std::mutex> lock(requestMutex);
      auto it = requestMap.find(name);
      return (it != requestMap.end()) ? it->second : decltype(it->second){};
    }

    void put(const dap::TypeInfo* typeinfo,
             const GenericRequestHandler& handler) {
      std::unique_lock<std::mutex> lock(requestMutex);
      auto added =
          requestMap
              .emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
              .second;
      if (!added) {
        errorfLocked("Request handler for '%s' already registered",
                     typeinfo->name().c_str());
      }
    }

    std::pair<const dap::TypeInfo*, GenericResponseHandler> response(int seq) {
      std::unique_lock<std::mutex> lock(responseMutex);
      auto responseIt = responseMap.find(seq);
      if (responseIt == responseMap.end()) {
        errorfLocked("Unknown response with sequence %d", seq);
        return {};
      }
      auto out = std::move(responseIt->second);
      responseMap.erase(seq);
      return out;
    }

    void put(int seq,
             const dap::TypeInfo* typeinfo,
             const GenericResponseHandler& handler) {
      std::unique_lock<std::mutex> lock(responseMutex);
      auto added =
          responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second;
      if (!added) {
        errorfLocked("Response handler for sequence %d already registered",
                     seq);
      }
    }

    std::pair<const dap::TypeInfo*, GenericEventHandler> event(
        const std::string& name) {
      std::unique_lock<std::mutex> lock(eventMutex);
      auto it = eventMap.find(name);
      return (it != eventMap.end()) ? it->second : decltype(it->second){};
    }

    void put(const dap::TypeInfo* typeinfo,
             const GenericEventHandler& handler) {
      std::unique_lock<std::mutex> lock(eventMutex);
      auto added =
          eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
              .second;
      if (!added) {
        errorfLocked("Event handler for '%s' already registered",
                     typeinfo->name().c_str());
      }
    }

    GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) {
      std::unique_lock<std::mutex> lock(responseSentMutex);
      auto it = responseSentMap.find(typeinfo);
      return (it != responseSentMap.end()) ? it->second
                                           : decltype(it->second){};
    }

    void put(const dap::TypeInfo* typeinfo,
             const GenericResponseSentHandler& handler) {
      std::unique_lock<std::mutex> lock(responseSentMutex);
      auto added = responseSentMap.emplace(typeinfo, handler).second;
      if (!added) {
        errorfLocked("Response sent handler for '%s' already registered",
                     typeinfo->name().c_str());
      }
    }

   private:
    void errorfLocked(const char* format, ...) {
      va_list vararg;
      va_start(vararg, format);
      errorLocked(format, vararg);
      va_end(vararg);
    }

    void errorLocked(const char* format, va_list args) {
      char buf[2048];
      vsnprintf(buf, sizeof(buf), format, args);
      if (errorHandler) {
        errorHandler(buf);
      }
    }

    std::mutex errorMutex;
    ErrorHandler errorHandler;

    std::mutex requestMutex;
    std::unordered_map<std::string,
                       std::pair<const dap::TypeInfo*, GenericRequestHandler>>
        requestMap;

    std::mutex responseMutex;
    std::unordered_map<int,
                       std::pair<const dap::TypeInfo*, GenericResponseHandler>>
        responseMap;

    std::mutex eventMutex;
    std::unordered_map<std::string,
                       std::pair<const dap::TypeInfo*, GenericEventHandler>>
        eventMap;

    std::mutex responseSentMutex;
    std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler>
        responseSentMap;
  };  // EventHandlers

  Payload processMessage(const std::string& str) {
    auto d = dap::json::Deserializer(str);
    dap::string type;
    if (!d.field("type", &type)) {
      handlers.error("Message missing string 'type' field");
      return {};
    }

    dap::integer sequence = 0;
    if (!d.field("seq", &sequence)) {
      handlers.error("Message missing number 'seq' field");
      return {};
    }

    if (type == "request") {
      return processRequest(&d, sequence);
    } else if (type == "event") {
      return processEvent(&d);
    } else if (type == "response") {
      processResponse(&d);
      return {};
    } else {
      handlers.error("Unknown message type '%s'", type.c_str());
    }

    return {};
  }

  Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) {
    dap::string command;
    if (!d->field("command", &command)) {
      handlers.error("Request missing string 'command' field");
      return {};
    }

    const dap::TypeInfo* typeinfo;
    GenericRequestHandler handler;
    std::tie(typeinfo, handler) = handlers.request(command);
    if (!typeinfo) {
      handlers.error("No request handler registered for command '%s'",
                     command.c_str());
      return {};
    }

    auto data = new uint8_t[typeinfo->size()];
    typeinfo->construct(data);

    if (!d->field("arguments", [&](dap::Deserializer* d) {
          return typeinfo->deserialize(d, data);
        })) {
      handlers.error("Failed to deserialize request");
      typeinfo->destruct(data);
      delete[] data;
      return {};
    }

    return [=] {
      handler(data,
              [&](const dap::TypeInfo* typeinfo, const void* data) {
                // onSuccess
                dap::json::Serializer s;
                s.field("seq", dap::integer(nextSeq++));
                s.field("type", "response");
                s.field("request_seq", sequence);
                s.field("success", dap::boolean(true));
                s.field("command", command);
                s.field("body", [&](dap::Serializer* s) {
                  return typeinfo->serialize(s, data);
                });
                send(s.dump());

                if (auto handler = handlers.responseSent(typeinfo)) {
                  handler(data, nullptr);
                }
              },
              [&](const dap::TypeInfo* typeinfo, const dap::Error& error) {
                // onError
                dap::json::Serializer s;
                s.field("seq", dap::integer(nextSeq++));
                s.field("type", "response");
                s.field("request_seq", sequence);
                s.field("success", dap::boolean(false));
                s.field("command", command);
                s.field("message", error.message);
                send(s.dump());

                if (auto handler = handlers.responseSent(typeinfo)) {
                  handler(nullptr, &error);
                }
              });
      typeinfo->destruct(data);
      delete[] data;
    };
  }

  Payload processEvent(dap::json::Deserializer* d) {
    dap::string event;
    if (!d->field("event", &event)) {
      handlers.error("Event missing string 'event' field");
      return {};
    }

    const dap::TypeInfo* typeinfo;
    GenericEventHandler handler;
    std::tie(typeinfo, handler) = handlers.event(event);
    if (!typeinfo) {
      handlers.error("No event handler registered for event '%s'",
                     event.c_str());
      return {};
    }

    auto data = new uint8_t[typeinfo->size()];
    typeinfo->construct(data);

    if (!d->field("body", [&](dap::Deserializer* d) {
          return typeinfo->deserialize(d, data);
        })) {
      handlers.error("Failed to deserialize event '%s' body", event.c_str());
      typeinfo->destruct(data);
      delete[] data;
      return {};
    }

    return [=] {
      handler(data);
      typeinfo->destruct(data);
      delete[] data;
    };
  }

  void processResponse(const dap::Deserializer* d) {
    dap::integer requestSeq = 0;
    if (!d->field("request_seq", &requestSeq)) {
      handlers.error("Response missing int 'request_seq' field");
      return;
    }

    const dap::TypeInfo* typeinfo;
    GenericResponseHandler handler;
    std::tie(typeinfo, handler) = handlers.response(requestSeq);
    if (!typeinfo) {
      handlers.error("Unknown response with sequence %d", requestSeq);
      return;
    }

    dap::boolean success = false;
    if (!d->field("success", &success)) {
      handlers.error("Response missing int 'success' field");
      return;
    }

    if (success) {
      auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]);
      typeinfo->construct(data.get());

      if (!d->field("body", [&](const dap::Deserializer* d) {
            return typeinfo->deserialize(d, data.get());
          })) {
        handlers.error("Failed to deserialize request");
        return;
      }

      handler(data.get(), nullptr);
      typeinfo->destruct(data.get());
    } else {
      std::string message;
      if (!d->field("message", &message)) {
        handlers.error("Failed to deserialize message");
        return;
      }
      auto error = dap::Error("%s", message.c_str());
      handler(nullptr, &error);
    }
  }

  bool send(const std::string& s) {
    std::unique_lock<std::mutex> lock(sendMutex);
    if (!writer.isOpen()) {
      handlers.error("Send failed as the writer is closed");
      return false;
    }
    return writer.write(s);
  }

  std::atomic<bool> isBound = {false};
  dap::ContentReader reader;
  dap::ContentWriter writer;

  std::atomic<bool> shutdown = {false};
  EventHandlers handlers;
  std::thread recvThread;
  std::thread dispatchThread;
  dap::Chan<Payload> inbox;
  std::atomic<uint32_t> nextSeq = {1};
  std::mutex sendMutex;
};

}  // anonymous namespace

namespace dap {

Error::Error(const std::string& message) : message(message) {}

Error::Error(const char* msg, ...) {
  char buf[2048];
  va_list vararg;
  va_start(vararg, msg);
  vsnprintf(buf, sizeof(buf), msg, vararg);
  va_end(vararg);
  message = buf;
}

std::unique_ptr<Session> Session::create() {
  return std::unique_ptr<Session>(new Impl());
}

}  // namespace dap
