// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/connectivity/overnet/lib/routing/routing_table.h"

#include <iostream>
#include <unordered_set>

#include "garnet/public/lib/fostr/fidl/fuchsia/overnet/protocol/formatting.h"
#include "src/connectivity/overnet/lib/protocol/varint.h"

using overnet::routing_table_impl::FullLinkLabel;

namespace overnet {

namespace {

template <class T>
void MoveInto(std::initializer_list<T> from, std::vector<T>* to) {
  for (auto& val : from) {
    to->emplace_back(fidl::Clone(val));
  }
}

}  // namespace

RoutingTable::RoutingTable(NodeId root_node, Timer* timer, bool allow_threading)
    : root_node_(root_node), timer_(timer), allow_threading_(allow_threading) {}

RoutingTable::~RoutingTable() {
  std::unique_lock<std::mutex> lock(mu_);
  if (processing_changes_) {
    std::thread pending_processing = std::move(*processing_changes_);
    processing_changes_.Reset();
    cv_.notify_all();
    lock.unlock();
    pending_processing.join();
  }

  for (auto& n : nodes_) {
    RemoveOutgoingLinks(&n.second);
  }
}

void RoutingTable::ProcessUpdate(
    std::initializer_list<fuchsia::overnet::protocol::NodeStatus> nodes,
    std::initializer_list<fuchsia::overnet::protocol::LinkStatus> links,
    bool flush_old_nodes) {
  if (nodes.size() == 0 && links.size() == 0 && !flush_old_nodes)
    return;
  std::unique_lock<std::mutex> lock(mu_);
  last_update_ = timer_->Now();
  const bool was_empty = change_log_.Empty() && !flush_requested_;
  if (flush_old_nodes)
    flush_requested_ = true;
  MoveInto(nodes, &change_log_.nodes);
  MoveInto(links, &change_log_.links);
  if (!was_empty)
    return;
  if (processing_changes_)
    return;
  auto process_changes = [this, changes = std::move(change_log_),
                          flush = flush_requested_, now = last_update_,
                          renderer = ScopedRenderer::current(),
                          severity = ScopedSeverity::current()]() mutable {
    ScopedRenderer scoped_renderer(renderer);
    ScopedSeverity scoped_severity(severity);
    while (true) {
      ApplyChanges(now, changes, flush);
      SelectedLinks new_selected_links = BuildForwardingTable();

      // Publish changes. If change-log has grown, restart update.
      std::lock_guard<std::mutex> lock(mu_);
      if (selected_links_ != new_selected_links) {
        selected_links_.swap(new_selected_links);
        selected_links_version_++;
      }
      if (!processing_changes_) {
        // Indicates that the owning RoutingTable instance is in its destruction
        // sequence.
        cv_.notify_all();
        return;
      } else if (change_log_.Empty() && !flush_requested_) {
        processing_changes_->detach();
        processing_changes_.Reset();
        cv_.notify_all();
        return;
      } else {
        changes = std::move(change_log_);
        change_log_.Clear();
        flush = flush_requested_;
        flush_requested_ = false;
        now = last_update_;
      }
    }
  };
  if (allow_threading_) {
    processing_changes_.Reset(std::move(process_changes));
    cv_.notify_all();
  }
  flush_requested_ = false;
  change_log_.Clear();
  if (!allow_threading_) {
    lock.unlock();
    process_changes();
  }
}

void RoutingTable::ApplyChanges(TimeStamp now, const StatusVecs& changes,
                                bool flush) {
  bool new_gossip_version = false;

  // Update all metrics from changelogs.
  for (const auto& m : changes.nodes) {
    auto it = nodes_.find(NodeId(m.id));
    const char* log_verb = "uninteresting";
    if (it == nodes_.end()) {
      if (m.version != fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE) {
        new_gossip_version = true;
        nodes_.emplace(std::piecewise_construct,
                       std::forward_as_tuple(NodeId(m.id)),
                       std::forward_as_tuple(now, fidl::Clone(m)));
        log_verb = "new";
      }
    } else if (m.version > it->second.status.version) {
      new_gossip_version = true;
      it->second.status = fidl::Clone(m);
      it->second.last_updated = now;
      log_verb = "updated";
    }
    OVERNET_TRACE(DEBUG) << "NODE UPDATE: " << log_verb << " " << m;
  }
  for (const auto& m : changes.links) {
    auto report_drop = [&m](const char* why) {
      OVERNET_TRACE(DEBUG) << "Drop link info: from=" << m.from
                           << " to=" << m.to << " label=" << m.local_id
                           << " version=" << m.version << ": " << why;
    };
    // Cannot add a link if the relevant nodes are unknown.
    auto from_node = nodes_.find(NodeId(m.from));
    if (from_node == nodes_.end()) {
      report_drop("from node does not exist in routing table");
      continue;
    }
    auto to_node = nodes_.find(NodeId(m.to));
    if (to_node == nodes_.end()) {
      report_drop("to node does not exist in routing table");
      continue;
    }

    // Add the link.
    const FullLinkLabel key = {NodeId(m.from), NodeId(m.to), m.local_id};
    auto it = links_.find(key);
    if (it == links_.end() &&
        m.version != fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE) {
      new_gossip_version = true;
      it = links_
               .emplace(
                   std::piecewise_construct, std::forward_as_tuple(key),
                   std::forward_as_tuple(now, fidl::Clone(m),
                                         &from_node->second, &to_node->second))
               .first;
      from_node->second.outgoing_links.PushBack(&it->second);
      to_node->second.incoming_links.PushBack(&it->second);
      OVERNET_TRACE(DEBUG) << "NEWLINK: " << m;
    } else if (m.version > it->second.status.version) {
      new_gossip_version = true;
      it->second.status = fidl::Clone(m);
      it->second.last_updated = now;
      OVERNET_TRACE(DEBUG) << "UPDATELINK: " << m;
    } else {
      report_drop("old version");
      continue;  // Skip keep-alive.
    }

    // Keep-alive the nodes.
    from_node->second.last_updated = now;
    to_node->second.last_updated = now;
  }

  // Remove anything old if we've been asked to.
  if (flush) {
    for (auto it = nodes_.begin(); it != nodes_.end();) {
      if (it->first != root_node_ &&
          it->second.last_updated >= now + EntryExpiry()) {
        RemoveOutgoingLinks(&it->second);
        RemoveIncomingLinks(&it->second);
        it = nodes_.erase(it);
      } else {
        ++it;
      }
    }
  }

  // Publish out to the application for propagation.
  if (new_gossip_version) {
    std::vector<fuchsia::overnet::protocol::NodeStatus> publish_node_status;
    std::vector<fuchsia::overnet::protocol::LinkStatus> publish_link_status;
    for (const auto& np : nodes_) {
      publish_node_status.push_back(fidl::Clone(np.second.status));
    }
    for (const auto& lp : links_) {
      publish_link_status.push_back(fidl::Clone(lp.second.status));
    }
    std::vector<Callback<void>> notify_callbacks;
    std::lock_guard<std::mutex> mutex(shared_table_mu_);
    gossip_version_++;
    shared_node_status_.swap(publish_node_status);
    shared_link_status_.swap(publish_link_status);
    // Places complete list of notification callbacks into notify_callbacks,
    // which will be destroyed after mutex is released, forcing all callbacks to
    // be called.
    on_node_table_update_.swap(notify_callbacks);
  }
}

void RoutingTable::RemoveOutgoingLinks(Node* node) {
  while (Link* link = node->outgoing_links.PopFront()) {
    link->to_node->incoming_links.Remove(link);
    links_.erase(FullLinkLabel{NodeId(link->status.from),
                               NodeId(link->status.to), link->status.local_id});
  }
}

void RoutingTable::RemoveIncomingLinks(Node* node) {
  while (Link* link = node->incoming_links.PopFront()) {
    link->from_node->outgoing_links.Remove(link);
    links_.erase(FullLinkLabel{NodeId(link->status.from),
                               NodeId(link->status.to), link->status.local_id});
  }
}

RoutingTable::SelectedLinks RoutingTable::BuildForwardingTable() {
  OVERNET_TRACE(DEBUG) << "Rebuilding forwarding table";
  auto node_it = nodes_.find(root_node_);
  if (node_it == nodes_.end()) {
    OVERNET_TRACE(DEBUG) << "No root known";
    return SelectedLinks();  // Root node as yet unknown.
  }

  for (const auto& n : nodes_) {
    OVERNET_TRACE(DEBUG) << n.first << " metrics\n" << n.second.status.metrics;
  }
  for (const auto& l : links_) {
    OVERNET_TRACE(DEBUG) << l.first << " metrics\n" << l.second.status.metrics;
  }

  ++path_finding_run_;
  node_it->second.last_path_finding_run = path_finding_run_;
  node_it->second.best_rtt = TimeDelta::Zero();
  node_it->second.mss = std::numeric_limits<uint32_t>::max();
  InternalList<Node, &Node::path_finding_node> todo;

  auto enqueue = [&todo](Node* node) {
    if (node->queued)
      return;
    node->queued = true;
    todo.PushBack(node);
  };

  auto node_id_of = [this](const Node* n) {
    for (const auto& np : nodes_) {
      if (&np.second == n) {
        return np.first;
      }
    }
    return NodeId(0);
  };

  enqueue(&node_it->second);

  while (!todo.Empty()) {
    Node* src = todo.PopFront();
    src->queued = false;
    for (auto link : src->outgoing_links) {
      if (link->status.version ==
          fuchsia::overnet::protocol::METRIC_VERSION_TOMBSTONE)
        continue;
      TimeDelta rtt = std::min(
          TimeDelta::FromHours(1),
          src->best_rtt +
              (src->status.metrics.has_forwarding_time()
                   ? TimeDelta::FromMicroseconds(
                         src->status.metrics.forwarding_time())
                   : TimeDelta::PositiveInf()) +
              (link->status.metrics.has_rtt()
                   ? TimeDelta::FromMicroseconds(link->status.metrics.rtt())
                   : TimeDelta::PositiveInf()));
      Node* dst = link->to_node;
      // For now we order by RTT.
      OVERNET_TRACE(DEBUG) << "RB[" << root_node_
                           << "]: src=" << node_id_of(src)
                           << " dst=" << node_id_of(dst)
                           << " dst->last_path_finding_run="
                           << dst->last_path_finding_run
                           << " path_finding_run=" << path_finding_run_
                           << " src->best_rtt=" << src->best_rtt
                           << " dst->best_rtt=" << dst->best_rtt
                           << " rtt=" << rtt << " src->mss=" << src->mss
                           << " link->mss="
                           << (link->status.metrics.has_mss()
                                   ? link->status.metrics.mss()
                                   : std::numeric_limits<uint32_t>::max());
      if (dst->last_path_finding_run != path_finding_run_ ||
          dst->best_rtt > rtt) {
        dst->last_path_finding_run = path_finding_run_;
        dst->best_rtt = rtt;
        dst->best_from = src;
        dst->best_link = link;
        dst->mss =
            std::min(src->mss, link->status.metrics.has_mss()
                                   ? link->status.metrics.mss()
                                   : std::numeric_limits<uint32_t>::max());
        enqueue(dst);
      }
    }
  }

  SelectedLinks selected_links;

  for (node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
    if (node_it->first == root_node_) {
      continue;
    }
    if (node_it->second.last_path_finding_run != path_finding_run_) {
      OVERNET_TRACE(DEBUG) << "RB[" << root_node_ << "]: " << node_it->first
                           << " unreachable";
      continue;  // Unreachable
    }
    Node* n = &node_it->second;
    while (n->best_from->status.id != root_node_) {
      n = n->best_from;
    }
    Link* link = n->best_link;
    assert(link->status.from == root_node_);
    selected_links.emplace(node_it->first,
                           SelectedLink{link->status.to, link->status.local_id,
                                        node_it->second.mss});
  }

  return selected_links;
}

uint64_t RoutingTable::SendUpdate(fuchsia::overnet::protocol::Peer_Proxy* peer,
                                  Optional<NodeId> exclude_node) const {
  OVERNET_TRACE(DEBUG) << root_node_ << " SendUpdate exclude=" << exclude_node;

  std::lock_guard<std::mutex> mutex(shared_table_mu_);
  std::unordered_set<NodeId> version_zero_nodes;

  for (const auto& m : shared_node_status_) {
    OVERNET_TRACE(DEBUG) << "Consider node: " << m;
    if (m.version == 0) {
      version_zero_nodes.insert(NodeId(m.id));
      continue;
    }
    if (NodeId(m.id) == exclude_node) {
      continue;
    }
    OVERNET_TRACE(DEBUG) << "Send: " << m;
    peer->UpdateNodeStatus(fidl::Clone(m));
  }

  for (const auto& m : shared_link_status_) {
    OVERNET_TRACE(DEBUG) << "Consider link: " << m;
    if (NodeId(m.from) == exclude_node ||
        version_zero_nodes.count(NodeId(m.from)) > 0 ||
        version_zero_nodes.count(NodeId(m.to)) > 0) {
      continue;
    }
    OVERNET_TRACE(DEBUG) << "Send: " << m;
    peer->UpdateLinkStatus(fidl::Clone(m));
  }

  return gossip_version_;
}

}  // namespace overnet
