Merge pull request #627 from openweave/bug/TunnelCASEFix
Fix for resumption message id valid tag.
diff --git a/src/device-manager/WeaveDataManagementClient.cpp b/src/device-manager/WeaveDataManagementClient.cpp
index 11343bb..6fb3d92 100644
--- a/src/device-manager/WeaveDataManagementClient.cpp
+++ b/src/device-manager/WeaveDataManagementClient.cpp
@@ -37,6 +37,7 @@
#include <errno.h>
#include <time.h>
#include <string>
+#include <chrono>
#include <Weave/Core/WeaveCore.h>
@@ -65,6 +66,8 @@
using namespace ::nl::Weave;
using namespace ::nl::Weave::TLV;
using namespace Schema::Weave::Common;
+using namespace nl::Weave::Profiles::DataManagement_Current::Event;
+using std::to_string;
class GenericTraitUpdatableDataSink;
class WdmClient;
@@ -125,7 +128,7 @@
WEAVE_ERROR GenericTraitUpdatableDataSink::RefreshData(void * appReqState, DMCompleteFunct onComplete, DMErrorFunct onError)
{
ClearVersion();
- mpWdmClient->RefreshData(appReqState, this, onComplete, onError, LocateTraitHandle);
+ mpWdmClient->RefreshData(appReqState, this, onComplete, onError, LocateTraitHandle, false);
return WEAVE_NO_ERROR;
}
@@ -878,9 +881,11 @@
const nl::Weave::ExchangeContext::Timeout kResponseTimeoutMsec = 15000;
+// mpWdmEventProcessor is unique_ptr, so we should use nullptr instead of NULL
WdmClient::WdmClient() :
State(kState_NotInitialized), mpAppState(NULL), mOnError(NULL), mGetDataHandle(NULL), mpPublisherPathList(NULL),
- mpSubscriptionClient(NULL), mpMsgLayer(NULL), mpContext(NULL), mpAppReqState(NULL), mOpState(kOpState_Idle)
+ mpSubscriptionClient(NULL), mpMsgLayer(NULL), mpContext(NULL), mpAppReqState(NULL), mOpState(kOpState_Idle),
+ mpWdmEventProcessor(nullptr), mEventStrBuffer(""), mEnableEventFetching(false)
{ }
void WdmClient::Close(void)
@@ -907,6 +912,10 @@
mpAppReqState = NULL;
mOnError = NULL;
+ mEventStrBuffer.clear();
+ mpWdmEventProcessor.release();
+ mEventFetchingTLE = false;
+
State = kState_NotInitialized;
ClearOpState();
}
@@ -922,6 +931,173 @@
}
}
+namespace {
+void WriteEscapedString(const char * apStr, size_t aLen, std::string & aBuf)
+{
+ // According to UTF8 encoding, all bytes from a multiple byte UTF8 sequence
+ // will have 1 as most siginificant bit. So this function will output the
+ // multi-byte characters without escape.
+ constexpr char * hex = "0123456789abcdef";
+ for (size_t i = 0; i < aLen && apStr[i]; i++)
+ {
+ switch (apStr[i])
+ {
+ case 0 ... 31:
+ case '"':
+ case '\\':
+ case '/':
+ // escape it using JSON style
+ aBuf += "\\u00";
+ aBuf += hex[(apStr[i] >> 4) & 0xf];
+ aBuf += hex[(apStr[i] & 0xf)];
+ break;
+ default:
+ aBuf += apStr[i];
+ break;
+ }
+ }
+}
+
+WEAVE_ERROR FormatEventData(TLVReader aInReader, std::string & aBuf)
+{
+ WEAVE_ERROR err = WEAVE_NO_ERROR;
+
+ switch (aInReader.GetType())
+ {
+ case nl::Weave::TLV::kTLVType_Structure:
+ case nl::Weave::TLV::kTLVType_Array:
+ break;
+ case nl::Weave::TLV::kTLVType_SignedInteger: {
+ int64_t value_s64;
+
+ err = aInReader.Get(value_s64);
+ SuccessOrExit(err);
+
+ aBuf += to_string(value_s64);
+ break;
+ }
+
+ case nl::Weave::TLV::kTLVType_UnsignedInteger: {
+ uint64_t value_u64;
+
+ err = aInReader.Get(value_u64);
+ SuccessOrExit(err);
+
+ aBuf += to_string(value_u64);
+ break;
+ }
+
+ case nl::Weave::TLV::kTLVType_Boolean: {
+ bool value_b;
+
+ err = aInReader.Get(value_b);
+ SuccessOrExit(err);
+
+ aBuf += (value_b ? "true" : "false");
+ break;
+ }
+
+ case nl::Weave::TLV::kTLVType_UTF8String: {
+ char value_s[256];
+
+ err = aInReader.GetString(value_s, sizeof(value_s));
+ VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_ERROR_BUFFER_TOO_SMALL, );
+
+ if (err == WEAVE_ERROR_BUFFER_TOO_SMALL)
+ {
+ aBuf += "\"(utf8 string too long)\"";
+ err = WEAVE_NO_ERROR;
+ }
+ else
+ {
+ aBuf += '"';
+ // String data needs escaped
+ WriteEscapedString(value_s, min(sizeof(value_s), strlen(value_s)), aBuf);
+ aBuf += '"';
+ }
+ break;
+ }
+
+ case nl::Weave::TLV::kTLVType_ByteString: {
+ uint8_t value_b[256];
+ uint32_t len, readerLen;
+
+ readerLen = aInReader.GetLength();
+
+ err = aInReader.GetBytes(value_b, sizeof(value_b));
+ VerifyOrExit(err == WEAVE_NO_ERROR || err == WEAVE_ERROR_BUFFER_TOO_SMALL, );
+
+ if (readerLen < sizeof(value_b))
+ {
+ len = readerLen;
+ }
+ else
+ {
+ len = sizeof(value_b);
+ }
+
+ if (err == WEAVE_ERROR_BUFFER_TOO_SMALL)
+ {
+ aBuf += "\"(byte string too long)\"";
+ }
+ else
+ {
+ aBuf += "[";
+ for (size_t i = 0; i < len; i++)
+ {
+ aBuf += (i > 0 ? "," : "");
+ aBuf += to_string(value_b[i]);
+ }
+ aBuf += "]";
+ }
+ break;
+ }
+
+ case nl::Weave::TLV::kTLVType_Null:
+ aBuf += "null";
+ break;
+
+ default:
+ aBuf += "\"<error data>\"";
+ break;
+ }
+
+ if (aInReader.GetType() == nl::Weave::TLV::kTLVType_Structure || aInReader.GetType() == nl::Weave::TLV::kTLVType_Array)
+ {
+ bool insideStructure = (aInReader.GetType() == nl::Weave::TLV::kTLVType_Structure);
+ aBuf += (insideStructure ? ' {' : '[');
+ const char terminating_char = (insideStructure ? '}' : ']');
+
+ nl::Weave::TLV::TLVType type;
+ bool isFirstChild = true;
+ err = aInReader.EnterContainer(type);
+ SuccessOrExit(err);
+
+ while ((err = aInReader.Next()) == WEAVE_NO_ERROR)
+ {
+ if (!isFirstChild)
+ aBuf += ",";
+ isFirstChild = false;
+ if (insideStructure)
+ {
+ uint32_t tagNum = nl::Weave::TLV::TagNumFromTag(aInReader.GetTag());
+ aBuf += "\"" + to_string(tagNum) + "\":";
+ }
+ err = FormatEventData(aInReader, aBuf);
+ SuccessOrExit(err);
+ }
+
+ aBuf += terminating_char;
+
+ err = aInReader.ExitContainer(type);
+ SuccessOrExit(err);
+ }
+
+exit:
+ return err;
+}
+} // namespace
+
void WdmClient::ClearDataSinkVersion(void * aTraitInstance, TraitDataHandle aHandle, void * aContext)
{
GenericTraitUpdatableDataSink * pGenericTraitUpdatableDataSink = NULL;
@@ -953,6 +1129,7 @@
{
uint16_t pathListLen = 0;
uint16_t traitListLen = 0;
+ uint32_t eventListLen = 0;
TraitDataHandle handle;
bool needSubscribeSpecificTrait = false;
@@ -966,35 +1143,54 @@
else
{
traitListLen = pWdmClient->mSinkCatalog.Size();
- VerifyOrExit(traitListLen != 0, err = WEAVE_ERROR_INVALID_LIST_LENGTH);
}
WeaveLogDetail(DataManagement, "prepare to subscribe %d trait data sink", traitListLen);
- if (pWdmClient->mpPublisherPathList != NULL)
+ if (!pWdmClient->mEnableEventFetching)
{
- delete[] pWdmClient->mpPublisherPathList;
- }
+ if (pWdmClient->mpPublisherPathList != NULL)
+ {
+ delete[] pWdmClient->mpPublisherPathList;
+ pWdmClient->mpPublisherPathList = NULL;
+ }
- pWdmClient->mpPublisherPathList = new TraitPath[traitListLen];
+ if (traitListLen > 0)
+ {
+ pWdmClient->mpPublisherPathList = new TraitPath[traitListLen];
+ }
- if (needSubscribeSpecificTrait)
- {
- pathListLen = 1;
- err = pWdmClient->mSinkCatalog.PrepareSubscriptionSpecificPathList(pWdmClient->mpPublisherPathList, traitListLen,
- handle);
+ if (needSubscribeSpecificTrait)
+ {
+ pathListLen = 1;
+ err = pWdmClient->mSinkCatalog.PrepareSubscriptionSpecificPathList(pWdmClient->mpPublisherPathList, traitListLen,
+ handle);
+ }
+ else
+ {
+ err = pWdmClient->mSinkCatalog.PrepareSubscriptionPathList(pWdmClient->mpPublisherPathList, traitListLen,
+ pathListLen);
+ }
+ SuccessOrExit(err);
}
else
{
- err = pWdmClient->mSinkCatalog.PrepareSubscriptionPathList(pWdmClient->mpPublisherPathList, traitListLen,
- pathListLen);
- }
- SuccessOrExit(err);
+ // When mEnableEventFetching is true, we will not subscribe to any traits
+ pWdmClient->mpPublisherPathList = NULL;
+ pathListLen = 0;
- aOutParam.mSubscribeRequestPrepareNeeded.mPathList = pWdmClient->mpPublisherPathList;
- aOutParam.mSubscribeRequestPrepareNeeded.mPathListSize = pathListLen;
- aOutParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents = false;
- aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList = NULL;
- aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize = 0;
+ err = pWdmClient->PrepareLastObservedEventList(eventListLen);
+ SuccessOrExit(err);
+ }
+
+ // Reset the tle state
+ pWdmClient->mEventFetchingTLE = false;
+
+ aOutParam.mSubscribeRequestPrepareNeeded.mPathList = pWdmClient->mpPublisherPathList;
+ aOutParam.mSubscribeRequestPrepareNeeded.mPathListSize = pathListLen;
+ aOutParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents = pWdmClient->mEnableEventFetching;
+ // When aEnableEventFetching is false, the eventListLen will be 0
+ aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList = pWdmClient->mLastObservedEventByImportanceForSending;
+ aOutParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize = eventListLen;
aOutParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin = 30;
aOutParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax = 120;
}
@@ -1014,7 +1210,7 @@
break;
case SubscriptionClient::kEvent_OnNotificationProcessed:
WeaveLogDetail(DataManagement, "Client->kEvent_OnNotificationProcessed");
- VerifyOrExit(kOpState_RefreshData == pWdmClient->mOpState, err = WEAVE_ERROR_INCORRECT_STATE);
+ VerifyOrExit((kOpState_RefreshData == pWdmClient->mOpState) || pWdmClient->mEventFetchingTLE, err = WEAVE_ERROR_INCORRECT_STATE);
break;
case SubscriptionClient::kEvent_OnSubscriptionTerminated:
WeaveLogDetail(DataManagement, "Client->kEvent_OnSubscriptionTerminated. Reason: %u, peer = 0x%" PRIX64 "\n",
@@ -1063,6 +1259,31 @@
pWdmClient->ClearOpState();
break;
+ case SubscriptionClient::kEvent_OnEventStreamReceived:
+ WeaveLogDetail(DataManagement, "kEvent_OnEventStreamReceived");
+
+ if (pWdmClient->mpWdmEventProcessor != nullptr)
+ {
+ // Remove ']' at end of buffer and append data then push ']' back to the buffer
+ pWdmClient->mEventStrBuffer.pop_back();
+ err = pWdmClient->mpWdmEventProcessor->ProcessEvents(*(aInParam.mEventStreamReceived.mReader),
+ *(pWdmClient->mpSubscriptionClient));
+ pWdmClient->mEventStrBuffer += ']';
+
+ SuccessOrExit(err);
+
+ if (pWdmClient->mLimitEventFetchTimeout && std::chrono::system_clock::now() >= pWdmClient->mEventFetchTimeout)
+ {
+ pWdmClient->mEventFetchingTLE = true;
+ pWdmClient->mpSubscriptionClient->AbortSubscription();
+ VerifyOrExit(kOpState_RefreshData == pWdmClient->mOpState, err = WEAVE_ERROR_INCORRECT_STATE);
+ pWdmClient->mOnComplete.General(pWdmClient->mpContext, pWdmClient->mpAppReqState);
+ pWdmClient->mpContext = NULL;
+ pWdmClient->ClearOpState();
+ }
+ }
+ break;
+
default:
SubscriptionClient::DefaultEventHandler(aEvent, aInParam, aOutParam);
break;
@@ -1128,6 +1349,12 @@
mFailedFlushPathStatus.clear();
mFailedPaths.clear();
+ mEventStrBuffer.clear();
+ mLimitEventFetchTimeout = false;
+ memset(mLastObservedEventByImportance, 0, sizeof mLastObservedEventByImportance);
+
+ mEventFetchingTLE = false;
+
exit:
return WEAVE_NO_ERROR;
}
@@ -1135,6 +1362,16 @@
void WdmClient::SetNodeId(uint64_t aNodeId)
{
mSinkCatalog.SetNodeId(aNodeId);
+
+ mpWdmEventProcessor.reset(new WdmEventProcessor(aNodeId, this));
+
+ WeaveLogError(DataManagement, "mpWdmEventProcessor set to %p", mpWdmEventProcessor.get());
+}
+
+void WdmClient::SetEventFetchingTimeout(uint32_t aTimeoutSec)
+{
+ mLimitEventFetchTimeout = (aTimeoutSec != 0);
+ mEventFetchTimeLimit = std::chrono::seconds(aTimeoutSec);
}
WEAVE_ERROR WdmClient::UpdateFailedPathResults(WdmClient * const apWdmClient, TraitDataHandle mTraitDataHandle,
@@ -1243,29 +1480,30 @@
}
WEAVE_ERROR WdmClient::RefreshData(void * apAppReqState, DMCompleteFunct onComplete, DMErrorFunct onError,
- GetDataHandleFunct getDataHandleCb)
+ GetDataHandleFunct getDataHandleCb, bool aFetchEvents)
{
VerifyOrExit(mpSubscriptionClient != NULL, WeaveLogError(DataManagement, "mpSubscriptionClient is NULL"));
mSinkCatalog.Iterate(ClearDataSinkVersion, this);
- RefreshData(apAppReqState, this, onComplete, onError, getDataHandleCb);
+ RefreshData(apAppReqState, this, onComplete, onError, getDataHandleCb, aFetchEvents);
exit:
return WEAVE_NO_ERROR;
}
WEAVE_ERROR WdmClient::RefreshData(void * apAppReqState, void * apContext, DMCompleteFunct onComplete, DMErrorFunct onError,
- GetDataHandleFunct getDataHandleCb)
+ GetDataHandleFunct getDataHandleCb, bool aFetchEvents)
{
VerifyOrExit(mOpState == kOpState_Idle, WeaveLogError(DataManagement, "RefreshData with OpState %d", mOpState));
- mpAppReqState = apAppReqState;
- mOnComplete.General = onComplete;
- mOnError = onError;
- mOpState = kOpState_RefreshData;
- mGetDataHandle = getDataHandleCb;
- mpContext = apContext;
+ mpAppReqState = apAppReqState;
+ mOnComplete.General = onComplete;
+ mOnError = onError;
+ mOpState = kOpState_RefreshData;
+ mGetDataHandle = getDataHandleCb;
+ mpContext = apContext;
+ mEnableEventFetching = aFetchEvents;
mpSubscriptionClient->InitiateSubscription();
@@ -1290,6 +1528,114 @@
return mSinkCatalog.Remove(apDataSink);
}
+WEAVE_ERROR WdmClient::GetEvents(BytesData * apBytesData)
+{
+ apBytesData->mpDataBuf = reinterpret_cast<const uint8_t *>(mEventStrBuffer.c_str());
+ apBytesData->mDataLen = mEventStrBuffer.size();
+ apBytesData->mpMsgBuf = NULL;
+
+ return WEAVE_NO_ERROR;
+}
+
+WEAVE_ERROR WdmClient::ProcessEvent(nl::Weave::TLV::TLVReader inReader, const EventProcessor::EventHeader & inEventHeader)
+{
+ WEAVE_ERROR err = WEAVE_NO_ERROR;
+
+ if (mEventStrBuffer.size() > 1)
+ {
+ // We will insert a bracket first, so the EventStrBuffer will be 1 for first event
+ mEventStrBuffer += ",";
+ }
+
+ mEventStrBuffer += "{";
+
+ mEventStrBuffer += "\"Source\":";
+ mEventStrBuffer += to_string(inEventHeader.mSource);
+ mEventStrBuffer += ",\"Importance\":";
+ mEventStrBuffer += to_string(inEventHeader.mImportance);
+ mEventStrBuffer += ",\"Id\":";
+ mEventStrBuffer += to_string(inEventHeader.mId);
+
+ mEventStrBuffer += ",\"RelatedImportance\":";
+ mEventStrBuffer += to_string(inEventHeader.mRelatedImportance);
+ mEventStrBuffer += ",\"RelatedId\":";
+ mEventStrBuffer += to_string(inEventHeader.mRelatedId);
+ mEventStrBuffer += ",\"UTCTimestamp\":";
+ mEventStrBuffer += to_string(inEventHeader.mUTCTimestamp);
+ mEventStrBuffer += ",\"SystemTimestamp\":";
+ mEventStrBuffer += to_string(inEventHeader.mSystemTimestamp);
+ mEventStrBuffer += ",\"ResourceId\":";
+ mEventStrBuffer += to_string(inEventHeader.mResourceId);
+ mEventStrBuffer += ",\"TraitProfileId\":";
+ mEventStrBuffer += to_string(inEventHeader.mTraitProfileId);
+ mEventStrBuffer += ",\"TraitInstanceId\":";
+ mEventStrBuffer += to_string(inEventHeader.mTraitInstanceId);
+ mEventStrBuffer += ",\"Type\":";
+ mEventStrBuffer += to_string(inEventHeader.mType);
+
+ mEventStrBuffer += ",\"DeltaUTCTime\":";
+ mEventStrBuffer += to_string(inEventHeader.mDeltaUTCTime);
+ mEventStrBuffer += ",\"DeltaSystemTime\":";
+ mEventStrBuffer += to_string(inEventHeader.mDeltaSystemTime);
+
+ mEventStrBuffer += ",\"PresenceMask\":";
+ mEventStrBuffer += to_string(inEventHeader.mPresenceMask);
+ mEventStrBuffer += ",\"DataSchemaVersionRange\": {\"MinVersion\":";
+ mEventStrBuffer += to_string(inEventHeader.mDataSchemaVersionRange.mMinVersion);
+ mEventStrBuffer += ",\"MaxVersion\":";
+ mEventStrBuffer += to_string(inEventHeader.mDataSchemaVersionRange.mMaxVersion);
+ mEventStrBuffer += "}";
+
+ mEventStrBuffer += ",\"Data\":";
+ err = FormatEventData(inReader, mEventStrBuffer);
+
+ mEventStrBuffer += "}";
+
+ mLastObservedEventByImportance[static_cast<int>(inEventHeader.mImportance) - static_cast<int>(kImportanceType_First)]
+ .mSourceId = inEventHeader.mSource;
+ mLastObservedEventByImportance[static_cast<int>(inEventHeader.mImportance) - static_cast<int>(kImportanceType_First)]
+ .mImportance = inEventHeader.mImportance;
+ mLastObservedEventByImportance[static_cast<int>(inEventHeader.mImportance) - static_cast<int>(kImportanceType_First)].mEventId =
+ inEventHeader.mId;
+
+ return err;
+}
+
+WEAVE_ERROR WdmClient::PrepareLastObservedEventList(uint32_t & aEventListLen)
+{
+ for (int i = 0; i < kImportanceType_Last - kImportanceType_First + 1; i++)
+ {
+ if (mLastObservedEventByImportance[i].mEventId)
+ {
+ mLastObservedEventByImportanceForSending[aEventListLen++] =
+ mLastObservedEventByImportance[i];
+ }
+ }
+
+ mEventFetchTimeout = std::chrono::system_clock::now() + mEventFetchTimeLimit;
+ mEventStrBuffer = "[]";
+
+ return WEAVE_NO_ERROR;
+}
+
+WdmEventProcessor::WdmEventProcessor(uint64_t aNodeId, WdmClient * aWdmClient) : EventProcessor(aNodeId), mWdmClient(aWdmClient)
+{
+ // nothing to do
+}
+
+WEAVE_ERROR WdmEventProcessor::ProcessEvent(nl::Weave::TLV::TLVReader inReader,
+ nl::Weave::Profiles::DataManagement::SubscriptionClient & inClient,
+ const EventHeader & inEventHeader)
+{
+ return mWdmClient->ProcessEvent(inReader, inEventHeader);
+}
+
+WEAVE_ERROR WdmEventProcessor::GapDetected(const EventHeader & inEventHeader)
+{
+ // Do nothing
+ return WEAVE_NO_ERROR;
+};
+
} // namespace DeviceManager
} // namespace Weave
} // namespace nl
diff --git a/src/device-manager/WeaveDataManagementClient.h b/src/device-manager/WeaveDataManagementClient.h
index 04b31df..8b96878 100644
--- a/src/device-manager/WeaveDataManagementClient.h
+++ b/src/device-manager/WeaveDataManagementClient.h
@@ -36,10 +36,14 @@
#include <SystemLayer/SystemPacketBuffer.h>
#include <Weave/Profiles/data-management/SubscriptionClient.h>
#include <Weave/Profiles/data-management/Current/GenericTraitCatalogImpl.h>
+#include <Weave/Profiles/data-management/Current/EventProcessor.h>
#include <map>
+#include <memory>
#include <vector>
#include "WeaveDeviceManager.h"
+#include <chrono>
+
namespace nl {
namespace Weave {
namespace DeviceManager {
@@ -70,6 +74,7 @@
class GenericTraitUpdatableDataSink;
class WdmClient;
+class WdmEventProcessor;
class WdmClientFlushUpdateStatus
{
@@ -167,6 +172,7 @@
class NL_DLL_EXPORT WdmClient
{
friend class GenericTraitUpdatableDataSink;
+ friend class WdmEventProcessor;
public:
enum
@@ -189,7 +195,10 @@
WEAVE_ERROR FlushUpdate(void * apAppReqState, DMFlushUpdateCompleteFunct onComplete, DMErrorFunct onError);
WEAVE_ERROR RefreshData(void * apAppReqState, DMCompleteFunct onComplete, DMErrorFunct onError,
- GetDataHandleFunct getDataHandleCb);
+ GetDataHandleFunct getDataHandleCb, bool aFetchEvents = false);
+
+ WEAVE_ERROR GetEvents(BytesData * aBytes);
+ void SetEventFetchingTimeout(uint32_t aTimeoutSec);
void * mpAppState;
@@ -216,9 +225,8 @@
static void ClientEventCallback(void * const aAppState, SubscriptionClient::EventID aEvent,
const SubscriptionClient::InEventParam & aInParam,
SubscriptionClient::OutEventParam & aOutParam);
-
WEAVE_ERROR RefreshData(void * apAppReqState, void * apContext, DMCompleteFunct onComplete, DMErrorFunct onError,
- GetDataHandleFunct getDataHandleCb);
+ GetDataHandleFunct getDataHandleCb, bool aWithEvents);
WEAVE_ERROR GetDataSink(const ResourceIdentifier & aResourceId, uint32_t aProfileId, uint64_t aInstanceId,
GenericTraitUpdatableDataSink *& apGenericTraitUpdatableDataSink);
WEAVE_ERROR SubscribePublisherTrait(const ResourceIdentifier & aResourceId, const uint64_t & aInstanceId,
@@ -230,6 +238,10 @@
PropertyPathHandle mPropertyPathHandle, uint32_t aReason, uint32_t aStatusProfileId,
uint16_t aStatusCode);
+ WEAVE_ERROR ProcessEvent(nl::Weave::TLV::TLVReader inReader, const EventProcessor::EventHeader & inEventHeader);
+
+ WEAVE_ERROR PrepareLastObservedEventList(uint32_t & aEventListLen);
+
GenericTraitSinkCatalog mSinkCatalog;
TraitPath * mpPublisherPathList;
@@ -240,7 +252,39 @@
OpState mOpState;
std::vector<std::string> mFailedPaths;
std::vector<WdmClientFlushUpdateStatus> mFailedFlushPathStatus;
+
+ std::unique_ptr<WdmEventProcessor> mpWdmEventProcessor;
+ std::string mEventStrBuffer;
+
+ // Three flags for event fetching
+ // If EventFetching is enabled
+ // If we limit the event fetching time
+ // If event fetching time limit exceeded (TLE)
+ bool mEnableEventFetching, mLimitEventFetchTimeout, mEventFetchingTLE;
+
+ std::chrono::time_point<std::chrono::system_clock> mEventFetchTimeout;
+ std::chrono::seconds mEventFetchTimeLimit;
+ SubscriptionClient::LastObservedEvent mLastObservedEventByImportance[kImportanceType_Last - kImportanceType_First + 1];
+ SubscriptionClient::LastObservedEvent
+ mLastObservedEventByImportanceForSending[kImportanceType_Last - kImportanceType_First + 1];
};
+
+class WdmEventProcessor : public EventProcessor
+{
+public:
+ WdmEventProcessor(uint64_t aNodeId, WdmClient * aWdmClient);
+ virtual ~WdmEventProcessor() = default;
+
+protected:
+ WEAVE_ERROR ProcessEvent(nl::Weave::TLV::TLVReader inReader, nl::Weave::Profiles::DataManagement::SubscriptionClient & inClient,
+ const EventHeader & inEventHeader) override;
+
+ WEAVE_ERROR GapDetected(const EventHeader & inEventHeader) override;
+
+private:
+ WdmClient * mWdmClient;
+};
+
} // namespace DeviceManager
} // namespace Weave
} // namespace nl
diff --git a/src/device-manager/cocoa/NLWdmClient.h b/src/device-manager/cocoa/NLWdmClient.h
index b01b748..8d59868 100644
--- a/src/device-manager/cocoa/NLWdmClient.h
+++ b/src/device-manager/cocoa/NLWdmClient.h
@@ -59,6 +59,7 @@
*/
- (void)setNodeId:(uint64_t)nodeId;
+
/**
* Create the new data newDataSink
*
@@ -89,4 +90,38 @@
*/
- (void)refreshData:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler;
+/**
+ * Begins a sync of all events. The result of this operation can be observed through the CompletionHandler and
+ * failureHandler, the procedure will be terminated after reaching timeout.
+ */
+- (void)fetchEvents:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler
+ timeoutSec:(uint32_t)timeoutSec;
+
+/**
+ * getEvents will return a list of event data in json array representation.
+ * If no events have been fetched, an empty array ("[]") will be returned.
+ * The internal buffer will be cleared when beginFetchEvents() is called.
+ * Each element in this array should be an object
+ *
+ * Field | Type | Description
+ * -----------------------+-------------+--------------
+ * Source | uint64 | Event Header
+ * Importance | int (enum)
+ * Id | uint64
+ * RelatedImportance | uint64
+ * RelatedId | uint64
+ * UTCTimestamp | uint64
+ * ResourceId | uint64
+ * TraitProfileId | uint64
+ * TraitInstanceId | uint64
+ * Type | uint64
+ * DeltaUTCTime | int
+ * DeltaSystemTime | int
+ * PresenceMask | uint64
+ * DataSchemaVersionRange | Object{MinVersion: uint64, MaxVersion: uint64}
+ * Data | Object | Event Trait Data
+ */
+- (void)getEvents:(NSString **)events;
+
+
@end
diff --git a/src/device-manager/cocoa/NLWdmClient.mm b/src/device-manager/cocoa/NLWdmClient.mm
index 58f9a63..1a1e613 100644
--- a/src/device-manager/cocoa/NLWdmClient.mm
+++ b/src/device-manager/cocoa/NLWdmClient.mm
@@ -547,5 +547,61 @@
});
}
+- (void)fetchEvents:(WdmClientCompletionBlock)completionHandler failure:(WdmClientFailureBlock)failureHandler timeoutSec:(uint32_t)timeoutSec
+{
+ WDM_LOG_METHOD_SIG();
+
+ NSString * taskName = @"FetchEvents";
+
+ dispatch_sync(_mWeaveWorkQueue, ^() {
+ _mWeaveCppWdmClient->SetEventFetchingTimeout(timeoutSec);
+ });
+
+ // we use async for the results are sent back to caller via async means also
+ dispatch_async(_mWeaveWorkQueue, ^() {
+ if (nil == _mRequestName) {
+ _mRequestName = taskName;
+ _mCompletionHandler = [completionHandler copy];
+ _mFailureHandler = [failureHandler copy];
+
+ WEAVE_ERROR err = _mWeaveCppWdmClient->RefreshData((__bridge void *) self, onWdmClientComplete, onWdmClientError, NULL, true);
+
+ if (WEAVE_NO_ERROR != err) {
+ [self dispatchAsyncDefaultFailureBlockWithCode:err];
+ }
+ } else {
+ WDM_LOG_ERROR(@"%@: Attemp to %@ while we're still executing %@, ignore", _name, taskName, _mRequestName);
+
+ // do not change _mRequestName, as we're rejecting this request
+ [self dispatchAsyncFailureBlock:WEAVE_ERROR_INCORRECT_STATE taskName:taskName handler:failureHandler];
+ }
+ });
+}
+
+- (WEAVE_ERROR)getEvents:(NSString **)events
+{
+ __block WEAVE_ERROR err = WEAVE_NO_ERROR;
+ __block nl::Weave::DeviceManager::BytesData bytesData;
+
+ WDM_LOG_METHOD_SIG();
+
+ VerifyOrExit(NULL != _mWeaveCppWdmClient, err = WEAVE_ERROR_INCORRECT_STATE);
+
+ // need this bracket to use Verify macros
+ {
+ // we use sync so the bytesData is immediately available to the caller upon return
+ dispatch_sync(_mWeaveWorkQueue, ^() {
+ err = _mWeaveCppWdmClient->GetEvents(&bytesData);
+ });
+ }
+
+exit:
+ if (WEAVE_NO_ERROR == err)
+ {
+ *events = [[NSString alloc] initWithBytes:bytesData.mpDataBuf length:bytesData.mDataLen encoding:NSUTF8StringEncoding];
+ }
+ return err;
+}
+
@end
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_CLIENT_EXPERIMENTAL
diff --git a/src/device-manager/java/WeaveDeviceManager-JNI.cpp b/src/device-manager/java/WeaveDeviceManager-JNI.cpp
index 33773f1..3491723 100644
--- a/src/device-manager/java/WeaveDeviceManager-JNI.cpp
+++ b/src/device-manager/java/WeaveDeviceManager-JNI.cpp
@@ -184,6 +184,8 @@
NL_DLL_EXPORT jlong Java_nl_Weave_DataManagement_WdmClientImpl_newDataSink(JNIEnv *env, jobject self, jlong wdmClientPtr, jobject resourceIdentifierObj, jlong profileId, jlong instanceId, jstring path);
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginFlushUpdate(JNIEnv *env, jobject self, jlong wdmClientPtr);
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginRefreshData(JNIEnv *env, jobject self, jlong wdmClientPtr);
+ NL_DLL_EXPORT void Java_nl_Weave_DataManagement_WdmClientImpl_beginFetchEvents(JNIEnv *env, jobject self, jlong wdmClientPtr, jint timeoutSec);
+ NL_DLL_EXPORT jstring Java_nl_Weave_DataManagement_WdmClientImpl_getEvents(JNIEnv *env, jobject self, jlong wdmClientPtr);
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_init(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_shutdown(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
NL_DLL_EXPORT void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_clear(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr);
@@ -3995,6 +3997,47 @@
}
}
+void Java_nl_Weave_DataManagement_WdmClientImpl_beginFetchEvents(JNIEnv *env, jobject self, jlong wdmClientPtr, jint timeoutSec)
+{
+ WEAVE_ERROR err = WEAVE_NO_ERROR;
+ WdmClient *wdmClient = (WdmClient *)wdmClientPtr;
+
+ WeaveLogProgress(DeviceManager, "beginFetchEvents() called");
+
+ pthread_mutex_lock(&sStackLock);
+ wdmClient->SetEventFetchingTimeout((uint32_t)timeoutSec);
+ err = wdmClient->RefreshData((void *)"FetchEvents", HandleWdmClientComplete, HandleWdmClientError, NULL, true);
+ pthread_mutex_unlock(&sStackLock);
+
+ if (err != WEAVE_NO_ERROR && err != WDM_JNI_ERROR_EXCEPTION_THROWN)
+ {
+ ThrowError(env, err);
+ }
+}
+
+
+jstring Java_nl_Weave_DataManagement_WdmClientImpl_getEvents(JNIEnv *env, jobject self, jlong wdmClientPtr)
+{
+ WEAVE_ERROR err = WEAVE_NO_ERROR;
+ WdmClient *wdmClient = (WdmClient *)wdmClientPtr;
+ BytesData bytesData;
+
+ WeaveLogProgress(DeviceManager, "getEvents() called");
+
+ err = wdmClient->GetEvents(&bytesData);
+ SuccessOrExit(err);
+
+exit:
+
+ if (err != WEAVE_NO_ERROR && err != WDM_JNI_ERROR_EXCEPTION_THROWN)
+ {
+ ThrowError(env, err);
+ }
+
+ // bytesData.mpDataBuf should be a pointer get by mEventBuffer.c_str()
+ return env->NewStringUTF(reinterpret_cast<const char *>(bytesData.mpDataBuf));
+}
+
void Java_nl_Weave_DataManagement_GenericTraitUpdatableDataSinkImpl_init(JNIEnv *env, jobject self, jlong genericTraitUpdatableDataSinkPtr)
{
GenericTraitUpdatableDataSink *pDataSink = NULL;
diff --git a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java
index ee0978e..b6332b0 100644
--- a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java
+++ b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClient.java
@@ -67,15 +67,51 @@
*/
public void beginRefreshData();
+ /**
+ * The result of this operation can be observed through the {@link CompletionHandler}
+ * that has been assigned via {@link #setCompletionHandler}.
+ * The fetched events can be accessed via getEvents()
+ *
+ * @param timeoutSec operation timeoutSec, this operation might take a long time if the number of events is too large.
+ */
+ public void beginFetchEvents(int timeoutSec);
+
public CompletionHandler getCompletionHandler();
public void setCompletionHandler(CompletionHandler compHandler);
public GenericTraitUpdatableDataSink getDataSink(long traitInstancePtr);
+ /**
+ * getEvents will return a list of event data in json array representation.
+ * If no events have been fetched, an empty array ("[]") will be returned.
+ * The internal buffer will be cleared when beginFetchEvents() is called.
+ * Each element in this array should be an object
+ *
+ * Field | Type | Description
+ * -----------------------+-------------+--------------
+ * Source | uint64 | Event Header
+ * Importance | int (enum)
+ * Id | uint64
+ * RelatedImportance | uint64
+ * RelatedId | uint64
+ * UTCTimestamp | uint64
+ * ResourceId | uint64
+ * TraitProfileId | uint64
+ * TraitInstanceId | uint64
+ * Type | uint64
+ * DeltaUTCTime | int
+ * DeltaSystemTime | int
+ * PresenceMask | uint64
+ * DataSchemaVersionRange | Object{MinVersion: uint64, MaxVersion: uint64}
+ * Data | Object | Event Trait Data
+ */
+ public String getEvents();
+
public interface CompletionHandler
{
void onFlushUpdateComplete(Throwable[] exceptions, WdmClient wdmClient);
void onRefreshDataComplete();
+ void onFetchEventsComplete();
void onError(Throwable err);
}
};
diff --git a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java
index 5418bb8..3dd221c 100644
--- a/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java
+++ b/src/device-manager/java/src/nl/Weave/DataManagement/WdmClientImpl.java
@@ -124,6 +124,13 @@
}
@Override
+ public void beginFetchEvents(int timeoutSec)
+ {
+ ensureNotClosed();
+ beginFetchEvents(mWdmClientPtr, timeoutSec);
+ }
+
+ @Override
public CompletionHandler getCompletionHandler()
{
return mCompHandler;
@@ -135,6 +142,13 @@
mCompHandler = compHandler;
}
+ @Override
+ public String getEvents()
+ {
+ ensureNotClosed();
+ return getEvents(mWdmClientPtr);
+ }
+
public GenericTraitUpdatableDataSink getDataSink(long traitInstancePtr)
{
GenericTraitUpdatableDataSink trait = null;
@@ -179,6 +193,11 @@
requireCompletionHandler().onRefreshDataComplete();
}
+ private void onFetchEventsComplete()
+ {
+ requireCompletionHandler().onFetchEventsComplete();
+ }
+
private void ensureNotClosed() {
if (mWdmClientPtr == 0) {
throw new IllegalStateException("This WdmClient has already been closed.");
@@ -205,4 +224,6 @@
private native long newDataSink(long wdmClientPtr, ResourceIdentifier resourceIdentifier, long profileId, long instanceId, String path);
private native void beginFlushUpdate(long wdmClientPtr);
private native void beginRefreshData(long wdmClientPtr);
+ private native void beginFetchEvents(long wdmClientPtr, int timeoutSec);
+ private native String getEvents(long wdmClientPtr);
};
diff --git a/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java b/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java
index 76ad3aa..f2f77c4 100644
--- a/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java
+++ b/src/device-manager/java/src/nl/Weave/DeviceManager/TestMain.java
@@ -70,6 +70,7 @@
mainObj.RunUnitTests();
}
catch (TestFailedException ex) {
+ ex.printStackTrace();
System.exit(-1);
}
}
@@ -661,6 +662,95 @@
System.out.println("testWdmClientDataSinkSetBigIntegerFlushRefreshGetBigInteger Succeeded");
}
+ void testWdmClientGetEvents(WeaveDeviceManager deviceMgr)
+ {
+ WdmClientFactory wdmClientFactory = new WdmClientFactory();
+ WdmClient mockWdmClient = wdmClientFactory.create(deviceMgr);
+
+ mockWdmClient.setCompletionHandler(this);
+
+ mockWdmClient.setNodeId(new BigInteger("-2"));
+
+ TestResult = null;
+ mockWdmClient.beginFetchEvents(1);
+ ExpectSuccess("FetchEvents"); // Need to wait the data, or the events are not prepared
+ String event = mockWdmClient.getEvents();
+
+ // Just check if it can return data, the content is verified in happy tests
+ // "[]".length() = 2
+ if (event.length() <= 2)
+ throw new TestFailedException("testWdmClientGetEvents");
+
+ // Sleep 5 second to wait for more events generated by mock-device
+ try { Thread.sleep(5000); }
+ catch (Exception ex) { }
+
+ TestResult = null;
+ mockWdmClient.beginFetchEvents(1);
+ ExpectSuccess("FetchEvents"); // Need to wait the data, or the events are not prepared
+ event = mockWdmClient.getEvents();
+
+ // Just check if it can return data, the content is verified in happy tests
+ // "[]".length() = 2
+ if (event.length() <= 2)
+ throw new TestFailedException("testWdmClientGetEvents");
+
+ // Sleep 5 second to wait for more events generated by mock-device
+ try { Thread.sleep(5000); }
+ catch (Exception ex) { }
+
+ GenericTraitUpdatableDataSink localSettingsTrait;
+
+ ResourceIdentifier resourceIdentifier = new ResourceIdentifier();
+ localSettingsTrait = mockWdmClient.newDataSink(resourceIdentifier, 20, 0, "/");
+ localSettingsTrait.setCompletionHandler(this);
+
+ TestResult = null;
+ mockWdmClient.beginRefreshData();
+ ExpectSuccess("RefreshData"); // Need to wait the data, or the events are not prepared
+ event = mockWdmClient.getEvents();
+
+ // RefreshData should not clear the internal event buffer
+ if (event.length() == 2)
+ throw new TestFailedException("testWdmClientGetEvents");
+
+ System.out.println("testWdmClientGetEvents Succeeded");
+ }
+
+ void testWdmClientGetLargeEvents(WeaveDeviceManager deviceMgr)
+ {
+ WdmClientFactory wdmClientFactory = new WdmClientFactory();
+ WdmClient mockWdmClient = wdmClientFactory.create(deviceMgr);
+ mockWdmClient.setCompletionHandler(this);
+
+ // Sleep 12 second for enough events, connection timeout is 15s
+ try { Thread.sleep(12000); }
+ catch (Exception ex) { }
+
+ GenericTraitUpdatableDataSink localSettingsTrait;
+
+ ResourceIdentifier resourceIdentifier = new ResourceIdentifier();
+ localSettingsTrait = mockWdmClient.newDataSink(resourceIdentifier, 20, 0, "/");
+ localSettingsTrait.setCompletionHandler(this);
+
+ mockWdmClient.setNodeId(new BigInteger("-2"));
+
+ TestResult = null;
+ mockWdmClient.beginFetchEvents(1);
+ long fetchStart = System.currentTimeMillis();
+ ExpectSuccess("FetchEvents"); // Need to wait the data, or the events are not prepared
+ // The timeout is not intended to be accurate, check if the event finished in a reasonable time
+ if (System.currentTimeMillis() > fetchStart + 2000)
+ throw new TestFailedException("testWdmClientGetLargeEvents");
+ String event = mockWdmClient.getEvents();
+
+ // Just check if it can return data, the content is verified in happy tests
+ if (event.length() == 2)
+ throw new TestFailedException("testWdmClientGetLargeEvents");
+
+ System.out.println("testWdmClientGetLargeEvents Succeeded");
+ }
+
void testWdmClientDataSinkSetRefreshFlushGetData(WeaveDeviceManager deviceMgr)
{
WdmClientFactory wdmClientFactory = new WdmClientFactory();
@@ -869,6 +959,8 @@
testWdmClientDataSinkSetFlushEmptyStringRefreshGetData(deviceMgr);
testWdmClientDataSinkSetFlushRefreshGetBigInteger(deviceMgr);
testWdmClientDataSinkSetBigIntegerFlushRefreshGetBigInteger(deviceMgr);
+ testWdmClientGetEvents(deviceMgr);
+ testWdmClientGetLargeEvents(deviceMgr);
testWdmClientDataSinkSetRefreshFlushGetData(deviceMgr);
testWdmClientDataSinkResourceIdentifierMakeResTypeIDInt(deviceMgr);
testWdmClientDataSinkResourceIdentifierMakeResTypeIdBytes(deviceMgr);
@@ -1395,6 +1487,12 @@
TestResult = "Success";
}
+ public void onFetchEventsComplete()
+ {
+ System.out.println(" Fetch Events complete");
+ TestResult = "Success";
+ }
+
public void print(WeaveDeviceDescriptor deviceDesc, String prefix)
{
BigInteger twoToThe64 = BigInteger.ONE.shiftLeft(64);
diff --git a/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp b/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp
index 4345847..7119252 100644
--- a/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp
+++ b/src/device-manager/python/WeaveDeviceManager-ScriptBinding.cpp
@@ -269,6 +269,8 @@
NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_NewDataSink(WdmClient *wdmClient, const ResourceIdentifier *resourceIdentifier, uint32_t aProfileId, uint64_t aInstanceId, const char * apPath, GenericTraitUpdatableDataSink ** outGenericTraitUpdatableDataSink);
NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_FlushUpdate(WdmClient *wdmClient, DMFlushUpdateCompleteFunct onComplete, DMErrorFunct onError);
NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_RefreshData(WdmClient *wdmClient, DMCompleteFunct onComplete, DMErrorFunct onError);
+ NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_FetchEvents(WdmClient *wdmClient, DMCompleteFunct onComplete, DMErrorFunct onError, uint32_t aTimeoutSec);
+ NL_DLL_EXPORT WEAVE_ERROR nl_Weave_WdmClient_GetEvents(WdmClient *wdmClient, ConstructBytesArrayFunct aCallback);
NL_DLL_EXPORT WEAVE_ERROR nl_Weave_GenericTraitUpdatableDataSink_Clear(GenericTraitUpdatableDataSink * apGenericTraitUpdatableDataSink);
NL_DLL_EXPORT WEAVE_ERROR nl_Weave_GenericTraitUpdatableDataSink_RefreshData(GenericTraitUpdatableDataSink * apGenericTraitUpdatableDataSink, DMCompleteFunct onComplete, DMErrorFunct onError);
@@ -1445,6 +1447,27 @@
return err;
}
+WEAVE_ERROR nl_Weave_WdmClient_FetchEvents(WdmClient *wdmClient, DMCompleteFunct onComplete, DMErrorFunct onError, uint32_t aTimeoutSec)
+{
+ WEAVE_ERROR err = WEAVE_NO_ERROR;
+ wdmClient->SetEventFetchingTimeout(aTimeoutSec);
+ err = wdmClient->RefreshData(NULL, onComplete, onError, NULL, true);
+ return err;
+}
+
+WEAVE_ERROR nl_Weave_WdmClient_GetEvents(WdmClient *wdmClient, ConstructBytesArrayFunct aCallback)
+{
+ WEAVE_ERROR err = WEAVE_NO_ERROR;
+ BytesData bytesData;
+ err = wdmClient->GetEvents(&bytesData);
+ SuccessOrExit(err);
+ aCallback(bytesData.mpDataBuf, bytesData.mDataLen);
+ bytesData.Clear();
+
+exit:
+ return err;
+}
+
WEAVE_ERROR nl_Weave_GenericTraitUpdatableDataSink_Clear(GenericTraitUpdatableDataSink * apGenericTraitUpdatableDataSink)
{
if (apGenericTraitUpdatableDataSink != NULL)
diff --git a/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py b/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py
index 93b2af8..e3a4fa6 100644
--- a/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py
+++ b/src/device-manager/python/openweave/GenericTraitUpdatableDataSink.py
@@ -90,7 +90,9 @@
def refreshData(self):
self._ensureNotClosed()
- self._weaveStack.CallAsync(lambda: self._generictraitupdatabledatasinkLib.nl_Weave_GenericTraitUpdatableDataSink_RefreshData(self._traitInstance, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError))
+ return self._weaveStack.CallAsync(
+ lambda: self._generictraitupdatabledatasinkLib.nl_Weave_GenericTraitUpdatableDataSink_RefreshData(self._traitInstance, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError)
+ )
def setData(self, path, val, isConditional=False):
self._ensureNotClosed()
diff --git a/src/device-manager/python/openweave/WdmClient.py b/src/device-manager/python/openweave/WdmClient.py
index 31be50b..58f27d9 100644
--- a/src/device-manager/python/openweave/WdmClient.py
+++ b/src/device-manager/python/openweave/WdmClient.py
@@ -29,7 +29,7 @@
from .WeaveUtility import WeaveUtility
__all__ = [ 'WdmClient', 'WdmFlushUpdateStatusStruct', 'WdmClientFlushUpdateError', 'WdmClientFlushUpdateDeviceError' ]
-
+_ConstructBytesArrayFunct = CFUNCTYPE(None, c_void_p, c_uint32)
WEAVE_ERROR_STATUS_REPORT = 4044
class WdmFlushUpdateStatusStruct(Structure):
@@ -103,6 +103,7 @@
self._weaveStack.Call(
lambda: self._datamanagmentLib.nl_Weave_WdmClient_SetNodeId(self._wdmClientPtr, nodeId)
)
+
def newDataSink(self, resourceIdentifier, profileId, instanceId, path):
self._ensureNotClosed()
traitInstance = c_void_p(None)
@@ -162,6 +163,33 @@
lambda: self._datamanagmentLib.nl_Weave_WdmClient_RefreshData(self._wdmClientPtr, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError)
)
+ def fetchEvents(self, timeoutSec):
+ self._ensureNotClosed()
+
+ return self._weaveStack.CallAsync(
+ lambda: self._datamanagmentLib.nl_Weave_WdmClient_FetchEvents(self._wdmClientPtr, self._weaveStack.cbHandleComplete, self._weaveStack.cbHandleError, timeoutSec)
+ )
+
+ def getEvents(self):
+ self._ensureNotClosed()
+ res = self._getEvents()
+
+ if isinstance(res, int):
+ raise self._weaveStack.ErrorToException(res)
+
+ return res
+
+ def _getEvents(self):
+ self._ensureNotClosed()
+
+ def HandleConstructBytesArray(dataBuf, dataLen):
+ self._weaveStack.callbackRes = WeaveUtility.VoidPtrToByteArray(dataBuf, dataLen)
+
+ cbHandleConstructBytesArray = _ConstructBytesArrayFunct(HandleConstructBytesArray)
+ return self._weaveStack.Call(
+ lambda: self._datamanagmentLib.nl_Weave_WdmClient_GetEvents(self._wdmClientPtr, cbHandleConstructBytesArray)
+ )
+
def _ensureNotClosed(self):
if (self._wdmClientPtr == None):
raise ValueError("wdmClient is not ready")
@@ -195,6 +223,12 @@
self._datamanagmentLib.nl_Weave_WdmClient_RefreshData.argtypes = [ c_void_p, _CompleteFunct, _ErrorFunct ]
self._datamanagmentLib.nl_Weave_WdmClient_RefreshData.restype = c_uint32
+ self._datamanagmentLib.nl_Weave_WdmClient_FetchEvents.argtypes = [ c_void_p, _CompleteFunct, _ErrorFunct, c_uint32 ]
+ self._datamanagmentLib.nl_Weave_WdmClient_FetchEvents.restype = c_uint32
+
+ self._datamanagmentLib.nl_Weave_WdmClient_GetEvents.argtypes = [ c_void_p, _ConstructBytesArrayFunct ]
+ self._datamanagmentLib.nl_Weave_WdmClient_GetEvents.restype = c_uint32
+
res = self._datamanagmentLib.nl_Weave_WdmClient_Init()
if (res != 0):
raise self._weaveStack.ErrorToException(res)
diff --git a/src/device-manager/python/openweave/WeaveStack.py b/src/device-manager/python/openweave/WeaveStack.py
index 03498c2..b51de84 100644
--- a/src/device-manager/python/openweave/WeaveStack.py
+++ b/src/device-manager/python/openweave/WeaveStack.py
@@ -41,7 +41,7 @@
from ctypes import *
from .WeaveUtility import WeaveUtility
-__all__ = [ 'DeviceStatusStruct', 'WeaveStackException', 'DeviceError', 'WeaveStackError', 'WeaveStack']
+__all__ = [ 'DeviceStatusStruct', 'WeaveStackException', 'DeviceError', 'WeaveStackError', 'WeaveStack', 'WeaveLogFormatter']
WeaveStackDLLBaseName = '_WeaveDeviceMgr.so'
diff --git a/src/lib/profiles/data-management/Current/MessageDef.cpp b/src/lib/profiles/data-management/Current/MessageDef.cpp
index e07bddf..f4f9085 100644
--- a/src/lib/profiles/data-management/Current/MessageDef.cpp
+++ b/src/lib/profiles/data-management/Current/MessageDef.cpp
@@ -1706,6 +1706,7 @@
#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
// Roughly verify the schema is right, including
// 1) at least one element is there
+// (not checked since we want to subscribe to events only some times)
// 2) all elements are anonymous and of Path type
// 3) every path is also valid in schema
WEAVE_ERROR PathList::Parser::CheckSchemaValidity(void) const
@@ -1744,11 +1745,7 @@
// if we have exhausted this container
if (WEAVE_END_OF_TLV == err)
{
- // if we have at least one Path element
- if (NumPath > 0)
- {
- err = WEAVE_NO_ERROR;
- }
+ err = WEAVE_NO_ERROR;
}
exit:
diff --git a/src/lib/profiles/data-management/Current/SubscriptionClient.cpp b/src/lib/profiles/data-management/Current/SubscriptionClient.cpp
index 8163a4b..3603532 100644
--- a/src/lib/profiles/data-management/Current/SubscriptionClient.cpp
+++ b/src/lib/profiles/data-management/Current/SubscriptionClient.cpp
@@ -149,6 +149,8 @@
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
MoveToState(kState_Initialized);
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
_AddRef();
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
@@ -427,6 +429,8 @@
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
_AddRef();
if (mBinding->IsReady())
@@ -778,6 +782,9 @@
{
--mRefCount;
}
+
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
}
Binding * SubscriptionClient::GetBinding() const
@@ -796,6 +803,8 @@
InEventParam inParam;
OutEventParam outParam;
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
// Make sure we're not freed by accident.
_AddRef();
@@ -972,7 +981,10 @@
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
mUpdateClient.Shutdown();
- mDataSinkCatalog->Iterate(CleanupUpdatableSinkTrait, this);
+ if (NULL != mDataSinkCatalog)
+ {
+ mDataSinkCatalog->Iterate(CleanupUpdatableSinkTrait, this);
+ }
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
}
@@ -1088,6 +1100,8 @@
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
_AddRef();
// this check serves to see whether we already have a timer set
@@ -1164,6 +1178,9 @@
{
SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aAppState);
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(pClient),
+ pClient->GetStateStr(), __func__, pClient->mRefCount);
+
pClient->_AddRef();
switch (aEvent)
@@ -1344,6 +1361,9 @@
ExitNow();
}
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
+
// Make sure we're not freed by accident
_AddRef();
@@ -1451,6 +1471,8 @@
AlwaysAcceptDataElementAccessControlDelegate acDelegate;
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
_AddRef();
LockUpdateMutex();
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
@@ -2476,6 +2498,8 @@
bool isPathPrivate;
bool willRetryPath;
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
// This method invokes callbacks into the upper layer.
_AddRef();
@@ -2770,6 +2794,8 @@
TraitPath traitPath;
WEAVE_ERROR err = WEAVE_NO_ERROR;
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
_AddRef();
LockUpdateMutex();
@@ -3331,6 +3357,8 @@
err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->ScheduleWork(OnUpdateScheduleWorkCallback, this);
SuccessOrExit(err);
+ WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
+ GetStateStr(), __func__, mRefCount);
_AddRef();
mUpdateFlushScheduled = true;
diff --git a/src/test-apps/MockLoggingManager.cpp b/src/test-apps/MockLoggingManager.cpp
index c7be8b7..323f562 100644
--- a/src/test-apps/MockLoggingManager.cpp
+++ b/src/test-apps/MockLoggingManager.cpp
@@ -59,7 +59,7 @@
public:
MockEventGeneratorImpl(void);
WEAVE_ERROR Init(nl::Weave::WeaveExchangeManager * aExchangeMgr, EventGenerator * aEventGenerator, int aDelayBetweenEvents,
- bool aWraparound);
+ bool aWraparound, size_t aBatch=1);
void SetEventGeneratorStop();
bool IsEventGeneratorStopped();
@@ -67,6 +67,7 @@
static void HandleNextEvent(nl::Weave::System::Layer * aSystemLayer, void * aAppState, ::nl::Weave::System::Error aErr);
nl::Weave::WeaveExchangeManager * mExchangeMgr;
int mTimeBetweenEvents; ///< delay, in miliseconds, between events.
+ size_t mBatch;
bool mEventWraparound; ///< does the event generator run indefinitely, or does it stop after iterating through its states
EventGenerator * mEventGenerator; ///< the event generator to use
int32_t mEventsLeft;
@@ -93,10 +94,10 @@
} // namespace Weave
} // namespace nl
-uint64_t gDebugEventBuffer[192];
-uint64_t gInfoEventBuffer[64];
-uint64_t gProdEventBuffer[256];
-uint64_t gCritEventBuffer[256];
+uint64_t gDebugEventBuffer[192000];
+uint64_t gInfoEventBuffer[64000];
+uint64_t gProdEventBuffer[256000];
+uint64_t gCritEventBuffer[256000];
bool gMockEventStop = false;
bool gEventIsStopped = false;
@@ -152,11 +153,11 @@
}
MockEventGeneratorImpl::MockEventGeneratorImpl(void) :
- mExchangeMgr(NULL), mTimeBetweenEvents(0), mEventWraparound(false), mEventGenerator(NULL), mEventsLeft(0)
+ mExchangeMgr(NULL), mTimeBetweenEvents(0), mBatch(1), mEventWraparound(false), mEventGenerator(NULL), mEventsLeft(0)
{ }
WEAVE_ERROR MockEventGeneratorImpl::Init(nl::Weave::WeaveExchangeManager * aExchangeMgr, EventGenerator * aEventGenerator,
- int aDelayBetweenEvents, bool aWraparound)
+ int aDelayBetweenEvents, bool aWraparound, size_t aBatch)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
mExchangeMgr = aExchangeMgr;
@@ -164,6 +165,8 @@
mTimeBetweenEvents = aDelayBetweenEvents;
mEventWraparound = aWraparound;
+ mBatch = aBatch;
+
if (mEventWraparound)
mEventsLeft = INT32_MAX;
else
@@ -186,8 +189,12 @@
}
else
{
- generator->mEventGenerator->Generate();
- generator->mEventsLeft--;
+ for (size_t i = 0; i < generator->mBatch && generator->mEventsLeft > 0; i++)
+ {
+ generator->mEventGenerator->Generate();
+ generator->mEventsLeft--;
+ }
+
if ((generator->mEventWraparound) || (generator->mEventsLeft > 0))
{
aSystemLayer->StartTimer(generator->mTimeBetweenEvents, HandleNextEvent, generator);
diff --git a/src/test-apps/MockLoggingManager.h b/src/test-apps/MockLoggingManager.h
index 551e786..5a21bd1 100644
--- a/src/test-apps/MockLoggingManager.h
+++ b/src/test-apps/MockLoggingManager.h
@@ -45,7 +45,7 @@
{
public:
static MockEventGenerator * GetInstance(void);
- virtual WEAVE_ERROR Init(nl::Weave::WeaveExchangeManager *aExchangeMgr, EventGenerator *aEventGenerator, int aDelayBetweenEvents, bool aWraparound) = 0;
+ virtual WEAVE_ERROR Init(nl::Weave::WeaveExchangeManager *aExchangeMgr, EventGenerator *aEventGenerator, int aDelayBetweenEvents, bool aWraparound, size_t aBatch=1) = 0;
virtual void SetEventGeneratorStop() = 0;
virtual bool IsEventGeneratorStopped() = 0;
};
diff --git a/src/test-apps/MockWdmNodeOptions.cpp b/src/test-apps/MockWdmNodeOptions.cpp
index ca3f4ae..4091e64 100644
--- a/src/test-apps/MockWdmNodeOptions.cpp
+++ b/src/test-apps/MockWdmNodeOptions.cpp
@@ -44,6 +44,7 @@
mEnableDataFlip(true),
mEventGeneratorType(kGenerator_None),
mTimeBetweenEvents(1000),
+ mEventBatchSize(1),
mTimeBetweenLivenessCheckSec(NULL),
mEnableDictionaryTest(false),
mEnableRetry(false),
@@ -71,6 +72,7 @@
{ "enable-dictionary-test", kNoArgument, kToolOpt_EnableDictionaryTest },
{ "event-generator", kArgumentRequired, kToolOpt_EventGenerator },
{ "inter-event-period", kArgumentRequired, kToolOpt_TimeBetweenEvents },
+ { "event-batch-size ", kArgumentRequired, kToolOpt_EventBatchSize },
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
{ "wdm-publisher", kArgumentRequired, kToolOpt_WdmPublisherNodeId },
{ "wdm-subless-notify-dest-node", kArgumentRequired, kToolOpt_WdmSublessNotifyDestNodeId },
@@ -179,7 +181,10 @@
" TestTrait: TestETrait events which cover a range of types.\n"
"\n"
" --inter-event-period <ms>\n"
- " Delay between emitting consecutive events (default 1s)\n"
+ " Delay between emitting consecutive event batches (default 1s)\n"
+ "\n"
+ " --event-batch-size <n>\n"
+ " Number of events generated per batch\n"
"\n"
" --enable-retry\n"
" Enable automatic subscription retries by WDM\n"
@@ -491,6 +496,17 @@
}
break;
}
+ case kToolOpt_EventBatchSize:
+ {
+ char *endptr;
+ mEventBatchSize = strtoul(arg, &endptr, 0);
+ if (endptr == arg)
+ {
+ PrintArgError("%s: Invalid inter-event timeout\n", progName);
+ return false;
+ }
+ break;
+ }
case kToolOpt_WdmUpdateMutation:
{
size_t i;
diff --git a/src/test-apps/MockWdmNodeOptions.h b/src/test-apps/MockWdmNodeOptions.h
index 6834916..8398902 100644
--- a/src/test-apps/MockWdmNodeOptions.h
+++ b/src/test-apps/MockWdmNodeOptions.h
@@ -68,6 +68,7 @@
kToolOpt_WdmUpdateConditionality,
kToolOpt_WdmUpdateTiming,
kToolOpt_WdmUpdateDiscardOnError,
+ kToolOpt_EventBatchSize,
};
class MockWdmNodeOptions : public OptionSetBase
@@ -141,6 +142,7 @@
bool mEnableDataFlip;
EventGeneratorType mEventGeneratorType;
int mTimeBetweenEvents;
+ int mEventBatchSize;
const char * mTimeBetweenLivenessCheckSec;
bool mEnableDictionaryTest;
bool mEnableRetry;
diff --git a/src/test-apps/happy/lib/WeaveDeviceManager.py b/src/test-apps/happy/lib/WeaveDeviceManager.py
index 1fc2029..0247d39 100644
--- a/src/test-apps/happy/lib/WeaveDeviceManager.py
+++ b/src/test-apps/happy/lib/WeaveDeviceManager.py
@@ -35,6 +35,7 @@
import base64
import json
import logging
+import time
weaveDeviceMgrPath = os.environ['WEAVE_DEVICE_MGR_PATH']
print('weaveDeviceMgrPath is %s' % weaveDeviceMgrPath)
@@ -205,6 +206,18 @@
print(str(ex))
return
+ def fetchEvents(self, timeoutSec):
+ if self.wdmClient == None:
+ print("wdmclient not initialized")
+ return
+
+ try:
+ result = self.wdmClient.fetchEvents(timeoutSec)
+ print("refresh trait data complete")
+ except WeaveStack.WeaveStackException as ex:
+ print(str(ex))
+ return
+
def refreshIndividualData(self, traitInstance):
if self.wdmClient == None or traitInstance == None:
print("wdmclient or traitInstance not initialized")
@@ -261,6 +274,20 @@
print(str(ex))
return
+ def getEvents(self):
+ if self.wdmClient == None:
+ print("wdmclient not initialized")
+ return
+
+ try:
+ val = self.wdmClient.getEvents()
+ print("get events")
+ print(val)
+ return val
+ except WeaveStack.WeaveStackException as ex:
+ print(str(ex))
+ return
+
def getVersion(self, traitInstance):
if self.wdmClient == None or traitInstance == None:
print("wdmclient or traitInstance not initialized")
@@ -446,6 +473,84 @@
testObject.closeWdmClient()
print("testWdmClientDataSinkRefreshGetDataRefresh completes")
+def testWdmClientDataSinkRefreshDataWithEvents(testObject):
+ # Sleep 5 second for enough events.
+ testObject.createWdmClient()
+ time.sleep(5)
+ # We need to set node Id to get everything works
+ testObject.setNodeId(-2)
+
+ testObject.fetchEvents(1)
+ data1 = testObject.getEvents()
+ # Sleep 5 second to wait for more events generated by mock-device
+ time.sleep(5)
+ testObject.fetchEvents(1)
+ data2 = testObject.getEvents()
+
+ jsondata1 = json.loads(data1.decode("utf-8"))
+ jsondata2 = json.loads(data2.decode("utf-8"))
+ # Mockdevice should generate events
+ if (len(jsondata1) == 0) or (len(jsondata2) == 0):
+ raise ValueError("testWdmClientDataSinkRefreshDataWithEvents fails")
+ # Event should not overlap
+ if jsondata2[0]["Id"] <= jsondata1[-1]["Id"]:
+ raise ValueError("testWdmClientDataSinkRefreshDataWithEvents fails")
+ # Sleep 5 second to wait for more events generated by mock-device
+ time.sleep(5)
+
+ TestCTrait = testObject.newDataSink(593165827, 0, b"/")
+ testObject.refreshData()
+ val = testObject.getEvents()
+ # The buffer shouldn't be cleared when refreshData() (not fetchEvents) is called.
+ if val.decode("utf-8") == "[]":
+ raise ValueError("testWdmClientDataSinkRefreshDataWithEvents fails")
+
+ testObject.closeWdmClient()
+ print("testWdmClientDataSinkRefreshDataWithEvents completes")
+
+def testWdmClientDataSinkRefreshDataWithLargeEventList(testObject):
+ # Mockdevice should generate event at 1 per second.
+ testObject.createWdmClient()
+ # Sleep 12 second for enough events, connection timeout is 15s
+ time.sleep(12)
+ # We need to set node Id to get everything works
+ testObject.setNodeId(-2)
+ startTime = time.time()
+ testObject.fetchEvents(1)
+ # The timeout is not intended to be accurate, check if the event finished in a reasonable time
+ if time.time() - startTime > 2:
+ raise ValueError("testWdmClientDataSinkRefreshDataWithLargeEventList fails: Failed to limit fetching time")
+ data1 = testObject.getEvents()
+
+ jsondata1 = json.loads(data1.decode("utf-8"))
+ # Mockdevice should generate enough events
+ if (len(jsondata1) < 10):
+ raise ValueError("testWdmClientDataSinkRefreshDataWithLargeEventList fails")
+
+ testObject.closeWdmClient()
+ print("testWdmClientDataSinkRefreshDataWithLargeEventList completes")
+
+def testWdmClientDataSinkRefreshDataWithoutTraits(testObject):
+ # Mockdevice should generate event at 1 per second.
+ testObject.createWdmClient()
+ # Sleep 12 second for enough events, connection timeout is 15s
+ time.sleep(5)
+ # We need to set node Id to get everything works
+ testObject.setNodeId(-2)
+ startTime = time.time()
+ testObject.fetchEvents(1)
+ if time.time() - startTime > 2:
+ raise ValueError("testWdmClientDataSinkRefreshDataWithoutTraits fails: Failed to limit fetching time")
+ data1 = testObject.getEvents()
+
+ jsondata1 = json.loads(data1.decode("utf-8"))
+ # Mockdevice should generate enough events
+ if (len(jsondata1) == 0):
+ raise ValueError("testWdmClientDataSinkRefreshDataWithoutTraits fails")
+
+ testObject.closeWdmClient()
+ print("testWdmClientDataSinkRefreshDataWithoutTraits completes")
+
def testWdmClientDataSinkCloseIndividualData(testObject):
testObject.createWdmClient()
localeSettingsTrait = testObject.newDataSink(20, 0, b"/")
@@ -582,6 +687,9 @@
testWdmClientDataSinkSetFlushRefreshGetData(testObject)
testWdmClientDataSinkRefreshGetDataRefresh(testObject)
testWdmClientDataSinkRefreshIndividualGetDataRefresh(testObject)
+ testWdmClientDataSinkRefreshDataWithEvents(testObject)
+ testWdmClientDataSinkRefreshDataWithLargeEventList(testObject)
+ testWdmClientDataSinkRefreshDataWithoutTraits(testObject)
testWdmClientDataSinkCloseIndividualData(testObject)
testWdmClientDataSinkSetRefreshFlushGetData(testObject)
testWdmClientDataSinkResourceIdentifierMakeResTypeIdInt(testObject)
@@ -598,7 +706,6 @@
if __name__ == '__main__':
-
# Override the default logging for WeaveStack to include timestamps.
weaveStackLogger = logging.getLogger('openweave.WeaveStack')
logHandler = logging.StreamHandler(stream=sys.stdout)
diff --git a/src/test-apps/happy/test-templates/WeaveBle.py b/src/test-apps/happy/test-templates/WeaveBle.py
index dfb8641..b8a0a8e 100755
--- a/src/test-apps/happy/test-templates/WeaveBle.py
+++ b/src/test-apps/happy/test-templates/WeaveBle.py
@@ -176,6 +176,7 @@
cmd += " --enable-bluez-peripheral --peripheral-name N0001 --peripheral-address " + self.interfaces[0]["bd_address"]
cmd += " --print-fault-counters --wdm-resp-mutual-sub --test-case 10 --total-count 0 --wdm-update-timing NoSub "
+ cmd += " --event-generator TestTrait"
self.start_weave_process(node_id=self.server_node_id, cmd=cmd, tag=self.server_process_tag, rootMode=True)
def resetBluez(self):
diff --git a/src/test-apps/happy/test-templates/WeavePairing.py b/src/test-apps/happy/test-templates/WeavePairing.py
index 8ac46df..51f0a71 100644
--- a/src/test-apps/happy/test-templates/WeavePairing.py
+++ b/src/test-apps/happy/test-templates/WeavePairing.py
@@ -344,6 +344,7 @@
cmd += " --node-addr " + device_info['device_ip'] + " --pairing-code TEST"
cmd += " --wdm-resp-mutual-sub --test-case 10 --total-count 0 --wdm-update-timing NoSub "
+ cmd += " --event-generator TestTrait --event-batch-size 500"
if self.server is not None:
cmd += " --pairing-server " + self.server_ip \
diff --git a/src/test-apps/happy/test-templates/WeaveWdmNext.py b/src/test-apps/happy/test-templates/WeaveWdmNext.py
index 34efbf9..b0094c4 100644
--- a/src/test-apps/happy/test-templates/WeaveWdmNext.py
+++ b/src/test-apps/happy/test-templates/WeaveWdmNext.py
@@ -348,12 +348,12 @@
client_parser_error, client_leak_detected = WeaveUtilities.scan_for_leaks_and_parser_errors(client_output)
result["no_client_parser_error"] = not client_parser_error
result["no_client_leak_detected"] = not client_leak_detected
- if server_output is not "":
+ if server_output != "":
server_parser_error, server_leak_detected = WeaveUtilities.scan_for_leaks_and_parser_errors(server_output)
result["no_server_parser_error"] = not client_parser_error
result["no_server_leak_detected"] = not server_leak_detected
- if self.quiet is False:
+ if not self.quiet:
print("weave-wdm-next %s from node %s (%s) to node %s (%s) : "\
% (self.wdm_option, client_info["client_node_id"], client_info["client_ip"], self.server_node_id, self.server_ip))
diff --git a/src/test-apps/mock-device.cpp b/src/test-apps/mock-device.cpp
index 5f7b0cd..1fbebc2 100644
--- a/src/test-apps/mock-device.cpp
+++ b/src/test-apps/mock-device.cpp
@@ -427,6 +427,9 @@
" --inter-event-period <ms>"
" Delay between emitting consecutive events (default 1s)\n"
"\n"
+ " --event-batch-size <n>\n"
+ " Number of events generated per batch\n"
+ "\n"
" --test-case <test case id>\n"
" Further configure device behavior with this test case id\n"
"\n"
@@ -868,7 +871,7 @@
if (gEventGenerator != NULL) {
printf("Starting Event Generator\n");
MockEventGenerator::GetInstance()->Init(&ExchangeMgr, gEventGenerator,
- gMockWdmNodeOptions.mTimeBetweenEvents, true);
+ gMockWdmNodeOptions.mTimeBetweenEvents, true, gMockWdmNodeOptions.mEventBatchSize);
}
}