blob: 8e30e73001a80136f83c5357c83a2a1a02fad3ec [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_ENCODER_SHIPPING_DISPATCHER_H_
#define COBALT_ENCODER_SHIPPING_DISPATCHER_H_
#include <map>
#include <memory>
#include <vector>
#include "./observation.pb.h"
#include "encoder/shipping_manager.h"
#include "third_party/clearcut/uploader.h"
#include "third_party/tensorflow_statusor/statusor.h"
#include "util/status.h"
namespace cobalt {
namespace encoder {
// ShippingDispatcher is a wrapper around the ShippingManager class. It allows
// dispatching to multiple different ShippingManagers so that we can send
// observations to different backends (current backends are a GKE GRPC, and a
// dummy Clearcut uploader). See ShippingManager for more details.
class ShippingDispatcher {
public:
// Registers a ShippingManager to be handled by the ShippingDispatcher. A
// particular |backend| should not be registered more than once, if it is,
// the last call to Register will take precedence.
void Register(ObservationMetadata::ShufflerBackend backend,
std::unique_ptr<ShippingManager> manager);
// Returns the list of ObservationMetadata::ShufflerBackends that have been
// registered with the ShippingDispatcher.
std::vector<ObservationMetadata::ShufflerBackend> RegisteredBackends();
// Starts the worker thread for all of the ShippingManagers. This method must
// be invoked exactly once.
void Start();
// Notifies all of the ShippingManagers that an observation may have been
// added to their ObservationStores.
void NotifyObservationsAdded();
// Register a request with all controlled ShippingManagers for an expedited
// send. The underlying ShippingManager's worker thread will use its
// |SendRetryer| to send all of the accumulated, unsent Observations as soon
// as possible but not sooner than |min_interval| seconds after the previous
// send operation has completed.
void RequestSendSoon();
// A version of RequestSendSoon() that provides feedback about the send.
// |send_callback| will be invoked with the result of the requested send
// attempt. More precisely, send_callback will be invoked after all of the
// ShippingManagers have attempted to send all of the Observations that were
// added prior to the invocation of RequestSendSoon(). It will be invoked
// with true if all such Observations were succesfully sent. It will be
// invoked with false if some Observations were not able to be sent, but
// the status of any particular Observation may not be determined. This
// is useful mainly in tests.
void RequestSendSoon(ShippingManager::SendCallback send_callback);
// Waits for |max_wait| seconds on each owned ShippingManager in sequence
// until each becomes idle.This method is most useful if it can be arranged
// that there are no concurrent invocations of AddObservation() (for example
// in a test) because such concurrent invocations may cause the idle state to
// never be entered.
void WaitUntilIdle(std::chrono::seconds max_wait);
// These diagnostic stats are mostly useful in a testing environment but
// may possibly prove useful in production also.
size_t NumSendAttempts();
size_t NumFailedAttempts();
util::Status last_send_status(ObservationMetadata::ShufflerBackend backend);
private:
friend class ShippingDispatcherTest;
tensorflow_statusor::StatusOr<ShippingManager*> manager(
ObservationMetadata::ShufflerBackend backend);
std::map<ObservationMetadata::ShufflerBackend,
std::unique_ptr<ShippingManager>>
shipping_managers_;
// RequestSendCallback is used in RequestSendSoon to make sure that the
// callback is called only after a specified number of invocations of |Call|.
class RequestSendCallback {
public:
~RequestSendCallback();
RequestSendCallback(ShippingManager::SendCallback cb,
size_t needed_callbacks);
void Call(bool success);
private:
const size_t needed_callbacks_;
size_t seen_callbacks_;
bool success_;
bool callback_called_;
ShippingManager::SendCallback cb_;
std::mutex mutex_;
};
};
} // namespace encoder
} // namespace cobalt
#endif // COBALT_ENCODER_SHIPPING_DISPATCHER_H_