/*
 *
 *    Copyright (c) 2016-2017 Nest Labs, Inc.
 *    All rights reserved.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

/**
 *    @file
 *      This file implements subscription client for Weave
 *      Data Management (WDM) profile.
 *
 */

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS

#ifndef __STDC_CONSTANT_MACROS
#define __STDC_CONSTANT_MACROS
#endif // __STDC_CONSTANT_MACROS

#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif // __STDC_LIMIT_MACROS

#include <Weave/Profiles/data-management/Current/WdmManagedNamespace.h>
#include <Weave/Profiles/data-management/DataManagement.h>

#include <Weave/Support/WeaveFaultInjection.h>
#include <Weave/Support/RandUtils.h>
#include <Weave/Support/FibonacciUtils.h>
#include <SystemLayer/SystemStats.h>

namespace nl {
namespace Weave {
namespace Profiles {
namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current) {

class AlwaysAcceptDataElementAccessControlDelegate : public IDataElementAccessControlDelegate
{
public:

     // TODO : This Access Check function needs to be more sophisticated in
     // allowing access to subscription-based notifications.

     WEAVE_ERROR DataElementAccessCheck(const TraitPath & aTraitPath,
                                        const TraitCatalogBase<TraitDataSink> & aCatalog)
     {
         return WEAVE_NO_ERROR;
     }
};

// Do nothing
SubscriptionClient::SubscriptionClient() { }

void SubscriptionClient::InitAsFree()
{
    mCurrentState = kState_Free;
    mRefCount     = 0;
    Reset();
}

void SubscriptionClient::Reset(void)
{
    mBinding                                = NULL;
    mEC                                     = NULL;
    mAppState                               = NULL;
    mEventCallback                          = NULL;
    mResubscribePolicyCallback              = NULL;
    mDataSinkCatalog                        = NULL;
    mInactivityTimeoutDuringSubscribingMsec = kNoTimeOut;
    mLivenessTimeoutMsec                    = kNoTimeOut;
    mSubscriptionId                         = 0;
    mConfig                                 = kConfig_Down;
    mRetryCounter                           = 0;

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    mUpdateMutex                            = NULL;
    mUpdateInFlight                         = false;
    mMaxUpdateSize                          = 0;
    mUpdateRequestContext.Reset();
    mPendingSetState = kPendingSetEmpty;
    mPendingUpdateSet.Init(mPendingStore, ArraySize(mPendingStore));
    mInProgressUpdateList.Init(mInProgressStore, ArraySize(mInProgressStore));
    mUpdateRetryCounter                     = 0;
    mUpdateRetryScheduled                   = false;
    mUpdateFlushScheduled                   = false;
    mSuspendUpdateRetries                   = false;
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

#if WDM_ENABLE_PROTOCOL_CHECKS
    mPrevTraitDataHandle = -1;
#endif

    mPrevIsPartialChange = false;
}

// AddRef to Binding
// store pointers to binding and delegate
// null out EC
WEAVE_ERROR SubscriptionClient::Init(Binding * const apBinding, void * const apAppState, EventCallback const aEventCallback,
                                     const TraitCatalogBase<TraitDataSink> * const apCatalog,
                                     const uint32_t aInactivityTimeoutDuringSubscribingMsec, IWeaveWDMMutex * aUpdateMutex)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    WeaveLogIfFalse(0 == mRefCount);

    // add reference to the binding
    apBinding->AddRef();

    // make a copy of the pointers
    mBinding                                = apBinding;
    mAppState                               = apAppState;
    mEventCallback                          = aEventCallback;

    // Set the protocol callback on the binding object so that the SubscriptionClient gets
    // notified of changes in the binding's state.
    mBinding->SetProtocolLayerCallback(BindingEventCallback, this);

    if (NULL == apCatalog)
    {
        mDataSinkCatalog = NULL;
    }
    else
    {
        mDataSinkCatalog = const_cast <TraitCatalogBase<TraitDataSink>*>(apCatalog);
    }

    mInactivityTimeoutDuringSubscribingMsec = aInactivityTimeoutDuringSubscribingMsec;


#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    mUpdateMutex                            = aUpdateMutex;
    mUpdateInFlight                         = false;
    mMaxUpdateSize                          = 0;

#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
    MoveToState(kState_Initialized);

    _AddRef();

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE

    err = mUpdateClient.Init(mBinding, this, UpdateEventCallback);
    SuccessOrExit(err);

    ConfigureUpdatableSinks();

exit:
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
    return err;
}

#if WEAVE_DETAIL_LOGGING
const char * SubscriptionClient::GetStateStr() const
{
    switch (mCurrentState)
    {
    case kState_Free:
        return "FREE";

    case kState_Initialized:
        return "INIT";

    case kState_Subscribing:
        return "SReq1";

    case kState_Subscribing_IdAssigned:
        return "SReq2";

    case kState_SubscriptionEstablished_Idle:
        return "ALIVE";

    case kState_SubscriptionEstablished_Confirming:
        return "CONFM";

    case kState_Canceling:
        return "CANCL";

    case kState_Resubscribe_Holdoff:
        return "RETRY";

    case kState_Terminated:
        return "TERM";
    }
    return "N/A";
}
#else  // WEAVE_DETAIL_LOGGING
const char * SubscriptionClient::GetStateStr() const
{
    return "N/A";
}
#endif // WEAVE_DETAIL_LOGGING

void SubscriptionClient::MoveToState(const ClientState aTargetState)
{
    mCurrentState = aTargetState;
    WeaveLogDetail(DataManagement, "Client[%u] moving to [%5.5s] Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), mRefCount);

#if WEAVE_DETAIL_LOGGING
    if (kState_Free == mCurrentState)
    {
        SubscriptionEngine::GetInstance()->LogSubscriptionFreed();
    }
#endif // #if WEAVE_DETAIL_LOGGING
}

/**
 * @brief Enable automatic resubscribes. Attach a callback to specify
 * the next retry time on failure.
 *
 * @param[in] aCallback     Optional callback to fetch the amount of time to
 *                          wait before retrying after a failure. If NULL use
 *                          a default policy.
 */
void SubscriptionClient::EnableResubscribe(ResubscribePolicyCallback aCallback)
{
    if (aCallback)
    {
        mResubscribePolicyCallback = aCallback;
    }
    else
    {
        mResubscribePolicyCallback = DefaultResubscribePolicyCallback;
    }
}

/**
 * @brief Disable the resubscribe mechanism. This will abort if a resubscribe
 * was pending.
 */
void SubscriptionClient::DisableResubscribe(void)
{
    mResubscribePolicyCallback = NULL;

    if (mCurrentState == kState_Resubscribe_Holdoff)
    {
        // cancel timer
        CancelSubscriptionTimer();

        // app doesn't need to know since it triggered this
        AbortSubscription();
    }
}

/**
 * @brief Kick the resubscribe mechanism. This will initiate an immediate retry
 */
void SubscriptionClient::ResetResubscribe(void)
{
    if (mCurrentState == kState_Resubscribe_Holdoff)
    {
        // cancel timer
        CancelSubscriptionTimer();
        MoveToState(kState_Initialized);
    }

    mRetryCounter = 0;

    if (mCurrentState == kState_Initialized || mCurrentState == kState_Resubscribe_Holdoff)
    {
        SetRetryTimer(WEAVE_NO_ERROR);
    }
}

void SubscriptionClient::IndicateActivity(void)
{
    // emit an OnSubscriptionActivity event
    InEventParam inParam;
    OutEventParam outParam;

    inParam.mSubscriptionActivity.mClient = this;
    mEventCallback(mAppState, kEvent_OnSubscriptionActivity, inParam, outParam);
}

WEAVE_ERROR SubscriptionClient::GetSubscriptionId(uint64_t * const apSubscriptionId)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    *apSubscriptionId = 0;

    switch (mCurrentState)
    {
    case kState_Subscribing_IdAssigned:
    case kState_SubscriptionEstablished_Idle:
    case kState_SubscriptionEstablished_Confirming:
    case kState_Canceling:
        *apSubscriptionId = mSubscriptionId;
        ExitNow();
        break;

    default:
        ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
    }

exit:
    WeaveLogFunctError(err);

    return err;
}

void SubscriptionClient::DefaultEventHandler(EventID aEvent, const InEventParam & aInParam, OutEventParam & aOutParam)
{
    IgnoreUnusedVariable(aInParam);
    IgnoreUnusedVariable(aOutParam);

    WeaveLogDetail(DataManagement, "%s event: %d", __func__, aEvent);
}

/**
 * @brief The default policy implementation will pick a random timeslot
 * with millisecond resolution over an ever increasing window,
 * following a fibonacci sequence upto WDM_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX.
 * Average of the randomized wait time past the WDM_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX
 * will be around one hour.
 * When the retry count resets to 0, the sequence starts from the beginning again.
 */

void SubscriptionClient::DefaultResubscribePolicyCallback(void * const aAppState, ResubscribeParam & aInParam,
                                                          uint32_t & aOutIntervalMsec)
{
    IgnoreUnusedVariable(aAppState);

    uint32_t fibonacciNum      = 0;
    uint32_t maxWaitTimeInMsec = 0;
    uint32_t waitTimeInMsec    = 0;
    uint32_t minWaitTimeInMsec = 0;

    if (aInParam.mNumRetries <= WDM_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
    {
        fibonacciNum      = GetFibonacciForIndex(aInParam.mNumRetries);
        maxWaitTimeInMsec = fibonacciNum * WDM_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
    }
    else
    {
        maxWaitTimeInMsec = WDM_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
    }

    if (maxWaitTimeInMsec != 0)
    {
        minWaitTimeInMsec = (WDM_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
        waitTimeInMsec    = minWaitTimeInMsec + (GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
    }

    aOutIntervalMsec = waitTimeInMsec;

    WeaveLogDetail(DataManagement,
                   "Computing %s policy: attempts %" PRIu32 ", max wait time %" PRIu32 " ms, selected wait time %" PRIu32
                   " ms",
                   aInParam.mRequestType == ResubscribeParam::kSubscription ? "resubscribe" : "update",
                   aInParam.mNumRetries, maxWaitTimeInMsec, waitTimeInMsec);

    return;
}

void SubscriptionClient::_InitiateSubscription(void)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    // Make sure the client object is not freed during the callback to the application.
    _AddRef();

    VerifyOrExit(kState_Subscribing != mCurrentState && kState_Subscribing_IdAssigned != mCurrentState, /* no-op */);

    VerifyOrExit(kState_Initialized >= mCurrentState, err = WEAVE_ERROR_INCORRECT_STATE);

#if WDM_ENABLE_PROTOCOL_CHECKS
    mPrevTraitDataHandle = -1;
#endif

    mPrevIsPartialChange = false;

    // If the binding is ready...
    if (mBinding->IsReady())
    {
        // Enter the Subscribing state.
        if (IsInitiator())
        {
            MoveToState(kState_Subscribing);
        }
        else
        {
            MoveToState(kState_Subscribing_IdAssigned);
        }

        // Using the binding, form and send a SubscribeRequest to the publisher.
        err = SendSubscribeRequest();
        SuccessOrExit(err);

        err = RefreshTimer();
        SuccessOrExit(err);
    }
    else
    {
        err = _PrepareBinding();
        SuccessOrExit(err);
    }

exit:
    WeaveLogFunctError(err);

    if (WEAVE_NO_ERROR != err)
    {
        TerminateSubscription(err, NULL, false);
    }

    _Release();
}

WEAVE_ERROR SubscriptionClient::_PrepareBinding()
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    _AddRef();

    if (mBinding->IsReady())
    {
        ExitNow();
    }
    else if (mBinding->CanBePrepared())
    {
        // Ask the application prepare the binding by delivering a PrepareRequested API event to it via the
        // binding's callback.  At some point the binding will callback into the SubscriptionClient signaling
        // that preparation has completed (successfully or otherwise).  Note that this callback can happen
        // synchronously within the RequestPrepare() method, implying that _InitiateSubscription() will recurse.
        err = mBinding->RequestPrepare();
        SuccessOrExit(err);
    }
    // Otherwise, verify that the binding is in one of the preparing states.  Once preparation completes, the
    // binding will call back, at which point, if preparation was successful, _InitiateSubscription() will be
    // called again.
    else
    {
        VerifyOrExit(mBinding->IsPreparing(), err = WEAVE_ERROR_INCORRECT_STATE);
    }

exit:
    _Release();

    return err;
}

WEAVE_ERROR SubscriptionClient::SendSubscribeRequest(void)
{
    WEAVE_ERROR err       = WEAVE_NO_ERROR;
    PacketBuffer * msgBuf = NULL;
    uint8_t msgType       = kMsgType_SubscribeRequest;
    InEventParam inSubscribeParam;
    OutEventParam outSubscribeParam;

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    inSubscribeParam.Clear();
    outSubscribeParam.Clear();

    outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList = NULL;
    outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList          = NULL;

    inSubscribeParam.mSubscribeRequestPrepareNeeded.mClient = this;
    mEventCallback(mAppState, kEvent_OnSubscribeRequestPrepareNeeded, inSubscribeParam, outSubscribeParam);

    if (IsCounterSubscriber())
    {
        mSubscriptionId = outSubscribeParam.mSubscribeRequestPrepareNeeded.mSubscriptionId;
    }

    VerifyOrExit((kState_Subscribing == mCurrentState || kState_Subscribing_IdAssigned == mCurrentState), err = WEAVE_ERROR_INCORRECT_STATE);
    VerifyOrExit((outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin <= kMaxTimeoutSec) ||
                     (kNoTimeOut == outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin),
                 err = WEAVE_ERROR_INVALID_ARGUMENT);
    VerifyOrExit((outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax <= kMaxTimeoutSec) ||
                     (kNoTimeOut == outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax),
                 err = WEAVE_ERROR_INVALID_ARGUMENT);

    msgBuf = PacketBuffer::New();
    VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);

    {
        nl::Weave::TLV::TLVWriter writer;
        SubscribeRequest::Builder request;

        writer.Init(msgBuf);

        err = request.Init(&writer);
        SuccessOrExit(err);

        if (kNoTimeOut != outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin)
        {
            request.SubscribeTimeoutMin(outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMin);
        }
        if (kNoTimeOut != outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax)
        {
            request.SubscribeTimeoutMax(outSubscribeParam.mSubscribeRequestPrepareNeeded.mTimeoutSecMax);
        }
        if (IsCounterSubscriber())
        {
            request.SubscriptionID(mSubscriptionId);
        }

        // It's safe to bail out after a series of operation, for
        // SubscriptionRequest::Builder would internally turn to NOP after error is logged
        SuccessOrExit(err = request.GetError());

        {
            PathList::Builder & pathList = request.CreatePathListBuilder();

            for (size_t i = 0; i < outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathListSize; ++i)
            {
                TraitDataSink * pDataSink;
                nl::Weave::TLV::TLVType dummyContainerType;
                SchemaVersionRange versionIntersection;
                VersionedTraitPath versionedTraitPath;

                // Applications can set either the versioned or non versioned path lists for now. We pick either
                // depending on which is non-NULL. If both are non-NULL, we then select the versioned list.
                if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList)
                {
                    versionedTraitPath = outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList[i];
                }
                else
                {
                    versionedTraitPath.mTraitDataHandle =
                        outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mTraitDataHandle;
                    versionedTraitPath.mPropertyPathHandle =
                        outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mPropertyPathHandle;
                }

                if (mDataSinkCatalog->Locate(versionedTraitPath.mTraitDataHandle, &pDataSink) != WEAVE_NO_ERROR)
                {
                    // Locate() can return an error if the sink has been removed from the catalog. In that case,
                    // continue to next entry
                    continue;
                }

                // Start the TLV Path
                err = writer.StartContainer(nl::Weave::TLV::AnonymousTag, nl::Weave::TLV::kTLVType_Path, dummyContainerType);
                SuccessOrExit(err);

                // Start, fill, and close the TLV Structure that contains ResourceID, ProfileID, and InstanceID
                err = mDataSinkCatalog->HandleToAddress(versionedTraitPath.mTraitDataHandle, writer,
                                                        versionedTraitPath.mRequestedVersionRange);

                if (err == WEAVE_ERROR_INVALID_ARGUMENT)
                {
                    // Ideally, this code will not be reached as HandleToAddress() should find the entry in the catalog.
                    // Otherwise, the earlier Locate() call would have continued.
                    // However, keeping this check here for consistency and code safety
                    err = WEAVE_NO_ERROR;
                    continue;
                }

                SuccessOrExit(err);

                // Append zero or more TLV tags based on the Path Handle
                err = pDataSink->GetSchemaEngine()->MapHandleToPath(versionedTraitPath.mPropertyPathHandle, writer);
                SuccessOrExit(err);

                // Close the TLV Path
                err = writer.EndContainer(dummyContainerType);
                SuccessOrExit(err);
            }

            pathList.EndOfPathList();
            SuccessOrExit(err = pathList.GetError());
        }

        {
            VersionList::Builder & versionList = request.CreateVersionListBuilder();

            for (size_t i = 0; i < outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathListSize; ++i)
            {
                TraitDataSink * pDataSink;
                VersionedTraitPath versionedTraitPath;

                if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList)
                {
                    versionedTraitPath = outSubscribeParam.mSubscribeRequestPrepareNeeded.mVersionedPathList[i];
                }
                else
                {
                    versionedTraitPath.mTraitDataHandle =
                        outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mTraitDataHandle;
                    versionedTraitPath.mPropertyPathHandle =
                        outSubscribeParam.mSubscribeRequestPrepareNeeded.mPathList[i].mPropertyPathHandle;
                }

                if (mDataSinkCatalog->Locate(versionedTraitPath.mTraitDataHandle, &pDataSink) != WEAVE_NO_ERROR)
                {
                    // Locate() can return an error if the sink has been removed from the catalog. In that case,
                    // continue to next entry
                    continue;
                }

                if (pDataSink->IsVersionValid())
                {
                    versionList.AddVersion(pDataSink->GetVersion());
                }
                else
                {
                    versionList.AddNull();
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
                    if (pDataSink->IsUpdatableDataSink() && ( ! pDataSink->IsVersionValid()))
                    {
                        TraitUpdatableDataSink *updatableSink = static_cast<TraitUpdatableDataSink *>(pDataSink);
                        ClearPotentialDataLoss(versionedTraitPath.mTraitDataHandle, *updatableSink);
                    }
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

                }
            }

            versionList.EndOfVersionList();
            SuccessOrExit(err = versionList.GetError());
        }

        if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mNeedAllEvents)
        {
            request.SubscribeToAllEvents(true);

            if (outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize > 0)
            {

                EventList::Builder & eventList = request.CreateLastObservedEventIdListBuilder();

                for (size_t n = 0; n < outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventListSize; ++n)
                {
                    Event::Builder & event = eventList.CreateEventBuilder();
                    event.SourceId(outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList[n].mSourceId)
                        .Importance(outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList[n].mImportance)
                        .EventId(outSubscribeParam.mSubscribeRequestPrepareNeeded.mLastObservedEventList[n].mEventId)
                        .EndOfEvent();
                    SuccessOrExit(err = event.GetError());
                }
                eventList.EndOfEventList();
                SuccessOrExit(err = eventList.GetError());
            }
        }

        request.EndOfRequest();
        SuccessOrExit(err = request.GetError());

        err = writer.Finalize();
        SuccessOrExit(err);
    }

    err = ReplaceExchangeContext();
    SuccessOrExit(err);

    // NOTE: State could be changed in sync error callback by message layer
    WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_SendUnsupportedReqMsgType, msgType += 50);

#if 0 // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
    // TODO: SubscribeRequest::CheckSchemaValidity is correct and rejects empty PathLists;
    // but the loop above can encode an empty PathList, and
    // the parser in SubscriptionHandler::ParsePathVersionEventLists will accept it.
    // Need to fix this in a way that's backward compatible.
    {
        nl::Weave::TLV::TLVReader reader;
        SubscribeRequest::Parser request;
        reader.Init(msgBuf);

        err = reader.Next();
        SuccessOrExit(err);

        err = request.Init(reader);
        SuccessOrExit(err);

        err = request.CheckSchemaValidity();
        SuccessOrExit(err);
    }
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK

    err    = mEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, msgType, msgBuf,
                              nl::Weave::ExchangeContext::kSendFlag_ExpectResponse);
    msgBuf = NULL;
    SuccessOrExit(err);

exit:
    WeaveLogFunctError(err);

    if (NULL != msgBuf)
    {
        PacketBuffer::Free(msgBuf);
        msgBuf = NULL;
    }

    return err;
}

/**
 * Configure the SubscriptionClient as an initiator (as opposed to
 * a counter-subscriber) and bring the subscription up if it is not.
 */
void SubscriptionClient::InitiateSubscription(void)
{
    mConfig = kConfig_Initiator;

    if (IsRetryEnabled())
    {
        if (false == mBinding->IsPreparing())
        {
            ResetResubscribe();
        }
    }
    else
    {
        _InitiateSubscription();
    }
}

void SubscriptionClient::InitiateCounterSubscription(const uint32_t aLivenessTimeoutSec)
{
    mConfig = kConfig_CounterSubscriber;

    // the liveness timeout spec is given and not part of the subscription setup
    mLivenessTimeoutMsec = aLivenessTimeoutSec * 1000;

    _InitiateSubscription();
}

void SubscriptionClient::_AddRef()
{
    WeaveLogIfFalse(mRefCount < INT8_MAX);

    ++mRefCount;

    // 0: free
    // 1: in some phase of subscription
    // increase: in downcall to message layer, some callback might come from message layer (send error/connection broken)
    // increase: in callback to app layer
}

void SubscriptionClient::_Release()
{
    WeaveLogIfFalse(mRefCount > 0);

    // If releasing the last reference...
    if (1 == mRefCount)
    {
        // Just to be safe, call AbortSubscription() to ensure that the subscription
        // is properly terminated. If the state transition logic is correct everywhere
        // else in the code, the subscription will already have been terminated and
        // this call will be a no-op.
        AbortSubscription();

        // Clean up resources/state associated with the client object.
        _Cleanup();

        // Return the client to the Free state.
        // NOTE: mRefCount is set to zero here solely to satisfy automated tests that look for
        // a specific reference count in the "Moving to [ FREE]" log message.
        mRefCount = 0;
        MoveToState(kState_Free);

        // Re-initialize all state data.
        InitAsFree();

        SYSTEM_STATS_DECREMENT(nl::Weave::System::Stats::kWDM_NumSubscriptionClients);
    }
    else
    {
        --mRefCount;
    }
}

Binding * SubscriptionClient::GetBinding() const
{
    return mBinding;
}

uint64_t SubscriptionClient::GetPeerNodeId(void) const
{
    return (mBinding != NULL) ? mBinding->GetPeerNodeId() : kNodeIdNotSpecified;
}

WEAVE_ERROR SubscriptionClient::ReplaceExchangeContext()
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    InEventParam inParam;
    OutEventParam outParam;

    // Make sure we're not freed by accident.
    _AddRef();

    FlushExistingExchangeContext();

    err = mBinding->NewExchangeContext(mEC);
    SuccessOrExit(err);

    mEC->AppState          = this;
    mEC->OnMessageReceived = OnMessageReceivedFromLocallyInitiatedExchange;
    mEC->OnResponseTimeout = OnResponseTimeout;
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
    mEC->OnSendError       = OnSendError;
    mEC->OnAckRcvd         = NULL;
#endif

    inParam.mExchangeStart.mEC     = mEC;
    inParam.mExchangeStart.mClient = this;

    // NOTE: app layer is not supposed to change state/ref count in this callback
    mEventCallback(mAppState, kEvent_OnExchangeStart, inParam, outParam);

exit:
    WeaveLogFunctError(err);

    _Release();

    return err;
}

void SubscriptionClient::FlushExistingExchangeContext(const bool aAbortNow)
{
    if (NULL != mEC)
    {
        mEC->AppState          = NULL;
        mEC->OnMessageReceived = NULL;
        mEC->OnResponseTimeout = NULL;
#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING
        mEC->OnSendError       = NULL;
        mEC->OnAckRcvd         = NULL;
#endif
        if (aAbortNow)
        {
            mEC->Abort();
        }
        else
        {
            mEC->Close();
        }
        mEC = NULL;
    }
}

/**
 * Gracefully end a client subscription
 *
 * Gracefully terminates the client end of a subscription.  If subscription cancel
 * support is enabled, a SubscribeCancelRequest message is sent to the subscription
 * publisher and the system awaits a reply before terminating the subscription;
 * otherwise the subscription is immediately terminated in a similar manner to
 * AbortSubscription().  If a mutual subscription exists, the counter subscription
 * is terminated as well.
 *
 * While awaiting a response to a SubscribeCancelRequest, the \c SubscriptionClient
 * enters the \c Canceling state.
 *
 * Once the termination process begins, the \c SubscriptionClient object enters the
 * `Terminated` state and an \c OnSubscriptionTerminated event is delivered to
 * the application's event handler.  Note that, if cancel support is _not_ enabled,
 * the event handler may be called synchronously within the call to EndSubscription().
 *
 * After the application's event handler returns, the \c SubscriptionClient object enters
 * the `Initialized` state.   At this point the \c SubscriptionClient object may be used
 * to initiate another subscription, or it may be freed by calling the Free() method.
 */
WEAVE_ERROR SubscriptionClient::EndSubscription()
{
    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

#if WDM_ENABLE_SUBSCRIPTION_CANCEL

    WEAVE_ERROR err       = WEAVE_NO_ERROR;
    PacketBuffer * msgBuf = NULL;
    nl::Weave::TLV::TLVWriter writer;
    SubscribeCancelRequest::Builder request;

    // Make sure we're not freed by accident.
    _AddRef();

    mConfig = kConfig_Down;

    switch (mCurrentState)
    {
    case kState_Subscribing:
    case kState_Resubscribe_Holdoff:
        // fall through
    case kState_Subscribing_IdAssigned:

        // If the subscription is not full established, simply terminate it without
        // informing the peer.
        TerminateSubscription(WEAVE_NO_ERROR, NULL, false);
        break;

    case kState_SubscriptionEstablished_Confirming:
        // forget we're in the middle of confirmation, as the outcome
        // has become irrelevant
        FlushExistingExchangeContext();

        // fall through

    case kState_SubscriptionEstablished_Idle:
        msgBuf = PacketBuffer::NewWithAvailableSize(request.kBaseMessageSubscribeId_PayloadLen);
        VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);

        err = ReplaceExchangeContext();
        SuccessOrExit(err);

        writer.Init(msgBuf);
        request.Init(&writer);
        err = request.SubscriptionID(mSubscriptionId).EndOfRequest().GetError();
        SuccessOrExit(err);

        err = writer.Finalize();
        SuccessOrExit(err);

        // NOTE: State could be changed if there is a sync error callback from message layer
        err    = mEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, kMsgType_SubscribeCancelRequest, msgBuf,
                                  nl::Weave::ExchangeContext::kSendFlag_ExpectResponse);
        msgBuf = NULL;
        SuccessOrExit(err);

        MoveToState(kState_Canceling);
        break;

    // Cancel is not supported in any other state
    default:
        ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
    }

exit:
    WeaveLogFunctError(err);

    if (NULL != msgBuf)
    {
        PacketBuffer::Free(msgBuf);
        msgBuf = NULL;
    }

    _Release();

    return err;

#else // WDM_ENABLE_SUBSCRIPTION_CANCEL

    // When Cancel support is not enabled, simply terminate the subscription without
    // informing the peer.
    mConfig = kConfig_Down;
    TerminateSubscription(WEAVE_NO_ERROR, NULL, false);
    return WEAVE_NO_ERROR;

#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL
}

void SubscriptionClient::_Cleanup(void)
{
    if (mBinding)
    {
        mBinding->SetProtocolLayerCallback(NULL, NULL);
        mBinding->Release();
        mBinding = NULL;
    }

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    mUpdateClient.Shutdown();

    mDataSinkCatalog->Iterate(CleanupUpdatableSinkTrait, this);
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
}

/**
 * Abort a client subscription
 *
 * Terminates the client end of a subscription, without notifying the subscription
 * publisher and without delivering an \c OnSubscriptionTerminated event to the
 * application's event handler.  If a mutual subscription exists, the counter
 * subscription is terminated as well.
 *
 * Upon calling AbortSubscription(), the \c SubscriptionClient object enters the
 * `Terminated` state. Once the termination process completes, the object enters
 * the `Initialized` state. Both transitions happen synchronously within the call
 * to AbortSubscription().
 *
 * After AbortSubscription() returns, the \c SubscriptionClient object may be used to
 * initiate another subscription, or it may be freed by calling the Free() method.
 */
void SubscriptionClient::AbortSubscription(void)
{
    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    // Immediately terminate any active or in progress subscription.  Since Abort
    // is always synchronous in nature, suppress the OnSubscriptionTerminated callback
    // to the application (but not the SubscriptionTerminated event to the trait handlers).
    mConfig = kConfig_Down;
    TerminateSubscription(WEAVE_NO_ERROR, NULL, true);
}

void SubscriptionClient::TerminateSubscription(WEAVE_ERROR aReason, StatusReporting::StatusReport * aStatusReport, bool suppressAppCallback)
{
    // If the SubscriptionClient is active...
    if (mCurrentState != kState_Initialized && mCurrentState != kState_Terminated)
    {
        const ClientState prevState = mCurrentState;

        WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                       GetStateStr(), __func__, mRefCount);

        // Ensure that the client object isn't freed while any callbacks are active.
        _AddRef();

        // Abort any in-progress exchange.
        FlushExistingExchangeContext(true);

        // Stop the subscription timer
        CancelSubscriptionTimer();

        MoveToState(kState_Terminated);

        if (prevState >= kState_Subscribing && prevState <= kState_Canceling)
        {
#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
            const uint64_t subscriptionId = mSubscriptionId;
            const uint64_t peerNodeId = mBinding->GetPeerNodeId();
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER

            // Deliver SubscriptionTerminated event to trait handlers.
            if (NULL != mDataSinkCatalog)
            {
                mDataSinkCatalog->DispatchEvent(TraitDataSink::kEventSubscriptionTerminated, NULL);
            }

            // Deliver OnSubscriptionTerminated event to application.
            if (NULL != mEventCallback && !suppressAppCallback)
            {
                InEventParam inParam;
                OutEventParam outParam;

                inParam.Clear();
                outParam.Clear();

                inParam.mSubscriptionTerminated.mReason            = aReason;
                inParam.mSubscriptionTerminated.mClient            = this;
                inParam.mSubscriptionTerminated.mWillRetry         = (ShouldSubscribe() && IsRetryEnabled());
                if (aStatusReport != NULL)
                {
                    inParam.mSubscriptionTerminated.mIsStatusCodeValid = true;
                    inParam.mSubscriptionTerminated.mStatusProfileId   = aStatusReport->mProfileId;
                    inParam.mSubscriptionTerminated.mStatusCode        = aStatusReport->mStatusCode;
                    inParam.mSubscriptionTerminated.mAdditionalInfoPtr = &(aStatusReport->mAdditionalInfo);
                }

                mEventCallback(mAppState, kEvent_OnSubscriptionTerminated, inParam, outParam);
            }

#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
            SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(peerNodeId, subscriptionId, true);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER
        }

        if (mCurrentState == kState_Terminated)
        {
            if (ShouldSubscribe() && IsRetryEnabled())
            {
                SetRetryTimer(aReason);
            }
            else
            {
                MoveToState(kState_Initialized);
                mRetryCounter = 0;
                mSubscriptionId = 0;
            }
        }

        _Release();
    }
}

void SubscriptionClient::SetRetryTimer(WEAVE_ERROR aReason)
{
    WEAVE_ERROR err                   = WEAVE_NO_ERROR;

    _AddRef();

    // this check serves to see whether we already have a timer set
    // and if resubscribes are enabled
    if (ShouldSubscribe() && IsRetryEnabled() && mCurrentState != kState_Resubscribe_Holdoff)
    {
        uint32_t timeoutMsec = 0;

        MoveToState(kState_Resubscribe_Holdoff);

        ResubscribeParam param;
        param.mNumRetries = mRetryCounter;
        param.mReason     = aReason;
        param.mRequestType = ResubscribeParam::kSubscription;

        mResubscribePolicyCallback(mAppState, param, timeoutMsec);

        err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->StartTimer(timeoutMsec,
                                                                                                             OnTimerCallback, this);
        SuccessOrExit(err);

        WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) timeout: %u",
                       SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount, timeoutMsec);
    }

exit:
    // all errors are considered fatal in this function
    if (err != WEAVE_NO_ERROR)
    {
        mConfig = kConfig_Down;
        TerminateSubscription(err, NULL, false);
    }

    _Release();
}

void SubscriptionClient::CancelSubscriptionTimer(void)
{
    SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->CancelTimer(OnTimerCallback, this);
}

/**
 * Free a \c SubscriptionClient object.
 *
 * Frees the \c SubscriptionClient object.  If a subscription is active or in-progress, the
 * subscription is immediately terminated in a similar manner to calling AbortSubscription().
 * If any update requests are in progress, they are similarly aborted.
 *
 * The application is responsible for calling Free() exactly once during the lifetime of
 * a \c SubscriptionClient object. After Free() is called, no further references should be
 * made to the object.
 */
void SubscriptionClient::Free()
{
    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    WeaveLogIfFalse(kState_Free != mCurrentState);
    WeaveLogIfFalse(mRefCount > 0);

    // Abort the subscription if active.
    AbortSubscription();

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    AbortUpdates(WEAVE_NO_ERROR);
#endif

    // If mRefCount == 1, _Release would decrement it to 0, call _Cleanup and move us to FREE state
    _Release();
}

void SubscriptionClient::BindingEventCallback(void * const aAppState, const Binding::EventType aEvent,
                                              const Binding::InEventParam & aInParam, Binding::OutEventParam & aOutParam)
{
    SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aAppState);

    pClient->_AddRef();

    switch (aEvent)
    {
    case Binding::kEvent_BindingReady:
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
        if (pClient->IsUpdatePendingOrInProgress())
        {
            if (false == pClient->IsUpdateInFlight())
            {
                pClient->StartUpdateRetryTimer(WEAVE_NO_ERROR);
            }
        }
#endif
        // The binding is ready.  If the SubscriptionClient is still in a state where
        // a subscription is desired, go send the subscription request now.
        if (pClient->mCurrentState == kState_Initialized && pClient->ShouldSubscribe())
        {
            pClient->_InitiateSubscription();
        }
        break;

    case Binding::kEvent_PrepareFailed:

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
        if (pClient->IsUpdatePendingOrInProgress())
        {
            pClient->StartUpdateRetryTimer(aInParam.PrepareFailed.Reason);
        }
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

        // Binding preparation failed.  If the SubscriptionClient is still in a state where
        // a subscription is desired, arm the subscription retry timer.
        if (pClient->mCurrentState == kState_Initialized && pClient->ShouldSubscribe())
        {
            pClient->SetRetryTimer(aInParam.PrepareFailed.Reason);
        }

        break;

    case Binding::kEvent_BindingFailed:

        // The binding has failed.  This can happen because an underling connection has closed,
        // or a security session has failed.

        // Cancel any in-progress Update request and arrange to re-try it after a delay.
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
        pClient->mUpdateClient.CancelUpdate();
        if (pClient->IsUpdatePendingOrInProgress())
        {
            pClient->StartUpdateRetryTimer(aInParam.BindingFailed.Reason);
        }
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

        // If a subscription is in-progress, established or being canceled, terminate the
        // subscription immediately. Do nothing if the SubscriptionClient is idle (Initialized)
        // or waiting to re-subscribe (Resubscribe_Holdoff). In those cases, when the time arrives
        // to subscribe again, the binding will be re-prepared.
        if (pClient->IsInProgressOrEstablished() || pClient->mCurrentState == kState_Canceling)
        {
            pClient->TerminateSubscription(aInParam.BindingFailed.Reason, NULL, false);
        }

        break;

    default:
        Binding::DefaultEventHandler(aAppState, aEvent, aInParam, aOutParam);
    }

    pClient->_Release();
}

void SubscriptionClient::OnTimerCallback(System::Layer * aSystemLayer, void * aAppState, System::Error)
{
    SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aAppState);

    pClient->TimerEventHandler();
}

WEAVE_ERROR SubscriptionClient::RefreshTimer(void)
{
    WEAVE_ERROR err      = WEAVE_NO_ERROR;
    bool isTimerNeeded   = false;
    uint32_t timeoutMsec = 0;

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    // Cancel timer first
    CancelSubscriptionTimer();

    // Arm timer according to current state
    switch (mCurrentState)
    {
    case kState_Subscribing:
    case kState_Subscribing_IdAssigned:
        if (kNoTimeOut != mInactivityTimeoutDuringSubscribingMsec)
        {
            // Note that loss of range is not expected, as ExchangeManager::Timeout is indeed uint32_t
            timeoutMsec   = mInactivityTimeoutDuringSubscribingMsec;
            isTimerNeeded = true;

            WeaveLogDetail(DataManagement,
                           "Client[%u] [%5.5s] %s Ref(%d) Set inactivity time limit during subscribing to %" PRIu32 " msec",
                           SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount, timeoutMsec);
        }
        break;
    case kState_SubscriptionEstablished_Idle:
        if (kNoTimeOut != mLivenessTimeoutMsec)
        {
            timeoutMsec = mLivenessTimeoutMsec;
            isTimerNeeded = true;

#if WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING

            // If the subscription is over Weave Reliable Messaging and the local node
            // is the subscription initiator...
            if (IsInitiator() && mBinding->IsWRMTransport())
            {
                // Calculate margin to reserve for WRM activity, so we send out SubscribeConfirm earlier
                // Note that wrap around could happen, if the system is configured with excessive delays and number of retries
                const nl::Weave::WRMPConfig & defaultWRMPConfig = mBinding->GetDefaultWRMPConfig();
                const uint32_t marginMsec = (defaultWRMPConfig.mMaxRetrans + 1) * defaultWRMPConfig.mInitialRetransTimeout;

                // If the margin is smaller than the desired liveness timeout, set a timer for the difference.
                // Otherwise, fail with an error.
                if (marginMsec < timeoutMsec)
                {
                    timeoutMsec = timeoutMsec - marginMsec;
                }
                else
                {
                    // This is a system configuration problem
                    WeaveLogDetail(DataManagement,
                                   "Client[%u] Liveness period (%" PRIu32 " msec) <= margin reserved for WRM (%" PRIu32 " msec)",
                                   SubscriptionEngine::GetInstance()->GetClientId(this), mLivenessTimeoutMsec, marginMsec);

                    ExitNow(err = WEAVE_ERROR_TIMEOUT);
                }
            }

#endif // WEAVE_CONFIG_ENABLE_RELIABLE_MESSAGING

            WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Set timer for liveness confirmation to %" PRIu32 " msec",
                           SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount, timeoutMsec);
        }
        break;
    case kState_SubscriptionEstablished_Confirming:
        // Do nothing
        break;
    default:
        ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
    }

    if (isTimerNeeded)
    {
        err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->StartTimer(timeoutMsec,
                                                                                                             OnTimerCallback, this);

        VerifyOrExit(WEAVE_SYSTEM_NO_ERROR == err, /* no-op */);
    }

exit:
    WeaveLogFunctError(err);

    return err;
}

void SubscriptionClient::TimerEventHandler(void)
{
    WEAVE_ERROR err       = WEAVE_NO_ERROR;
    PacketBuffer * msgBuf = NULL;
    bool skipTimerCheck   = false;

    if ((0 == mRefCount) || (mCurrentState < kState_TimerTick_Begin) || (mCurrentState > kState_TimerTick_End))
    {
        skipTimerCheck = true;
        ExitNow();
    }

    // Make sure we're not freed by accident
    _AddRef();

    switch (mCurrentState)
    {
    case kState_Subscribing:
    case kState_Subscribing_IdAssigned:
        WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Timeout for subscribing phase, abort",
                       SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);

        ExitNow(err = WEAVE_ERROR_TIMEOUT);
        break;

    case kState_SubscriptionEstablished_Idle:
        if (IsInitiator())
        {
            WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Confirming liveness",
                           SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);

            // timeout for liveness check
            nl::Weave::TLV::TLVWriter writer;
            SubscribeConfirmRequest::Builder request;
            msgBuf = PacketBuffer::NewWithAvailableSize(request.kBaseMessageSubscribeId_PayloadLen);
            VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);

            err = ReplaceExchangeContext();
            SuccessOrExit(err);

            writer.Init(msgBuf);
            request.Init(&writer);
            err = request.SubscriptionID(mSubscriptionId).EndOfRequest().GetError();
            SuccessOrExit(err);

            err = writer.Finalize();
            SuccessOrExit(err);

            // NOTE: State could be changed if there is a send error callback from message layer
            err    = mEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_WDM, kMsgType_SubscribeConfirmRequest, msgBuf,
                                      nl::Weave::ExchangeContext::kSendFlag_ExpectResponse);
            msgBuf = NULL;
            SuccessOrExit(err);

            if (kState_SubscriptionEstablished_Idle == mCurrentState)
            {
                MoveToState(kState_SubscriptionEstablished_Confirming);
            }
            else
            {
                // state has changed, probably because some callback from message layer
                ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
            }
        }
        else
        {
            // We are not the initiator, so we cannot send out the subscribe confirm
            WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Timeout",
                           SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);

            // abort the subscription as we've timed out
            ExitNow(err = WEAVE_ERROR_TIMEOUT);
        }
        break;

    case kState_Resubscribe_Holdoff:
        mRetryCounter++;

        MoveToState(kState_Initialized);

        if (ShouldSubscribe())
        {
            _InitiateSubscription();
        }
        break;

    default:
        WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d) Timer event fired at wrong state, ignore",
                       SubscriptionEngine::GetInstance()->GetClientId(this), GetStateStr(), __func__, mRefCount);
        break;
    }

exit:
    WeaveLogFunctError(err);

    if (NULL != msgBuf)
    {
        PacketBuffer::Free(msgBuf);
        msgBuf = NULL;
    }

    if (err != WEAVE_NO_ERROR)
    {
        TerminateSubscription(err, NULL, false);
    }

    if (!skipTimerCheck)
    {
        _Release();
    }
}

WEAVE_ERROR SubscriptionClient::ProcessDataList(nl::Weave::TLV::TLVReader & aReader)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    AlwaysAcceptDataElementAccessControlDelegate acDelegate;

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    _AddRef();
    LockUpdateMutex();
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

    err = SubscriptionEngine::ProcessDataList(aReader, mDataSinkCatalog, mPrevIsPartialChange, mPrevTraitDataHandle, acDelegate);
    SuccessOrExit(err);

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    if (false == IsUpdateInProgress())
    {
        size_t numPendingBefore = mPendingUpdateSet.GetNumItems();
        PurgePendingUpdate();

        if (numPendingBefore && mPendingUpdateSet.IsEmpty())
        {
            NoMorePendingEventCbHelper();
        }
    }
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

exit:
#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
    UnlockUpdateMutex();
    _Release();
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

    return err;
}

void SubscriptionClient::NotificationRequestHandler(nl::Weave::ExchangeContext * aEC, const nl::Inet::IPPacketInfo * aPktInfo,
                                                    const nl::Weave::WeaveMessageInfo * aMsgInfo, PacketBuffer * aPayload)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    InEventParam inParam;
    OutEventParam outParam;
    NotificationRequest::Parser notify;
    const ClientState StateWhenEntered = mCurrentState;
    nl::Weave::TLV::TLVReader reader;
    bool isDataListPresent = false;
#if WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
    bool isEventListPresent = false;
#endif
    uint8_t statusReportLen = 6;
    bool incomingEC = (mEC != aEC);
    PacketBuffer * msgBuf   = PacketBuffer::NewWithAvailableSize(statusReportLen);

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    // Make sure we're not freed by accident
    _AddRef();

    if (incomingEC)
    {
        // only re-configure if this is an incoming EC
        mBinding->AdjustResponseTimeout(aEC);
    }

    VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);

    // skip the first 6 bytes in status report, as they are reserved for the profile ID and status code
    msgBuf->SetDataLength(statusReportLen);

    switch (StateWhenEntered)
    {
    case kState_Subscribing:
    case kState_Subscribing_IdAssigned:
        // In these two states, all notifications must come in the same exchange context
        VerifyOrExit(aEC == mEC, err = WEAVE_ERROR_INCORRECT_STATE);

        // refresh inactivity monitor every time we receive a notification request
        err = RefreshTimer();
        SuccessOrExit(err);
        break;

    case kState_SubscriptionEstablished_Idle:
    case kState_SubscriptionEstablished_Confirming:

        // refresh inactivity monitor every time we receive a notification request
        err = RefreshTimer();
        SuccessOrExit(err);

#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
        SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(mBinding->GetPeerNodeId(), mSubscriptionId);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER

        break;

    // we are going to ignore any notification requests in other states
    default:
        ExitNow();
    }

    // Emit an OnSubscriptionActivity event
    inParam.Clear();
    inParam.mSubscriptionActivity.mClient = this;
    mEventCallback(mAppState, kEvent_OnSubscriptionActivity, inParam, outParam);

    inParam.Clear();
    outParam.Clear();
    inParam.mNotificationRequest.mEC      = aEC;
    inParam.mNotificationRequest.mMessage = aPayload;
    inParam.mNotificationRequest.mClient  = this;

    // NOTE: state could be changed in the callback to app layer
    mEventCallback(mAppState, kEvent_OnNotificationRequest, inParam, outParam);

    mDataSinkCatalog->DispatchEvent(TraitDataSink::kEventNotifyRequestBegin, NULL);

    // jump to Exit if the state has been changed in the callback to app layer
    VerifyOrExit(StateWhenEntered == mCurrentState, /* no-op */);

    reader.Init(aPayload);
    reader.Next();

    err = notify.Init(reader);
    SuccessOrExit(err);

#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
    // simple schema checking
    err = notify.CheckSchemaValidity();
    SuccessOrExit(err);
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK

    // TODO: use the new GetReaderOnXYZ pattern to locate the data list, instead creating a data list parser object
    {
        DataList::Parser dataList;

        err = notify.GetDataList(&dataList);
        if (WEAVE_NO_ERROR == err)
        {
            isDataListPresent = true;
        }
        else if (WEAVE_END_OF_TLV == err)
        {
            isDataListPresent = false;
            err               = WEAVE_NO_ERROR;
        }
        SuccessOrExit(err);

        // re-initialize the reader to point to individual data element (reuse to save stack depth).
        dataList.GetReader(&reader);
    }

    if (isDataListPresent)
    {
        err = ProcessDataList(reader);
        SuccessOrExit(err);
    }

#if WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION
    {
        EventList::Parser eventList;

        err = notify.GetEventList(&eventList);
        if (WEAVE_NO_ERROR == err)
        {
            isEventListPresent = true;
        }
        else if (WEAVE_END_OF_TLV == err)
        {
            isEventListPresent = false;
            err                = WEAVE_NO_ERROR;
        }
        SuccessOrExit(err);

        // re-initialize the reader (reuse to save stack depth).
        eventList.GetReader(&reader);
    }

    if (isEventListPresent)
    {
        inParam.mEventStreamReceived.mReader = &reader;
        inParam.mEventStreamReceived.mClient = this;

        // Invoke our callback.
        mEventCallback(mAppState, kEvent_OnEventStreamReceived, inParam, outParam);
    }
#endif // WEAVE_CONFIG_SERIALIZATION_ENABLE_DESERIALIZATION

    // TODO: As I've commented in Weave PR#614, there is no support for event sink

    inParam.mNotificationProcessed.mClient = this;

    // NOTE: state could be changed in the callback to app layer
    mEventCallback(mAppState, kEvent_OnNotificationProcessed, inParam, outParam);

    mDataSinkCatalog->DispatchEvent(TraitDataSink::kEventNotifyRequestEnd, NULL);

    // jump to Exit if the state has been changed in the callback to app layer
    VerifyOrExit(StateWhenEntered == mCurrentState, /* no-op */);

    {
        uint8_t * p = msgBuf->Start();
        nl::Weave::Encoding::LittleEndian::Write32(p, nl::Weave::Profiles::kWeaveProfile_Common);
        nl::Weave::Encoding::LittleEndian::Write16(p, nl::Weave::Profiles::Common::kStatus_Success);

        err    = aEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_Common, nl::Weave::Profiles::Common::kMsgType_StatusReport, msgBuf);
        msgBuf = NULL;
        SuccessOrExit(err);
    }

exit:
    WeaveLogFunctError(err);

    if (NULL != msgBuf)
    {
        PacketBuffer::Free(msgBuf);
        msgBuf = NULL;
    }

    if (NULL != aPayload)
    {
        PacketBuffer::Free(aPayload);
        aPayload = NULL;
    }

    // If this is not a locally initiated exchange, always close the exchange
    if (incomingEC)
    {
        aEC->Close();
        aEC = NULL;
    }

    if (WEAVE_NO_ERROR != err)
    {
        // If we're not aborted yet, make a callback to app layer
        TerminateSubscription(err, NULL, false);
    }

    _Release();
}

#if WDM_ENABLE_SUBSCRIPTION_CANCEL
void SubscriptionClient::CancelRequestHandler(nl::Weave::ExchangeContext * aEC, const nl::Inet::IPPacketInfo * aPktInfo,
                                              const nl::Weave::WeaveMessageInfo * aMsgInfo, PacketBuffer * aPayload)
{
    WEAVE_ERROR err         = WEAVE_NO_ERROR;
    uint8_t statusReportLen = 6;
    PacketBuffer * msgBuf   = PacketBuffer::NewWithAvailableSize(statusReportLen);
    uint8_t * p;
    bool canceled          = true;
    uint32_t statusProfile = nl::Weave::Profiles::kWeaveProfile_Common;
    uint16_t statusCode    = nl::Weave::Profiles::Common::kStatus_Success;

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(this),
                   GetStateStr(), __func__, mRefCount);

    // Make sure we're not freed by accident
    _AddRef();

    VerifyOrExit(NULL != msgBuf, err = WEAVE_ERROR_NO_MEMORY);

    // Verify the cancel request is truly from the publisher.  If not, reject the request with
    // "invalid subscription id" to avoid revealing the existence of the subscription.
    if (!mBinding->IsAuthenticMessageFromPeer(aMsgInfo))
    {
        WeaveLogDetail(DataManagement, "Rejecting SubscribeCancelRequest from unauthorized source");
        canceled      = false;
        statusProfile = nl::Weave::Profiles::kWeaveProfile_WDM;
        statusCode    = kStatus_InvalidSubscriptionID;
    }

    p = msgBuf->Start();
    nl::Weave::Encoding::LittleEndian::Write32(p, statusProfile);
    nl::Weave::Encoding::LittleEndian::Write16(p, statusCode);
    msgBuf->SetDataLength(statusReportLen);

    err    = aEC->SendMessage(nl::Weave::Profiles::kWeaveProfile_Common, nl::Weave::Profiles::Common::kMsgType_StatusReport, msgBuf);
    msgBuf = NULL;
    SuccessOrExit(err);

    // Proactively close the exchange.  This prevents the call to TerminateSubscription()
    // below from aborting the exchange, which, when using WRM, would prevent the
    // StatusReport message from being re-transmitted if necessary.
    FlushExistingExchangeContext();

exit:
    WeaveLogFunctError(err);

    PacketBuffer::Free(msgBuf);

    // If the subscription was canceled, or if an error occurred while handing
    // the Cancel request, terminate the subscription and notify the application.
    if (canceled || WEAVE_NO_ERROR != err)
    {
        mConfig = kConfig_Down;
        TerminateSubscription(err, NULL, false);
    }

    _Release();
}
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL

void SubscriptionClient::OnSendError(ExchangeContext * aEC, WEAVE_ERROR aErrorCode, void * aMsgSpecificContext)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    IgnoreUnusedVariable(aMsgSpecificContext);
    SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aEC->AppState);
    bool subscribeRequestFailed        = false;

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(pClient),
                   pClient->GetStateStr(), __func__, pClient->mRefCount);

    // Make sure we're not freed by accident
    pClient->_AddRef();

    switch (pClient->mCurrentState)
    {
    case kState_Subscribing:
    case kState_Subscribing_IdAssigned:
        // Subscribe request failed, deliver mSubscriptionRequestFailedEventParam
        subscribeRequestFailed = true;
        ExitNow(err = aErrorCode);
        break;

    case kState_SubscriptionEstablished_Confirming:
        // Subscribe Confirm request failed, so no point trying to send a cancel.
        // Go ahead and terminate it.
        ExitNow(err = aErrorCode);
        break;

    case kState_Resubscribe_Holdoff:
        // OnResponseTimeout posts an error to OnSendError (this function).
        // That can happen after we've already received a cb for OnSendError.
        // So if we've already set a timeout, then we can ignore this error.
        if (aErrorCode == WEAVE_ERROR_TIMEOUT)
        {
            err = WEAVE_NO_ERROR;
        }
        break;

    case kState_Canceling:
        ExitNow(err = aErrorCode);
        break;

    // In any of these states, we must not see this callback
    default:
        ExitNow(err = WEAVE_ERROR_INCORRECT_STATE);
    }

exit:
    WeaveLogFunctError(err);

    if ((subscribeRequestFailed) || (WEAVE_NO_ERROR != err))
    {
        pClient->TerminateSubscription(err, NULL, false);
    }

    pClient->_Release();
}

void SubscriptionClient::OnResponseTimeout(nl::Weave::ExchangeContext * aEC)
{
    SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aEC->AppState);

    IgnoreUnusedVariable(pClient);

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(pClient),
                   pClient->GetStateStr(), __func__, pClient->mRefCount);

    OnSendError(aEC, WEAVE_ERROR_TIMEOUT, NULL);
}

void SubscriptionClient::OnMessageReceivedFromLocallyInitiatedExchange(nl::Weave::ExchangeContext * aEC,
                                                                       const nl::Inet::IPPacketInfo * aPktInfo,
                                                                       const nl::Weave::WeaveMessageInfo * aMsgInfo,
                                                                       uint32_t aProfileId, uint8_t aMsgType,
                                                                       PacketBuffer * aPayload)
{
    // Notification Requests during initial setup
    // Subscribe response
    // Status Report for Subscribe request
    // Status Report for Subscribe Cancel request
    // Status Report for Subscribe Confirm request

    WEAVE_ERROR err              = WEAVE_NO_ERROR;
    SubscriptionClient * pClient = reinterpret_cast<SubscriptionClient *>(aEC->AppState);
    bool terminateSubscription = false;
    bool retainExchangeContext = false;
    bool isStatusReportValid   = false;
    nl::Weave::Profiles::StatusReporting::StatusReport status;

    WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] %s Ref(%d)", SubscriptionEngine::GetInstance()->GetClientId(pClient),
                   pClient->GetStateStr(), __func__, pClient->mRefCount);

    // Make sure we're not freed by accident
    pClient->_AddRef();

    WeaveLogIfFalse(aEC == pClient->mEC);

    if ((nl::Weave::Profiles::kWeaveProfile_Common == aProfileId) &&
        (nl::Weave::Profiles::Common::kMsgType_StatusReport == aMsgType))
    {
        // Note that payload is not freed in this call to parse
        err = nl::Weave::Profiles::StatusReporting::StatusReport::parse(aPayload, status);
        SuccessOrExit(err);
        isStatusReportValid = true;
        WeaveLogDetail(DataManagement, "Received StatusReport %s", nl::StatusReportStr(status.mProfileId, status.mStatusCode));
    }

    switch (pClient->mCurrentState)
    {
    case kState_Subscribing:
    case kState_Subscribing_IdAssigned:
        if (isStatusReportValid)
        {
            ExitNow(err = WEAVE_ERROR_STATUS_REPORT_RECEIVED);
        }
        else if ((nl::Weave::Profiles::kWeaveProfile_WDM == aProfileId) && (kMsgType_NotificationRequest == aMsgType))
        {
            // notification request, don't close the exchange context, for more notification requests might arrive
            // through this same exchange context
            retainExchangeContext = true;
            pClient->NotificationRequestHandler(aEC, aPktInfo, aMsgInfo, aPayload);
            aPayload = NULL;
        }
        else if ((nl::Weave::Profiles::kWeaveProfile_WDM == aProfileId) && (kMsgType_SubscribeResponse == aMsgType))
        {
            // capture subscription ID and liveness timeout
            nl::Weave::TLV::TLVReader reader;
            reader.Init(aPayload);
            err = reader.Next();
            SuccessOrExit(err);

            SubscribeResponse::Parser response;
            err = response.Init(reader);
            SuccessOrExit(err);

#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
            // simple schema checking
            err = response.CheckSchemaValidity();
            SuccessOrExit(err);
#endif // WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK

            {
                uint64_t subscriptionId;
                err = response.GetSubscriptionID(&subscriptionId);
                SuccessOrExit(err);
                if (kState_Subscribing == pClient->mCurrentState)
                {
                    // capture subscription ID
                    pClient->mSubscriptionId = subscriptionId;
                }
                else
                {
                    // verify they are the same
                    VerifyOrExit(pClient->mSubscriptionId == subscriptionId, err = WEAVE_ERROR_INVALID_TLV_ELEMENT);
                }
            }

            if (kState_Subscribing == pClient->mCurrentState)
            {
                uint32_t livenessTimeoutSec;
                err = response.GetSubscribeTimeout(&livenessTimeoutSec);

                if (WEAVE_NO_ERROR == err)
                {
                    VerifyOrExit(livenessTimeoutSec <= kMaxTimeoutSec, err = WEAVE_ERROR_INVALID_TLV_ELEMENT);

                    // capture liveness timeout
                    pClient->mLivenessTimeoutMsec = livenessTimeoutSec * 1000;
                }
                else if (WEAVE_END_OF_TLV == err)
                {
                    err = WEAVE_NO_ERROR;
                }
                else
                {
                    ExitNow();
                }
            }

            // Subscribe response, move to alive-idle state (and close the exchange context)
            pClient->MoveToState(kState_SubscriptionEstablished_Idle);

            err = pClient->RefreshTimer();
            SuccessOrExit(err);

            // Release the response buffer before initiating any callbacks to reduce overall
            // buffer pressure.
            PacketBuffer::Free(aPayload);
            aPayload = NULL;

#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
            SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(pClient->mBinding->GetPeerNodeId(), pClient->mSubscriptionId);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER

            pClient->mRetryCounter = 0;

            {
                InEventParam inParam;
                OutEventParam outParam;

                // Emit an OnSubscriptionActivity event to the application.
                inParam.mSubscriptionActivity.mClient = pClient;
                pClient->mEventCallback(pClient->mAppState, kEvent_OnSubscriptionActivity, inParam, outParam);

                // Emit an OnSubscriptionEstablished event to the application.
                // Note that it's allowed to cancel or even abandon this subscription right inside this callback.
                inParam.mSubscriptionEstablished.mSubscriptionId = pClient->mSubscriptionId;
                inParam.mSubscriptionEstablished.mClient         = pClient;
                pClient->mEventCallback(pClient->mAppState, kEvent_OnSubscriptionEstablished, inParam, outParam);
            }

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
            pClient->LockUpdateMutex();
            if (false == pClient->IsUpdatePendingOrInProgress())
            {
                if (pClient->CheckForSinksWithDataLoss())
                {
                    ExitNow(err = WEAVE_ERROR_WDM_POTENTIAL_DATA_LOSS);
                }
            }
            pClient->UnlockUpdateMutex();
#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE

            ExitNow();
        }
        else
        {
            // protocol error
            ExitNow(err = WEAVE_ERROR_INVALID_MESSAGE_TYPE);
        }
        break;

    case kState_SubscriptionEstablished_Confirming:

        // Verify the response is a status report.
        VerifyOrExit(isStatusReportValid, err = WEAVE_ERROR_INVALID_MESSAGE_TYPE);

        // Verify that response is Success.
        VerifyOrExit(status.success(), err = WEAVE_ERROR_STATUS_REPORT_RECEIVED);

        // Close the exchange context and move back to idle state
        pClient->FlushExistingExchangeContext();
        pClient->MoveToState(kState_SubscriptionEstablished_Idle);

        WeaveLogDetail(DataManagement, "Client[%u] [%5.5s] liveness confirmed",
                       SubscriptionEngine::GetInstance()->GetClientId(pClient), pClient->GetStateStr());

        // Emit an OnSubscriptionActivity event
        {
            InEventParam inParam;
            OutEventParam outParam;
            inParam.mSubscriptionActivity.mClient = pClient;
            pClient->mEventCallback(pClient->mAppState, kEvent_OnSubscriptionActivity, inParam, outParam);
        }

        // Restart the subscription timer.
        err = pClient->RefreshTimer();
        SuccessOrExit(err);

#if WDM_ENABLE_SUBSCRIPTION_PUBLISHER
        SubscriptionEngine::GetInstance()->UpdateHandlerLiveness(pClient->mBinding->GetPeerNodeId(), pClient->mSubscriptionId);
#endif // WDM_ENABLE_SUBSCRIPTION_PUBLISHER

        break;

#if WDM_ENABLE_SUBSCRIPTION_CANCEL
    case kState_Canceling:

        // Verify the response is a status report.
        // NOTE: It doesn't really matter what status code we receive from the other end as
        // the subscription is being terminated regardless.
        VerifyOrExit(isStatusReportValid, err = WEAVE_ERROR_INVALID_MESSAGE_TYPE);
        terminateSubscription = true;

        break;
#endif // WDM_ENABLE_SUBSCRIPTION_CANCEL

    // We must not receive this callback in any other states
    default:
        WeaveLogDetail(DataManagement, "Received message in some wrong state, ignore");
        ExitNow();
    }

exit:
    WeaveLogFunctError(err);

    // If the exchange is over, close the exchange context.
    if (!retainExchangeContext)
    {
        pClient->FlushExistingExchangeContext();
    }

    // Terminate the subscription if indicated, or if an unexpected error occurred.
    // Pass the status report information to the application's OnSubscriptionTerminated
    // callback if its pertinent in this case.
    if (terminateSubscription || err != WEAVE_NO_ERROR)
    {
        pClient->TerminateSubscription(err, (err == WEAVE_ERROR_STATUS_REPORT_RECEIVED) ? &status : NULL, false);
    }

    // Free the message buffer if it hasn't been done already.  Note that in the case the
    // response was a status report, this must be done *after* the call to TerminateSubscription
    // as the StatusReport object that is passed to that method may contain a pointer into
    // the buffer.
    PacketBuffer::Free(aPayload);

    pClient->_Release();
}

#if WEAVE_CONFIG_ENABLE_WDM_UPDATE
void SubscriptionClient::StartUpdateRetryTimer(WEAVE_ERROR aReason)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    uint32_t timeoutMsec = 0;

    VerifyOrExit(false == mUpdateRetryScheduled, );

    VerifyOrExit(mResubscribePolicyCallback != NULL, WeaveLogDetail(DataManagement, "Update timed out with the retry policy disabled"));

    if (WEAVE_NO_ERROR == aReason)
    {
        mUpdateRetryCounter = 0;
    }
    ResubscribeParam param;
    param.mNumRetries = mUpdateRetryCounter;
    mUpdateRetryCounter++;
    param.mReason     = aReason;
    param.mRequestType = ResubscribeParam::kUpdate;

    mResubscribePolicyCallback(mAppState, param, timeoutMsec);

    WeaveLogDetail(DataManagement, "Will send update in %" PRIu32 " msec", timeoutMsec);

    err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->StartTimer(timeoutMsec,
            OnUpdateTimerCallback, this);

    if (err != WEAVE_NO_ERROR)
    {
        WeaveDie();
    }

    mUpdateRetryScheduled = true;

exit:
    return;
}

void SubscriptionClient::UpdateTimerEventHandler()
{
    WeaveLogDetail(DataManagement, "%s", __func__);

    mUpdateRetryScheduled = false;

    VerifyOrExit(false == mSuspendUpdateRetries, WeaveLogDetail(DataManagement, "Holding off updates"));

    FormAndSendUpdate();

exit:
    return;
}

void SubscriptionClient::OnUpdateTimerCallback(System::Layer * aSystemLayer, void * aAppState, System::Error)
{
    SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aAppState);

    pClient->UpdateTimerEventHandler();
}

void SubscriptionClient::OnUpdateScheduleWorkCallback(System::Layer * aSystemLayer, void * aAppState, System::Error)
{
    SubscriptionClient * const pClient = reinterpret_cast<SubscriptionClient *>(aAppState);

    pClient->LockUpdateMutex();

    // If the application has called AbortUpdates() or Free(), no need to do anything.
    VerifyOrExit(pClient->mUpdateFlushScheduled, );

    pClient->mUpdateFlushScheduled = false;

    // Start the timer with a retry count of zero, unless the timer has already been scheduled.
    // There is some overhead, but this gives the application a chance to add a one-off delay
    // at the beginning of the sequence.
    pClient->StartUpdateRetryTimer(WEAVE_NO_ERROR);

exit:
    pClient->UnlockUpdateMutex();
    pClient->_Release();
}

void SubscriptionClient::SetMaxUpdateSize(const uint32_t aMaxSize)
{
    if (aMaxSize > UINT16_MAX)
        mMaxUpdateSize = 0;
    else
        mMaxUpdateSize = aMaxSize;
}

/**
 * Move paths from the dispatched store back to the pending one.
 * Skip the private ones, as they will be re-added during the recursion.
 */
WEAVE_ERROR SubscriptionClient::MoveInProgressToPending(void)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    uint32_t count = 0;
    TraitDataSink *dataSink;
    TraitPath traitPath;

    for (size_t i = mInProgressUpdateList.GetFirstValidItem();
            i < mInProgressUpdateList.GetPathStoreSize();
            i = mInProgressUpdateList.GetNextValidItem(i))
    {
        mInProgressUpdateList.GetItemAt(i, traitPath);

        if ( ! mInProgressUpdateList.AreFlagsSet(i, kFlag_Private))
        {
            // Locate() can return an error if the sink has been removed from the catalog. In that case,
            // skip this path
            if (mDataSinkCatalog->Locate(traitPath.mTraitDataHandle, &dataSink) == WEAVE_NO_ERROR)
            {
                err = AddItemPendingUpdateSet(traitPath, dataSink->GetSchemaEngine());
                SuccessOrExit(err);

                count++;
            }

            mInProgressUpdateList.RemoveItemAt(i);
        }
    }

    // Move the state to Ready only if it was Empty; if the application is adding paths,
    // let it decide when to call FlushUpdate.
    if ((mPendingUpdateSet.GetNumItems() > 0) && (mPendingSetState == kPendingSetEmpty))
    {
        SetPendingSetState(kPendingSetReady);
    }

    // Call clear to remove the private ones as well and anything else.
    mInProgressUpdateList.Clear();

    mUpdateRequestContext.Reset();

exit:
    WeaveLogDetail(DataManagement, "Moved %" PRIu32 " items from InProgress to Pending; err %" PRId32 "", count, err);

    return err;
}

// Move the pending set to the in-progress list, grouping the
// paths by trait instance
WEAVE_ERROR SubscriptionClient::MovePendingToInProgress(void)
{
    VerifyOrDie(mInProgressUpdateList.IsEmpty());

    if (mDataSinkCatalog)
    {
        mDataSinkCatalog->Iterate(MovePendingToInProgressUpdatableSinkTrait, this);
    }

    mPendingUpdateSet.Clear();
    SetPendingSetState(kPendingSetEmpty);

    return WEAVE_NO_ERROR;
}

void SubscriptionClient::MovePendingToInProgressUpdatableSinkTrait(void * aDataSink, TraitDataHandle aDataHandle, void * aContext)
{
    SubscriptionClient * subClient = static_cast<SubscriptionClient *>(aContext);
    TraitDataSink * dataSink = static_cast<TraitDataSink *>(aDataSink);
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    int count = 0;

    VerifyOrExit(dataSink->IsUpdatableDataSink() == true, /* no error */);

    for (size_t i = subClient->mPendingUpdateSet.GetFirstValidItem(aDataHandle);
            i < subClient->mPendingUpdateSet.GetPathStoreSize();
            i = subClient->mPendingUpdateSet.GetNextValidItem(i, aDataHandle))
    {
        TraitPath traitPath;

        subClient->mPendingUpdateSet.GetItemAt(i, traitPath);

        err = subClient->mInProgressUpdateList.AddItem(traitPath);
        SuccessOrExit(err);
        count++;
    }

exit:
    WeaveLogDetail(DataManagement, "Moved %d items from Pending to InProgress; err %" PRId32 "", count, err);
    return;
}

/**
 * Notify the application for each failed pending path and
 * remove it from the pending set.
 * Return the number of paths removed.
 * Note that the application is allowed to call SetUpdated()
 * from the callback.
 */
void SubscriptionClient::PurgeAndNotifyFailedPaths(WEAVE_ERROR aErr, TraitPathStore &aPathStore, size_t &aCount)
{
    TraitPath traitPath;
    TraitUpdatableDataSink *updatableDataSink = NULL;
    aCount = 0;

    for (size_t j = 0; j < aPathStore.GetPathStoreSize(); j++)
    {
        if (! aPathStore.IsItemInUse(j))
        {
            continue;
        }
        if (aPathStore.IsItemFailed(j))
        {
            bool isPrivate = aPathStore.AreFlagsSet(j, kFlag_Private);

            aPathStore.GetItemAt(j, traitPath);
            updatableDataSink = Locate(traitPath.mTraitDataHandle, mDataSinkCatalog);

            if (updatableDataSink == NULL)
            {
                // Locate() can return NULL if the datasink has been removed from the catalog.
                // In that case, remove the stale item in the path store and continue to next entry
                aPathStore.RemoveItemAt(j);
                continue;
            }

            updatableDataSink->ClearVersion();
            updatableDataSink->ClearUpdateRequiredVersion();
            updatableDataSink->SetConditionalUpdate(false);

            aPathStore.RemoveItemAt(j);

            if (false == isPrivate)
            {
                UpdateCompleteEventCbHelper(traitPath,
                        nl::Weave::Profiles::kWeaveProfile_Common,
                        nl::Weave::Profiles::Common::kStatus_InternalError,
                        aErr,
                        false);
            }
            aCount++;
        }
    }

    if (&aPathStore == &mPendingUpdateSet && aPathStore.IsEmpty())
    {
        SetPendingSetState(kPendingSetEmpty);
    }
    if (&aPathStore == &mInProgressUpdateList)
    {
        mUpdateRequestContext.Reset();
    }

    return;
}

WEAVE_ERROR SubscriptionClient::AddItemPendingUpdateSet(const TraitPath &aItem, const TraitSchemaEngine * const aSchemaEngine)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    err = mPendingUpdateSet.AddItemDedup(aItem, aSchemaEngine);

    WeaveLogDetail(DataManagement, "%s t%u, p%u, err %d", __func__, aItem.mTraitDataHandle, aItem.mPropertyPathHandle, err);
    return err;
}

void SubscriptionClient::ClearPotentialDataLoss(TraitDataHandle aTraitDataHandle, TraitUpdatableDataSink &aUpdatableSink)
{
    if (aUpdatableSink.IsPotentialDataLoss())
    {
        WeaveLogDetail(DataManagement, "Potential data loss cleared for traitDataHandle: %" PRIu16 ", trait %08x",
                aTraitDataHandle, aUpdatableSink.GetSchemaEngine()->GetProfileId());
    }

    aUpdatableSink.SetPotentialDataLoss(false);
}

void SubscriptionClient::MarkFailedPendingPaths(TraitDataHandle aTraitDataHandle, TraitUpdatableDataSink &aSink, const DataVersion &aLatestVersion)
{
    if (aSink.IsConditionalUpdate() &&
            IsVersionNewer(aLatestVersion, aSink.GetUpdateRequiredVersion()))
    {
        WeaveLogDetail(DataManagement, "<MarkFailedPendingPaths> current version 0x%" PRIx64
                ", valid: %d"
                ", updateRequiredVersion: 0x%" PRIx64
                ", latest known version: 0x%" PRIx64 "",
                aSink.GetVersion(),
                aSink.IsVersionValid(),
                aSink.GetUpdateRequiredVersion(),
                aLatestVersion);

        mPendingUpdateSet.SetFailedTrait(aTraitDataHandle);
    }

    return;
}

bool SubscriptionClient::FilterNotifiedPath(TraitDataHandle aTraitDataHandle,
                                            PropertyPathHandle aLeafPathHandle,
                                            const TraitSchemaEngine * const aSchemaEngine)
{
    bool retval = false;

    retval = mInProgressUpdateList.Includes(TraitPath(aTraitDataHandle, aLeafPathHandle), aSchemaEngine) ||
             mPendingUpdateSet.Includes(TraitPath(aTraitDataHandle, aLeafPathHandle), aSchemaEngine);

    if (retval)
    {
        TraitUpdatableDataSink *updatableDataSink = Locate(aTraitDataHandle, mDataSinkCatalog);

        if (NULL != updatableDataSink && false == updatableDataSink->IsPotentialDataLoss())
        {
            updatableDataSink->SetPotentialDataLoss(true);

            WeaveLogDetail(DataManagement, "Potential data loss set for traitDataHandle: %" PRIu16 ", trait %08x pathHandle: %" PRIu32 "",
                    aTraitDataHandle, aSchemaEngine->GetProfileId(), aLeafPathHandle);
        }
    }

    return retval;
}

void SubscriptionClient::LockUpdateMutex()
{
    if (mUpdateMutex)
    {
        mUpdateMutex->Lock();
    }
}

void SubscriptionClient::UnlockUpdateMutex()
{
    if (mUpdateMutex)
    {
        mUpdateMutex->Unlock();
    }
}

bool SubscriptionClient::WillRetryUpdate(WEAVE_ERROR aErr, uint32_t aStatusProfileId, uint16_t aStatusCode)
{
    bool retval = false;

    if (aErr == WEAVE_ERROR_TIMEOUT)
    {
        retval = true;
    }

    if (aErr == WEAVE_ERROR_STATUS_REPORT_RECEIVED &&
            aStatusProfileId == nl::Weave::Profiles::kWeaveProfile_Common &&
              (aStatusCode == nl::Weave::Profiles::Common::kStatus_Busy ||
               aStatusCode == nl::Weave::Profiles::Common::kStatus_Timeout))
    {
        retval = true;
    }

    return retval;
}

void SubscriptionClient::UpdateCompleteEventCbHelper(const TraitPath &aTraitPath,
                                                     uint32_t aStatusProfileId,
                                                     uint16_t aStatusCode,
                                                     WEAVE_ERROR aReason,
                                                     bool aWillRetry)
{
    InEventParam inParam;
    OutEventParam outParam;

    if (aReason == WEAVE_NO_ERROR &&
            false == (aStatusProfileId == nl::Weave::Profiles::kWeaveProfile_Common &&
                      aStatusCode == nl::Weave::Profiles::Common::kStatus_Success))
    {
        aReason = WEAVE_ERROR_STATUS_REPORT_RECEIVED;
    }

    inParam.Clear();
    outParam.Clear();
    inParam.mUpdateComplete.mClient = this;
    inParam.mUpdateComplete.mStatusProfileId = aStatusProfileId;
    inParam.mUpdateComplete.mStatusCode = aStatusCode;
    inParam.mUpdateComplete.mReason = aReason;
    inParam.mUpdateComplete.mTraitDataHandle = aTraitPath.mTraitDataHandle;
    inParam.mUpdateComplete.mPropertyPathHandle = aTraitPath.mPropertyPathHandle;
    inParam.mUpdateComplete.mWillRetry = aWillRetry;

    mEventCallback(mAppState, kEvent_OnUpdateComplete, inParam, outParam);
}

void SubscriptionClient::NoMorePendingEventCbHelper()
{
    InEventParam inParam;
    OutEventParam outParam;

    inParam.Clear();
    outParam.Clear();

    mEventCallback(mAppState, kEvent_OnNoMorePendingUpdates, inParam, outParam);
}

void SubscriptionClient::SetPendingSetState(PendingSetState aState)
{
    if (aState != mPendingSetState)
    {
        WeaveLogDetail(DataManagement, "PendingSetState %d -> %d", mPendingSetState, aState);
    }
    mPendingSetState = aState;
}

// TODO: Break this method down into smaller methods.
void SubscriptionClient::OnUpdateResponse(WEAVE_ERROR aReason, nl::Weave::Profiles::StatusReporting::StatusReport * apStatus)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    WEAVE_ERROR callbackerr;
    TraitPath traitPath;
    TraitUpdatableDataSink * updatableDataSink = NULL;
    UpdateResponse::Parser response;
    ReferencedTLVData additionalInfo;
    StatusList::Parser statusList;
    VersionList::Parser versionList;
    uint64_t versionCreated = 0;
    bool IsVersionListPresent = false;
    bool IsStatusListPresent = false;
    bool wholeRequestSucceeded = false;
    bool needToResubscribe = false;
    uint32_t profileID;
    uint16_t statusCode;
    nl::Weave::TLV::TLVReader reader;
    bool isPathSuccessful;
    bool isPathPrivate;
    bool willRetryPath;

    // This method invokes callbacks into the upper layer.
    _AddRef();

    LockUpdateMutex();

    additionalInfo = apStatus->mAdditionalInfo;
    ClearUpdateInFlight();

    if (mUpdateRequestContext.mIsPartialUpdate)
    {
        WeaveLogDetail(DataManagement, "Got StatusReport in the middle of a long update");
    }

    WeaveLogDetail(DataManagement, "Received StatusReport %s", nl::StatusReportStr(apStatus->mProfileId, apStatus->mStatusCode));
    WeaveLogDetail(DataManagement, "Received StatusReport additional info %u",
                   additionalInfo.theLength);

    if (apStatus->mProfileId == nl::Weave::Profiles::kWeaveProfile_Common &&
            apStatus->mStatusCode == nl::Weave::Profiles::Common::kStatus_Success)
    {
        // If the whole update has succeeded, the status list
        // is allowed to be empty.
        wholeRequestSucceeded = true;

        // Also reset the retry counter.
        mUpdateRetryCounter = 0;
    }

    profileID = apStatus->mProfileId;
    statusCode = apStatus->mStatusCode;

    if (additionalInfo.theLength != 0)
    {
        reader.Init(additionalInfo.theData, additionalInfo.theLength);

        err = reader.Next();
        SuccessOrExit(err);

        err = response.Init(reader);
        SuccessOrExit(err);

#if WEAVE_CONFIG_DATA_MANAGEMENT_ENABLE_SCHEMA_CHECK
        err = response.CheckSchemaValidity();
        SuccessOrExit(err);
#endif

        err = response.GetVersionList(&versionList);
        switch (err)
        {
        case WEAVE_NO_ERROR:
            IsVersionListPresent = true;
            break;
        case WEAVE_END_OF_TLV:
            err = WEAVE_NO_ERROR;
            break;
        default:
            ExitNow();
            break;
        }

        err = response.GetStatusList(&statusList);
        switch (err)
        {
        case WEAVE_NO_ERROR:
            IsStatusListPresent = true;
            break;
        case WEAVE_END_OF_TLV:
            err = WEAVE_NO_ERROR;
            break;
        default:
            ExitNow();
            break;
        }
    }

    if ((wholeRequestSucceeded) && !(IsStatusListPresent && IsVersionListPresent))
    {
        WeaveLogDetail(DataManagement, "<OnUpdateResponse> version/status list missing");
        ExitNow(err = WEAVE_ERROR_WDM_MALFORMED_UPDATE_RESPONSE);
    }

    // TODO: validate that the version and status lists are either empty or contain
    // the same number of items as the dispatched list

    for (size_t j = mInProgressUpdateList.GetFirstValidItem();
            j < mInProgressUpdateList.GetPathStoreSize();
            j = mInProgressUpdateList.GetNextValidItem(j))
    {
        if (IsVersionListPresent)
        {
            err = versionList.Next();
            SuccessOrExit(err);

            err = versionList.GetVersion(&versionCreated);
            SuccessOrExit(err);
        }

        if (IsStatusListPresent)
        {
            err = statusList.Next();

            err = statusList.GetProfileIDAndStatusCode(&profileID, &statusCode);
            SuccessOrExit(err);
        }

        WEAVE_FAULT_INJECT(FaultInjection::kFault_WDM_UpdateResponseBusy,
                profileID = nl::Weave::Profiles::kWeaveProfile_Common;
                statusCode = nl::Weave::Profiles::Common::kStatus_Busy;
                wholeRequestSucceeded = false;
                );

        err = WEAVE_NO_ERROR;

        isPathSuccessful = (profileID == nl::Weave::Profiles::kWeaveProfile_Common &&
                            statusCode == nl::Weave::Profiles::Common::kStatus_Success);

        callbackerr = isPathSuccessful ? WEAVE_NO_ERROR : WEAVE_ERROR_STATUS_REPORT_RECEIVED;

        willRetryPath = WillRetryUpdate(callbackerr, profileID, statusCode);

        isPathPrivate = mInProgressUpdateList.AreFlagsSet(j, kFlag_Private);

        mInProgressUpdateList.GetItemAt(j, traitPath);

        updatableDataSink = Locate(traitPath.mTraitDataHandle, mDataSinkCatalog);

        if (updatableDataSink == NULL)
        {
            // Locate() can return an error if the sink has been removed from the catalog. In that case, ignore this path
            WeaveLogDetail(DataManagement, "item: %zu, traitDataHandle: % potentially removed from the catalog" PRIu16 ", pathHandle: %" PRIu32 "",
                    j, traitPath.mTraitDataHandle, traitPath.mPropertyPathHandle);
            mInProgressUpdateList.RemoveItemAt(j);
            continue;
        }

        WeaveLogDetail(DataManagement, "item: %zu, profile: %" PRIu32 ", statusCode: 0x% " PRIx16 ", version 0x%" PRIx64 "",
                j, profileID, statusCode, versionCreated);
        WeaveLogDetail(DataManagement, "item: %zu, traitDataHandle: %" PRIu16 ", pathHandle: %" PRIu32 "",
                j, traitPath.mTraitDataHandle, traitPath.mPropertyPathHandle);

        if (isPathSuccessful)
        {
            mInProgressUpdateList.RemoveItemAt(j);

            if (updatableDataSink->IsConditionalUpdate())
            {
                if (updatableDataSink->IsVersionValid() &&
                        versionCreated > updatableDataSink->GetVersion() &&
                        updatableDataSink->GetVersion() >= updatableDataSink->GetUpdateStartVersion())
                {
                    updatableDataSink->SetVersion(versionCreated);
                }
                if (mPendingUpdateSet.IsPresent(traitPath))
                {
                    updatableDataSink->SetUpdateRequiredVersion(versionCreated);
                }
                else
                {
                    updatableDataSink->ClearUpdateRequiredVersion();
                    updatableDataSink->SetConditionalUpdate(false);
                }
            }

            if (updatableDataSink->IsPotentialDataLoss() &&
                    updatableDataSink->IsVersionValid() &&
                    versionCreated >= updatableDataSink->GetVersion() &&
                    updatableDataSink->GetVersion() >= updatableDataSink->GetUpdateStartVersion())
            {
                ClearPotentialDataLoss(traitPath.mTraitDataHandle, *updatableDataSink);
            }
        } // Success
        else
        {
            if (profileID == nl::Weave::Profiles::kWeaveProfile_WDM &&
                    statusCode == nl::Weave::Profiles::DataManagement::kStatus_VersionMismatch)
            {
                mInProgressUpdateList.RemoveItemAt(j);

                // Fail all pending ones as well for VersionMismatch and force resubscribe
                if (mPendingUpdateSet.IsTraitPresent(traitPath.mTraitDataHandle))
                {
                    mPendingUpdateSet.SetFailedTrait(traitPath.mTraitDataHandle);
                }
                updatableDataSink->ClearVersion();
                updatableDataSink->ClearUpdateRequiredVersion();
                updatableDataSink->SetConditionalUpdate(false);
                needToResubscribe = true;
            }
            else
            {
                // If the publisher is busy or has an internal timeout, retry later.
                // Else, throw away all updates in the trait instance.
                if (false == willRetryPath)
                {
                    mInProgressUpdateList.RemoveItemAt(j);

                    if (updatableDataSink->IsConditionalUpdate() &&
                            mPendingUpdateSet.IsTraitPresent(traitPath.mTraitDataHandle))
                    {
                        mPendingUpdateSet.SetFailedTrait(traitPath.mTraitDataHandle);
                        updatableDataSink->ClearUpdateRequiredVersion();
                        updatableDataSink->SetConditionalUpdate(false);
                    }

                    if (updatableDataSink->IsVersionValid())
                    {
                        WeaveLogDetail(DataManagement, "Clearing version for tdh %d, trait %08x",
                                traitPath.mTraitDataHandle,
                                updatableDataSink->GetSchemaEngine()->GetProfileId());

                        updatableDataSink->ClearVersion();
                        needToResubscribe = true;
                    }
                }
            }
        } // Not success

        if (false == isPathPrivate)
        {
            UpdateCompleteEventCbHelper(traitPath, profileID, statusCode, callbackerr, willRetryPath);
            // The application can call DiscardUpdates() from the callback. In that case,
            // the next item in the list will be invalid, and the loop will terminate.
            // Either this method or DiscardUpdates will trigger a resubscription.
        }
    } // for all paths in mInProgressUpdateList

exit:

    if (err != WEAVE_NO_ERROR)
    {
        size_t count;

        // If the loop above exited early for an error, the application
        // is notified for any remaining path by the following method.
        // These paths are not retried.
        mInProgressUpdateList.SetFailed();
        PurgeAndNotifyFailedPaths(err, mInProgressUpdateList, count);
        needToResubscribe = true;
    }
    else
    {
        // Whatever was not discarded above should be retried
        err = MoveInProgressToPending();
        if (err != WEAVE_NO_ERROR)
        {
            AbortUpdates(err);
        }
    }

    mUpdateRequestContext.Reset();

    PurgePendingUpdate();

    if (mPendingSetState == kPendingSetEmpty)
    {
        mUpdateRetryCounter = 0;

        NoMorePendingEventCbHelper();

        if (CheckForSinksWithDataLoss())
        {
            needToResubscribe = true;
        }
    }

    if (mPendingSetState == kPendingSetReady)
    {
        StartUpdateRetryTimer(wholeRequestSucceeded ? WEAVE_NO_ERROR : WEAVE_ERROR_STATUS_REPORT_RECEIVED);
    }

    // If we need to resubscribe, bring it down
    if (needToResubscribe && IsInProgressOrEstablished())
    {
        WeaveLogDetail(DataManagement, "UpdateResponse: triggering resubscription");
        TerminateSubscription(err, NULL, false);
    }

    UnlockUpdateMutex();

    WeaveLogFunctError(err);

    _Release();
}

/**
 * This handler is optimized for the case that the request never reached the
 * responder: the dispatched paths are put back in the pending queue and retried.
 */
void SubscriptionClient::OnUpdateNoResponse(WEAVE_ERROR aError)
{
    TraitPath traitPath;
    WEAVE_ERROR err = WEAVE_NO_ERROR;

    _AddRef();

    LockUpdateMutex();

    ClearUpdateInFlight();

    // Notify the app for all dispatched paths.
    for (size_t j = mInProgressUpdateList.GetFirstValidItem();
            j < mInProgressUpdateList.GetPathStoreSize();
            j = mInProgressUpdateList.GetNextValidItem(j))
    {
        if (! mInProgressUpdateList.AreFlagsSet(j, kFlag_Private))
        {
            mInProgressUpdateList.GetItemAt(j, traitPath);

            UpdateCompleteEventCbHelper(traitPath,
                                        nl::Weave::Profiles::kWeaveProfile_Common,
                                        nl::Weave::Profiles::Common::kStatus_Success,
                                        aError,
                                        true);
        }
    }

    //Move paths from DispatchedUpdates to PendingUpdates for all TIs.
    err = MoveInProgressToPending();
    if (err != WEAVE_NO_ERROR)
    {
        AbortUpdates(err);
    }
    else
    {
        PurgePendingUpdate();
    }

    if (mPendingUpdateSet.IsEmpty())
    {
        NoMorePendingEventCbHelper();
    }
    else
    {
        StartUpdateRetryTimer(aError);
    }

    UnlockUpdateMutex();

    _Release();
}

void SubscriptionClient::UpdateEventCallback (void * const aAppState,
                                              UpdateClient::EventType aEvent,
                                              const UpdateClient::InEventParam & aInParam,
                                              UpdateClient::OutEventParam & aOutParam)
{
    SubscriptionClient * const pSubClient = reinterpret_cast<SubscriptionClient *>(aAppState);

    switch (aEvent)
    {
    case UpdateClient::kEvent_UpdateComplete:
        WeaveLogDetail(DataManagement, "UpdateComplete event: %d", aEvent);

        if (aInParam.UpdateComplete.Reason == WEAVE_NO_ERROR)
        {
            pSubClient->OnUpdateResponse(aInParam.UpdateComplete.Reason, aInParam.UpdateComplete.StatusReportPtr);
        }
        else
        {
            pSubClient->OnUpdateNoResponse(aInParam.UpdateComplete.Reason);
        }

        break;
    case UpdateClient::kEvent_UpdateContinue:
        WeaveLogDetail(DataManagement, "UpdateContinue event: %d", aEvent);
        pSubClient->ClearUpdateInFlight();
        pSubClient->FormAndSendUpdate();
        break;
    default:
        WeaveLogDetail(DataManagement, "Unknown UpdateClient event: %d", aEvent);
        break;
    }

    return;
}

WEAVE_ERROR SubscriptionClient::SetUpdated(TraitUpdatableDataSink * aDataSink, PropertyPathHandle aPropertyHandle, bool aIsConditional)
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    TraitDataHandle dataHandle;
    const TraitSchemaEngine * schemaEngine;
    bool needToSetUpdateRequiredVersion = false;
    bool isTraitInstanceInUpdate = false;

    LockUpdateMutex();

    if (aIsConditional)
    {
        if (!aDataSink->IsVersionValid())
        {
            err = WEAVE_ERROR_WDM_LOCAL_DATA_INCONSISTENT;
            WeaveLogDetail(DataManagement, "Rejected mutation with error %d", err);
            ExitNow();
        }
    }

    schemaEngine = aDataSink->GetSchemaEngine();

    err = mDataSinkCatalog->Locate(aDataSink, dataHandle);
    SuccessOrExit(err);

    isTraitInstanceInUpdate = mPendingUpdateSet.IsTraitPresent(dataHandle) ||
                              mInProgressUpdateList.IsTraitPresent(dataHandle);

    // It is not supported to mix conditional and non-conditional updates
    // in the same trait.
    if (isTraitInstanceInUpdate)
    {
        VerifyOrExit(aIsConditional == aDataSink->IsConditionalUpdate(), err = WEAVE_ERROR_WDM_INCONSISTENT_CONDITIONALITY);
    }
    else
    {
        if (aIsConditional)
        {
            needToSetUpdateRequiredVersion = true;
        }
    }

    err = AddItemPendingUpdateSet(TraitPath(dataHandle, aPropertyHandle), schemaEngine);
    SuccessOrExit(err);


    if (needToSetUpdateRequiredVersion)
    {
        uint64_t requiredDataVersion = aDataSink->GetVersion();
        aDataSink->SetUpdateRequiredVersion(requiredDataVersion);
        WeaveLogDetail(DataManagement, "<SetUpdated> Set update required version to 0x%" PRIx64 "", aDataSink->GetUpdateRequiredVersion());
    }
    aDataSink->SetConditionalUpdate(aIsConditional);

    SetPendingSetState(kPendingSetOpen);

exit:

    UnlockUpdateMutex();

    return err;
}

/**
 * Tells the SubscriptionClient to empty the set of TraitPaths pending to be updated and abort the
 * update request that is in progress, if any.
 * This method can be invoked from any callback.
 */
void SubscriptionClient::DiscardUpdates()
{
    AbortUpdates(WEAVE_NO_ERROR);
}

/**
 * Empties the set of TraitPaths pending to be updated and aborts the
 * update request that is in progress, if any.
 * Calls the application callback for every path if the error code passed is not WEAVE_NO_ERROR.
 *
 * Note that this method is written to be reentrant. If this is called internally with an
 * error code, the application can in turn call SetUpdated()/FlushUpdate() or DiscardUpdates()
 * from any of the paths.
 * A call to SetUpdated() will add a path to mPendingUpdateSet as usual, and that path won't be
 * deleted by the AbortUpdates in progress.
 * If the application calls DiscardUpdates, another AbortUpdates will start executing; this will
 * mark any new paths as failed and empty the stores without further callbacks. When the original
 * AbortUpdates resumes, it will find no more paths to notify the application about.
 *
 * @param[in]   aErr    The WEAVE_ERROR to notify the application with.
 */
void SubscriptionClient::AbortUpdates(WEAVE_ERROR aErr)
{
    size_t numPending = 0;
    size_t numInProgress = 0;
    mResubscribeNeeded = false;

    LockUpdateMutex();

    SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->CancelTimer(OnUpdateTimerCallback, this);
    mUpdateRetryScheduled = false;

    mUpdateFlushScheduled = false;

    ClearUpdateInFlight();

    mUpdateClient.CancelUpdate();

    if (mDataSinkCatalog)
    {
        mDataSinkCatalog->Iterate(RefreshUpdatableSinkTrait, this);
    }

    if (mResubscribeNeeded && IsInProgressOrEstablished())
    {
        TerminateSubscription(WEAVE_NO_ERROR, NULL, false);
    }

    // If there's an error code, notify the application
    if (aErr == WEAVE_NO_ERROR)
    {
        numPending = mPendingUpdateSet.GetNumItems();
        mPendingUpdateSet.Clear();
        SetPendingSetState(kPendingSetEmpty);

        numInProgress = mInProgressUpdateList.GetNumItems();
        mInProgressUpdateList.Clear();
    }
    else
    {
        // Note that the application can call DiscardUpdates() or SetUpdated()/FlushUpdate()
        // from any callback.
        // SetUpdated() can re-add paths to the pending list.
        // FlushUpdate() can re-start the retry timer.
        // A call to DiscardUpdates() does nothing because all paths are already marked as failed,
        // unless SetUpdated() has been by a callback for an earlier element.

        mPendingUpdateSet.SetFailed();
        mInProgressUpdateList.SetFailed();
        PurgeAndNotifyFailedPaths(aErr, mPendingUpdateSet, numPending);
        PurgeAndNotifyFailedPaths(aErr, mInProgressUpdateList, numInProgress);
    }

    WeaveLogDetail(DataManagement, "Discarded %" PRIu32 " pending  and %" PRIu32 " inProgress paths",
            numPending, numInProgress);

    UnlockUpdateMutex();

    return;
}

void SubscriptionClient::RefreshUpdatableSinkTrait(void * aDataSink, TraitDataHandle aDataHandle, void * aContext)
{
    SubscriptionClient * subClient = static_cast<SubscriptionClient *>(aContext);
    TraitDataSink * dataSink = static_cast<TraitDataSink *>(aDataSink);
    TraitUpdatableDataSink * updatableDataSink = NULL;
    bool refreshTraitInstance = false;

    VerifyOrExit(dataSink->IsUpdatableDataSink() == true, /* no error */);

    updatableDataSink = static_cast<TraitUpdatableDataSink *>(dataSink);

    if (subClient->mPendingUpdateSet.IsTraitPresent(aDataHandle))
    {
        refreshTraitInstance = true;
    }

    if (subClient->mInProgressUpdateList.IsTraitPresent(aDataHandle))
    {
        refreshTraitInstance = true;
    }

    if (refreshTraitInstance)
    {
        updatableDataSink->SetConditionalUpdate(false);
        updatableDataSink->ClearUpdateRequiredVersion();
        updatableDataSink->ClearVersion();
        subClient->mResubscribeNeeded = true;
    }

exit:
    return;
}

/**
 * Tells the SubscriptionClient to stop retrying update requests.
 * Allows the application to suspend updates for a period of time
 * without discarding all metadata.
 * Updates and retries will be resumed when FlushUpdate is called.
 * When called to suspend updates while an update is in-flight, the update
 * is not canceled but in case it fails it will not be retried until FlushUpdate
 * is called again.
 */
void SubscriptionClient::SuspendUpdateRetries()
{
    LockUpdateMutex();

    SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->CancelTimer(OnUpdateTimerCallback, this);
    mUpdateRetryScheduled = false;

    if (false == mSuspendUpdateRetries)
    {
        WeaveLogDetail(DataManagement, "%s false -> true", __func__);

        mSuspendUpdateRetries = true;
    }

    UnlockUpdateMutex();
}

/**
 * Fail all conditional pending paths that have become obsolete and
 * notify the application.
 */
WEAVE_ERROR SubscriptionClient::PurgePendingUpdate()
{
    WEAVE_ERROR err = WEAVE_NO_ERROR;
    size_t numPendingPathsDeleted = 0;

    WeaveLogDetail(DataManagement, "PurgePendingUpdate: numItems before: %d", mPendingUpdateSet.GetNumItems());

    VerifyOrExit(mPendingUpdateSet.GetNumItems() > 0, );

    if (mDataSinkCatalog)
    {
        mDataSinkCatalog->Iterate(PurgePendingUpdatableSinkTrait, this);
    }

    PurgeAndNotifyFailedPaths(WEAVE_ERROR_WDM_VERSION_MISMATCH, mPendingUpdateSet, numPendingPathsDeleted);

    if ((numPendingPathsDeleted > 0) && IsInProgressOrEstablished())
    {
        TerminateSubscription(WEAVE_ERROR_WDM_VERSION_MISMATCH, NULL, false);
    }

exit:
    WeaveLogDetail(DataManagement, "PurgePendingUpdate: numItems after: %d", mPendingUpdateSet.GetNumItems());

    return err;
}

void SubscriptionClient::PurgePendingUpdatableSinkTrait(void * aDataSink, TraitDataHandle aDataHandle, void * aContext)
{
    SubscriptionClient * subClient = static_cast<SubscriptionClient *>(aContext);
    TraitDataSink * dataSink = static_cast<TraitDataSink *>(aDataSink);
    TraitUpdatableDataSink * updatableDataSink = NULL;

    VerifyOrExit(dataSink->IsUpdatableDataSink() == true, /* no error */);

    updatableDataSink = static_cast<TraitUpdatableDataSink *>(dataSink);

    if (updatableDataSink->IsVersionValid())
    {
        subClient->MarkFailedPendingPaths(aDataHandle, *updatableDataSink, updatableDataSink->GetVersion());
    }

exit:
    return;
}

void SubscriptionClient::SetUpdateStartVersions(void)
{
    TraitPath traitPath;
    TraitUpdatableDataSink *updatableSink;

    for (size_t i = mInProgressUpdateList.GetFirstValidItem();
            i < mInProgressUpdateList.GetPathStoreSize();
            i = mInProgressUpdateList.GetNextValidItem(i))
    {
        mInProgressUpdateList.GetItemAt(i, traitPath);

        updatableSink = Locate(traitPath.mTraitDataHandle, mDataSinkCatalog);
        if (NULL != updatableSink)
        {
            updatableSink->SetUpdateStartVersion();
        }
    }
}

WEAVE_ERROR SubscriptionClient::SendSingleUpdateRequest(void)
{
    WEAVE_ERROR err   = WEAVE_NO_ERROR;
    uint32_t maxUpdateSize;
    uint32_t maxPayloadSize = 0;
    PacketBuffer* pBuf = NULL;
    UpdateEncoder::Context context;

    maxUpdateSize = GetMaxUpdateSize();
    err = mUpdateClient.mpBinding->AllocateRightSizedBuffer(pBuf, maxUpdateSize, WDM_MIN_UPDATE_SIZE, maxPayloadSize);
    SuccessOrExit(err);

    mUpdateRequestContext.mIsPartialUpdate = false;

    context.mBuf = pBuf;
    context.mMaxPayloadSize = maxPayloadSize;
    context.mUpdateRequestIndex = mUpdateRequestContext.mUpdateRequestIndex;
    context.mExpiryTimeMicroSecond = 0;
    context.mItemInProgress = mUpdateRequestContext.mItemInProgress;
    context.mNextDictionaryElementPathHandle = mUpdateRequestContext.mNextDictionaryElementPathHandle;
    context.mInProgressUpdateList = &mInProgressUpdateList;
    context.mDataSinkCatalog = mDataSinkCatalog;

    err = mUpdateEncoder.EncodeRequest(context);
    SuccessOrExit(err);

    mUpdateRequestContext.mNextDictionaryElementPathHandle = context.mNextDictionaryElementPathHandle;

    if (context.mItemInProgress < mInProgressUpdateList.GetPathStoreSize())
    {
        // This is a PartialUpdateRequest; increase the index for the next one
        mUpdateRequestContext.mIsPartialUpdate = true;
        mUpdateRequestContext.mUpdateRequestIndex++;
    }


    if (context.mNumDataElementsAddedToPayload > 0)
    {
        if (false == mUpdateRequestContext.mIsPartialUpdate)
        {
            // TODO: Should this happen at the first PartialUpdateRequest, or at the final UpdateRequest?
            SetUpdateStartVersions();
        }

        WeaveLogDetail(DataManagement, "Sending %sUpdateRequest with %" PRIu16 " DEs",
                mUpdateRequestContext.mIsPartialUpdate ? "Partial" : "",
                context.mNumDataElementsAddedToPayload);

        // TODO: SetUpdateInFlight is here instead of after SendUpdate
        // to be able to inject timeouts; must improve this..
        SetUpdateInFlight();

        err = mUpdateClient.SendUpdate(mUpdateRequestContext.mIsPartialUpdate, pBuf, context.mUpdateRequestIndex == 0);
        pBuf = NULL;
        SuccessOrExit(err);

        mUpdateRequestContext.mItemInProgress = context.mItemInProgress;
    }
    else
    {
        mUpdateClient.CancelUpdate();
    }

exit:

    if (NULL != pBuf)
    {
        PacketBuffer::Free(pBuf);
        pBuf = NULL;
    }

    if (err == WEAVE_ERROR_BUFFER_TOO_SMALL)
    {
        WeaveLogDetail(DataManagement, "illegal oversized trait property is too big to fit in the packet");
    }

    return err;
}

void SubscriptionClient::FormAndSendUpdate()
{
    WEAVE_ERROR err                  = WEAVE_NO_ERROR;

    LockUpdateMutex();

    VerifyOrExit(!IsUpdateInFlight(), WeaveLogDetail(DataManagement, "Update request in flight"));

    WeaveLogDetail(DataManagement, "Eval Subscription: (state = %s)!", GetStateStr());

    if (mBinding->IsReady())
    {
        if (mInProgressUpdateList.IsEmpty() && mPendingSetState == kPendingSetReady)
        {
            MovePendingToInProgress();
        }

        err = SendSingleUpdateRequest();
        SuccessOrExit(err);

        WeaveLogDetail(DataManagement, "Done update processing!");
    }
    else if (false == mBinding->IsPreparing())
    {
        err = _PrepareBinding();
        SuccessOrExit(err);
    }

exit:

    if (WEAVE_NO_ERROR != err)
    {
        // If anything failed, the UpdateRequest payload was not sent.
        // Move paths back to pending and retry later.
        OnUpdateNoResponse(err);
    }

    UnlockUpdateMutex();

    WeaveLogFunctError(err);

    return;
}

/**
 * Signals that the application has finished mutating all TraitUpdatableDataSinks.
 * Unless a previous update exchange is in progress, the client will
 * take all data marked as updated and send it to the responder in one update request.
 * This method can be called from any thread.
 *
 * @param[in] aForce    If true, causes the update to be sent immediately even if
 *                      a retry has been scheduled in the future. This parameter is
 *                      considered false by default.
 *
 * @return WEAVE_NO_ERROR in case of success; other WEAVE_ERROR codes in case of failure.
 */
WEAVE_ERROR SubscriptionClient::FlushUpdate()
{
    return FlushUpdate(false);
}
WEAVE_ERROR SubscriptionClient::FlushUpdate(bool aForce)
{
    WEAVE_ERROR err                  = WEAVE_NO_ERROR;

    WeaveLogDetail(DataManagement, "%s", __func__);

    LockUpdateMutex();

    mSuspendUpdateRetries = false;

    if (mPendingSetState == kPendingSetOpen)
    {
        SetPendingSetState(kPendingSetReady);
    }

    VerifyOrExit(mPendingSetState == kPendingSetReady,
            WeaveLogDetail(DataManagement, "%s: PendingSetState: %d; err = %s", __func__, mPendingSetState, nl::ErrorStr(err)));

    VerifyOrExit(false == IsUpdateInFlight(),
            WeaveLogDetail(DataManagement, "%s: update already in flight", __func__));

    if (aForce)
    {
        // Mark the timer as not running, so the ScheduleWork handler will reset it and
        // start an immediate attempt.
        mUpdateRetryScheduled = false;
    }

    VerifyOrExit(false == mUpdateRetryScheduled, );

    VerifyOrExit(false == mUpdateFlushScheduled, );

    // Tell the Weave thread to try sending the update immediately
    err = SubscriptionEngine::GetInstance()->GetExchangeManager()->MessageLayer->SystemLayer->ScheduleWork(OnUpdateScheduleWorkCallback, this);
    SuccessOrExit(err);

    _AddRef();
    mUpdateFlushScheduled = true;

exit:
    UnlockUpdateMutex();

    if (mPendingSetState == kPendingSetEmpty)
    {
        NoMorePendingEventCbHelper();
    }

    return err;
}

bool SubscriptionClient::CheckForSinksWithDataLoss()
{
    bool needToResubscribe = false;

    mDataSinkCatalog->Iterate(CheckForSinksWithDataLossIteratorCb, &needToResubscribe);

    return needToResubscribe;
}

void SubscriptionClient::CheckForSinksWithDataLossIteratorCb(void * aDataSink, TraitDataHandle aDataHandle, void * aContext)
{
    TraitDataSink * dataSink = static_cast<TraitDataSink *>(aDataSink);
    TraitUpdatableDataSink * updatableDataSink = NULL;
    bool *needToResubscribe = static_cast<bool *>(aContext);

    VerifyOrExit(dataSink->IsUpdatableDataSink() == true, /* no error */);

    updatableDataSink = static_cast<TraitUpdatableDataSink *>(dataSink);

    if (updatableDataSink->IsPotentialDataLoss())
    {
        WeaveLogDetail(DataManagement, "Need to resubscribe for potential data loss in TDH %d, trait %08x",
                aDataHandle,
                updatableDataSink->GetSchemaEngine()->GetProfileId());

        updatableDataSink->ClearVersion();
        updatableDataSink->ClearUpdateRequiredVersion();
        updatableDataSink->SetConditionalUpdate(false);
        *needToResubscribe = true;
    }

exit:
    return;
}

/**
 * This method should be called when the TraitDataSink catalog
 * has been modified.
 */
void SubscriptionClient::OnCatalogChanged()
{
    ConfigureUpdatableSinks();
}

/**
 * Iterates over the TraitDataSink catalog to configure updatable sinks
 */
void SubscriptionClient::ConfigureUpdatableSinks()
{
    LockUpdateMutex();

    if (mDataSinkCatalog)
    {
        mDataSinkCatalog->Iterate(ConfigureUpdatableSinkTrait, this);
    }

    UnlockUpdateMutex();
}

void SubscriptionClient::ConfigureUpdatableSinkTrait(void * aDataSink, TraitDataHandle aDataHandle, void * aContext)
{
    SubscriptionClient * subClient = static_cast<SubscriptionClient *>(aContext);
    TraitDataSink * dataSink = static_cast<TraitDataSink *>(aDataSink);
    TraitUpdatableDataSink * updatableDataSink = NULL;

    VerifyOrExit(dataSink->IsUpdatableDataSink() == true, /* no error */);

    updatableDataSink = static_cast<TraitUpdatableDataSink *>(dataSink);

    if (updatableDataSink->GetSubscriptionClient() != subClient)
    {
        updatableDataSink->SetSubscriptionClient(subClient);
        updatableDataSink->SetUpdateEncoder(&subClient->mUpdateEncoder);
        updatableDataSink->ClearUpdateRequiredVersion();
        updatableDataSink->ClearUpdateStartVersion();
        updatableDataSink->SetConditionalUpdate(false);
        updatableDataSink->SetPotentialDataLoss(false);
    }

    exit:
        return;
}

void SubscriptionClient::CleanupUpdatableSinkTrait(void * aDataSink, TraitDataHandle aDataHandle, void * aContext)
{
    TraitUpdatableDataSink * updatableDataSink = NULL;
    TraitDataSink * dataSink = static_cast<TraitDataSink *>(aDataSink);

    VerifyOrExit(dataSink->IsUpdatableDataSink() == true, /* no error */);

    updatableDataSink = static_cast<TraitUpdatableDataSink *>(dataSink);

    updatableDataSink->SetSubscriptionClient(NULL);
    updatableDataSink->SetUpdateEncoder(NULL);

exit:
    return;
}

void SubscriptionClient::UpdateRequestContext::Reset()
{
    mItemInProgress = 0;
    mNextDictionaryElementPathHandle = kNullPropertyPathHandle;
    mUpdateRequestIndex = 0;

    mIsPartialUpdate = false;
}

#endif // WEAVE_CONFIG_ENABLE_WDM_UPDATE
}; // namespace WeaveMakeManagedNamespaceIdentifier(DataManagement, kWeaveManagedNamespaceDesignation_Current)
}; // namespace Profiles
}; // namespace Weave
}; // namespace nl
