blob: 509e6929f6eb89106d90ec1f864843e893fe966a [file] [log] [blame]
/*
*
* Copyright (c) 2016-2018 Nest Labs, Inc.
* Copyright (c) 2019 Google, LLC.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* @file
* This file implements notification engine for Weave
* Data Management (WDM) profile.
*
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS
#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif //__STDC_LIMIT_MACROS
#include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h>
#include <Weave/Profiles/data-management/DataManagement.h>
#include <Weave/Profiles/status-report/StatusReportProfile.h>
#include <Weave/Profiles/time/WeaveTime.h>
using namespace ::nl::Weave;
using namespace ::nl::Weave::TLV;
using namespace ::nl::Weave::Profiles;
using namespace ::nl::Weave::Profiles::Common;
using namespace ::nl::Weave::Profiles::DataManagement;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BasicGraphSolver
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool NotificationEngine::BasicGraphSolver::IsPropertyPathSupported(PropertyPathHandle aHandle)
{
// Only support subscriptions to root with the basic solver.
if (aHandle != kRootPropertyPathHandle)
{
return false;
}
else
{
return true;
}
}
WEAVE_ERROR NotificationEngine::BasicGraphSolver::RetrieveTraitInstanceData(NotifyRequestBuilder * aBuilder,
TraitDataHandle aTraitDataHandle,
SchemaVersion aSchemaVersion, bool aRetrieveAll)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
err = aBuilder->WriteDataElement(aTraitDataHandle, kRootPropertyPathHandle, aSchemaVersion, NULL, 0, NULL, 0);
SuccessOrExit(err);
exit:
return err;
}
WEAVE_ERROR NotificationEngine::BasicGraphSolver::SetDirty(TraitDataHandle aDataHandle, PropertyPathHandle aPropertyHandle)
{
SubscriptionEngine * subEngine = SubscriptionEngine::GetInstance();
// Iterate over all subscriptions and their trait instance info lists and mark them dirty as appropriate
for (int i = 0; i < SubscriptionEngine::kMaxNumSubscriptionHandlers; ++i)
{
SubscriptionHandler * subHandler = &subEngine->mHandlers[i];
if (subHandler->IsActive())
{
SubscriptionHandler::TraitInstanceInfo * traitInstance = subHandler->GetTraitInstanceInfoList();
for (size_t j = 0; j < subHandler->GetNumTraitInstances(); j++)
{
if (traitInstance[j].mTraitDataHandle == aDataHandle)
{
WeaveLogDetail(DataManagement, "<BSolver:SetD> Set S%u:T%u dirty", i, j);
traitInstance[j].SetDirty();
}
}
}
}
return WEAVE_NO_ERROR;
}
WEAVE_ERROR NotificationEngine::BasicGraphSolver::ClearDirty()
{
return WEAVE_NO_ERROR;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IntermediateGraphSolver::Store
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
NotificationEngine::IntermediateGraphSolver::Store::Store()
{
mNumItems = 0;
for (size_t i = 0; i < WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE; i++)
{
mStore[i].mPropertyPathHandle = kNullPropertyPathHandle;
mStore[i].mTraitDataHandle = UINT16_MAX;
mValidFlags[i] = false;
}
}
bool NotificationEngine::IntermediateGraphSolver::Store::AddItem(TraitPath aItem)
{
if (mNumItems >= WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE)
{
return false;
}
for (size_t i = 0; i < WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE; i++)
{
if (!mValidFlags[i])
{
mStore[i] = aItem;
mValidFlags[i] = true;
mNumItems++;
return true;
}
}
return false;
// Shouldn't get here since that would imply that mNumItems and mValidFlags are out of sync
// which should never happen unless someone mucked with the flags themselves. Continuing past
// this point runs the risk of unpredictable behavior and so, it's better to just assert
// at this point.
VerifyOrDie(0);
}
void NotificationEngine::IntermediateGraphSolver::Store::RemoveItem(TraitDataHandle aDataHandle)
{
if (mNumItems)
{
for (size_t i = 0; i < WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE; i++)
{
if (mValidFlags[i] && (mStore[i].mTraitDataHandle == aDataHandle))
{
mValidFlags[i] = false;
mNumItems--;
}
}
}
}
void NotificationEngine::IntermediateGraphSolver::Store::RemoveItemAt(uint32_t aIndex)
{
if (mNumItems)
{
mValidFlags[aIndex] = false;
mNumItems--;
}
}
bool NotificationEngine::IntermediateGraphSolver::Store::IsPresent(TraitPath aItem)
{
for (size_t i = 0; i < WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE; i++)
{
if (mValidFlags[i] && (mStore[i] == aItem))
{
return true;
}
}
return false;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IntermediateGraphSolver
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool NotificationEngine::IntermediateGraphSolver::IsPropertyPathSupported(PropertyPathHandle aHandle)
{
// The intermediate solver also only supports subscribing to root.
return BasicGraphSolver::IsPropertyPathSupported(aHandle);
}
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WEAVE_ERROR NotificationEngine::IntermediateGraphSolver::DeleteKey(TraitDataHandle aDataHandle, PropertyPathHandle aPropertyHandle)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
size_t i;
SubscriptionEngine * subEngine = SubscriptionEngine::GetInstance();
TraitDataSource * dataSource;
WeaveLogDetail(DataManagement, "<ISolver:DeleteKey> T%u::(%u:%u), CurDeleteItems = %u/%u", aDataHandle,
GetPropertyDictionaryKey(aPropertyHandle), GetPropertySchemaHandle(aPropertyHandle), mDeleteStore.GetNumItems(),
WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE);
// Check if the data source is already marked dirty at the root.
err = subEngine->mPublisherCatalog->Locate(aDataHandle, &dataSource);
SuccessOrExit(err);
// Set the subscribers dirty.
err = BasicGraphSolver::SetDirty(aDataHandle, aPropertyHandle);
SuccessOrExit(err);
// if it's marked root dirty already, nothing more to be done!
VerifyOrExit(!dataSource->IsRootDirty(), WeaveLogDetail(DataManagement, "<ISolver:DeleteKey> Already root dirty!");
err = WEAVE_NO_ERROR);
// if previously present in the delete store, nothing more to be done!
if (mDeleteStore.IsPresent(TraitPath(aDataHandle, aPropertyHandle)))
{
WeaveLogDetail(DataManagement, "<ISolver:DeleteKey> Previously dirty");
return WEAVE_NO_ERROR;
}
// If we have exceeded the num items in the store, we need to mark the whole trait instance as dirty and remove all
// existing references to this trait instance in the delete store.
if (mDeleteStore.IsFull())
{
WeaveLogDetail(DataManagement, "<ISolver:DeleteKey> No more space in granular store!");
mDeleteStore.RemoveItem(aDataHandle);
// Mark the data source is being entirely dirty.
dataSource->SetRootDirty();
}
else
{
mDeleteStore.AddItem(TraitPath(aDataHandle, aPropertyHandle));
// If we are deleting something, we need to remove any prior additions to this dictionary for this item.
for (i = 0; i < mDirtyStore.GetStoreSize(); i++)
{
if (mDirtyStore.mValidFlags[i] && (mDirtyStore.mStore[i].mTraitDataHandle == aDataHandle))
{
if (mDirtyStore.mStore[i].mPropertyPathHandle == aPropertyHandle ||
dataSource->GetSchemaEngine()->IsParent(mDirtyStore.mStore[i].mPropertyPathHandle, aPropertyHandle))
{
WeaveLogDetail(DataManagement, "<ISolver:DeleteKey> Removing previously added dirty handle (%u:%u)",
GetPropertyDictionaryKey(mDirtyStore.mStore[i].mPropertyPathHandle),
GetPropertySchemaHandle(mDirtyStore.mStore[i].mPropertyPathHandle));
mDirtyStore.RemoveItemAt(i);
}
}
}
}
exit:
return err;
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WEAVE_ERROR NotificationEngine::IntermediateGraphSolver::SetDirty(TraitDataHandle aDataHandle, PropertyPathHandle aPropertyHandle)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
SubscriptionEngine * subEngine = SubscriptionEngine::GetInstance();
TraitDataSource * dataSource;
WeaveLogDetail(DataManagement, "<ISolver:SetDirty> T%u::(%u:%u), CurDirtyItems = %u/%u", aDataHandle,
GetPropertyDictionaryKey(aPropertyHandle), GetPropertySchemaHandle(aPropertyHandle), mDirtyStore.GetNumItems(),
WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE);
// Check if the data source is already marked dirty at the root.
err = subEngine->mPublisherCatalog->Locate(aDataHandle, &dataSource);
SuccessOrExit(err);
// Set the subscribers dirty.
err = BasicGraphSolver::SetDirty(aDataHandle, aPropertyHandle);
SuccessOrExit(err);
// if it's marked root dirty already, nothing more to be done!
VerifyOrExit(!dataSource->IsRootDirty(), WeaveLogDetail(DataManagement, "<ISolver:SetDirty> Already root dirty!");
err = WEAVE_NO_ERROR);
// if previously present in the delete store, nothing more to be done!
if (mDirtyStore.IsPresent(TraitPath(aDataHandle, aPropertyHandle)))
{
WeaveLogDetail(DataManagement, "<ISolver:SetDirty> Previously dirty");
return WEAVE_NO_ERROR;
}
// If we have exceeded the num items in the store, we need to mark the whole trait instance as dirty and remove all
// existing references to this trait instance in the dirty store.
if (mDirtyStore.IsFull())
{
WeaveLogDetail(DataManagement, "<ISolver:SetDirty> No more space in granular store!");
mDirtyStore.RemoveItem(aDataHandle);
// Mark the data source is being entirely dirty.
dataSource->SetRootDirty();
}
else
{
PropertyPathHandle handleToAdd = aPropertyHandle;
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
// If we're adding/modifying a dictionary element, remove any previous deletions of this element to maintain correctness.
for (size_t i = 0; i < mDeleteStore.GetStoreSize(); i++)
{
if (mDeleteStore.mValidFlags[i] && (mDeleteStore.mStore[i].mTraitDataHandle == aDataHandle))
{
if (aPropertyHandle == mDeleteStore.mStore[i].mPropertyPathHandle ||
dataSource->GetSchemaEngine()->IsParent(aPropertyHandle, mDeleteStore.mStore[i].mPropertyPathHandle))
{
WeaveLogDetail(DataManagement, "<ISolver:DeleteKey> Removing previously deleted element (%u:%u)",
GetPropertyDictionaryKey(mDeleteStore.mStore[i].mPropertyPathHandle),
GetPropertySchemaHandle(mDeleteStore.mStore[i].mPropertyPathHandle));
// Given that the handle to add could be a deep leaf path within the dictionary element, we need to actually
// mark the root dictionary element as being dirty in the case where we previously were tracking a deletion to
// this item. Otherwise, we'll just send a modification to the leaf part of the element which will be incorrect.
dataSource->GetSchemaEngine()->IsInDictionary(aPropertyHandle, handleToAdd);
VerifyOrExit(handleToAdd != kNullPropertyPathHandle, err = WEAVE_ERROR_INCORRECT_STATE);
mDeleteStore.RemoveItemAt(i);
}
}
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
mDirtyStore.AddItem(TraitPath(aDataHandle, handleToAdd));
}
exit:
return err;
}
PropertyPathHandle NotificationEngine::IntermediateGraphSolver::GetNextCandidateHandle(uint32_t & aChangeStoreCursor,
TraitDataHandle aTargetDataHandle,
bool & aCandidateHandleIsDelete)
{
PropertyPathHandle candidateHandle = kNullPropertyPathHandle;
while (aChangeStoreCursor < mDirtyStore.GetStoreSize())
{
TraitPath dirtyPath = mDirtyStore.mStore[aChangeStoreCursor];
if (mDirtyStore.mValidFlags[aChangeStoreCursor] && (dirtyPath.mTraitDataHandle == aTargetDataHandle))
{
candidateHandle = dirtyPath.mPropertyPathHandle;
aCandidateHandleIsDelete = false;
aChangeStoreCursor++;
break;
}
aChangeStoreCursor++;
}
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
while (aChangeStoreCursor >= mDirtyStore.GetStoreSize() &&
aChangeStoreCursor < (mDeleteStore.GetStoreSize() + mDirtyStore.GetStoreSize()))
{
TraitPath deletePath = mDeleteStore.mStore[aChangeStoreCursor - mDirtyStore.GetStoreSize()];
if (mDeleteStore.mValidFlags[aChangeStoreCursor - mDirtyStore.GetStoreSize()] &&
(deletePath.mTraitDataHandle == aTargetDataHandle))
{
candidateHandle = deletePath.mPropertyPathHandle;
aCandidateHandleIsDelete = true;
aChangeStoreCursor++;
break;
}
aChangeStoreCursor++;
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
return candidateHandle;
}
WEAVE_ERROR NotificationEngine::IntermediateGraphSolver::RetrieveTraitInstanceData(NotifyRequestBuilder * aBuilder,
TraitDataHandle aTraitDataHandle,
SchemaVersion aSchemaVersion, bool aRetrieveAll)
{
WEAVE_ERROR err;
PropertyPathHandle mergeHandleSet[WDM_PUBLISHER_INTERMEDIATE_SOLVER_MAX_MERGE_HANDLE_SET] = { kNullPropertyPathHandle };
PropertyPathHandle deleteHandleSet[WDM_PUBLISHER_INTERMEDIATE_SOLVER_MAX_MERGE_HANDLE_SET] = { kNullPropertyPathHandle };
int32_t numMergeHandles = 0;
int32_t numDeleteHandles = 0;
PropertyPathHandle currentCommonHandle = kNullPropertyPathHandle;
TraitDataSource * dataSource;
const TraitSchemaEngine * schemaEngine;
err = SubscriptionEngine::GetInstance()->mPublisherCatalog->Locate(aTraitDataHandle, &dataSource);
SuccessOrExit(err);
schemaEngine = dataSource->GetSchemaEngine();
WeaveLogDetail(DataManagement, "<ISolver::Retr> CurDirtyItems = %u/%u", mDirtyStore.GetNumItems(),
WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE);
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WeaveLogDetail(DataManagement, "<ISolver::Retr> CurDeleteItems = %u/%u", mDeleteStore.GetNumItems(),
WDM_PUBLISHER_MAX_ITEMS_IN_TRAIT_DIRTY_STORE);
#endif
// If we are told to retrieve all (i.e root), our job here is done
if (aRetrieveAll)
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> Retrieving all!");
currentCommonHandle = kRootPropertyPathHandle;
}
// If the data source as a whole has been marked dirty, our job here is done
else if (dataSource->IsRootDirty())
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> Root is dirty!");
currentCommonHandle = kRootPropertyPathHandle;
}
else
{
PropertyPathHandle nextCommonHandle, candidateHandle;
PropertyPathHandle laggingHandles[2] = { kNullPropertyPathHandle, kNullPropertyPathHandle };
bool oldCandidateHandleIsDelete = false, candidateHandleIsDelete = false;
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
bool modifyDeleteToModify = false;
#endif
uint32_t changeStoreCursor = 0;
//
// This loop forms the crux of the TDM part of the NotificationEngine. It is responsible for gathering up the dirty bits
// within a data source instance and generating a *single* data element that maximally encompasses all that dirtiness. To do
// so, it iteratively computes a 'nextCommonHandle' that is the parent to all dirty path handles accumulated up to each
// iteration. This parent handle is termed as the Lowest Common Ancestor, or LCA.
//
// The WDM protocol rules state that all handles in the data at the first level(i.e immediate children of the handle
// referenced by the path) are to be merged into the eventual data, while data at the 2nd level and beyond are to be
// replaced. The algorithm below tries to exploit the merge semantics to just send the handles that are dirty relative to
// the common handle. Given the handle set is finitely sized, an overflow of that set results in all child handles being
// merged in.
//
// It also deals with deletions as well. Deletions are treated somewhat similarly to modifications/additions from the algo
// perspective with some minor adjustments:
//
// 1. Deletions are only applicable so long as all deletions apply to the same dictionary. Once we have deletions that
// span multiple dictionaries, we cannot express a deletion anymore and the deletion is treated like a modify/add
// from the algorithm perspective for the purposes of computing the LCA and adding entries to the merge handle set.
//
// 2. Deletions can co-exist with modifications/additions to the same dictionary. If there are mods/adds present in
// other parts of the tree/other dictionaries, the deletion reverts to the same treatment as mentioned in 1)
//
// Key Variables:
//
// currentCommonHandle = The current LCA of all handles evaluated thus far.
//
// candidateHandle = The next handle picked out from either the dirty or delete stores that will be evaluated against
// the current common handle to compute the next common handle
//
// nextCommonHandle = The next computed LCA of the current handle and the candidate handle
//
// laggingHandles = immediate children of the newly computed LCA that encompass the two
// candidates passed into the LCA computation function respectively. If either of the two input handles
// passed in match the newly computed LCA, the lagging handle will be set to kNullPropertyPathHandle
//
// mergeHandleSet = set of handles that will be merged in relative to the currentCommonHandle. If empty, all children
// under the commonHandle will be included.
//
while ((candidateHandle = GetNextCandidateHandle(changeStoreCursor, aTraitDataHandle, candidateHandleIsDelete)) !=
kNullPropertyPathHandle)
{
oldCandidateHandleIsDelete = candidateHandleIsDelete;
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
// This flag tracks whether we have stopped trying to express deletions (setup in previous iterations) and now have
// reverted to converting them over to look like adds/modifies. This variable will remain set in this value for
// remaining iterations.
if (modifyDeleteToModify)
{
candidateHandleIsDelete = false;
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WeaveLogDetail(DataManagement, "Candidate Handle = %u:%u (%c -> %c)", GetPropertyDictionaryKey(candidateHandle),
GetPropertySchemaHandle(candidateHandle), oldCandidateHandleIsDelete ? 'D' : 'M',
candidateHandleIsDelete ? 'D' : 'M');
//
// Evaluate the next LCA
//
// Given our current common ancestor handle and our candidate handle, we compute the next LCA. The next common handle
// will be stored in 'nextCommonHandle' while the two lagging branches will be represented through laggingHandles[0] and
// laggingHandles[1]. [0] will correspond to the lagging branch for the current common handle while [1] will correspond
// to that for the candidate handle.
//
if (currentCommonHandle == kNullPropertyPathHandle)
{
// If we're first starting out, we need to pick a sensible common handle. Unlike modifications where the LCA is the
// first modified/added handle we encounter, deletions need to be expressed relative to the parent dictionary
// handle. Hence, we setup it up to look like a 'merge' by having the common handle point to the dictionary and the
// lagging handle point to the deleted element.
if (candidateHandleIsDelete)
{
nextCommonHandle = schemaEngine->GetParent(candidateHandle);
laggingHandles[0] = kNullPropertyPathHandle;
laggingHandles[1] = candidateHandle;
}
else
{
nextCommonHandle = candidateHandle;
}
WeaveLogDetail(DataManagement, "<ISolver::Retr> (%c) nextCommonHandle = %u:%u", candidateHandleIsDelete ? 'D' : 'M',
GetPropertyDictionaryKey(nextCommonHandle), GetPropertySchemaHandle(nextCommonHandle));
}
else
{
// Find the lowest common parent of the currently tracked common handle and the next item in the dirty set. Also,
// return the two child handles that lag the ancestor that are parents of the two input handles to the LCA.
nextCommonHandle = schemaEngine->FindLowestCommonAncestor(currentCommonHandle, candidateHandle, &laggingHandles[0],
&laggingHandles[1]);
VerifyOrExit(nextCommonHandle != kNullPropertyPathHandle, err = WEAVE_ERROR_INVALID_ARGUMENT);
WeaveLogDetail(DataManagement,
"<ISolver::Retr> (%c) nextCommonHandle += (%u:%u) = (%u:%u) (Lag-set = (%u:%u), (%u:%u))",
candidateHandleIsDelete ? 'D' : 'M', GetPropertyDictionaryKey(candidateHandle),
GetPropertySchemaHandle(candidateHandle), GetPropertyDictionaryKey(nextCommonHandle),
GetPropertySchemaHandle(nextCommonHandle), GetPropertyDictionaryKey(laggingHandles[0]),
GetPropertySchemaHandle(laggingHandles[0]), GetPropertyDictionaryKey(laggingHandles[1]),
GetPropertySchemaHandle(laggingHandles[1]));
}
// If we compute a new next handle, we'll need to wipe our merge handle set since the old set of merge/delete handles
// were referenced against a now-stale handle
if (currentCommonHandle != nextCommonHandle)
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (%c) nextHandle != currentHandle, wiping merge/delete sets",
candidateHandleIsDelete ? 'D' : 'M');
numMergeHandles = 0;
numDeleteHandles = 0;
}
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
if (candidateHandleIsDelete)
{
// The deleteHandleSet only makes sense as long as the next common handle is the parent of the delete set.
// If not, we start treating it as a add/modify.
if (nextCommonHandle == schemaEngine->GetParent(candidateHandle))
{
int32_t i;
for (i = 0; i < numDeleteHandles; i++)
{
if (deleteHandleSet[i] == laggingHandles[1])
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (D) Handle (%u:%u) already present",
GetPropertyDictionaryKey(laggingHandles[1]), GetPropertySchemaHandle(laggingHandles[1]));
break;
}
}
if (i == numDeleteHandles)
{
// If our delete handle set overflows, we degenerate to expressing the deletes as a replacement of the
// dictionary itself.
if (numDeleteHandles >= WDM_PUBLISHER_INTERMEDIATE_SOLVER_MAX_MERGE_HANDLE_SET)
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (D) delete set overflowed, converting to replace");
laggingHandles[0] = kNullPropertyPathHandle;
laggingHandles[1] = nextCommonHandle;
nextCommonHandle = schemaEngine->GetParent(nextCommonHandle);
numMergeHandles = 0;
numDeleteHandles = 0;
candidateHandleIsDelete = false;
modifyDeleteToModify = true;
}
else
{
WeaveLogDetail(DataManagement,
"<ISolver::Retr> (D) Adding delete handle = (%u:%u) (numCurHandles = %u)",
GetPropertyDictionaryKey(laggingHandles[1]), GetPropertySchemaHandle(laggingHandles[1]),
numDeleteHandles + 1);
deleteHandleSet[numDeleteHandles++] = laggingHandles[1];
// There's always a possibility that the other lagging handle was pointing to a modified/added handle.
// We set the laggingHandle[1] as null to prevent it from getting added but set candidateHandleIsDelete
// to false to force it get evaluated in the section below for addition to the mergeHandleSet.
laggingHandles[1] = kNullPropertyPathHandle;
candidateHandleIsDelete = false;
}
}
}
else
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (D) Making delete a merge instead");
candidateHandleIsDelete = false;
}
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
if (!candidateHandleIsDelete)
{
// If our next handle matches the current dirty handle, we know we cannot do a merge so wipe the merge set.
if (nextCommonHandle == candidateHandle)
{
numMergeHandles = 0;
WeaveLogDetail(DataManagement, "<ISolver::Retr> (M) next is dirty handle - wiping merge set");
// We make a small exception if the dirty handle is a dictionary - it doesn't make a lot of sense to mark a
// dictionary as dirty if you were just intending to convey modifications/additions only. Instead, let's do a
// replace given that makes more sense for a dynamic data type like this.
if (schemaEngine->IsDictionary(candidateHandle))
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (M) next is dictionary - setting up replace");
mergeHandleSet[0] = candidateHandle;
nextCommonHandle = schemaEngine->GetParent(candidateHandle);
numMergeHandles = 1;
}
}
else
{
for (size_t i = 0; i < 2; i++)
{
if (laggingHandles[i] != kNullPropertyPathHandle)
{
int j;
for (j = 0; j < numMergeHandles; j++)
{
if (mergeHandleSet[j] == laggingHandles[i])
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (M) Handle (%u:%u) already present",
GetPropertyDictionaryKey(laggingHandles[i]),
GetPropertySchemaHandle(laggingHandles[i]));
break;
}
}
if (numMergeHandles >= 0 && j == numMergeHandles)
{
if (numMergeHandles >= WDM_PUBLISHER_INTERMEDIATE_SOLVER_MAX_MERGE_HANDLE_SET)
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (M) merge set overflowed");
numMergeHandles = -1;
}
else
{
WeaveLogDetail(DataManagement, "<ISolver::Retr> (M) Merge handle = (%u:%u) (numhandles = %u)",
GetPropertyDictionaryKey(laggingHandles[i]),
GetPropertySchemaHandle(laggingHandles[i]), numMergeHandles + 1);
mergeHandleSet[numMergeHandles++] = laggingHandles[i];
}
}
}
}
}
}
currentCommonHandle = nextCommonHandle;
}
}
// If our algo is working correctly, currentCommonHandle should always be pointing to a valid handle. This is always the case
// since a) this function only gets called if we know there is dirtiness in this trait and b) the current common handle is
// always a function of the dirty handle set, which by definition, cannot be null.
VerifyOrDie(currentCommonHandle != kNullPropertyPathHandle);
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
// If we're expressing a deletion (i.e numDeleteHandles > 0), then it has to be done against a path that points to a dictionary.
// If that isn't the case, something really wrong has happened in the algorithm above.
if (numDeleteHandles > 0)
{
VerifyOrDie(schemaEngine->IsDictionary(currentCommonHandle));
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WeaveLogDetail(DataManagement, "<ISolver::Retr> Final handle = (%u:%u), numMergeHandles = %d, numDeleteHandles = %d",
GetPropertyDictionaryKey(currentCommonHandle), GetPropertySchemaHandle(currentCommonHandle), numMergeHandles,
numDeleteHandles);
// if we overflow, let's clear them back to 0.
if (numMergeHandles < 0)
{
numMergeHandles = 0;
}
// Generate data elements
err = aBuilder->WriteDataElement(aTraitDataHandle, currentCommonHandle, aSchemaVersion, mergeHandleSet, numMergeHandles,
deleteHandleSet, numDeleteHandles);
SuccessOrExit(err);
exit:
return err;
}
void NotificationEngine::IntermediateGraphSolver::ClearTraitInstanceDirty(void * aDataSource, TraitDataHandle aDataHandle,
void * aContext)
{
TraitDataSource * dataSource = static_cast<TraitDataSource *>(aDataSource);
dataSource->ClearRootDirty();
}
WEAVE_ERROR NotificationEngine::IntermediateGraphSolver::ClearDirty()
{
// Iterate over every publisher trait instance and clear their dirty field.
SubscriptionEngine::GetInstance()->mPublisherCatalog->Iterate(ClearTraitInstanceDirty, this);
// Clear out our granular dirty store.
mDirtyStore.Clear();
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
mDeleteStore.Clear();
#endif
return WEAVE_NO_ERROR;
}
void NotificationEngine::IntermediateGraphSolver::Store::Clear()
{
mNumItems = 0;
memset(mValidFlags, 0, sizeof(mValidFlags));
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NotifyRequestBuilder
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::Init(PacketBuffer * aBuf, TLV::TLVWriter * aWriter,
SubscriptionHandler * aSubHandler,
uint32_t aMaxPayloadSize)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit(aBuf != NULL, err = WEAVE_ERROR_INVALID_ARGUMENT);
mWriter = aWriter;
mState = kNotifyRequestBuilder_Idle;
mBuf = aBuf;
mSub = aSubHandler;
mMaxPayloadSize = aMaxPayloadSize;
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::StartNotifyRequest()
{
TLVType dummyType;
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit((mState == kNotifyRequestBuilder_Idle) && (mBuf != NULL), err = WEAVE_ERROR_INCORRECT_STATE);
mWriter->Init(mBuf, mMaxPayloadSize);
err = mWriter->StartContainer(AnonymousTag, kTLVType_Structure, dummyType);
SuccessOrExit(err);
if (mSub)
{
err = mWriter->Put(ContextTag(BaseMessageWithSubscribeId::kCsTag_SubscriptionId), mSub->mSubscriptionId);
SuccessOrExit(err);
}
mState = kNotifyRequestBuilder_Ready;
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::EndNotifyRequest()
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit(mState == kNotifyRequestBuilder_Ready, err = WEAVE_ERROR_INCORRECT_STATE);
err = mWriter->EndContainer(kTLVType_NotSpecified);
SuccessOrExit(err);
err = mWriter->Finalize();
SuccessOrExit(err);
mState = kNotifyRequestBuilder_Idle;
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::StartDataList()
{
TLVType dummyType; // Per spec requirement, will be set to TLVStructure
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit(mState == kNotifyRequestBuilder_Ready, err = WEAVE_ERROR_INCORRECT_STATE);
err = mWriter->StartContainer(ContextTag(NotificationRequest::kCsTag_DataList), kTLVType_Array, dummyType);
SuccessOrExit(err);
mState = kNotifyRequestBuilder_BuildDataList;
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::EndDataList()
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit(mState == kNotifyRequestBuilder_BuildDataList, err = WEAVE_ERROR_INCORRECT_STATE);
err = mWriter->EndContainer(kTLVType_Structure); // corresponds to dummyType in Start*List
SuccessOrExit(err);
mState = kNotifyRequestBuilder_Ready;
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::StartEventList()
{
TLVType dummyType; // Per spec requirement, will be set to TLVStructure
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit(mState == kNotifyRequestBuilder_Ready, err = WEAVE_ERROR_INCORRECT_STATE);
err = mWriter->StartContainer(ContextTag(NotificationRequest::kCsTag_EventList), kTLVType_Array, dummyType);
SuccessOrExit(err);
mState = kNotifyRequestBuilder_BuildEventList;
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::EndEventList()
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
VerifyOrExit(mState == kNotifyRequestBuilder_BuildEventList, err = WEAVE_ERROR_INCORRECT_STATE);
err = mWriter->EndContainer(kTLVType_Structure); // corresponds to dummyType in Start*List
SuccessOrExit(err);
mState = kNotifyRequestBuilder_Ready;
exit:
return err;
}
WEAVE_ERROR
NotificationEngine::NotifyRequestBuilder::WriteDataElement(TraitDataHandle aTraitDataHandle, PropertyPathHandle aPropertyPathHandle,
SchemaVersion aSchemaVersion, PropertyPathHandle * aMergeDataHandleSet,
uint32_t aNumMergeDataHandles, PropertyPathHandle * aDeleteHandleSet,
uint32_t aNumDeleteHandles)
{
WEAVE_ERROR err;
TLVType dummyContainerType;
TraitDataSource * dataSource;
bool retrievingData = false;
SchemaVersionRange versionRange;
VerifyOrExit(mState == kNotifyRequestBuilder_BuildDataList, err = WEAVE_ERROR_INCORRECT_STATE);
err = mWriter->StartContainer(AnonymousTag, kTLVType_Structure, dummyContainerType);
SuccessOrExit(err);
err = SubscriptionEngine::GetInstance()->mPublisherCatalog->Locate(aTraitDataHandle, &dataSource);
SuccessOrExit(err);
versionRange.mMaxVersion = aSchemaVersion;
versionRange.mMinVersion = dataSource->GetSchemaEngine()->GetLowestCompatibleVersion(versionRange.mMaxVersion);
err = mWriter->StartContainer(ContextTag(DataElement::kCsTag_Path), kTLVType_Path, dummyContainerType);
SuccessOrExit(err);
err = SubscriptionEngine::GetInstance()->mPublisherCatalog->HandleToAddress(aTraitDataHandle, *mWriter, versionRange);
SuccessOrExit(err);
err = dataSource->GetSchemaEngine()->MapHandleToPath(aPropertyPathHandle, *mWriter);
SuccessOrExit(err);
err = mWriter->EndContainer(dummyContainerType);
SuccessOrExit(err);
err = mWriter->Put(ContextTag(DataElement::kCsTag_Version), dataSource->GetVersion());
SuccessOrExit(err);
if (aNumMergeDataHandles > 0 || aNumDeleteHandles > 0)
{
const TraitSchemaEngine * schemaEngine = dataSource->GetSchemaEngine();
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
if (aNumDeleteHandles > 0)
{
err =
mWriter->StartContainer(ContextTag(DataElement::kCsTag_DeletedDictionaryKeys), kTLVType_Array, dummyContainerType);
SuccessOrExit(err);
for (size_t i = 0; i < aNumDeleteHandles; i++)
{
err = mWriter->Put(AnonymousTag, GetPropertyDictionaryKey(aDeleteHandleSet[i]));
SuccessOrExit(err);
}
err = mWriter->EndContainer(dummyContainerType);
SuccessOrExit(err);
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
if (aNumMergeDataHandles > 0)
{
err = mWriter->StartContainer(ContextTag(DataElement::kCsTag_Data), kTLVType_Structure, dummyContainerType);
SuccessOrExit(err);
retrievingData = true;
for (size_t i = 0; i < aNumMergeDataHandles; i++)
{
WeaveLogDetail(DataManagement, "<NE::WriteDE> Merging in 0x%08x", aMergeDataHandleSet[i]);
err = dataSource->ReadData(aMergeDataHandleSet[i], schemaEngine->GetTag(aMergeDataHandleSet[i]), *mWriter);
SuccessOrExit(err);
}
retrievingData = false;
err = mWriter->EndContainer(dummyContainerType);
SuccessOrExit(err);
}
}
else
{
retrievingData = true;
err = dataSource->ReadData(aPropertyPathHandle, ContextTag(DataElement::kCsTag_Data), *mWriter);
SuccessOrExit(err);
retrievingData = false;
}
err = mWriter->EndContainer(kTLVType_Array);
SuccessOrExit(err);
exit:
if (retrievingData && err != WEAVE_NO_ERROR)
{
WeaveLogError(DataManagement, "Error retrieving data from trait (instanceHandle: %u, profileId: %08x), err = %d",
aTraitDataHandle, dataSource->GetSchemaEngine()->GetProfileId(), err);
}
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::MoveToState(NotifyRequestBuilderState aDesiredState)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
// If we're already in the correct builder state, exit without doing anything else
if (aDesiredState == mState)
{
ExitNow();
}
// Get to the toplevel of the request
switch (mState)
{
case kNotifyRequestBuilder_Idle:
err = StartNotifyRequest();
break;
case kNotifyRequestBuilder_Ready:
break;
case kNotifyRequestBuilder_BuildDataList:
err = EndDataList();
break;
case kNotifyRequestBuilder_BuildEventList:
err = EndEventList();
break;
}
// verify that we're at the toplevel
VerifyOrExit(err == WEAVE_NO_ERROR, WeaveLogDetail(DataManagement, "<NE:Builder> Failed to reach Ready: %d", err));
// Extra paranoia: verify that we're in toplevel state
VerifyOrExit(mState == kNotifyRequestBuilder_Ready, err = WEAVE_ERROR_INCORRECT_STATE);
// Now, go to the desired state
switch (aDesiredState)
{
case kNotifyRequestBuilder_Idle:
err = EndNotifyRequest();
break;
case kNotifyRequestBuilder_Ready:
break;
case kNotifyRequestBuilder_BuildDataList:
err = StartDataList();
break;
case kNotifyRequestBuilder_BuildEventList:
err = StartEventList();
break;
}
VerifyOrExit(err == WEAVE_NO_ERROR, WeaveLogDetail(DataManagement, "<NE:Builder> Failed to reach desired state: %d", err));
// Extra paranoia: verify that we're in desired state
VerifyOrExit(mState == aDesiredState, err = WEAVE_ERROR_INCORRECT_STATE);
exit:
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::Checkpoint(TLVWriter & aPoint)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
aPoint = *mWriter;
return err;
}
WEAVE_ERROR NotificationEngine::NotifyRequestBuilder::Rollback(TLVWriter & aPoint)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
*mWriter = aPoint;
return err;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NotificationEngine
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
WEAVE_ERROR NotificationEngine::Init()
{
mCurSubscriptionHandlerIdx = 0;
mCurTraitInstanceIdx = 0;
mNumNotifiesInFlight = 0;
return WEAVE_NO_ERROR;
}
#if TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WEAVE_ERROR NotificationEngine::DeleteKey(TraitDataSource * aDataSource, PropertyPathHandle aPropertyHandle)
{
WEAVE_ERROR err;
TraitDataHandle dataHandle;
SubscriptionEngine * subEngine = SubscriptionEngine::GetInstance();
bool isLocked = false;
err = subEngine->mPublisherCatalog->Locate(aDataSource, dataHandle);
SuccessOrExit(err);
err = SubscriptionEngine::GetInstance()->Lock();
SuccessOrExit(err);
isLocked = true;
err = mGraphSolver.DeleteKey(dataHandle, aPropertyHandle);
SuccessOrExit(err);
exit:
if (isLocked)
{
SubscriptionEngine::GetInstance()->Unlock();
}
return err;
}
#endif // TDM_ENABLE_PUBLISHER_DICTIONARY_SUPPORT
WEAVE_ERROR NotificationEngine::SetDirty(TraitDataSource * aDataSource, PropertyPathHandle aPropertyHandle)
{
WEAVE_ERROR err;
TraitDataHandle dataHandle;
SubscriptionEngine * subEngine = SubscriptionEngine::GetInstance();
bool isLocked = false;
err = subEngine->mPublisherCatalog->Locate(aDataSource, dataHandle);
SuccessOrExit(err);
err = SubscriptionEngine::GetInstance()->Lock();
SuccessOrExit(err);
isLocked = true;
err = mGraphSolver.SetDirty(dataHandle, aPropertyHandle);
SuccessOrExit(err);
exit:
if (isLocked)
{
SubscriptionEngine::GetInstance()->Unlock();
}
return err;
}
WEAVE_ERROR NotificationEngine::RetrieveTraitInstanceData(SubscriptionHandler * aSubHandler,
SubscriptionHandler::TraitInstanceInfo * aTraitInfo,
NotifyRequestBuilder * aBuilder, bool * aPacketFull)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
*aPacketFull = false;
err = mGraphSolver.RetrieveTraitInstanceData(aBuilder, aTraitInfo->mTraitDataHandle, aTraitInfo->mRequestedVersion,
aSubHandler->IsSubscribing());
SuccessOrExit(err);
// Clear out the dirty bit since we're done processing this trait instance.
aTraitInfo->ClearDirty();
exit:
if ((err == WEAVE_ERROR_BUFFER_TOO_SMALL) || (err == WEAVE_ERROR_NO_MEMORY))
{
*aPacketFull = true;
err = WEAVE_NO_ERROR;
}
return err;
}
WEAVE_ERROR NotificationEngine::SendNotify(PacketBuffer * aBuffer, SubscriptionHandler * aSubHandler)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
// We can only have 1 notify in flight for any given subscription - increment and break out.
mNumNotifiesInFlight++;
err = aSubHandler->SendNotificationRequest(aBuffer);
SuccessOrExit(err);
exit:
if (err != WEAVE_NO_ERROR)
{
mNumNotifiesInFlight--;
}
return err;
}
void NotificationEngine::OnNotifyConfirm(SubscriptionHandler * aSubHandler, bool aNotifyDelivered)
{
VerifyOrDie(mNumNotifiesInFlight > 0);
WeaveLogDetail(DataManagement, "<NE> OnNotifyConfirm: NumNotifies-- = %d", mNumNotifiesInFlight - 1);
mNumNotifiesInFlight--;
if (aNotifyDelivered && aSubHandler->mSubscribeToAllEvents)
{
LoggingManagement & logger = LoggingManagement::GetInstance();
for (int iterator = kImportanceType_First; iterator <= kImportanceType_Last; iterator++)
{
size_t i = static_cast<size_t>(iterator - kImportanceType_First);
ImportanceType importance = (ImportanceType) iterator;
logger.NotifyEventsDelivered(importance, aSubHandler->mSelfVendedEvents[i] - 1, aSubHandler->GetPeerNodeId());
}
}
// Run NE again now that a notify has come back/error'ed out and that we might be able to do more work.
Run();
}
/**
* @brief
* Given the `SubscriptionHandler`, fill in the `EventList` element
* within the `NotifyRequest`,
*
* The function will fill in a `NotifyRequest`'s `EventList`. If the
* event logs occupy more space than available in the current
* `NotifyRequest`, the function will only pack enough events to fit
* within the buffer and adjust the state of the
* `SubscriptionHandler` to resume processing at unprocessed event.
* The events are sent in the order of priority. To avoid endless
* cycling through events, the function sets the end goal within the
* event log that it will reach before it considers the subscription
* clean.
*
* @param[in] aSubHandler A pointer to the SubscriptionHandler for
* which we are attempting to create
* a `NotifyRequest`
*
* @param[in] aNotificationRequest A builder object encapsulating the
* request we are trying to build
*
* @param[out] aIsSubscriptionClean A boolean that is set to true
* when no further traits have data
* that needs to be processed by the
* SubscriptionHandler and to false
* otherwise.
*
* @param[out] aNeWriteInProgress A boolean that is set to true
* when there is data written in notify request
* to false otherwise.
*
* @retval #WEAVE_NO_ERROR On success.
*
* @retval other The processing failed within the subroutines. The
* errors may have resulted from state
* corruption, TDM errors, insufficient
* buffering, etc.
*/
WEAVE_ERROR NotificationEngine::BuildSingleNotifyRequestEventList(SubscriptionHandler * aSubHandler,
NotifyRequestBuilder & aNotifyRequest,
bool & aIsSubscriptionClean, bool & aNeWriteInProgress)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
aIsSubscriptionClean = true;
event_id_t initialEvents[kImportanceType_Last - kImportanceType_First + 1];
memcpy(initialEvents, aSubHandler->mSelfVendedEvents, sizeof(initialEvents));
int event_count = 0;
// events only enter the picture if the subscription handler is
// subscribed to events.
if (aSubHandler->mSubscribeToAllEvents)
{
// Verify that we have events to transmit
LoggingManagement & logger = LoggingManagement::GetInstance();
// If the logger is not valid or has not been initialized,
// skip the rest of processing
VerifyOrExit(logger.IsValid(), /* no-op */);
for (int i = 0; i < kImportanceType_Last - kImportanceType_First + 1; i++)
{
event_id_t tmp_id = logger.GetFirstEventID(static_cast<ImportanceType>(i + 1));
if (tmp_id > initialEvents[i])
{
WeaveLogProgress(DataManagement, "BuildSingleNotifyRequestEventList | Missing event_id range: { %u, %u };",
initialEvents[i], tmp_id - 1);
initialEvents[i] = tmp_id;
}
}
// Check whether we are in a middle of an upload
if (aSubHandler->mCurrentImportance == kImportanceType_Invalid)
{
// Upload is not underway. Check for new events, and set a checkpoint
aIsSubscriptionClean = aSubHandler->CheckEventUpToDate(logger);
if (!aIsSubscriptionClean)
{
// We have more events. snapshot last event IDs
aSubHandler->SetEventLogEndpoint(logger);
}
// initialize the next importance level to transfer
aSubHandler->mCurrentImportance = aSubHandler->FindNextImportanceForTransfer();
}
else
{
aSubHandler->mCurrentImportance = aSubHandler->FindNextImportanceForTransfer();
aIsSubscriptionClean = (aSubHandler->mCurrentImportance == kImportanceType_Invalid);
}
// proceed only if there are new events.
if (aIsSubscriptionClean)
{
ExitNow(); // subscription clean, move along
}
// Ensure we have a buffer and we've started EventList
err = aNotifyRequest.MoveToState(kNotifyRequestBuilder_BuildEventList);
// if we did not have enough space for event list at all,
// squash the error and exit immediately
if ((err == WEAVE_ERROR_NO_MEMORY) || (err == WEAVE_ERROR_BUFFER_TOO_SMALL))
{
err = WEAVE_NO_ERROR;
ExitNow();
}
SuccessOrExit(err);
while (aSubHandler->mCurrentImportance != kImportanceType_Invalid)
{
size_t i = static_cast<size_t>(aSubHandler->mCurrentImportance - kImportanceType_First);
err = logger.FetchEventsSince(*aNotifyRequest.GetWriter(), aSubHandler->mCurrentImportance,
aSubHandler->mSelfVendedEvents[i]);
if ((err == WEAVE_END_OF_TLV) || (err == WEAVE_ERROR_TLV_UNDERRUN) || (err == WEAVE_NO_ERROR))
{
// We have successfully reached the end of the log for
// the current importance. Advance to the next
// importance level.
err = WEAVE_NO_ERROR;
aSubHandler->mCurrentImportance = aSubHandler->FindNextImportanceForTransfer();
}
else if ((err == WEAVE_ERROR_BUFFER_TOO_SMALL) || (err == WEAVE_ERROR_NO_MEMORY))
{
for (int t = 0; t <= kImportanceType_Last - kImportanceType_First; t++)
{
if (aSubHandler->mSelfVendedEvents[t] > initialEvents[t])
{
event_count += aSubHandler->mSelfVendedEvents[t] - initialEvents[t];
}
}
if (event_count > 0)
{
aNeWriteInProgress = true;
}
// when first trait event is too big to fit in the packet, ignore that trait event.
if (!aNeWriteInProgress)
{
aSubHandler->mSelfVendedEvents[i]++;
WeaveLogDetail(DataManagement, "<NE:Run> trait event is too big so that it fails to fit in the packet!");
err = WEAVE_NO_ERROR;
}
else
{
// `FetchEventsSince` has filled the available space
// within the allowed buffer before it fit all the
// available events. This is an expected condition,
// so we do not propagate the error to higher levels;
// instead, we terminate the event processing for now
// (we will get another chance immediately afterwards,
// with a ew buffer) and do not advance the processing
// to the next importance level.
err = WEAVE_NO_ERROR;
ExitNow();
}
}
else
{
// All other errors are propagated to higher level.
// Exiting here and returning an error will lead to
// abandoning subscription.
ExitNow();
}
}
}
exit:
event_count = 0;
for (int i = 0; i <= kImportanceType_Last - kImportanceType_First; i++)
{
// There are many acceptable situations where the initial event id for
// an importance buffer is greater than the vended event id for the
// subscription. We know that we have not loaded any events from that
// importance into the current NotifyRequest, so we can skip over it
// for the purposes of the "Fetched events" log line.
if (aSubHandler->mSelfVendedEvents[i] > initialEvents[i])
{
event_count += aSubHandler->mSelfVendedEvents[i] - initialEvents[i];
WeaveLogProgress(DataManagement, "Fetched events [importance: %d, event_id: %u - %u]",
i + 1, initialEvents[i], aSubHandler->mSelfVendedEvents[i] - 1);
}
}
WeaveLogDetail(DataManagement, "Fetched %d events", event_count);
if (event_count > 0)
{
aNeWriteInProgress = true;
}
if (err != WEAVE_NO_ERROR)
{
WeaveLogError(DataManagement, "Error retrieving events, err = %d", err);
}
return err;
}
/**
* @brief
* Given the `SubscriptionHandler`, fill in the `DataList` element
* within the `NotifyRequest`
*
* The function will fill in a `NotifyRequest`'s `DataList`. If
* the property changes occupy more space than available in the
* underlying buffer, the function will only pack enough elements to
* fit within the buffer and adjust the state of the
* `SubscriptionHandler` to resume processing at the first
* unprocessed trait.
*
* @param[in] aSubHandler A pointer to the SubscriptionHandler for
* which we are attempting to create
* a `NotifyRequest`
*
* @param[in] aNotificationRequest A builder object encapsulating the
* request we are trying to build
*
* @param[out] aIsSubscriptionClean A boolean that is set to true
* when no further traits have data
* that needs to be processed by the
* SubscriptionHandler and to false
* otherwise.
*
* @param[out] aNeWriteInProgress A boolean that is set to true
* when there is data written in notify request
* to false otherwise.
*
* @retval #WEAVE_NO_ERROR On success.
*
* @retval other The processing failed within the subroutines. The
* errors may have resulted from state
* corruption, TDM errors, insufficient
* buffering, etc.
*/
WEAVE_ERROR NotificationEngine::BuildSingleNotifyRequestDataList(SubscriptionHandler * aSubHandler,
NotifyRequestBuilder & aNotifyRequest, bool & aIsSubscriptionClean,
bool & aNeWriteInProgress)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
bool packetIsFull = false;
SubscriptionHandler::TraitInstanceInfo * traitInfo =
aSubHandler->GetTraitInstanceInfoList() + aSubHandler->mCurProcessingTraitInstanceIdx;
while (aSubHandler->mCurProcessingTraitInstanceIdx < aSubHandler->GetNumTraitInstances())
{
if (traitInfo->IsDirty())
{
aIsSubscriptionClean = false;
TLVWriter writerCpy;
WeaveLogDetail(DataManagement, "<NE:Run> T%u is dirty", aSubHandler->mCurProcessingTraitInstanceIdx);
// Ensure we're in the DataList element. May allocate memory.
err = aNotifyRequest.MoveToState(kNotifyRequestBuilder_BuildDataList);
SuccessOrExit(err);
// Make a back-up of the writer so that we can rewind back if the next retrieval fails due to the packet getting full.
aNotifyRequest.Checkpoint(writerCpy);
// Retrieve data for this trait instance and clear its dirty flag.
err = RetrieveTraitInstanceData(aSubHandler, traitInfo, &aNotifyRequest, &packetIsFull);
VerifyOrExit(err == WEAVE_NO_ERROR,
WeaveLogError(DataManagement, "<NE:Run> Error retrieving data from trait, aborting"));
if (packetIsFull)
{
WeaveLogDetail(DataManagement, "<NE:Run> Packet got full!");
// Restore the writer
aNotifyRequest.Rollback(writerCpy);
// when first trait property is too big to fit in the packet, ignore that trait property.
if (!aNeWriteInProgress)
{
WeaveLogDetail(DataManagement, "<NE:Run> trait property is too big so that it fails to fit in the packet");
traitInfo->ClearDirty();
}
else
{
break;
}
}
else
{
aNeWriteInProgress = true;
}
}
aSubHandler->mCurProcessingTraitInstanceIdx++;
traitInfo++;
}
// Only do this if our sub handler is still valid at this point (which it may not be)
if (aSubHandler->GetNumTraitInstances())
{
aSubHandler->mCurProcessingTraitInstanceIdx %= aSubHandler->GetNumTraitInstances();
}
exit:
return err;
}
/**
* @brief
* Build and send a single notify request for a given subscription handler
*
* The function creates and sends a single `NotifyRequest` for a
* given `SubscriptionHandler`. If there are changes in the TDM or
* in the event log state, the function will allocate a buffer, fill
* it with trait and event data (as appropriate) and send it the
* buffer to the subscriber. If the data to be sent to the
* subscriber spans more than a single notify request, the function
* must be called multiple times to ensure that all the trait and
* event data is synchronized between publisher and subscriber; in
* that case, the function will adjust the internal state of the
* `SubscriptionHandler` such that the subsequent `NotifyRequest`s
* resume at a point where this request left off.
*
* The function prioritizes trait properties over events: the trait
* properties are serialized first and events are serialized into
* space leftover after the properties have been serialized.
*
* The function allocates at most one `PacketBuffer`. At the end of
* the function, either the ownership of this buffer is passed to the
* `WeaveMessageLayer` or the buffer is de-allocated.
*
* If the function encounters any error that's not a WEAVE_ERR_MEM,
* the function will abort the subscription.
*
* @param[in] aSubHandler A pointer to the SubscriptionHandler for
* which we are attempting to create
* a `NotifyRequest`
*
* @param[out] aSubscriptionHandled An output parameter used in
* tracking the iteration through
* subscriptions.
*
* @param[out] aIsSubscriptionClean An output parameter that is set
* to true if there is no processing
* left for this
* `SubscriptionHandler`, and to
* false otherwise.
*
* @retval #WEAVE_NO_ERROR On success.
*
* @retval #WEAVE_ERR_MEM The function could not allocate memory. The
* caller may need to abort its current
* iteration and restart processing under less
* memory pressure.
*/
WEAVE_ERROR NotificationEngine::BuildSingleNotifyRequest(SubscriptionHandler * aSubHandler, bool & aSubscriptionHandled,
bool & aIsSubscriptionClean)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer * buf = NULL;
TLVWriter writer;
NotifyRequestBuilder notifyRequest;
bool subClean;
bool neWriteInProgress = false;
uint32_t maxNotificationSize = 0;
uint32_t maxPayloadSize = 0;
aIsSubscriptionClean = true; // assume no work it to be done
// If we're picking up from where we left off last, don't assume the subscription will be clean nor handled completely in this
// evaluation round.
if (aSubHandler->mCurProcessingTraitInstanceIdx != 0)
{
aIsSubscriptionClean = false;
aSubscriptionHandled = false;
}
maxNotificationSize = aSubHandler->GetMaxNotificationSize();
err = aSubHandler->mBinding->AllocateRightSizedBuffer(buf, maxNotificationSize, WDM_MIN_NOTIFICATION_SIZE, maxPayloadSize);
SuccessOrExit(err);
// Create a notify request.
err = notifyRequest.Init(buf, &writer, aSubHandler, maxPayloadSize);
SuccessOrExit(err);
// Fill in the DataList. Allocation may take place
subClean = true;
err = BuildSingleNotifyRequestDataList(aSubHandler, notifyRequest, subClean, neWriteInProgress);
SuccessOrExit(err);
aIsSubscriptionClean &= subClean;
subClean = true;
#if WEAVE_CONFIG_EVENT_LOGGING_WDM_OFFLOAD
// Fill in the EventList. Allocation may take place
err = BuildSingleNotifyRequestEventList(aSubHandler, notifyRequest, subClean, neWriteInProgress);
SuccessOrExit(err);
#endif
aIsSubscriptionClean &= subClean;
// transition request builder to the Init state. If buffer was
// not allocated, then the function is a noop. Otherwise, the TLV
// elements get closed (through the NotificationRequest), and buf
// is non-null
err = notifyRequest.MoveToState(kNotifyRequestBuilder_Idle);
SuccessOrExit(err);
// TODO: JIRA-1419. change the NotifyRequest to provide a facility of buffer
// ownership transfer. At this point, perhaps it is of minor
// utility, but it we ever get tooling to track buffer ownership,
// it would be handy. As it is, at this point in the code, the
// request builder should be dead.
if (neWriteInProgress && buf)
{
WeaveLogDetail(DataManagement, "<NE:Run> Sending notify...");
err = SendNotify(buf, aSubHandler);
// NULL out the buf since we've handed it over to the message layer
buf = NULL;
VerifyOrExit(err == WEAVE_NO_ERROR, WeaveLogError(DataManagement, "<NE:Run> Error sending out notify!"));
}
exit:
// On any error, abort the subscription, and consider it handled.
if (err != WEAVE_NO_ERROR)
{
// abort subscription, squash error, signal to upper
// layers that the subscription is done
aSubHandler->TerminateSubscription(err, NULL, false);
aSubscriptionHandled = true;
err = WEAVE_NO_ERROR;
}
if (buf != NULL)
{
PacketBuffer::Free(buf);
}
return err;
}
#if WDM_ENABLE_SUBSCRIPTIONLESS_NOTIFICATION
WEAVE_ERROR NotificationEngine::SendSubscriptionlessNotification(Binding * const apBinding,
TraitPath *aPathList,
uint16_t aPathListSize)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
PacketBuffer * msgBuf = NULL;
ExchangeContext *ec = NULL;
uint32_t maxPayloadSize = 0;
VerifyOrExit(apBinding != NULL && aPathList != NULL,
err = WEAVE_ERROR_INVALID_ARGUMENT);
err = apBinding->AllocateRightSizedBuffer(msgBuf, WDM_MAX_NOTIFICATION_SIZE, WDM_MIN_NOTIFICATION_SIZE, maxPayloadSize);
SuccessOrExit(err);
// Build Notify Request for subscriptionless notification
err = BuildSubscriptionlessNotification(msgBuf, maxPayloadSize, aPathList, aPathListSize);
SuccessOrExit(err);
err = apBinding->NewExchangeContext(ec);
SuccessOrExit(err);
ec->AppState = this;
err = ec->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, kMsgType_SubscriptionlessNotification, msgBuf);
msgBuf = NULL;
SuccessOrExit(err);
ec->Close();
ec = NULL;
exit:
if (NULL != msgBuf)
{
PacketBuffer::Free(msgBuf);
msgBuf = NULL;
}
if (NULL != ec)
{
ec->Abort();
ec = NULL;
}
return err;
}
WEAVE_ERROR NotificationEngine::BuildSubscriptionlessNotification(PacketBuffer *aMsgBuf, uint32_t maxPayloadSize,
TraitPath *aPathList,
uint16_t aPathListSize)
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
TLVWriter writer;
NotifyRequestBuilder notifyRequest;
TraitDataSource *dataSource;
TraitPath *currPath;
TraitCatalogBase<TraitDataSource> * pubCatalog;
SchemaVersion schemaVersion;
VerifyOrExit(aPathList != NULL, err = WEAVE_ERROR_INVALID_ARGUMENT);
currPath = aPathList;
// Get a handle of the publisher catalog
pubCatalog = SubscriptionEngine::GetInstance()->mPublisherCatalog;
// Create a notify request.
err = notifyRequest.Init(aMsgBuf, &writer, NULL, maxPayloadSize);
SuccessOrExit(err);
// Ensure we're in the DataList element.
err = notifyRequest.MoveToState(kNotifyRequestBuilder_BuildDataList);
SuccessOrExit(err);
// Iterate through the trait path list and populate the notifyrequest with
// trait instance data.
for (uint16_t i = 0; i < aPathListSize; i++, currPath++)
{
TraitDataHandle traitHandle = currPath->mTraitDataHandle;
// Get the max version from the datasource
// Locate() can return an error if the sink has been removed from the catalog. In that case,
// skip this path
if (pubCatalog->Locate(traitHandle, &dataSource) == WEAVE_NO_ERROR)
{
schemaVersion = dataSource->GetSchemaEngine()->GetMaxVersion();
err = notifyRequest.WriteDataElement(traitHandle, kRootPropertyPathHandle, schemaVersion, NULL, 0, NULL, 0);
SuccessOrExit(err);
}
}
err = notifyRequest.MoveToState(kNotifyRequestBuilder_Idle);
SuccessOrExit(err);
exit:
return err;
}
#endif // WDM_ENABLE_SUBSCRIPTIONLESS_NOTIFICATION
void NotificationEngine::Run(System::Layer * aSystemLayer, void * aAppState, System::Error)
{
NotificationEngine * const pEngine = reinterpret_cast<NotificationEngine *>(aAppState);
pEngine->Run();
}
void NotificationEngine::ScheduleRun()
{
SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->ScheduleWork(Run, this);
}
void NotificationEngine::Run()
{
WEAVE_ERROR err = WEAVE_NO_ERROR;
uint32_t numSubscriptionsHandled = 0;
SubscriptionEngine * subEngine = SubscriptionEngine::GetInstance();
SubscriptionHandler * subHandler = subEngine->mHandlers + mCurSubscriptionHandlerIdx;
bool subscriptionHandled, isSubscriptionClean;
bool isClean = true;
bool isLocked = false;
// Lock before attempting to modify any of the shared data structures.
err = subEngine->Lock();
SuccessOrExit(err);
isLocked = true;
WeaveLogDetail(DataManagement, "<NE:Run> NotifiesInFlight = %u", mNumNotifiesInFlight);
while ((mNumNotifiesInFlight < WDM_PUBLISHER_MAX_NOTIFIES_IN_FLIGHT) &&
(numSubscriptionsHandled < SubscriptionEngine::kMaxNumSubscriptionHandlers))
{
subscriptionHandled = true;
// limit the prints to handlers that are in meaingful subscribing/notifying states.
if (subHandler->IsNotifying() || subHandler->IsSubscribing())
{
WeaveLogDetail(DataManagement, "<NE:Run> Eval Subscription: %u (state = %s, num-traits = %u)!",
mCurSubscriptionHandlerIdx, subHandler->GetStateStr(), subHandler->GetNumTraitInstances());
}
if (subHandler->IsNotifiable())
{
// This is needed because some error could trigger abort on subscription, which leads to destroy of the handler
subHandler->_AddRef();
err = BuildSingleNotifyRequest(subHandler, subscriptionHandled, isSubscriptionClean);
SuccessOrExit(err);
if (isSubscriptionClean)
{
// TODO: notification based on the event list state.
subHandler->OnNotifyProcessingComplete(false, NULL, 0);
}
subHandler->_Release();
}
if (subscriptionHandled)
{
numSubscriptionsHandled++;
}
else
{
WeaveLogDetail(DataManagement, "<NE:Run> Subscription %u not handled", mCurSubscriptionHandlerIdx);
numSubscriptionsHandled = 0;
}
mCurSubscriptionHandlerIdx = (mCurSubscriptionHandlerIdx + 1) % SubscriptionEngine::kMaxNumSubscriptionHandlers;
subHandler = subEngine->mHandlers + mCurSubscriptionHandlerIdx;
}
subHandler = subEngine->mHandlers;
isClean = true;
// We only wipe our granular dirty stores if all the subscriptions are clean. To do so, we iterate over
// all of them and check each of their dirty flags.
for (int i = 0; i < SubscriptionEngine::kMaxNumSubscriptionHandlers; i++)
{
if (subHandler->IsActive())
{
SubscriptionHandler::TraitInstanceInfo * traitInfo = subHandler->GetTraitInstanceInfoList();
for (size_t j = 0; j < subHandler->GetNumTraitInstances(); j++)
{
if (traitInfo->IsDirty())
{
WeaveLogDetail(DataManagement, "<NE:Run> S%u:T%u still dirty", i, j);
isClean = false;
break;
}
traitInfo++;
}
}
subHandler++;
}
if (isClean)
{
WeaveLogDetail(DataManagement, "<NE> Done processing!");
mGraphSolver.ClearDirty();
}
exit:
if (isLocked)
{
subEngine->Unlock();
}
return;
}