Merge pull request #627 from openweave/bug/TunnelCASEFix

Fix for resumption message id valid tag.
diff --git a/src/device-manager/WeaveDataManagementClient.cpp b/src/device-manager/WeaveDataManagementClient.cpp
index 11343bb..6fb3d92 100644
--- a/src/device-manager/WeaveDataManagementClient.cpp
+++ b/src/device-manager/WeaveDataManagementClient.cpp
@@ -37,6 +37,7 @@
 #include <errno.h>
 #include <time.h>
 #include <string>
+#include <chrono>
 
 #include <Weave/Core/WeaveCore.h>
 
@@ -65,6 +66,8 @@
 using namespace ::nl::Weave;
 using namespace ::nl::Weave::TLV;
 using namespace Schema::Weave::Common;
+using namespace nl::Weave::Profiles::DataManagement_Current::Event;
+using std::to_string;
 
 class GenericTraitUpdatableDataSink;
 class WdmClient;
@@ -125,7 +128,7 @@
 WEAVE_ERROR GenericTraitUpdatableDataSink::RefreshData(void * appReqState, DMCompleteFunct onComplete, DMErrorFunct onError)
 {
     ClearVersion();
-    mpWdmClient->RefreshData(appReqState, this, onComplete, onError, LocateTraitHandle);
+    mpWdmClient->RefreshData(appReqState, this, onComplete, onError, LocateTraitHandle, false);
     return WEAVE_NO_ERROR;
 }
 
@@ -878,9 +881,11 @@
 
 const nl::Weave::ExchangeContext::Timeout kResponseTimeoutMsec = 15000;
 
+// mpWdmEventProcessor is unique_ptr, so we should use nullptr instead of NULL
 WdmClient::WdmClient() :
     State(kState_NotInitialized), mpAppState(NULL), mOnError(NULL), mGetDataHandle(NULL), mpPublisherPathList(NULL),
-    mpSubscriptionClient(NULL), mpMsgLayer(NULL), mpContext(NULL), mpAppReqState(NULL), mOpState(kOpState_Idle)
+    mpSubscriptionClient(NULL), mpMsgLayer(NULL), mpContext(NULL), mpAppReqState(NULL), mOpState(kOpState_Idle),
+    mpWdmEventProcessor(nullptr), mEventStrBuffer(""), mEnableEventFetching(false)
 { }
 
 void WdmClient::Close(void)
@@ -907,6 +912,10 @@
     mpAppReqState = NULL;
     mOnError      = NULL;
 
+    mEventStrBuffer.clear();
+    mpWdmEventProcessor.release();
+    mEventFetchingTLE = false;
+
     State = kState_NotInitialized;
     ClearOpState();
 }
@@ -922,6 +931,173 @@
     }
 }
 
+namespace {
+void WriteEscapedString(const char * apStr, size_t aLen, std::string & aBuf)
+{
+    // According to UTF8 encoding, all bytes from a multiple byte UTF8 sequence
+    // will have 1 as most siginificant bit. So this function will output the
+    // multi-byte characters without escape.
+    constexpr char * hex = "0123456789abcdef";
+    for (size_t i = 0; i < aLen && apStr[i]; i++)
+    {
+        switch (apStr[i])
+        {
+        case 0 ... 31:
+        case '"':
+        case '\\':
+        case '/':
+            // escape it using JSON style
+            aBuf += "\\u00";
+            aBuf += hex[(apStr[i] >> 4) & 0xf];
+            aBuf += hex[(apStr[i] & 0xf)];
+            break;
+        default:
+            aBuf += apStr[i];
+            break;
+        }
+    }
+}
+
+WEAVE_ERROR FormatEventData(TLVReader aInReader, std::string & aBuf)
+{
+    WEAVE_ERROR err = WEAVE_NO_ERROR;
+
+    switch (aInReader.GetType())
+    {
+    case nl::Weave::TLV::kTLVType_Structure:
+    case nl::Weave::TLV::kTLVType_Array:
+        break;
+    case nl::Weave::TLV::kTLVType_SignedInteger: {
+        int64_t value_s64;
+
+        err = aInReader.Get(value_s64);
+        SuccessOrExit(err);
+
+        aBuf += to_string(value_s64);
+        break;
+    }
+
+    case nl::Weave::TLV::kTLVType_UnsignedInteger: {
+        uint64_t value_u64;
+
+        err = aInReader.Get(value_u64);
+        SuccessOrExit(err);
+
+        aBuf += to_string(value_u64);
+        break;
+    }
+
+    case nl::Weave::TLV::kTLVType_Boolean: {
+        bool value_b;
+
+        err = aInReader.Get(value_b);
+        SuccessOrExit(err);
+
+        aBuf += (value_b ? "true" : "false");
+        break;
+    }
+
+    case nl::Weave::TLV::kTLVType_UTF8String: {
+        char value_s[256];
+
+        err = aInReader.GetString(value_s, sizeof(value_s));
+        VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_ERROR_BUFFER_TOO_SMALL, );
+
+        if (err == WEAVE_ERROR_BUFFER_TOO_SMALL)
+        {
+            aBuf += "\"(utf8 string too long)\"";
+            err = WEAVE_NO_ERROR;
+        }
+        else
+        {
+            aBuf += '"';
+            // String data needs escaped
+            WriteEscapedString(value_s, min(sizeof(value_s), strlen(value_s)), aBuf);
+            aBuf += '"';
+        }
+        break;
+    }
+
+    case nl::Weave::TLV::kTLVType_ByteString: {
+        uint8_t value_b[256];
+        uint32_t len, readerLen;
+
+        readerLen = aInReader.GetLength();
+
+        err = aInReader.GetBytes(value_b, sizeof(value_b));
+        VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_ERROR_BUFFER_TOO_SMALL, );
+
+        if (readerLen < sizeof(value_b))
+        {
+            len = readerLen;
+        }
+        else
+        {
+            len = sizeof(value_b);
+        }
+
+        if (err == WEAVE_ERROR_BUFFER_TOO_SMALL)
+        {
+            aBuf += "\"(byte string too long)\"";
+        }
+        else
+        {
+            aBuf += "[";
+            for (size_t i = 0; i < len; i++)
+            {
+                aBuf += (i > 0 ? "," : "");
+                aBuf += to_string(value_b[i]);
+            }
+            aBuf += "]";
+        }
+        break;
+    }
+
+    case nl::Weave::TLV::kTLVType_Null:
+        aBuf += "null";
+        break;
+
+    default:
+        aBuf += "\"<error data>\"";
+        break;
+    }
+
+    if (aInReader.GetType() == nl::Weave::TLV::kTLVType_Structure || aInReader.GetType() == nl::Weave::TLV::kTLVType_Array)
+    {
+        bool insideStructure = (aInReader.GetType() == nl::Weave::TLV::kTLVType_Structure);
+        aBuf += (insideStructure ? ' {' : '[');
+        const char terminating_char = (insideStructure ? '}' : ']');
+
+        nl::Weave::TLV::TLVType type;
+        bool isFirstChild = true;
+        err               = aInReader.EnterContainer(type);
+        SuccessOrExit(err);
+
+        while ((err = aInReader.Next()) == WEAVE_NO_ERROR)
+        {
+            if (!isFirstChild)
+                aBuf += ",";
+            isFirstChild = false;
+            if (insideStructure)
+            {
+                uint32_t tagNum = nl::Weave::TLV::TagNumFromTag(aInReader.GetTag());
+                aBuf += "\"" + to_string(tagNum) + "\":";
+            }
+            err = FormatEventData(aInReader, aBuf);
+            SuccessOrExit(err);
+        }
+
+        aBuf += terminating_char;
+
+        err = aInReader.ExitContainer(type);
+        SuccessOrExit(err);
+    }
+
+exit:
+    return err;
+}
+} // namespace
+
 void WdmClient::ClearDataSinkVersion(void * aTraitInstance, TraitDataHandle aHandle, void * aContext)
 {
     GenericTraitUpdatableDataSink * pGenericTraitUpdatableDataSink = NULL;
@@ -953,6 +1129,7 @@
         {
             uint16_t pathListLen  = 0;
             uint16_t traitListLen = 0;
+            uint32_t eventListLen = 0;
             TraitDataHandle handle;
             bool needSubscribeSpecificTrait = false;
 
@@ -966,35 +1143,54 @@
             else
             {
                 traitListLen = pWdmClient->mSinkCatalog.Size();
-                VerifyOrExit(traitListLen != 0, err = WEAVE_ERROR_INVALID_LIST_LENGTH);
             }
             WeaveLogDetail(DataManagement, "prepare to subscribe %d trait data sink", traitListLen);
 
-            if (pWdmClient->mpPublisherPathList != NULL)
+            if (!pWdmClient->mEnableEventFetching)
             {
-                delete[] pWdmClient->mpPublisherPathList;
-            }
+                if (pWdmClient->mpPublisherPathList != NULL)
+                {
+                    delete[] pWdmClient->mpPublisherPathList;
+                    pWdmClient->mpPublisherPathList = NULL;
+                }
 
-            pWdmClient->mpPublisherPathList = new TraitPath[traitListLen];
+                if (traitListLen > 0)
+                {
+                    pWdmClient->mpPublisherPathList = new TraitPath[traitListLen];
+                }
 
-            if (needSubscribeSpecificTrait)
-            {
-                pathListLen = 1;
-                err = pWdmClient->mSinkCatalog.PrepareSubscriptionSpecificPathList(pWdmClient->mpPublisherPathList, traitListLen,
-                                                                                   handle);
+                if (needSubscribeSpecificTrait)
+                {
+                    pathListLen = 1;
+                    err = pWdmClient->mSinkCatalog.PrepareSubscriptionSpecificPathList(pWdmClient->mpPublisherPathList, traitListLen,
+                                                                                    handle);
+                }
+                else
+                {
+                    err = pWdmClient->mSinkCatalog.PrepareSubscriptionPathList(pWdmClient->mpPublisherPathList, traitListLen,
+                                                                            pathListLen);
+                }
+                SuccessOrExit(err);
             }
             else
             {
-                err = pWdmClient->mSinkCatalog.PrepareSubscriptionPathList(pWdmClient->mpPublisherPathList, traitListLen,
-                                                                           pathListLen);
-            }
-            SuccessOrExit(err);
+                // When mEnableEventFetching is true, we will not subscribe to any traits
+                pWdmClient->mpPublisherPathList = NULL;
+                pathListLen = 0;
 
-            aOutParam.mSubscribeRequestPrepareNeeded.mPathList                  = pWdmClient->mpPublisherPathList;
-            aOutParam.mSubscribeRequestPrepareNeeded.mPathListSize              = pathListLen;
-            aOutParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents             = false;
-            aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList     = NULL;
-            aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize = 0;
+                err = pWdmClient->PrepareLastObservedEventList(eventListLen);
+                SuccessOrExit(err);
+            }
+
+            // Reset the tle state
+            pWdmClient->mEventFetchingTLE = false;
+
+            aOutParam.mSubscribeRequestPrepareNeeded.mPathList      = pWdmClient->mpPublisherPathList;
+            aOutParam.mSubscribeRequestPrepareNeeded.mPathListSize  = pathListLen;
+            aOutParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents = pWdmClient->mEnableEventFetching;
+            // When aEnableEventFetching is false, the eventListLen will be 0
+            aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList = pWdmClient->mLastObservedEventByImportanceForSending;
+            aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize = eventListLen;
             aOutParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin             = 30;
             aOutParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax             = 120;
         }
@@ -1014,7 +1210,7 @@
         break;
     case SubscriptionClient::kEvent_OnNotificationProcessed:
         WeaveLogDetail(DataManagement, "Client->kEvent_OnNotificationProcessed");
-        VerifyOrExit(kOpState_RefreshData == pWdmClient->mOpState, err = WEAVE_ERROR_INCORRECT_STATE);
+        VerifyOrExit((kOpState_RefreshData == pWdmClient->mOpState) || pWdmClient->mEventFetchingTLE, err = WEAVE_ERROR_INCORRECT_STATE);
         break;
     case SubscriptionClient::kEvent_OnSubscriptionTerminated:
         WeaveLogDetail(DataManagement, "Client->kEvent_OnSubscriptionTerminated. Reason: %u, peer = 0x%" PRIX64 "\n",
@@ -1063,6 +1259,31 @@
         pWdmClient->ClearOpState();
         break;
 
+    case SubscriptionClient::kEvent_OnEventStreamReceived:
+        WeaveLogDetail(DataManagement, "kEvent_OnEventStreamReceived");
+
+        if (pWdmClient->mpWdmEventProcessor != nullptr)
+        {
+            // Remove ']' at end of buffer and append data then push ']' back to the buffer
+            pWdmClient->mEventStrBuffer.pop_back();
+            err = pWdmClient->mpWdmEventProcessor->ProcessEvents(*(aInParam.mEventStreamReceived.mReader),
+                                                                *(pWdmClient->mpSubscriptionClient));
+            pWdmClient->mEventStrBuffer += ']';
+
+            SuccessOrExit(err);
+
+            if (pWdmClient->mLimitEventFetchTimeout && std::chrono::system_clock::now() >= pWdmClient->mEventFetchTimeout)
+            {
+                pWdmClient->mEventFetchingTLE = true;
+                pWdmClient->mpSubscriptionClient->AbortSubscription();
+                VerifyOrExit(kOpState_RefreshData == pWdmClient->mOpState, err = WEAVE_ERROR_INCORRECT_STATE);
+                pWdmClient->mOnComplete.General(pWdmClient->mpContext, pWdmClient->mpAppReqState);
+                pWdmClient->mpContext = NULL;
+                pWdmClient->ClearOpState();
+            }
+        }
+        break;
+
     default:
         SubscriptionClient::DefaultEventHandler(aEvent, aInParam, aOutParam);
         break;
@@ -1128,6 +1349,12 @@
     mFailedFlushPathStatus.clear();
     mFailedPaths.clear();
 
+    mEventStrBuffer.clear();
+    mLimitEventFetchTimeout = false;
+    memset(mLastObservedEventByImportance, 0, sizeof mLastObservedEventByImportance);
+
+    mEventFetchingTLE = false;
+
 exit:
     return WEAVE_NO_ERROR;
 }
@@ -1135,6 +1362,16 @@
 void WdmClient::SetNodeId(uint64_t aNodeId)
 {
     mSinkCatalog.SetNodeId(aNodeId);
+
+    mpWdmEventProcessor.reset(new WdmEventProcessor(aNodeId, this));
+
+    WeaveLogError(DataManagement, "mpWdmEventProcessor set to %p", mpWdmEventProcessor.get());
+}
+
+void WdmClient::SetEventFetchingTimeout(uint32_t aTimeoutSec)
+{
+    mLimitEventFetchTimeout = (aTimeoutSec != 0);
+    mEventFetchTimeLimit    = std::chrono::seconds(aTimeoutSec);
 }
 
 WEAVE_ERROR WdmClient::UpdateFailedPathResults(WdmClient * const apWdmClient, TraitDataHandle mTraitDataHandle,
@@ -1243,29 +1480,30 @@
 }
 
 WEAVE_ERROR WdmClient::RefreshData(void * apAppReqState, DMCompleteFunct onComplete, DMErrorFunct onError,
-                                   GetDataHandleFunct getDataHandleCb)
+                                   GetDataHandleFunct getDataHandleCb, bool aFetchEvents)
 {
     VerifyOrExit(mpSubscriptionClient != NULL, WeaveLogError(DataManagement, "mpSubscriptionClient is NULL"));
 
     mSinkCatalog.Iterate(ClearDataSinkVersion, this);
 
-    RefreshData(apAppReqState, this, onComplete, onError, getDataHandleCb);
+    RefreshData(apAppReqState, this, onComplete, onError, getDataHandleCb, aFetchEvents);
 
 exit:
     return WEAVE_NO_ERROR;
 }
 
 WEAVE_ERROR WdmClient::RefreshData(void * apAppReqState, void * apContext, DMCompleteFunct onComplete, DMErrorFunct onError,
-                                   GetDataHandleFunct getDataHandleCb)
+                                   GetDataHandleFunct getDataHandleCb, bool aFetchEvents)
 {
     VerifyOrExit(mOpState == kOpState_Idle, WeaveLogError(DataManagement, "RefreshData with OpState %d", mOpState));
 
-    mpAppReqState       = apAppReqState;
-    mOnComplete.General = onComplete;
-    mOnError            = onError;
-    mOpState            = kOpState_RefreshData;
-    mGetDataHandle      = getDataHandleCb;
-    mpContext           = apContext;
+    mpAppReqState        = apAppReqState;
+    mOnComplete.General  = onComplete;
+    mOnError             = onError;
+    mOpState             = kOpState_RefreshData;
+    mGetDataHandle       = getDataHandleCb;
+    mpContext            = apContext;
+    mEnableEventFetching       = aFetchEvents;
 
     mpSubscriptionClient->InitiateSubscription();
 
@@ -1290,6 +1528,114 @@
     return mSinkCatalog.Remove(apDataSink);
 }
 
+WEAVE_ERROR WdmClient::GetEvents(BytesData * apBytesData)
+{
+    apBytesData->mpDataBuf = reinterpret_cast<const uint8_t *>(mEventStrBuffer.c_str());
+    apBytesData->mDataLen  = mEventStrBuffer.size();
+    apBytesData->mpMsgBuf  = NULL;
+
+    return WEAVE_NO_ERROR;
+}
+
+WEAVE_ERROR WdmClient::ProcessEvent(nl::Weave::TLV::TLVReader inReader, const EventProcessor::EventHeader & inEventHeader)
+{
+    WEAVE_ERROR err = WEAVE_NO_ERROR;
+
+    if (mEventStrBuffer.size() > 1)
+    {
+        // We will insert a bracket first, so the EventStrBuffer will be 1 for first event
+        mEventStrBuffer += ",";
+    }
+
+    mEventStrBuffer += "{";
+
+    mEventStrBuffer += "\"Source\":";
+    mEventStrBuffer += to_string(inEventHeader.mSource);
+    mEventStrBuffer += ",\"Importance\":";
+    mEventStrBuffer += to_string(inEventHeader.mImportance);
+    mEventStrBuffer += ",\"Id\":";
+    mEventStrBuffer += to_string(inEventHeader.mId);
+
+    mEventStrBuffer += ",\"RelatedImportance\":";
+    mEventStrBuffer += to_string(inEventHeader.mRelatedImportance);
+    mEventStrBuffer += ",\"RelatedId\":";
+    mEventStrBuffer += to_string(inEventHeader.mRelatedId);
+    mEventStrBuffer += ",\"UTCTimestamp\":";
+    mEventStrBuffer += to_string(inEventHeader.mUTCTimestamp);
+    mEventStrBuffer += ",\"SystemTimestamp\":";
+    mEventStrBuffer += to_string(inEventHeader.mSystemTimestamp);
+    mEventStrBuffer += ",\"ResourceId\":";
+    mEventStrBuffer += to_string(inEventHeader.mResourceId);
+    mEventStrBuffer += ",\"TraitProfileId\":";
+    mEventStrBuffer += to_string(inEventHeader.mTraitProfileId);
+    mEventStrBuffer += ",\"TraitInstanceId\":";
+    mEventStrBuffer += to_string(inEventHeader.mTraitInstanceId);
+    mEventStrBuffer += ",\"Type\":";
+    mEventStrBuffer += to_string(inEventHeader.mType);
+
+    mEventStrBuffer += ",\"DeltaUTCTime\":";
+    mEventStrBuffer += to_string(inEventHeader.mDeltaUTCTime);
+    mEventStrBuffer += ",\"DeltaSystemTime\":";
+    mEventStrBuffer += to_string(inEventHeader.mDeltaSystemTime);
+
+    mEventStrBuffer += ",\"PresenceMask\":";
+    mEventStrBuffer += to_string(inEventHeader.mPresenceMask);
+    mEventStrBuffer += ",\"DataSchemaVersionRange\": {\"MinVersion\":";
+    mEventStrBuffer += to_string(inEventHeader.mDataSchemaVersionRange.mMinVersion);
+    mEventStrBuffer += ",\"MaxVersion\":";
+    mEventStrBuffer += to_string(inEventHeader.mDataSchemaVersionRange.mMaxVersion);
+    mEventStrBuffer += "}";
+
+    mEventStrBuffer += ",\"Data\":";
+    err = FormatEventData(inReader, mEventStrBuffer);
+
+    mEventStrBuffer += "}";
+
+    mLastObservedEventByImportance[static_cast<int>(inEventHeader.mImportance) - static_cast<int>(kImportanceType_First)]
+        .mSourceId = inEventHeader.mSource;
+    mLastObservedEventByImportance[static_cast<int>(inEventHeader.mImportance) - static_cast<int>(kImportanceType_First)]
+        .mImportance = inEventHeader.mImportance;
+    mLastObservedEventByImportance[static_cast<int>(inEventHeader.mImportance) - static_cast<int>(kImportanceType_First)].mEventId =
+        inEventHeader.mId;
+
+    return err;
+}
+
+WEAVE_ERROR WdmClient::PrepareLastObservedEventList(uint32_t & aEventListLen)
+{
+    for (int i = 0; i < kImportanceType_Last - kImportanceType_First + 1; i++)
+    {
+        if (mLastObservedEventByImportance[i].mEventId)
+        {
+            mLastObservedEventByImportanceForSending[aEventListLen++] =
+                mLastObservedEventByImportance[i];
+        }
+    }
+
+    mEventFetchTimeout = std::chrono::system_clock::now() + mEventFetchTimeLimit;
+    mEventStrBuffer    = "[]";
+
+    return WEAVE_NO_ERROR;
+}
+
+WdmEventProcessor::WdmEventProcessor(uint64_t aNodeId, WdmClient * aWdmClient) : EventProcessor(aNodeId), mWdmClient(aWdmClient)
+{
+    // nothing to do
+}
+
+WEAVE_ERROR WdmEventProcessor::ProcessEvent(nl::Weave::TLV::TLVReader inReader,
+                                            nl::Weave::Profiles::DataManagement::SubscriptionClient & inClient,
+                                            const EventHeader & inEventHeader)
+{
+    return mWdmClient->ProcessEvent(inReader, inEventHeader);
+}
+
+WEAVE_ERROR WdmEventProcessor::GapDetected(const EventHeader & inEventHeader)
+{
+    // Do nothing
+    return WEAVE_NO_ERROR;
+};
+
 } // namespace DeviceManager
 } // namespace Weave
 } // namespace nl
diff --git a/src/device-manager/WeaveDataManagementClient.h b/src/device-manager/WeaveDataManagementClient.h
index 04b31df..8b96878 100644
--- a/src/device-manager/WeaveDataManagementClient.h
+++ b/src/device-manager/WeaveDataManagementClient.h
@@ -36,10 +36,14 @@
 #include <SystemLayer/SystemPacketBuffer.h>
 #include <Weave/Profiles/data-management/SubscriptionClient.h>
 #include <Weave/Profiles/data-management/Current/GenericTraitCatalogImpl.h>
+#include <Weave/Profiles/data-management/Current/EventProcessor.h>
 #include <map>
+#include <memory>
 #include <vector>
 #include "WeaveDeviceManager.h"
 
+#include <chrono>
+
 namespace nl {
 namespace Weave {
 namespace DeviceManager {
@@ -70,6 +74,7 @@
 
 class GenericTraitUpdatableDataSink;
 class WdmClient;
+class WdmEventProcessor;
 
 class WdmClientFlushUpdateStatus
 {
@@ -167,6 +172,7 @@
 class NL_DLL_EXPORT WdmClient
 {
     friend class GenericTraitUpdatableDataSink;
+    friend class WdmEventProcessor;
 
 public:
     enum
@@ -189,7 +195,10 @@
     WEAVE_ERROR FlushUpdate(void * apAppReqState, DMFlushUpdateCompleteFunct onComplete, DMErrorFunct onError);
 
     WEAVE_ERROR RefreshData(void * apAppReqState, DMCompleteFunct onComplete, DMErrorFunct onError,
-                            GetDataHandleFunct getDataHandleCb);
+                            GetDataHandleFunct getDataHandleCb, bool aFetchEvents = false);
+
+    WEAVE_ERROR GetEvents(BytesData * aBytes);
+    void SetEventFetchingTimeout(uint32_t aTimeoutSec);
 
     void * mpAppState;
 
@@ -216,9 +225,8 @@
     static void ClientEventCallback(void * const aAppState, SubscriptionClient::EventID aEvent,
                                     const SubscriptionClient::InEventParam & aInParam,
                                     SubscriptionClient::OutEventParam & aOutParam);
-
     WEAVE_ERROR RefreshData(void * apAppReqState, void * apContext, DMCompleteFunct onComplete, DMErrorFunct onError,
-                            GetDataHandleFunct getDataHandleCb);
+                            GetDataHandleFunct getDataHandleCb, bool aWithEvents);
     WEAVE_ERROR GetDataSink(const ResourceIdentifier & aResourceId, uint32_t aProfileId, uint64_t aInstanceId,
                             GenericTraitUpdatableDataSink *& apGenericTraitUpdatableDataSink);
     WEAVE_ERROR SubscribePublisherTrait(const ResourceIdentifier & aResourceId, const uint64_t & aInstanceId,
@@ -230,6 +238,10 @@
                                         PropertyPathHandle mPropertyPathHandle, uint32_t aReason, uint32_t aStatusProfileId,
                                         uint16_t aStatusCode);
 
+    WEAVE_ERROR ProcessEvent(nl::Weave::TLV::TLVReader inReader, const EventProcessor::EventHeader & inEventHeader);
+
+    WEAVE_ERROR PrepareLastObservedEventList(uint32_t & aEventListLen);
+
     GenericTraitSinkCatalog mSinkCatalog;
     TraitPath * mpPublisherPathList;
 
@@ -240,7 +252,39 @@
     OpState mOpState;
     std::vector<std::string> mFailedPaths;
     std::vector<WdmClientFlushUpdateStatus> mFailedFlushPathStatus;
+
+    std::unique_ptr<WdmEventProcessor> mpWdmEventProcessor;
+    std::string mEventStrBuffer;
+
+    // Three flags for event fetching
+    // If EventFetching is enabled
+    // If we limit the event fetching time
+    // If event fetching time limit exceeded (TLE)
+    bool mEnableEventFetching, mLimitEventFetchTimeout, mEventFetchingTLE;
+
+    std::chrono::time_point<std::chrono::system_clock> mEventFetchTimeout;
+    std::chrono::seconds mEventFetchTimeLimit;
+    SubscriptionClient::LastObservedEvent mLastObservedEventByImportance[kImportanceType_Last - kImportanceType_First + 1];
+    SubscriptionClient::LastObservedEvent
+        mLastObservedEventByImportanceForSending[kImportanceType_Last - kImportanceType_First + 1];
 };
+
+class WdmEventProcessor : public EventProcessor
+{
+public:
+    WdmEventProcessor(uint64_t aNodeId, WdmClient * aWdmClient);
+    virtual ~WdmEventProcessor() = default;
+
+protected:
+    WEAVE_ERROR ProcessEvent(nl::Weave::TLV::TLVReader inReader, nl::Weave::Profiles::DataManagement::SubscriptionClient & inClient,
+                             const EventHeader & inEventHeader) override;
+
+    WEAVE_ERROR GapDetected(const EventHeader & inEventHeader) override;
+
+private:
+    WdmClient * mWdmClient;
+};
+
 } // namespace DeviceManager
 } // namespace Weave
 } // namespace nl
diff --git a/src/device-manager/cocoa/NLWdmClient.h b/src/device-manager/cocoa/NLWdmClient.h
index b01b748..8d59868 100644
--- a/src/device-manager/cocoa/NLWdmClient.h
+++ b/src/device-manager/cocoa/NLWdmClient.h
@@ -59,6 +59,7 @@
  */
 - (void)setNodeId:(uint64_t)nodeId;
 
+
 /**
  * Create the new data newDataSink
  *
@@ -89,4 +90,38 @@
  */
 - (void)refreshData:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler;
 
+/**
+ * Begins a sync of all events. The result of this operation can be observed through the CompletionHandler and
+ * failureHandler, the procedure will be terminated after reaching timeout.
+ */
+- (void)fetchEvents:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler
+            timeoutSec:(uint32_t)timeoutSec;
+
+/**
+ * getEvents will return a list of event data in json array representation.
+ * If no events have been fetched, an empty array ("[]") will be returned.
+ * The internal buffer will be cleared when beginFetchEvents() is called.
+ * Each element in this array should be an object
+ *
+ * Field                  | Type        | Description
+ * -----------------------+-------------+--------------
+ * Source                 | uint64      | Event Header
+ * Importance             | int (enum)
+ * Id                     | uint64
+ * RelatedImportance      | uint64
+ * RelatedId              | uint64
+ * UTCTimestamp           | uint64
+ * ResourceId             | uint64
+ * TraitProfileId         | uint64
+ * TraitInstanceId        | uint64
+ * Type                   | uint64
+ * DeltaUTCTime           | int
+ * DeltaSystemTime        | int
+ * PresenceMask           | uint64
+ * DataSchemaVersionRange | Object{MinVersion: uint64, MaxVersion: uint64}
+ * Data                   | Object      | Event Trait Data
+ */
+- (void)getEvents:(NSString **)events;
+
+
 @end
diff --git a/src/device-manager/cocoa/NLWdmClient.mm b/src/device-manager/cocoa/NLWdmClient.mm
index 58f9a63..1a1e613 100644
--- a/src/device-manager/cocoa/NLWdmClient.mm
+++ b/src/device-manager/cocoa/NLWdmClient.mm
@@ -547,5 +547,61 @@
     });
 }
 
+- (void)fetchEvents:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler timeoutSec:(uint32_t)timeoutSec
+{
+    WDM_LOG_METHOD_SIG();
+
+    NSString * taskName = @"FetchEvents";
+
+    dispatch_sync(_mWeaveWorkQueue, ^() {
+        _mWeaveCppWdmClient->SetEventFetchingTimeout(timeoutSec);
+    });
+
+    // we use async for the results are sent back to caller via async means also
+    dispatch_async(_mWeaveWorkQueue, ^() {
+        if (nil == _mRequestName) {
+            _mRequestName = taskName;
+            _mCompletionHandler = [completionHandler copy];
+            _mFailureHandler = [failureHandler copy];
+
+            WEAVE_ERROR err = _mWeaveCppWdmClient->RefreshData((__bridge void *) self, onWdmClientComplete, onWdmClientError, NULL, true);
+
+            if (WEAVE_NO_ERROR != err) {
+                [self dispatchAsyncDefaultFailureBlockWithCode:err];
+            }
+        } else {
+            WDM_LOG_ERROR(@"%@: Attemp to %@ while we're still executing %@, ignore", _name, taskName, _mRequestName);
+
+            // do not change _mRequestName, as we're rejecting this request
+            [self dispatchAsyncFailureBlock:WEAVE_ERROR_INCORRECT_STATE taskName:taskName handler:failureHandler];
+        }
+    });
+}
+
+- (WEAVE_ERROR)getEvents:(NSString **)events
+{
+    __block WEAVE_ERROR err = WEAVE_NO_ERROR;
+    __block nl::Weave::DeviceManager::BytesData bytesData;
+
+    WDM_LOG_METHOD_SIG();
+
+    VerifyOrExit(NULL != _mWeaveCppWdmClient, err = WEAVE_ERROR_INCORRECT_STATE);
+
+    // need this bracket to use Verify macros
+    {
+        // we use sync so the bytesData is immediately available to the caller upon return
+        dispatch_sync(_mWeaveWorkQueue, ^() {
+            err = _mWeaveCppWdmClient->GetEvents(&bytesData);
+        });
+    }
+
+exit:
+    if (WEAVE_NO_ERROR == err)
+    {
+        *events = [[NSString alloc] initWithBytes:bytesData.mpDataBuf length:bytesData.mDataLen encoding:NSUTF8StringEncoding];
+    }
+    return err;
+}
+
 @end
 #endif // WEAVE_CONFIG_DATA_MANAGEMENT_CLIENT_EXPERIMENTAL
diff --git a/src/device-manager/java/WeaveDeviceManager-JNI.cpp b/src/device-manager/java/WeaveDeviceManager-JNI.cpp
index 33773f1..3491723 100644
--- a/src/device-manager/java/WeaveDeviceManager-JNI.cpp
+++ b/src/device-manager/java/WeaveDeviceManager-JNI.cpp
@@ -184,6 +184,8 @@
     NL_DLL_EXPORT jlong Java_nl_Weave_DataManagement_WdmClientImpl_newDataSink(JNIEnv *env, jobject self, jlong wdmClientPtr, jobject resourceIdentifierObj, jlong profileId, jlong instanceId, jstring path);
     NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginFlushUpdate(JNIEnv *env, jobject self, jlong wdmClientPtr);
     NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginRefreshData(JNIEnv *env, jobject self, jlong wdmClientPtr);
+    NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginFetchEvents(JNIEnv *env, jobject self, jlong wdmClientPtr, jint timeoutSec);
+    NL_DLL_EXPORT jstring Java_nl_Weave_DataManagement_WdmClientImpl_getEvents(JNIEnv *env, jobject self, jlong wdmClientPtr);
     NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_init(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
     NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_shutdown(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
     NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_clear(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
@@ -3995,6 +3997,47 @@
     }
 }
 
+void Java_nl_Weave_DataManagement_WdmClientImpl_beginFetchEvents(JNIEnv *env, jobject self, jlong wdmClientPtr, jint timeoutSec)
+{
+    WEAVE_ERROR err = WEAVE_NO_ERROR;
+    WdmClient *wdmClient = (WdmClient *)wdmClientPtr;
+
+    WeaveLogProgress(DeviceManager, "beginFetchEvents() called");
+
+    pthread_mutex_lock(&sStackLock);
+    wdmClient->SetEventFetchingTimeout((uint32_t)timeoutSec);
+    err = wdmClient->RefreshData((void *)"FetchEvents", HandleWdmClientComplete, HandleWdmClientError, NULL, true);
+    pthread_mutex_unlock(&sStackLock);
+
+    if (err != WEAVE_NO_ERROR && err != WDM_JNI_ERROR_EXCEPTION_THROWN)
+    {
+        ThrowError(env, err);
+    }
+}
+
+
+jstring Java_nl_Weave_DataManagement_WdmClientImpl_getEvents(JNIEnv *env, jobject self, jlong wdmClientPtr)
+{
+    WEAVE_ERROR err = WEAVE_NO_ERROR;
+    WdmClient *wdmClient = (WdmClient *)wdmClientPtr;
+    BytesData bytesData;
+
+    WeaveLogProgress(DeviceManager, "getEvents() called");
+
+    err = wdmClient->GetEvents(&bytesData);
+    SuccessOrExit(err);
+
+exit:
+
+    if (err != WEAVE_NO_ERROR && err != WDM_JNI_ERROR_EXCEPTION_THROWN)
+    {
+        ThrowError(env, err);
+    }
+
+    // bytesData.mpDataBuf should be a pointer get by mEventBuffer.c_str()
+    return env->NewStringUTF(reinterpret_cast<const char *>(bytesData.mpDataBuf));
+}
+
 void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_init(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr)
 {
     GenericTraitUpdatableDataSink *pDataSink = NULL;
diff --git a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java
index ee0978e..b6332b0 100644
--- a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java
+++ b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java
@@ -67,15 +67,51 @@
      */
     public void beginRefreshData();
 
+    /**
+     * The result of this operation can be observed through the {@link CompletionHandler}
+     * that has been assigned via {@link #setCompletionHandler}.
+     * The fetched events can be accessed via getEvents()
+     *
+     * @param timeoutSec operation timeoutSec, this operation might take a long time if the number of events is too large.
+     */
+    public void beginFetchEvents(int timeoutSec);
+
     public CompletionHandler getCompletionHandler();
     public void setCompletionHandler(CompletionHandler compHandler);
 
     public GenericTraitUpdatableDataSink getDataSink(long traitInstancePtr);
 
+    /**
+     * getEvents will return a list of event data in json array representation.
+     * If no events have been fetched, an empty array ("[]") will be returned.
+     * The internal buffer will be cleared when beginFetchEvents() is called.
+     * Each element in this array should be an object
+     *
+     * Field                  | Type        | Description
+     * -----------------------+-------------+--------------
+     * Source                 | uint64      | Event Header
+     * Importance             | int (enum)
+     * Id                     | uint64
+     * RelatedImportance      | uint64
+     * RelatedId              | uint64
+     * UTCTimestamp           | uint64
+     * ResourceId             | uint64
+     * TraitProfileId         | uint64
+     * TraitInstanceId        | uint64
+     * Type                   | uint64
+     * DeltaUTCTime           | int
+     * DeltaSystemTime        | int
+     * PresenceMask           | uint64
+     * DataSchemaVersionRange | Object{MinVersion: uint64, MaxVersion: uint64}
+     * Data                   | Object      | Event Trait Data
+     */
+    public String getEvents();
+
     public interface CompletionHandler
     {
         void onFlushUpdateComplete(Throwable[] exceptions, WdmClient wdmClient);
         void onRefreshDataComplete();
+        void onFetchEventsComplete();
         void onError(Throwable err);
     }
 };
diff --git a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java
index 5418bb8..3dd221c 100644
--- a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java
+++ b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java
@@ -124,6 +124,13 @@
     }
 
     @Override
+    public void beginFetchEvents(int timeoutSec)
+    {
+        ensureNotClosed();
+        beginFetchEvents(mWdmClientPtr, timeoutSec);
+    }
+
+    @Override
     public CompletionHandler getCompletionHandler()
     {
         return mCompHandler;
@@ -135,6 +142,13 @@
         mCompHandler = compHandler;
     }
 
+    @Override
+    public String getEvents()
+    {
+        ensureNotClosed();
+        return getEvents(mWdmClientPtr);
+    }
+
     public GenericTraitUpdatableDataSink getDataSink(long traitInstancePtr)
     {
         GenericTraitUpdatableDataSink trait = null;
@@ -179,6 +193,11 @@
         requireCompletionHandler().onRefreshDataComplete();
     }
 
+    private void onFetchEventsComplete()
+    {
+        requireCompletionHandler().onFetchEventsComplete();
+    }
+
     private void ensureNotClosed() {
       if (mWdmClientPtr == 0) {
         throw new IllegalStateException("This WdmClient has already been closed.");
@@ -205,4 +224,6 @@
     private native long newDataSink(long wdmClientPtr, ResourceIdentifier resourceIdentifier, long profileId, long instanceId, String path);
     private native void beginFlushUpdate(long wdmClientPtr);
     private native void beginRefreshData(long wdmClientPtr);
+    private native void beginFetchEvents(long wdmClientPtr, int timeoutSec);
+    private native String getEvents(long wdmClientPtr);
 };
diff --git a/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java b/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java
index 76ad3aa..f2f77c4 100644
--- a/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java
+++ b/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java
@@ -70,6 +70,7 @@
             mainObj.RunUnitTests();
         }
         catch (TestFailedException ex) {
+            ex.printStackTrace();
             System.exit(-1);
         }
     }
@@ -661,6 +662,95 @@
         System.out.println("testWdmClientDataSinkSetBigIntegerFlushRefreshGetBigInteger Succeeded");
     }
 
+    void testWdmClientGetEvents(WeaveDeviceManager deviceMgr)
+    {
+        WdmClientFactory wdmClientFactory = new WdmClientFactory();
+        WdmClient mockWdmClient = wdmClientFactory.create(deviceMgr);
+
+        mockWdmClient.setCompletionHandler(this);
+
+        mockWdmClient.setNodeId(new BigInteger("-2"));
+
+        TestResult = null;
+        mockWdmClient.beginFetchEvents(1);
+        ExpectSuccess("FetchEvents"); // Need to wait the data, or the events are not prepared
+        String event = mockWdmClient.getEvents();
+
+        // Just check if it can return data, the content is verified in happy tests
+        // "[]".length() = 2
+        if (event.length() <= 2)
+            throw new TestFailedException("testWdmClientGetEvents");
+
+        // Sleep 5 second to wait for more events generated by mock-device
+        try { Thread.sleep(5000); }
+        catch (Exception ex) { }
+
+        TestResult = null;
+        mockWdmClient.beginFetchEvents(1);
+        ExpectSuccess("FetchEvents"); // Need to wait the data, or the events are not prepared
+        event = mockWdmClient.getEvents();
+
+        // Just check if it can return data, the content is verified in happy tests
+        // "[]".length() = 2
+        if (event.length() <= 2)
+            throw new TestFailedException("testWdmClientGetEvents");
+
+        // Sleep 5 second to wait for more events generated by mock-device
+        try { Thread.sleep(5000); }
+        catch (Exception ex) { }
+
+        GenericTraitUpdatableDataSink localSettingsTrait;
+
+        ResourceIdentifier resourceIdentifier = new ResourceIdentifier();
+        localSettingsTrait = mockWdmClient.newDataSink(resourceIdentifier, 20, 0, "/");
+        localSettingsTrait.setCompletionHandler(this);
+
+        TestResult = null;
+        mockWdmClient.beginRefreshData();
+        ExpectSuccess("RefreshData"); // Need to wait the data, or the events are not prepared
+        event = mockWdmClient.getEvents();
+
+        // RefreshData should not clear the internal event buffer
+        if (event.length() == 2)
+            throw new TestFailedException("testWdmClientGetEvents");
+
+        System.out.println("testWdmClientGetEvents Succeeded");
+    }
+
+    void testWdmClientGetLargeEvents(WeaveDeviceManager deviceMgr)
+    {
+        WdmClientFactory wdmClientFactory = new WdmClientFactory();
+        WdmClient mockWdmClient = wdmClientFactory.create(deviceMgr);
+        mockWdmClient.setCompletionHandler(this);
+
+        // Sleep 12 second for enough events, connection timeout is 15s
+        try { Thread.sleep(12000); }
+        catch (Exception ex) { }
+
+        GenericTraitUpdatableDataSink localSettingsTrait;
+
+        ResourceIdentifier resourceIdentifier = new ResourceIdentifier();
+        localSettingsTrait = mockWdmClient.newDataSink(resourceIdentifier, 20, 0, "/");
+        localSettingsTrait.setCompletionHandler(this);
+
+        mockWdmClient.setNodeId(new BigInteger("-2"));
+
+        TestResult = null;
+        mockWdmClient.beginFetchEvents(1);
+        long fetchStart = System.currentTimeMillis();
+        ExpectSuccess("FetchEvents"); // Need to wait the data, or the events are not prepared
+        // The timeout is not intended to be accurate, check if the event finished in a reasonable time
+        if (System.currentTimeMillis() > fetchStart + 2000)
+            throw new TestFailedException("testWdmClientGetLargeEvents");
+        String event = mockWdmClient.getEvents();
+
+        // Just check if it can return data, the content is verified in happy tests
+        if (event.length() == 2)
+            throw new TestFailedException("testWdmClientGetLargeEvents");
+
+        System.out.println("testWdmClientGetLargeEvents Succeeded");
+    }
+
     void testWdmClientDataSinkSetRefreshFlushGetData(WeaveDeviceManager deviceMgr)
     {
         WdmClientFactory wdmClientFactory = new WdmClientFactory();
@@ -869,6 +959,8 @@
         testWdmClientDataSinkSetFlushEmptyStringRefreshGetData(deviceMgr);
         testWdmClientDataSinkSetFlushRefreshGetBigInteger(deviceMgr);
         testWdmClientDataSinkSetBigIntegerFlushRefreshGetBigInteger(deviceMgr);
+        testWdmClientGetEvents(deviceMgr);
+        testWdmClientGetLargeEvents(deviceMgr);
         testWdmClientDataSinkSetRefreshFlushGetData(deviceMgr);
         testWdmClientDataSinkResourceIdentifierMakeResTypeIDInt(deviceMgr);
         testWdmClientDataSinkResourceIdentifierMakeResTypeIdBytes(deviceMgr);
@@ -1395,6 +1487,12 @@
         TestResult = "Success";
     }
 
+    public void onFetchEventsComplete()
+    {
+        System.out.println("    Fetch Events complete");
+        TestResult = "Success";
+    }
+
     public void print(WeaveDeviceDescriptor deviceDesc, String prefix)
     {
         BigInteger twoToThe64 = BigInteger.ONE.shiftLeft(64);
diff --git a/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp b/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp
index 4345847..7119252 100644
--- a/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp
+++ b/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp
@@ -269,6 +269,8 @@
     NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_NewDataSink(WdmClient *wdmClient, const ResourceIdentifier *resourceIdentifier, uint32_t aProfileId, uint64_t aInstanceId, const char * apPath, GenericTraitUpdatableDataSink ** outGenericTraitUpdatableDataSink);
     NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_FlushUpdate(WdmClient *wdmClient, DMFlushUpdateCompleteFunct onComplete, DMErrorFunct onError);
     NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_RefreshData(WdmClient *wdmClient, DMCompleteFunct onComplete, DMErrorFunct onError);
+    NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_FetchEvents(WdmClient *wdmClient, DMCompleteFunct onComplete, DMErrorFunct onError, uint32_t aTimeoutSec);
+    NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_GetEvents(WdmClient *wdmClient, ConstructBytesArrayFunct aCallback);
 
     NL_DLL_EXPORT WEAVE_ERROR nl_Weave_GenericTraitUpdatableDataSink_Clear(GenericTraitUpdatableDataSink * apGenericTraitUpdatableDataSink);
     NL_DLL_EXPORT WEAVE_ERROR nl_Weave_GenericTraitUpdatableDataSink_RefreshData(GenericTraitUpdatableDataSink * apGenericTraitUpdatableDataSink, DMCompleteFunct onComplete, DMErrorFunct onError);
@@ -1445,6 +1447,27 @@
     return err;
 }
 
+WEAVE_ERROR nl_Weave_WdmClient_FetchEvents(WdmClient *wdmClient, DMCompleteFunct onComplete, DMErrorFunct onError, uint32_t aTimeoutSec)
+{
+    WEAVE_ERROR err = WEAVE_NO_ERROR;
+    wdmClient->SetEventFetchingTimeout(aTimeoutSec);
+    err = wdmClient->RefreshData(NULL, onComplete, onError, NULL, true);
+    return err;
+}
+
+WEAVE_ERROR nl_Weave_WdmClient_GetEvents(WdmClient *wdmClient, ConstructBytesArrayFunct aCallback)
+{
+    WEAVE_ERROR err = WEAVE_NO_ERROR;
+    BytesData bytesData;
+    err = wdmClient->GetEvents(&bytesData);
+    SuccessOrExit(err);
+    aCallback(bytesData.mpDataBuf, bytesData.mDataLen);
+    bytesData.Clear();
+
+exit:
+    return err;
+}
+
 WEAVE_ERROR nl_Weave_GenericTraitUpdatableDataSink_Clear(GenericTraitUpdatableDataSink * apGenericTraitUpdatableDataSink)
 {
     if (apGenericTraitUpdatableDataSink != NULL)
diff --git a/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py b/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py
index 93b2af8..e3a4fa6 100644
--- a/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py
+++ b/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py
@@ -90,7 +90,9 @@
 
     def refreshData(self):
         self._ensureNotClosed()
-        self._weaveStack.CallAsync(lambda: self._generictraitupdatabledatasinkLib.nl_Weave_GenericTraitUpdatableDataSink_RefreshData(self._traitInstance, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError))
+        return self._weaveStack.CallAsync(
+            lambda: self._generictraitupdatabledatasinkLib.nl_Weave_GenericTraitUpdatableDataSink_RefreshData(self._traitInstance, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError)
+        )
 
     def setData(self, path, val, isConditional=False):
         self._ensureNotClosed()
diff --git a/src/device-manager/python/openweave/WdmClient.py b/src/device-manager/python/openweave/WdmClient.py
index 31be50b..58f27d9 100644
--- a/src/device-manager/python/openweave/WdmClient.py
+++ b/src/device-manager/python/openweave/WdmClient.py
@@ -29,7 +29,7 @@
 from .WeaveUtility import WeaveUtility
 
 __all__ = [ 'WdmClient', 'WdmFlushUpdateStatusStruct', 'WdmClientFlushUpdateError', 'WdmClientFlushUpdateDeviceError' ]
-
+_ConstructBytesArrayFunct                   = CFUNCTYPE(None, c_void_p, c_uint32)
 WEAVE_ERROR_STATUS_REPORT = 4044
 
 class WdmFlushUpdateStatusStruct(Structure):
@@ -103,6 +103,7 @@
         self._weaveStack.Call(
             lambda: self._datamanagmentLib.nl_Weave_WdmClient_SetNodeId(self._wdmClientPtr, nodeId)
         )
+
     def newDataSink(self, resourceIdentifier, profileId, instanceId, path):
         self._ensureNotClosed()
         traitInstance = c_void_p(None)
@@ -162,6 +163,33 @@
             lambda: self._datamanagmentLib.nl_Weave_WdmClient_RefreshData(self._wdmClientPtr, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError)
         )
 
+    def fetchEvents(self, timeoutSec):
+        self._ensureNotClosed()
+
+        return self._weaveStack.CallAsync(
+            lambda: self._datamanagmentLib.nl_Weave_WdmClient_FetchEvents(self._wdmClientPtr, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError, timeoutSec)
+        )
+
+    def getEvents(self):
+        self._ensureNotClosed()
+        res = self._getEvents()
+
+        if isinstance(res, int):
+            raise self._weaveStack.ErrorToException(res)
+
+        return res
+    
+    def _getEvents(self):
+        self._ensureNotClosed()
+
+        def HandleConstructBytesArray(dataBuf, dataLen):
+            self._weaveStack.callbackRes = WeaveUtility.VoidPtrToByteArray(dataBuf, dataLen)
+
+        cbHandleConstructBytesArray = _ConstructBytesArrayFunct(HandleConstructBytesArray)
+        return self._weaveStack.Call(
+            lambda: self._datamanagmentLib.nl_Weave_WdmClient_GetEvents(self._wdmClientPtr, cbHandleConstructBytesArray)
+        )
+
     def _ensureNotClosed(self):
         if (self._wdmClientPtr == None):
             raise ValueError("wdmClient is not ready")
@@ -195,6 +223,12 @@
             self._datamanagmentLib.nl_Weave_WdmClient_RefreshData.argtypes = [ c_void_p, _CompleteFunct, _ErrorFunct ]
             self._datamanagmentLib.nl_Weave_WdmClient_RefreshData.restype = c_uint32
 
+            self._datamanagmentLib.nl_Weave_WdmClient_FetchEvents.argtypes = [ c_void_p, _CompleteFunct, _ErrorFunct, c_uint32 ]
+            self._datamanagmentLib.nl_Weave_WdmClient_FetchEvents.restype = c_uint32
+
+            self._datamanagmentLib.nl_Weave_WdmClient_GetEvents.argtypes = [ c_void_p, _ConstructBytesArrayFunct ]
+            self._datamanagmentLib.nl_Weave_WdmClient_GetEvents.restype = c_uint32
+
         res = self._datamanagmentLib.nl_Weave_WdmClient_Init()
         if (res != 0):
             raise self._weaveStack.ErrorToException(res)
diff --git a/src/device-manager/python/openweave/WeaveStack.py b/src/device-manager/python/openweave/WeaveStack.py
index 03498c2..b51de84 100644
--- a/src/device-manager/python/openweave/WeaveStack.py
+++ b/src/device-manager/python/openweave/WeaveStack.py
@@ -41,7 +41,7 @@
 from ctypes import *
 from .WeaveUtility import WeaveUtility
 
-__all__ = [ 'DeviceStatusStruct', 'WeaveStackException', 'DeviceError', 'WeaveStackError', 'WeaveStack']
+__all__ = [ 'DeviceStatusStruct', 'WeaveStackException', 'DeviceError', 'WeaveStackError', 'WeaveStack', 'WeaveLogFormatter']
 
 WeaveStackDLLBaseName = '_WeaveDeviceMgr.so'
 
diff --git a/src/lib/profiles/data-management/Current/MessageDef.cpp b/src/lib/profiles/data-management/Current/MessageDef.cpp
index e07bddf..f4f9085 100644
--- a/src/lib/profiles/data-management/Current/MessageDef.cpp
+++ b/src/lib/profiles/data-management/Current/MessageDef.cpp
@@ -1706,6 +1706,7 @@
 #if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
 // Roughly verify the schema is right, including
 // 1) at least one element is there
+//    (not checked since we want to subscribe to events only some times)
 // 2) all elements are anonymous and of Path type
 // 3) every path is also valid in schema
 WEAVE_ERROR PathList::Parser::CheckSchemaValidity(void) const
@@ -1744,11 +1745,7 @@
     // if we have exhausted this container
     if (WEAVE_END_OF_TLV == err)
     {
-        // if we have at least one Path element
-        if (NumPath > 0)
-        {
-            err = WEAVE_NO_ERROR;
-        }
+        err = WEAVE_NO_ERROR;
     }
 
 exit:
diff --git a/src/lib/profiles/data-management/Current/SubscriptionClient.cpp b/src/lib/profiles/data-management/Current/SubscriptionClient.cpp
index 8163a4b..3603532 100644
--- a/src/lib/profiles/data-management/Current/SubscriptionClient.cpp
+++ b/src/lib/profiles/data-management/Current/SubscriptionClient.cpp
@@ -149,6 +149,8 @@
 #endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
     MoveToState(kState_Initialized);
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
     _AddRef();
 
 #if WEAVE_CONFIG_ENABLE_WDM_UPDATE
@@ -427,6 +429,8 @@
 {
     WEAVE_ERROR err = WEAVE_NO_ERROR;
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
     _AddRef();
 
     if (mBinding->IsReady())
@@ -778,6 +782,9 @@
     {
         --mRefCount;
     }
+
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
 }
 
 Binding * SubscriptionClient::GetBinding() const
@@ -796,6 +803,8 @@
     InEventParam inParam;
     OutEventParam outParam;
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
     // Make sure we're not freed by accident.
     _AddRef();
 
@@ -972,7 +981,10 @@
 #if WEAVE_CONFIG_ENABLE_WDM_UPDATE
     mUpdateClient.Shutdown();
 
-    mDataSinkCatalog->Iterate(CleanupUpdatableSinkTrait, this);
+    if (NULL != mDataSinkCatalog)
+    {
+        mDataSinkCatalog->Iterate(CleanupUpdatableSinkTrait, this);
+    }
 #endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
 }
 
@@ -1088,6 +1100,8 @@
 {
     WEAVE_ERROR err                   = WEAVE_NO_ERROR;
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
     _AddRef();
 
     // this check serves to see whether we already have a timer set
@@ -1164,6 +1178,9 @@
 {
     SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aAppState);
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(pClient),
+                   pClient->GetStateStr(), __func__, pClient->mRefCount);
+
     pClient->_AddRef();
 
     switch (aEvent)
@@ -1344,6 +1361,9 @@
         ExitNow();
     }
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
+
     // Make sure we're not freed by accident
     _AddRef();
 
@@ -1451,6 +1471,8 @@
     AlwaysAcceptDataElementAccessControlDelegate acDelegate;
 
 #if WEAVE_CONFIG_ENABLE_WDM_UPDATE
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+                   GetStateStr(), __func__, mRefCount);
     _AddRef();
     LockUpdateMutex();
 #endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
@@ -2476,6 +2498,8 @@
     bool isPathPrivate;
     bool willRetryPath;
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+               GetStateStr(), __func__, mRefCount);
     // This method invokes callbacks into the upper layer.
     _AddRef();
 
@@ -2770,6 +2794,8 @@
     TraitPath traitPath;
     WEAVE_ERROR err = WEAVE_NO_ERROR;
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+               GetStateStr(), __func__, mRefCount);
     _AddRef();
 
     LockUpdateMutex();
@@ -3331,6 +3357,8 @@
     err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->ScheduleWork(OnUpdateScheduleWorkCallback, this);
     SuccessOrExit(err);
 
+    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+               GetStateStr(), __func__, mRefCount);
     _AddRef();
     mUpdateFlushScheduled = true;
 
diff --git a/src/test-apps/MockLoggingManager.cpp b/src/test-apps/MockLoggingManager.cpp
index c7be8b7..323f562 100644
--- a/src/test-apps/MockLoggingManager.cpp
+++ b/src/test-apps/MockLoggingManager.cpp
@@ -59,7 +59,7 @@
 public:
     MockEventGeneratorImpl(void);
     WEAVE_ERROR Init(nl::Weave::WeaveExchangeManager * aExchangeMgr, EventGenerator * aEventGenerator, int aDelayBetweenEvents,
-                     bool aWraparound);
+                     bool aWraparound, size_t aBatch=1);
     void SetEventGeneratorStop();
     bool IsEventGeneratorStopped();
 
@@ -67,6 +67,7 @@
     static void HandleNextEvent(nl::Weave::System::Layer * aSystemLayer, void * aAppState, ::nl::Weave::System::Error aErr);
     nl::Weave::WeaveExchangeManager * mExchangeMgr;
     int mTimeBetweenEvents; ///< delay, in miliseconds, between events.
+    size_t mBatch;
     bool mEventWraparound;  ///< does the event generator run indefinitely, or does it stop after iterating through its states
     EventGenerator * mEventGenerator; ///< the event generator to use
     int32_t mEventsLeft;
@@ -93,10 +94,10 @@
 } // namespace Weave
 } // namespace nl
 
-uint64_t gDebugEventBuffer[192];
-uint64_t gInfoEventBuffer[64];
-uint64_t gProdEventBuffer[256];
-uint64_t gCritEventBuffer[256];
+uint64_t gDebugEventBuffer[192000];
+uint64_t gInfoEventBuffer[64000];
+uint64_t gProdEventBuffer[256000];
+uint64_t gCritEventBuffer[256000];
 
 bool gMockEventStop                     = false;
 bool gEventIsStopped                    = false;
@@ -152,11 +153,11 @@
 }
 
 MockEventGeneratorImpl::MockEventGeneratorImpl(void) :
-    mExchangeMgr(NULL), mTimeBetweenEvents(0), mEventWraparound(false), mEventGenerator(NULL), mEventsLeft(0)
+    mExchangeMgr(NULL), mTimeBetweenEvents(0), mBatch(1), mEventWraparound(false), mEventGenerator(NULL), mEventsLeft(0)
 { }
 
 WEAVE_ERROR MockEventGeneratorImpl::Init(nl::Weave::WeaveExchangeManager * aExchangeMgr, EventGenerator * aEventGenerator,
-                                         int aDelayBetweenEvents, bool aWraparound)
+                                         int aDelayBetweenEvents, bool aWraparound, size_t aBatch)
 {
     WEAVE_ERROR err    = WEAVE_NO_ERROR;
     mExchangeMgr       = aExchangeMgr;
@@ -164,6 +165,8 @@
     mTimeBetweenEvents = aDelayBetweenEvents;
     mEventWraparound   = aWraparound;
 
+    mBatch = aBatch;
+
     if (mEventWraparound)
         mEventsLeft = INT32_MAX;
     else
@@ -186,8 +189,12 @@
     }
     else
     {
-        generator->mEventGenerator->Generate();
-        generator->mEventsLeft--;
+        for (size_t i = 0; i < generator->mBatch && generator->mEventsLeft > 0; i++)
+        {
+            generator->mEventGenerator->Generate();
+            generator->mEventsLeft--;
+        }
+
         if ((generator->mEventWraparound) || (generator->mEventsLeft > 0))
         {
             aSystemLayer->StartTimer(generator->mTimeBetweenEvents, HandleNextEvent, generator);
diff --git a/src/test-apps/MockLoggingManager.h b/src/test-apps/MockLoggingManager.h
index 551e786..5a21bd1 100644
--- a/src/test-apps/MockLoggingManager.h
+++ b/src/test-apps/MockLoggingManager.h
@@ -45,7 +45,7 @@
 {
 public:
     static MockEventGenerator * GetInstance(void);
-    virtual WEAVE_ERROR Init(nl::Weave::WeaveExchangeManager *aExchangeMgr, EventGenerator *aEventGenerator, int aDelayBetweenEvents, bool aWraparound) = 0;
+    virtual WEAVE_ERROR Init(nl::Weave::WeaveExchangeManager *aExchangeMgr, EventGenerator *aEventGenerator, int aDelayBetweenEvents, bool aWraparound, size_t aBatch=1) = 0;
     virtual void SetEventGeneratorStop() = 0;
     virtual bool IsEventGeneratorStopped() = 0;
 };
diff --git a/src/test-apps/MockWdmNodeOptions.cpp b/src/test-apps/MockWdmNodeOptions.cpp
index ca3f4ae..4091e64 100644
--- a/src/test-apps/MockWdmNodeOptions.cpp
+++ b/src/test-apps/MockWdmNodeOptions.cpp
@@ -44,6 +44,7 @@
     mEnableDataFlip(true),
     mEventGeneratorType(kGenerator_None),
     mTimeBetweenEvents(1000),
+    mEventBatchSize(1),
     mTimeBetweenLivenessCheckSec(NULL),
     mEnableDictionaryTest(false),
     mEnableRetry(false),
@@ -71,6 +72,7 @@
         { "enable-dictionary-test",                         kNoArgument,        kToolOpt_EnableDictionaryTest },
         { "event-generator",                                kArgumentRequired,  kToolOpt_EventGenerator },
         { "inter-event-period",                             kArgumentRequired,  kToolOpt_TimeBetweenEvents },
+        { "event-batch-size ",                              kArgumentRequired,  kToolOpt_EventBatchSize },
 #if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
         { "wdm-publisher",                                  kArgumentRequired,  kToolOpt_WdmPublisherNodeId },
         { "wdm-subless-notify-dest-node",                   kArgumentRequired,  kToolOpt_WdmSublessNotifyDestNodeId },
@@ -179,7 +181,10 @@
         "         TestTrait: TestETrait events which cover a range of types.\n"
         "\n"
         "  --inter-event-period <ms>\n"
-        "       Delay between emitting consecutive events (default 1s)\n"
+        "       Delay between emitting consecutive event batches (default 1s)\n"
+        "\n"
+        "  --event-batch-size <n>\n"
+        "       Number of events generated per batch\n"
         "\n"
         "  --enable-retry\n"
         "       Enable automatic subscription retries by WDM\n"
@@ -491,6 +496,17 @@
         }
         break;
     }
+    case kToolOpt_EventBatchSize:
+    {
+        char *endptr;
+        mEventBatchSize = strtoul(arg, &endptr, 0);
+        if (endptr == arg)
+        {
+            PrintArgError("%s: Invalid inter-event timeout\n", progName);
+            return false;
+        }
+        break;
+    }
     case kToolOpt_WdmUpdateMutation:
     {
         size_t i;
diff --git a/src/test-apps/MockWdmNodeOptions.h b/src/test-apps/MockWdmNodeOptions.h
index 6834916..8398902 100644
--- a/src/test-apps/MockWdmNodeOptions.h
+++ b/src/test-apps/MockWdmNodeOptions.h
@@ -68,6 +68,7 @@
     kToolOpt_WdmUpdateConditionality,
     kToolOpt_WdmUpdateTiming,
     kToolOpt_WdmUpdateDiscardOnError,
+    kToolOpt_EventBatchSize,
 };
 
 class MockWdmNodeOptions : public OptionSetBase
@@ -141,6 +142,7 @@
     bool mEnableDataFlip;
     EventGeneratorType mEventGeneratorType;
     int mTimeBetweenEvents;
+    int mEventBatchSize;
     const char * mTimeBetweenLivenessCheckSec;
     bool mEnableDictionaryTest;
     bool mEnableRetry;
diff --git a/src/test-apps/happy/lib/WeaveDeviceManager.py b/src/test-apps/happy/lib/WeaveDeviceManager.py
index 1fc2029..0247d39 100644
--- a/src/test-apps/happy/lib/WeaveDeviceManager.py
+++ b/src/test-apps/happy/lib/WeaveDeviceManager.py
@@ -35,6 +35,7 @@
 import base64
 import json
 import logging
+import time
 
 weaveDeviceMgrPath = os.environ['WEAVE_DEVICE_MGR_PATH']
 print('weaveDeviceMgrPath is %s' % weaveDeviceMgrPath)
@@ -205,6 +206,18 @@
             print(str(ex))
             return
 
+    def fetchEvents(self, timeoutSec):
+        if self.wdmClient == None:
+            print("wdmclient not initialized")
+            return
+
+        try:
+            result = self.wdmClient.fetchEvents(timeoutSec)
+            print("refresh trait data complete")
+        except WeaveStack.WeaveStackException as ex:
+            print(str(ex))
+            return
+
     def refreshIndividualData(self, traitInstance):
         if self.wdmClient == None or traitInstance == None:
             print("wdmclient or traitInstance not initialized")
@@ -261,6 +274,20 @@
             print(str(ex))
             return
 
+    def getEvents(self):
+        if self.wdmClient == None:
+            print("wdmclient not initialized")
+            return
+
+        try:
+            val = self.wdmClient.getEvents()
+            print("get events")
+            print(val)
+            return val
+        except WeaveStack.WeaveStackException as ex:
+            print(str(ex))
+            return
+
     def getVersion(self, traitInstance):
         if self.wdmClient == None or traitInstance == None:
             print("wdmclient or traitInstance not initialized")
@@ -446,6 +473,84 @@
     testObject.closeWdmClient()
     print("testWdmClientDataSinkRefreshGetDataRefresh completes")
 
+def testWdmClientDataSinkRefreshDataWithEvents(testObject):
+    # Sleep 5 second for enough events.
+    testObject.createWdmClient()
+    time.sleep(5)
+    # We need to set node Id to get everything works
+    testObject.setNodeId(-2)
+
+    testObject.fetchEvents(1)
+    data1 = testObject.getEvents()
+    # Sleep 5 second to wait for more events generated by mock-device
+    time.sleep(5)
+    testObject.fetchEvents(1)
+    data2 = testObject.getEvents()
+
+    jsondata1 = json.loads(data1.decode("utf-8"))
+    jsondata2 = json.loads(data2.decode("utf-8"))
+    # Mockdevice should generate events
+    if (len(jsondata1) == 0) or (len(jsondata2) == 0):
+        raise ValueError("testWdmClientDataSinkRefreshDataWithEvents  fails")
+    # Event should not overlap
+    if jsondata2[0]["Id"] <= jsondata1[-1]["Id"]:
+        raise ValueError("testWdmClientDataSinkRefreshDataWithEvents  fails")
+    # Sleep 5 second to wait for more events generated by mock-device
+    time.sleep(5)
+
+    TestCTrait = testObject.newDataSink(593165827, 0, b"/")
+    testObject.refreshData()
+    val = testObject.getEvents()
+    # The buffer shouldn't be cleared when refreshData() (not fetchEvents) is called.
+    if val.decode("utf-8") == "[]":
+        raise ValueError("testWdmClientDataSinkRefreshDataWithEvents  fails")
+
+    testObject.closeWdmClient()
+    print("testWdmClientDataSinkRefreshDataWithEvents completes")
+
+def testWdmClientDataSinkRefreshDataWithLargeEventList(testObject):
+    # Mockdevice should generate event at 1 per second.
+    testObject.createWdmClient()
+    # Sleep 12 second for enough events, connection timeout is 15s
+    time.sleep(12)
+    # We need to set node Id to get everything works
+    testObject.setNodeId(-2)
+    startTime = time.time()
+    testObject.fetchEvents(1)
+    # The timeout is not intended to be accurate, check if the event finished in a reasonable time
+    if time.time() - startTime > 2:
+        raise ValueError("testWdmClientDataSinkRefreshDataWithLargeEventList  fails: Failed to limit fetching time")
+    data1 = testObject.getEvents()
+
+    jsondata1 = json.loads(data1.decode("utf-8"))
+    # Mockdevice should generate enough events
+    if (len(jsondata1) < 10):
+        raise ValueError("testWdmClientDataSinkRefreshDataWithLargeEventList  fails")
+
+    testObject.closeWdmClient()
+    print("testWdmClientDataSinkRefreshDataWithLargeEventList completes")
+
+def testWdmClientDataSinkRefreshDataWithoutTraits(testObject):
+    # Mockdevice should generate event at 1 per second.
+    testObject.createWdmClient()
+    # Sleep 12 second for enough events, connection timeout is 15s
+    time.sleep(5)
+    # We need to set node Id to get everything works
+    testObject.setNodeId(-2)
+    startTime = time.time()
+    testObject.fetchEvents(1)
+    if time.time() - startTime > 2:
+        raise ValueError("testWdmClientDataSinkRefreshDataWithoutTraits  fails: Failed to limit fetching time")
+    data1 = testObject.getEvents()
+
+    jsondata1 = json.loads(data1.decode("utf-8"))
+    # Mockdevice should generate enough events
+    if (len(jsondata1) == 0):
+        raise ValueError("testWdmClientDataSinkRefreshDataWithoutTraits  fails")
+
+    testObject.closeWdmClient()
+    print("testWdmClientDataSinkRefreshDataWithoutTraits completes")
+
 def testWdmClientDataSinkCloseIndividualData(testObject):
     testObject.createWdmClient()
     localeSettingsTrait = testObject.newDataSink(20, 0, b"/")
@@ -582,6 +687,9 @@
     testWdmClientDataSinkSetFlushRefreshGetData(testObject)
     testWdmClientDataSinkRefreshGetDataRefresh(testObject)
     testWdmClientDataSinkRefreshIndividualGetDataRefresh(testObject)
+    testWdmClientDataSinkRefreshDataWithEvents(testObject)
+    testWdmClientDataSinkRefreshDataWithLargeEventList(testObject)
+    testWdmClientDataSinkRefreshDataWithoutTraits(testObject)
     testWdmClientDataSinkCloseIndividualData(testObject)
     testWdmClientDataSinkSetRefreshFlushGetData(testObject)
     testWdmClientDataSinkResourceIdentifierMakeResTypeIdInt(testObject)
@@ -598,7 +706,6 @@
 
 
 if __name__ == '__main__':
-    
     # Override the default logging for WeaveStack to include timestamps.
     weaveStackLogger = logging.getLogger('openweave.WeaveStack')
     logHandler = logging.StreamHandler(stream=sys.stdout)
diff --git a/src/test-apps/happy/test-templates/WeaveBle.py b/src/test-apps/happy/test-templates/WeaveBle.py
index dfb8641..b8a0a8e 100755
--- a/src/test-apps/happy/test-templates/WeaveBle.py
+++ b/src/test-apps/happy/test-templates/WeaveBle.py
@@ -176,6 +176,7 @@
 
         cmd += " --enable-bluez-peripheral --peripheral-name N0001 --peripheral-address " + self.interfaces[0]["bd_address"]
         cmd += " --print-fault-counters --wdm-resp-mutual-sub --test-case 10 --total-count 0 --wdm-update-timing NoSub "
+        cmd += " --event-generator TestTrait"
         self.start_weave_process(node_id=self.server_node_id, cmd=cmd, tag=self.server_process_tag, rootMode=True)
 
     def resetBluez(self):
diff --git a/src/test-apps/happy/test-templates/WeavePairing.py b/src/test-apps/happy/test-templates/WeavePairing.py
index 8ac46df..51f0a71 100644
--- a/src/test-apps/happy/test-templates/WeavePairing.py
+++ b/src/test-apps/happy/test-templates/WeavePairing.py
@@ -344,6 +344,7 @@
 
         cmd += " --node-addr " + device_info['device_ip'] + " --pairing-code TEST"
         cmd += " --wdm-resp-mutual-sub --test-case 10 --total-count 0 --wdm-update-timing NoSub "
+        cmd += " --event-generator TestTrait --event-batch-size 500"
 
         if self.server is not None:
             cmd += " --pairing-server " + self.server_ip \
diff --git a/src/test-apps/happy/test-templates/WeaveWdmNext.py b/src/test-apps/happy/test-templates/WeaveWdmNext.py
index 34efbf9..b0094c4 100644
--- a/src/test-apps/happy/test-templates/WeaveWdmNext.py
+++ b/src/test-apps/happy/test-templates/WeaveWdmNext.py
@@ -348,12 +348,12 @@
         client_parser_error, client_leak_detected = WeaveUtilities.scan_for_leaks_and_parser_errors(client_output)
         result["no_client_parser_error"] = not client_parser_error
         result["no_client_leak_detected"] = not client_leak_detected
-        if server_output is not "":
+        if server_output != "":
             server_parser_error, server_leak_detected = WeaveUtilities.scan_for_leaks_and_parser_errors(server_output)
             result["no_server_parser_error"] = not client_parser_error
             result["no_server_leak_detected"] = not server_leak_detected
 
-        if self.quiet is False:
+        if not self.quiet:
             print("weave-wdm-next %s from node %s (%s) to node %s (%s) : "\
                   % (self.wdm_option, client_info["client_node_id"], client_info["client_ip"], self.server_node_id, self.server_ip))
 
diff --git a/src/test-apps/mock-device.cpp b/src/test-apps/mock-device.cpp
index 5f7b0cd..1fbebc2 100644
--- a/src/test-apps/mock-device.cpp
+++ b/src/test-apps/mock-device.cpp
@@ -427,6 +427,9 @@
     "  --inter-event-period <ms>"
     "       Delay between emitting consecutive events (default 1s)\n"
     "\n"
+    "  --event-batch-size <n>\n"
+    "       Number of events generated per batch\n"
+    "\n"
     "  --test-case <test case id>\n"
     "       Further configure device behavior with this test case id\n"
     "\n"
@@ -868,7 +871,7 @@
         if (gEventGenerator != NULL) {
             printf("Starting Event Generator\n");
             MockEventGenerator::GetInstance()->Init(&ExchangeMgr, gEventGenerator,
-                                                    gMockWdmNodeOptions.mTimeBetweenEvents, true);
+                                                    gMockWdmNodeOptions.mTimeBetweenEvents, true, gMockWdmNodeOptions.mEventBatchSize);
         }
 
     }