// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#pragma once

#include <fuchsia/overnet/protocol/cpp/fidl.h>
#include <fuchsia/overnet/protocol/cpp/overnet_internal.h>

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <ostream>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <vector>

#include "lib/fidl/cpp/clone.h"
#include "src/connectivity/overnet/lib/environment/timer.h"
#include "src/connectivity/overnet/lib/environment/trace.h"
#include "src/connectivity/overnet/lib/labels/node_id.h"
#include "src/connectivity/overnet/lib/vocabulary/bandwidth.h"
#include "src/connectivity/overnet/lib/vocabulary/internal_list.h"
#include "src/connectivity/overnet/lib/vocabulary/optional.h"
#include "src/connectivity/overnet/lib/vocabulary/slice.h"

namespace overnet {
namespace routing_table_impl {

struct FullLinkLabel {
  NodeId from;
  NodeId to;
  uint64_t link_label;
};

inline bool operator==(const FullLinkLabel& a, const FullLinkLabel& b) {
  return a.from == b.from && a.to == b.to && a.link_label == b.link_label;
}

inline std::ostream& operator<<(std::ostream& out, FullLinkLabel lbl) {
  return out << lbl.from << "->" << lbl.to << "#" << lbl.link_label;
}

}  // namespace routing_table_impl
}  // namespace overnet

namespace std {
template <>
struct hash<overnet::routing_table_impl::FullLinkLabel> {
  size_t operator()(
      const overnet::routing_table_impl::FullLinkLabel& id) const {
    return id.from.Hash() ^ id.to.Hash() ^ id.link_label;
  }
};
}  // namespace std

namespace overnet {

class RoutingTable {
 public:
  RoutingTable(NodeId root_node, Timer* timer, bool allow_threading);
  ~RoutingTable();
  RoutingTable(const RoutingTable&) = delete;
  RoutingTable& operator=(const RoutingTable&) = delete;

  static constexpr TimeDelta EntryExpiry() { return TimeDelta::FromMinutes(5); }

  struct SelectedLink {
    NodeId target_node;
    uint64_t link_id;
    uint32_t route_mss;

    bool operator==(SelectedLink other) const {
      return target_node == other.target_node && link_id == other.link_id &&
             route_mss == other.route_mss;
    }
  };
  using SelectedLinks = std::unordered_map<NodeId, SelectedLink>;

  void ProcessUpdate(
      std::initializer_list<fuchsia::overnet::protocol::NodeStatus>
          node_updates,
      std::initializer_list<fuchsia::overnet::protocol::LinkStatus>
          link_updates,
      bool flush_old_nodes);

  // Returns true if this update concludes any changes begun by all prior
  // Update() calls.
  template <class F>
  bool PollLinkUpdates(F f) {
    if (!mu_.try_lock()) {
      return false;
    }
    if (selected_links_version_ != published_links_version_) {
      published_links_version_ = selected_links_version_;
      f(selected_links_);
    }
    const bool done = !processing_changes_.has_value();
    mu_.unlock();
    return done;
  }

  void BlockUntilNoBackgroundUpdatesProcessing() {
    std::unique_lock<std::mutex> lock(mu_);
    cv_.wait(lock, [this]() -> bool { return !processing_changes_; });
  }

  uint64_t gossip_version() const {
    std::lock_guard<std::mutex> lock(shared_table_mu_);
    return gossip_version_;
  }

  uint64_t SendUpdate(fuchsia::overnet::protocol::Peer_Proxy* peer,
                      Optional<NodeId> exclude_node) const;

  template <class F>
  void ForEachNodeMetric(F visitor) const {
    std::vector<fuchsia::overnet::protocol::NodeStatus> nodes_copy;
    {
      std::lock_guard lock{shared_table_mu_};
      for (const auto& m : shared_node_status_) {
        nodes_copy.emplace_back(fidl::Clone(m));
      }
    }
    for (const auto& m : nodes_copy) {
      visitor(m);
    }
  }

  // Request notification that the node tables has been updated.
  // This may be called back on an arbitrary thread (unlike most of overnet).
  void OnNodeTableUpdate(uint64_t last_seen_version, Callback<void> callback) {
    std::lock_guard lock{shared_table_mu_};
    if (gossip_version_ != last_seen_version) {
      // Forces callback to be called after lock is released.
      return;
    }
    on_node_table_update_.emplace_back(std::move(callback));
  }

 private:
  const NodeId root_node_;
  Timer* const timer_;

  struct StatusVecs {
    std::vector<fuchsia::overnet::protocol::NodeStatus> nodes;
    std::vector<fuchsia::overnet::protocol::LinkStatus> links;
    bool Empty() const { return nodes.empty() && links.empty(); }
    void Clear() {
      nodes.clear();
      links.clear();
    }
  };
  StatusVecs change_log_;
  const bool allow_threading_;
  bool flush_requested_ = false;

  void ApplyChanges(TimeStamp now, const StatusVecs& changes, bool flush);
  SelectedLinks BuildForwardingTable();

  TimeStamp last_update_{TimeStamp::Epoch()};
  uint64_t path_finding_run_ = 0;

  std::mutex mu_;
  std::condition_variable cv_;
  Optional<std::thread> processing_changes_;

  struct Node;

  struct Link {
    Link(TimeStamp now, fuchsia::overnet::protocol::LinkStatus initial_status,
         Node* from, Node* to)
        : status(std::move(initial_status)),
          last_updated(now),
          from_node(from),
          to_node(to) {}
    fuchsia::overnet::protocol::LinkStatus status;
    TimeStamp last_updated;
    InternalListNode<Link> outgoing_link;
    InternalListNode<Link> incoming_link;
    Node* const from_node;
    Node* const to_node;
  };

  struct Node {
    Node(TimeStamp now, fuchsia::overnet::protocol::NodeStatus initial_status)
        : status(std::move(initial_status)), last_updated(now) {}
    fuchsia::overnet::protocol::NodeStatus status;
    TimeStamp last_updated;
    InternalList<Link, &Link::outgoing_link> outgoing_links;
    InternalList<Link, &Link::incoming_link> incoming_links;

    // Path finding temporary state.
    uint64_t last_path_finding_run = 0;
    TimeDelta best_rtt{TimeDelta::Zero()};
    Node* best_from;
    Link* best_link;
    uint32_t mss;
    bool queued = false;
    InternalListNode<Node> path_finding_node;
  };

  void RemoveOutgoingLinks(Node* node);
  void RemoveIncomingLinks(Node* node);

  std::unordered_map<NodeId, Node> nodes_;
  std::unordered_map<routing_table_impl::FullLinkLabel, Link> links_;

  mutable std::mutex shared_table_mu_;
  uint64_t gossip_version_ = 0;
  std::vector<fuchsia::overnet::protocol::NodeStatus> shared_node_status_;
  std::vector<fuchsia::overnet::protocol::LinkStatus> shared_link_status_;
  std::vector<Callback<void>> on_node_table_update_;

  uint64_t selected_links_version_ = 0;
  SelectedLinks selected_links_;
  uint64_t published_links_version_ = 0;
};

}  // namespace overnet
