| // Example of parsing JSON to document by parts. |
| |
| // Using C++11 threads |
| // Temporarily disable for clang (older version) due to incompatibility with libstdc++ |
| #if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__) |
| |
| #include "rapidjson/document.h" |
| #include "rapidjson/error/en.h" |
| #include "rapidjson/writer.h" |
| #include "rapidjson/ostreamwrapper.h" |
| #include <condition_variable> |
| #include <iostream> |
| #include <mutex> |
| #include <thread> |
| |
| using namespace rapidjson; |
| |
| template<unsigned parseFlags = kParseDefaultFlags> |
| class AsyncDocumentParser { |
| public: |
| AsyncDocumentParser(Document& d) |
| : stream_(*this) |
| , d_(d) |
| , parseThread_() |
| , mutex_() |
| , notEmpty_() |
| , finish_() |
| , completed_() |
| { |
| // Create and execute thread after all member variables are initialized. |
| parseThread_ = std::thread(&AsyncDocumentParser::Parse, this); |
| } |
| |
| ~AsyncDocumentParser() { |
| if (!parseThread_.joinable()) |
| return; |
| |
| { |
| std::unique_lock<std::mutex> lock(mutex_); |
| |
| // Wait until the buffer is read up (or parsing is completed) |
| while (!stream_.Empty() && !completed_) |
| finish_.wait(lock); |
| |
| // Automatically append '\0' as the terminator in the stream. |
| static const char terminator[] = ""; |
| stream_.src_ = terminator; |
| stream_.end_ = terminator + 1; |
| notEmpty_.notify_one(); // unblock the AsyncStringStream |
| } |
| |
| parseThread_.join(); |
| } |
| |
| void ParsePart(const char* buffer, size_t length) { |
| std::unique_lock<std::mutex> lock(mutex_); |
| |
| // Wait until the buffer is read up (or parsing is completed) |
| while (!stream_.Empty() && !completed_) |
| finish_.wait(lock); |
| |
| // Stop further parsing if the parsing process is completed. |
| if (completed_) |
| return; |
| |
| // Set the buffer to stream and unblock the AsyncStringStream |
| stream_.src_ = buffer; |
| stream_.end_ = buffer + length; |
| notEmpty_.notify_one(); |
| } |
| |
| private: |
| void Parse() { |
| d_.ParseStream<parseFlags>(stream_); |
| |
| // The stream may not be fully read, notify finish anyway to unblock ParsePart() |
| std::unique_lock<std::mutex> lock(mutex_); |
| completed_ = true; // Parsing process is completed |
| finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting. |
| } |
| |
| struct AsyncStringStream { |
| typedef char Ch; |
| |
| AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {} |
| |
| char Peek() const { |
| std::unique_lock<std::mutex> lock(parser_.mutex_); |
| |
| // If nothing in stream, block to wait. |
| while (Empty()) |
| parser_.notEmpty_.wait(lock); |
| |
| return *src_; |
| } |
| |
| char Take() { |
| std::unique_lock<std::mutex> lock(parser_.mutex_); |
| |
| // If nothing in stream, block to wait. |
| while (Empty()) |
| parser_.notEmpty_.wait(lock); |
| |
| count_++; |
| char c = *src_++; |
| |
| // If all stream is read up, notify that the stream is finish. |
| if (Empty()) |
| parser_.finish_.notify_one(); |
| |
| return c; |
| } |
| |
| size_t Tell() const { return count_; } |
| |
| // Not implemented |
| char* PutBegin() { return 0; } |
| void Put(char) {} |
| void Flush() {} |
| size_t PutEnd(char*) { return 0; } |
| |
| bool Empty() const { return src_ == end_; } |
| |
| AsyncDocumentParser& parser_; |
| const char* src_; //!< Current read position. |
| const char* end_; //!< End of buffer |
| size_t count_; //!< Number of characters taken so far. |
| }; |
| |
| AsyncStringStream stream_; |
| Document& d_; |
| std::thread parseThread_; |
| std::mutex mutex_; |
| std::condition_variable notEmpty_; |
| std::condition_variable finish_; |
| bool completed_; |
| }; |
| |
| int main() { |
| Document d; |
| |
| { |
| AsyncDocumentParser<> parser(d); |
| |
| const char json1[] = " { \"hello\" : \"world\", \"t\" : tr"; |
| //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error |
| const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14"; |
| const char json3[] = "16, \"a\":[1, 2, 3, 4] } "; |
| |
| parser.ParsePart(json1, sizeof(json1) - 1); |
| parser.ParsePart(json2, sizeof(json2) - 1); |
| parser.ParsePart(json3, sizeof(json3) - 1); |
| } |
| |
| if (d.HasParseError()) { |
| std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl; |
| return EXIT_FAILURE; |
| } |
| |
| // Stringify the JSON to cout |
| OStreamWrapper os(std::cout); |
| Writer<OStreamWrapper> writer(os); |
| d.Accept(writer); |
| std::cout << std::endl; |
| |
| return EXIT_SUCCESS; |
| } |
| |
| #else // Not supporting C++11 |
| |
| #include <iostream> |
| int main() { |
| std::cout << "This example requires C++11 compiler" << std::endl; |
| } |
| |
| #endif |