|  | // Copyright 2019 Google LLC | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     https://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | #include "content_stream.h" | 
|  |  | 
|  | #include "dap/any.h" | 
|  | #include "dap/session.h" | 
|  |  | 
|  | #include "chan.h" | 
|  | #include "json_serializer.h" | 
|  | #include "socket.h" | 
|  |  | 
|  | #include <stdarg.h> | 
|  | #include <stdio.h> | 
|  | #include <atomic> | 
|  | #include <deque> | 
|  | #include <memory> | 
|  | #include <mutex> | 
|  | #include <thread> | 
|  | #include <unordered_map> | 
|  | #include <vector> | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | class Impl : public dap::Session { | 
|  | public: | 
|  | void onError(const ErrorHandler& handler) override { handlers.put(handler); } | 
|  |  | 
|  | void registerHandler(const dap::TypeInfo* typeinfo, | 
|  | const GenericRequestHandler& handler) override { | 
|  | handlers.put(typeinfo, handler); | 
|  | } | 
|  |  | 
|  | void registerHandler(const dap::TypeInfo* typeinfo, | 
|  | const GenericEventHandler& handler) override { | 
|  | handlers.put(typeinfo, handler); | 
|  | } | 
|  |  | 
|  | void registerHandler(const dap::TypeInfo* typeinfo, | 
|  | const GenericResponseSentHandler& handler) override { | 
|  | handlers.put(typeinfo, handler); | 
|  | } | 
|  |  | 
|  | std::function<void()> getPayload() override { | 
|  | auto request = reader.read(); | 
|  | if (request.size() > 0) { | 
|  | if (auto payload = processMessage(request)) { | 
|  | return payload; | 
|  | } | 
|  | } | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | void connect(const std::shared_ptr<dap::Reader>& r, | 
|  | const std::shared_ptr<dap::Writer>& w) override { | 
|  | if (isBound.exchange(true)) { | 
|  | handlers.error("Session is already bound!"); | 
|  | return; | 
|  | } | 
|  |  | 
|  | reader = dap::ContentReader(r); | 
|  | writer = dap::ContentWriter(w); | 
|  | } | 
|  |  | 
|  | void startProcessingMessages() override { | 
|  | recvThread = std::thread([this] { | 
|  | while (reader.isOpen()) { | 
|  | if (auto payload = getPayload()) { | 
|  | inbox.put(std::move(payload)); | 
|  | } | 
|  | } | 
|  | }); | 
|  |  | 
|  | dispatchThread = std::thread([this] { | 
|  | while (auto payload = inbox.take()) { | 
|  | payload.value()(); | 
|  | } | 
|  | }); | 
|  | } | 
|  |  | 
|  | bool send(const dap::TypeInfo* requestTypeInfo, | 
|  | const dap::TypeInfo* responseTypeInfo, | 
|  | const void* request, | 
|  | const GenericResponseHandler& responseHandler) override { | 
|  | int seq = nextSeq++; | 
|  |  | 
|  | handlers.put(seq, responseTypeInfo, responseHandler); | 
|  |  | 
|  | dap::json::Serializer s; | 
|  | if (!s.object([&](dap::FieldSerializer* fs) { | 
|  | return fs->field("seq", dap::integer(seq)) && | 
|  | fs->field("type", "request") && | 
|  | fs->field("command", requestTypeInfo->name()) && | 
|  | fs->field("arguments", [&](dap::Serializer* s) { | 
|  | return requestTypeInfo->serialize(s, request); | 
|  | }); | 
|  | })) { | 
|  | return false; | 
|  | } | 
|  | return send(s.dump()); | 
|  | } | 
|  |  | 
|  | bool send(const dap::TypeInfo* typeinfo, const void* event) override { | 
|  | dap::json::Serializer s; | 
|  | if (!s.object([&](dap::FieldSerializer* fs) { | 
|  | return fs->field("seq", dap::integer(nextSeq++)) && | 
|  | fs->field("type", "event") && | 
|  | fs->field("event", typeinfo->name()) && | 
|  | fs->field("body", [&](dap::Serializer* s) { | 
|  | return typeinfo->serialize(s, event); | 
|  | }); | 
|  | })) { | 
|  | return false; | 
|  | } | 
|  | return send(s.dump()); | 
|  | } | 
|  |  | 
|  | ~Impl() { | 
|  | inbox.close(); | 
|  | reader.close(); | 
|  | writer.close(); | 
|  | if (recvThread.joinable()) { | 
|  | recvThread.join(); | 
|  | } | 
|  | if (dispatchThread.joinable()) { | 
|  | dispatchThread.join(); | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | using Payload = std::function<void()>; | 
|  |  | 
|  | class EventHandlers { | 
|  | public: | 
|  | void put(const ErrorHandler& handler) { | 
|  | std::unique_lock<std::mutex> lock(errorMutex); | 
|  | errorHandler = handler; | 
|  | } | 
|  |  | 
|  | void error(const char* format, ...) { | 
|  | va_list vararg; | 
|  | va_start(vararg, format); | 
|  | std::unique_lock<std::mutex> lock(errorMutex); | 
|  | errorLocked(format, vararg); | 
|  | va_end(vararg); | 
|  | } | 
|  |  | 
|  | std::pair<const dap::TypeInfo*, GenericRequestHandler> request( | 
|  | const std::string& name) { | 
|  | std::unique_lock<std::mutex> lock(requestMutex); | 
|  | auto it = requestMap.find(name); | 
|  | return (it != requestMap.end()) ? it->second : decltype(it->second){}; | 
|  | } | 
|  |  | 
|  | void put(const dap::TypeInfo* typeinfo, | 
|  | const GenericRequestHandler& handler) { | 
|  | std::unique_lock<std::mutex> lock(requestMutex); | 
|  | auto added = | 
|  | requestMap | 
|  | .emplace(typeinfo->name(), std::make_pair(typeinfo, handler)) | 
|  | .second; | 
|  | if (!added) { | 
|  | errorfLocked("Request handler for '%s' already registered", | 
|  | typeinfo->name().c_str()); | 
|  | } | 
|  | } | 
|  |  | 
|  | std::pair<const dap::TypeInfo*, GenericResponseHandler> response( | 
|  | int64_t seq) { | 
|  | std::unique_lock<std::mutex> lock(responseMutex); | 
|  | auto responseIt = responseMap.find(seq); | 
|  | if (responseIt == responseMap.end()) { | 
|  | errorfLocked("Unknown response with sequence %d", seq); | 
|  | return {}; | 
|  | } | 
|  | auto out = std::move(responseIt->second); | 
|  | responseMap.erase(seq); | 
|  | return out; | 
|  | } | 
|  |  | 
|  | void put(int seq, | 
|  | const dap::TypeInfo* typeinfo, | 
|  | const GenericResponseHandler& handler) { | 
|  | std::unique_lock<std::mutex> lock(responseMutex); | 
|  | auto added = | 
|  | responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second; | 
|  | if (!added) { | 
|  | errorfLocked("Response handler for sequence %d already registered", | 
|  | seq); | 
|  | } | 
|  | } | 
|  |  | 
|  | std::pair<const dap::TypeInfo*, GenericEventHandler> event( | 
|  | const std::string& name) { | 
|  | std::unique_lock<std::mutex> lock(eventMutex); | 
|  | auto it = eventMap.find(name); | 
|  | return (it != eventMap.end()) ? it->second : decltype(it->second){}; | 
|  | } | 
|  |  | 
|  | void put(const dap::TypeInfo* typeinfo, | 
|  | const GenericEventHandler& handler) { | 
|  | std::unique_lock<std::mutex> lock(eventMutex); | 
|  | auto added = | 
|  | eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler)) | 
|  | .second; | 
|  | if (!added) { | 
|  | errorfLocked("Event handler for '%s' already registered", | 
|  | typeinfo->name().c_str()); | 
|  | } | 
|  | } | 
|  |  | 
|  | GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) { | 
|  | std::unique_lock<std::mutex> lock(responseSentMutex); | 
|  | auto it = responseSentMap.find(typeinfo); | 
|  | return (it != responseSentMap.end()) ? it->second | 
|  | : decltype(it->second){}; | 
|  | } | 
|  |  | 
|  | void put(const dap::TypeInfo* typeinfo, | 
|  | const GenericResponseSentHandler& handler) { | 
|  | std::unique_lock<std::mutex> lock(responseSentMutex); | 
|  | auto added = responseSentMap.emplace(typeinfo, handler).second; | 
|  | if (!added) { | 
|  | errorfLocked("Response sent handler for '%s' already registered", | 
|  | typeinfo->name().c_str()); | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | void errorfLocked(const char* format, ...) { | 
|  | va_list vararg; | 
|  | va_start(vararg, format); | 
|  | errorLocked(format, vararg); | 
|  | va_end(vararg); | 
|  | } | 
|  |  | 
|  | void errorLocked(const char* format, va_list args) { | 
|  | char buf[2048]; | 
|  | vsnprintf(buf, sizeof(buf), format, args); | 
|  | if (errorHandler) { | 
|  | errorHandler(buf); | 
|  | } | 
|  | } | 
|  |  | 
|  | std::mutex errorMutex; | 
|  | ErrorHandler errorHandler; | 
|  |  | 
|  | std::mutex requestMutex; | 
|  | std::unordered_map<std::string, | 
|  | std::pair<const dap::TypeInfo*, GenericRequestHandler>> | 
|  | requestMap; | 
|  |  | 
|  | std::mutex responseMutex; | 
|  | std::unordered_map<int64_t, | 
|  | std::pair<const dap::TypeInfo*, GenericResponseHandler>> | 
|  | responseMap; | 
|  |  | 
|  | std::mutex eventMutex; | 
|  | std::unordered_map<std::string, | 
|  | std::pair<const dap::TypeInfo*, GenericEventHandler>> | 
|  | eventMap; | 
|  |  | 
|  | std::mutex responseSentMutex; | 
|  | std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler> | 
|  | responseSentMap; | 
|  | };  // EventHandlers | 
|  |  | 
|  | Payload processMessage(const std::string& str) { | 
|  | auto d = dap::json::Deserializer(str); | 
|  | dap::string type; | 
|  | if (!d.field("type", &type)) { | 
|  | handlers.error("Message missing string 'type' field"); | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | dap::integer sequence = 0; | 
|  | if (!d.field("seq", &sequence)) { | 
|  | handlers.error("Message missing number 'seq' field"); | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | if (type == "request") { | 
|  | return processRequest(&d, sequence); | 
|  | } else if (type == "event") { | 
|  | return processEvent(&d); | 
|  | } else if (type == "response") { | 
|  | processResponse(&d); | 
|  | return {}; | 
|  | } else { | 
|  | handlers.error("Unknown message type '%s'", type.c_str()); | 
|  | } | 
|  |  | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) { | 
|  | dap::string command; | 
|  | if (!d->field("command", &command)) { | 
|  | handlers.error("Request missing string 'command' field"); | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | const dap::TypeInfo* typeinfo; | 
|  | GenericRequestHandler handler; | 
|  | std::tie(typeinfo, handler) = handlers.request(command); | 
|  | if (!typeinfo) { | 
|  | handlers.error("No request handler registered for command '%s'", | 
|  | command.c_str()); | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | auto data = new uint8_t[typeinfo->size()]; | 
|  | typeinfo->construct(data); | 
|  |  | 
|  | if (!d->field("arguments", [&](dap::Deserializer* d) { | 
|  | return typeinfo->deserialize(d, data); | 
|  | })) { | 
|  | handlers.error("Failed to deserialize request"); | 
|  | typeinfo->destruct(data); | 
|  | delete[] data; | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | return [=] { | 
|  | handler( | 
|  | data, | 
|  | [=](const dap::TypeInfo* typeinfo, const void* data) { | 
|  | // onSuccess | 
|  | dap::json::Serializer s; | 
|  | s.object([&](dap::FieldSerializer* fs) { | 
|  | return fs->field("seq", dap::integer(nextSeq++)) && | 
|  | fs->field("type", "response") && | 
|  | fs->field("request_seq", sequence) && | 
|  | fs->field("success", dap::boolean(true)) && | 
|  | fs->field("command", command) && | 
|  | fs->field("body", [&](dap::Serializer* s) { | 
|  | return typeinfo->serialize(s, data); | 
|  | }); | 
|  | }); | 
|  | send(s.dump()); | 
|  |  | 
|  | if (auto handler = handlers.responseSent(typeinfo)) { | 
|  | handler(data, nullptr); | 
|  | } | 
|  | }, | 
|  | [=](const dap::TypeInfo* typeinfo, const dap::Error& error) { | 
|  | // onError | 
|  | dap::json::Serializer s; | 
|  | s.object([&](dap::FieldSerializer* fs) { | 
|  | return fs->field("seq", dap::integer(nextSeq++)) && | 
|  | fs->field("type", "response") && | 
|  | fs->field("request_seq", sequence) && | 
|  | fs->field("success", dap::boolean(false)) && | 
|  | fs->field("command", command) && | 
|  | fs->field("message", error.message); | 
|  | }); | 
|  | send(s.dump()); | 
|  |  | 
|  | if (auto handler = handlers.responseSent(typeinfo)) { | 
|  | handler(nullptr, &error); | 
|  | } | 
|  | }); | 
|  | typeinfo->destruct(data); | 
|  | delete[] data; | 
|  | }; | 
|  | } | 
|  |  | 
|  | Payload processEvent(dap::json::Deserializer* d) { | 
|  | dap::string event; | 
|  | if (!d->field("event", &event)) { | 
|  | handlers.error("Event missing string 'event' field"); | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | const dap::TypeInfo* typeinfo; | 
|  | GenericEventHandler handler; | 
|  | std::tie(typeinfo, handler) = handlers.event(event); | 
|  | if (!typeinfo) { | 
|  | handlers.error("No event handler registered for event '%s'", | 
|  | event.c_str()); | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | auto data = new uint8_t[typeinfo->size()]; | 
|  | typeinfo->construct(data); | 
|  |  | 
|  | if (!d->field("body", [&](dap::Deserializer* d) { | 
|  | return typeinfo->deserialize(d, data); | 
|  | })) { | 
|  | handlers.error("Failed to deserialize event '%s' body", event.c_str()); | 
|  | typeinfo->destruct(data); | 
|  | delete[] data; | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | return [=] { | 
|  | handler(data); | 
|  | typeinfo->destruct(data); | 
|  | delete[] data; | 
|  | }; | 
|  | } | 
|  |  | 
|  | void processResponse(const dap::Deserializer* d) { | 
|  | dap::integer requestSeq = 0; | 
|  | if (!d->field("request_seq", &requestSeq)) { | 
|  | handlers.error("Response missing int 'request_seq' field"); | 
|  | return; | 
|  | } | 
|  |  | 
|  | const dap::TypeInfo* typeinfo; | 
|  | GenericResponseHandler handler; | 
|  | std::tie(typeinfo, handler) = handlers.response(requestSeq); | 
|  | if (!typeinfo) { | 
|  | handlers.error("Unknown response with sequence %d", requestSeq); | 
|  | return; | 
|  | } | 
|  |  | 
|  | dap::boolean success = false; | 
|  | if (!d->field("success", &success)) { | 
|  | handlers.error("Response missing int 'success' field"); | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (success) { | 
|  | auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]); | 
|  | typeinfo->construct(data.get()); | 
|  |  | 
|  | // "body" field in Response is an optional field. | 
|  | d->field("body", [&](const dap::Deserializer* d) { | 
|  | return typeinfo->deserialize(d, data.get()); | 
|  | }); | 
|  |  | 
|  | handler(data.get(), nullptr); | 
|  | typeinfo->destruct(data.get()); | 
|  | } else { | 
|  | std::string message; | 
|  | if (!d->field("message", &message)) { | 
|  | handlers.error("Failed to deserialize message"); | 
|  | return; | 
|  | } | 
|  | auto error = dap::Error("%s", message.c_str()); | 
|  | handler(nullptr, &error); | 
|  | } | 
|  | } | 
|  |  | 
|  | bool send(const std::string& s) { | 
|  | std::unique_lock<std::mutex> lock(sendMutex); | 
|  | if (!writer.isOpen()) { | 
|  | handlers.error("Send failed as the writer is closed"); | 
|  | return false; | 
|  | } | 
|  | return writer.write(s); | 
|  | } | 
|  |  | 
|  | std::atomic<bool> isBound = {false}; | 
|  | dap::ContentReader reader; | 
|  | dap::ContentWriter writer; | 
|  |  | 
|  | std::atomic<bool> shutdown = {false}; | 
|  | EventHandlers handlers; | 
|  | std::thread recvThread; | 
|  | std::thread dispatchThread; | 
|  | dap::Chan<Payload> inbox; | 
|  | std::atomic<uint32_t> nextSeq = {1}; | 
|  | std::mutex sendMutex; | 
|  | }; | 
|  |  | 
|  | }  // anonymous namespace | 
|  |  | 
|  | namespace dap { | 
|  |  | 
|  | Error::Error(const std::string& message) : message(message) {} | 
|  |  | 
|  | Error::Error(const char* msg, ...) { | 
|  | char buf[2048]; | 
|  | va_list vararg; | 
|  | va_start(vararg, msg); | 
|  | vsnprintf(buf, sizeof(buf), msg, vararg); | 
|  | va_end(vararg); | 
|  | message = buf; | 
|  | } | 
|  |  | 
|  | Session::~Session() = default; | 
|  |  | 
|  | std::unique_ptr<Session> Session::create() { | 
|  | return std::unique_ptr<Session>(new Impl()); | 
|  | } | 
|  |  | 
|  | }  // namespace dap |