[scenic][ui-input] Create fuchsia.ui.pointer::TouchSource as a GestureContender

Creates the TouchSource class which implements both
fuchsia::ui::pointer::TouchSource and GestureContender protocols.

Most functionality complete. Remaining work for follow-up CLs.

Does not yet wire up to anything.

Bug: 64379
Test: fx test input_unittests -- --gtest_filter=*TouchSource*

Change-Id: Ic8b04f8aceccf877f15b821217952bf126560eb5
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/499381
Commit-Queue: Mikael Pessa <mikaelpessa@google.com>
Reviewed-by: Alice Neels <neelsa@google.com>
diff --git a/src/ui/scenic/lib/input/BUILD.gn b/src/ui/scenic/lib/input/BUILD.gn
index 31a0b95..9f41c65 100644
--- a/src/ui/scenic/lib/input/BUILD.gn
+++ b/src/ui/scenic/lib/input/BUILD.gn
@@ -10,6 +10,7 @@
     "constants.h",
     "gesture_arena.cc",
     "gesture_arena.h",
+    "gesture_contender.cc",
     "gesture_contender.h",
     "gfx_legacy_contender.cc",
     "gfx_legacy_contender.h",
@@ -22,6 +23,8 @@
     "input_system.cc",
     "input_system.h",
     "internal_pointer_event.h",
+    "touch_source.cc",
+    "touch_source.h",
   ]
 
   deps = [
@@ -43,5 +46,8 @@
     "//zircon/system/ulib/trace",
   ]
 
-  public_deps = [ "//sdk/fidl/fuchsia.ui.pointerinjector" ]
+  public_deps = [
+    "//sdk/fidl/fuchsia.ui.pointer",
+    "//sdk/fidl/fuchsia.ui.pointerinjector",
+  ]
 }
diff --git a/src/ui/scenic/lib/input/gesture_arena.h b/src/ui/scenic/lib/input/gesture_arena.h
index 7789f3d..71dee04 100644
--- a/src/ui/scenic/lib/input/gesture_arena.h
+++ b/src/ui/scenic/lib/input/gesture_arena.h
@@ -16,9 +16,6 @@
 
 namespace scenic_impl::input {
 
-using ContenderId = uint64_t;
-static constexpr ContenderId kInvalidContenderId = 0;
-
 struct ContestResults {
   std::optional<ContenderId> winner;
   std::vector<ContenderId> losers;
diff --git a/src/ui/scenic/lib/input/gesture_contender.cc b/src/ui/scenic/lib/input/gesture_contender.cc
new file mode 100644
index 0000000..874eaae
--- /dev/null
+++ b/src/ui/scenic/lib/input/gesture_contender.cc
@@ -0,0 +1,14 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ui/scenic/lib/input/gesture_contender.h"
+
+namespace scenic_impl::input {
+
+StreamId NewStreamId() {
+  static StreamId next_id = 1;
+  return next_id++;
+}
+
+}  // namespace scenic_impl::input
diff --git a/src/ui/scenic/lib/input/gesture_contender.h b/src/ui/scenic/lib/input/gesture_contender.h
index 09c3da0..bd8fe8c 100644
--- a/src/ui/scenic/lib/input/gesture_contender.h
+++ b/src/ui/scenic/lib/input/gesture_contender.h
@@ -25,7 +25,13 @@
   kUndefined = 9
 };
 
-using StreamId = uint64_t;
+using ContenderId = uint32_t;
+constexpr ContenderId kInvalidContenderId = 0;
+
+// TODO(fxbug.dev/fxbug.dev/73600): Rename all instances of "stream" to "interaction".
+using StreamId = uint32_t;
+constexpr StreamId kInvalidStreamId = 0;
+StreamId NewStreamId();
 
 // Interface for a gesture disambiguation contender. All methods are called in response to
 // a GestureArena.
diff --git a/src/ui/scenic/lib/input/injector.cc b/src/ui/scenic/lib/input/injector.cc
index 94e4cd8..cb7dca39 100644
--- a/src/ui/scenic/lib/input/injector.cc
+++ b/src/ui/scenic/lib/input/injector.cc
@@ -62,11 +62,6 @@
   }
 }
 
-StreamId NewStreamId() {
-  static StreamId next_id = 1;
-  return next_id++;
-}
-
 namespace {
 
 InternalPointerEvent CreateCancelEvent(uint32_t device_id, uint32_t pointer_id, zx_koid_t context,
diff --git a/src/ui/scenic/lib/input/injector.h b/src/ui/scenic/lib/input/injector.h
index 290de7b..2268338 100644
--- a/src/ui/scenic/lib/input/injector.h
+++ b/src/ui/scenic/lib/input/injector.h
@@ -11,16 +11,13 @@
 
 #include <unordered_map>
 
+#include "src/ui/scenic/lib/input/gesture_contender.h"
 #include "src/ui/scenic/lib/input/helper.h"
 #include "src/ui/scenic/lib/input/internal_pointer_event.h"
 
 namespace scenic_impl {
 namespace input {
 
-using StreamId = uint64_t;
-constexpr StreamId kInvalidStreamId = 0;
-StreamId NewStreamId();
-
 // Non-FIDL-type struct for keeping client defined settings.
 struct InjectorSettings {
   fuchsia::ui::pointerinjector::DispatchPolicy dispatch_policy =
diff --git a/src/ui/scenic/lib/input/internal_pointer_event.h b/src/ui/scenic/lib/input/internal_pointer_event.h
index bc85792..15d3b07 100644
--- a/src/ui/scenic/lib/input/internal_pointer_event.h
+++ b/src/ui/scenic/lib/input/internal_pointer_event.h
@@ -29,6 +29,11 @@
     min = {extents[0][0], extents[0][1]};
     max = {extents[1][0], extents[1][1]};
   }
+
+  // Used to check for exact equality in TouchSource
+  inline bool operator==(const Extents& other) const {
+    return min == other.min && max == other.max;
+  }
 };
 
 // Viewport defines an arbitrary rectangle in the space of the injector context.
@@ -39,6 +44,13 @@
   Extents extents;
   // A transform defining the Viewport in relation to a context (a View).
   glm::mat4 context_from_viewport_transform = glm::mat4(1.f);
+
+  // Used to check for exact equality in TouchSource
+  inline bool operator==(const Viewport& other) const {
+    return extents == other.extents &&
+           context_from_viewport_transform == other.context_from_viewport_transform;
+  }
+  inline bool operator!=(const Viewport& other) const { return !(*this == other); }
 };
 
 // Pointer event representation to be used internally, uncoupled from FIDL types.
diff --git a/src/ui/scenic/lib/input/tests/BUILD.gn b/src/ui/scenic/lib/input/tests/BUILD.gn
index 29ef77e..258ceb7 100644
--- a/src/ui/scenic/lib/input/tests/BUILD.gn
+++ b/src/ui/scenic/lib/input/tests/BUILD.gn
@@ -27,6 +27,7 @@
     "pointer_capture_test.cc",
     "pointer_event_conversion_test.cc",
     "startup_tests.cc",
+    "touch_source_test.cc",
     "view_tree_input_integration_test.cc",
   ]
   deps = [
diff --git a/src/ui/scenic/lib/input/tests/touch_source_test.cc b/src/ui/scenic/lib/input/tests/touch_source_test.cc
new file mode 100644
index 0000000..099d6c4
--- /dev/null
+++ b/src/ui/scenic/lib/input/tests/touch_source_test.cc
@@ -0,0 +1,646 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ui/scenic/lib/input/touch_source.h"
+
+#include <lib/async-testing/test_loop.h>
+#include <lib/syslog/cpp/macros.h>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "lib/gtest/test_loop_fixture.h"
+
+namespace lib_ui_input_tests {
+namespace {
+
+using fup_EventPhase = fuchsia::ui::pointer::EventPhase;
+using fuchsia::ui::pointer::TouchResponseType;
+using scenic_impl::input::ContenderId;
+using scenic_impl::input::Extents;
+using scenic_impl::input::GestureResponse;
+using scenic_impl::input::InternalPointerEvent;
+using scenic_impl::input::Phase;
+using scenic_impl::input::StreamId;
+using scenic_impl::input::TouchSource;
+using scenic_impl::input::Viewport;
+
+constexpr StreamId kStreamId = 1;
+constexpr uint32_t kDeviceId = 2;
+constexpr uint32_t kPointerId = 3;
+
+namespace {
+
+fuchsia::ui::pointer::TouchResponse CreateResponse(TouchResponseType response_type) {
+  fuchsia::ui::pointer::TouchResponse response;
+  response.set_response_type(response_type);
+  return response;
+}
+
+void ExpectEqual(const fuchsia::ui::pointer::ViewParameters& view_parameters,
+                 const Viewport& viewport) {
+  EXPECT_THAT(view_parameters.viewport.min,
+              testing::ElementsAre(viewport.extents.min[0], viewport.extents.min[1]));
+  EXPECT_THAT(view_parameters.viewport.max,
+              testing::ElementsAre(viewport.extents.max[0], viewport.extents.max[1]));
+
+  const auto& mat = viewport.context_from_viewport_transform;
+  EXPECT_THAT(view_parameters.viewport_to_view_transform,
+              testing::ElementsAre(mat[0][0], mat[0][1], mat[0][2], mat[1][0], mat[1][1], mat[1][2],
+                                   mat[2][0], mat[2][1], mat[2][2]));
+}
+
+}  // namespace
+
+class TouchSourceTest : public gtest::TestLoopFixture {
+ protected:
+  void SetUp() override {
+    client_ptr_.set_error_handler([this](auto) { channel_closed_ = true; });
+
+    touch_event_source_.emplace(
+        client_ptr_.NewRequest(),
+        /*respond*/
+        [this](StreamId stream_id, const std::vector<GestureResponse>& responses) {
+          std::copy(responses.begin(), responses.end(),
+                    std::back_inserter(received_responses_[stream_id]));
+        },
+        /*error_handler*/ [this] { internal_error_handler_fired_ = true; });
+  }
+
+  bool internal_error_handler_fired_ = false;
+  bool channel_closed_ = false;
+  std::unordered_map<StreamId, std::vector<GestureResponse>> received_responses_;
+
+  fuchsia::ui::pointer::TouchSourcePtr client_ptr_;
+  std::optional<TouchSource> touch_event_source_;
+};
+
+TEST_F(TouchSourceTest, Watch_WithNoPendingMessages_ShouldNeverReturn) {
+  bool callback_triggered = false;
+  client_ptr_->Watch({}, [&callback_triggered](auto) { callback_triggered = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_TRUE(received_responses_.empty());
+  EXPECT_FALSE(channel_closed_);
+  EXPECT_FALSE(callback_triggered);
+}
+
+TEST_F(TouchSourceTest, ErrorHandler_ShouldFire_OnClientDisconnect) {
+  EXPECT_FALSE(internal_error_handler_fired_);
+  client_ptr_.Unbind();
+  RunLoopUntilIdle();
+  EXPECT_TRUE(internal_error_handler_fired_);
+}
+
+TEST_F(TouchSourceTest, NonEmptyResponse_ForInitialWatch_ShouldCloseChannel) {
+  bool callback_triggered = false;
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  client_ptr_->Watch(std::move(responses),
+                     [&callback_triggered](auto) { callback_triggered = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_TRUE(received_responses_.empty());
+  EXPECT_TRUE(channel_closed_);
+  EXPECT_FALSE(callback_triggered);
+}
+
+TEST_F(TouchSourceTest, EmptyResponse_ForPointerEvent_ShouldCloseChannel) {
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  client_ptr_->Watch({}, [](auto events) { EXPECT_EQ(events.size(), 1u); });
+  RunLoopUntilIdle();
+
+  // Respond with an empty response table.
+  bool callback_triggered = false;
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.push_back({});  // Empty response.
+  client_ptr_->Watch(std::move(responses),
+                     [&callback_triggered](auto) { callback_triggered = true; });
+  RunLoopUntilIdle();
+  EXPECT_TRUE(received_responses_.empty());
+  EXPECT_FALSE(callback_triggered);
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, NonEmptyResponse_ForNonPointerEvent_ShouldCloseChannel) {
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  // This event expects an empty response table.
+  touch_event_source_->EndContest(kStreamId, /*awarded_win*/ true);
+  client_ptr_->Watch({}, [](auto events) { EXPECT_EQ(events.size(), 2u); });
+  RunLoopUntilIdle();
+
+  bool callback_triggered = false;
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));  // Expected to be empty.
+  client_ptr_->Watch(std::move(responses),
+                     [&callback_triggered](auto) { callback_triggered = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_TRUE(received_responses_.empty());
+  EXPECT_FALSE(callback_triggered);
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, Watch_BeforeEvents_ShouldReturnOnFirstEvent) {
+  uint64_t num_events = 0;
+  client_ptr_->Watch({}, [&num_events](auto events) { num_events += events.size(); });
+
+  RunLoopUntilIdle();
+  EXPECT_TRUE(received_responses_.empty());
+  EXPECT_FALSE(channel_closed_);
+  EXPECT_EQ(num_events, 0u);
+
+  // Sending fidl message on first event, so expect the second one not to arrive.
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                    /*is_end_of_stream*/ false);
+
+  RunLoopUntilIdle();
+  EXPECT_TRUE(received_responses_.empty());
+  EXPECT_FALSE(channel_closed_);
+  EXPECT_EQ(num_events, 1u);
+
+  // Second event should arrive on next Watch() call.
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  client_ptr_->Watch(std::move(responses),
+                     [&num_events](auto events) { num_events += events.size(); });
+  RunLoopUntilIdle();
+  EXPECT_EQ(received_responses_.size(), 1u);
+  EXPECT_FALSE(channel_closed_);
+  EXPECT_EQ(num_events, 2u);
+}
+
+TEST_F(TouchSourceTest, Watch_ShouldAtMostReturn_TOUCH_MAX_EVENT_Events_PerCall) {
+  // Sending fidl message on first event, so expect the second one not to arrive.
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  for (size_t i = 0; i < fuchsia::ui::pointer::TOUCH_MAX_EVENT + 3; ++i) {
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                      /*is_end_of_stream*/ false);
+  }
+
+  client_ptr_->Watch(
+      {}, [](auto events) { ASSERT_EQ(events.size(), fuchsia::ui::pointer::TOUCH_MAX_EVENT); });
+  RunLoopUntilIdle();
+
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  for (size_t i = 0; i < fuchsia::ui::pointer::TOUCH_MAX_EVENT; ++i) {
+    responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  }
+
+  // The 4 events remaining in the queue should be delivered with the next Watch() call.
+  client_ptr_->Watch(std::move(responses), [](auto events) { EXPECT_EQ(events.size(), 4u); });
+  RunLoopUntilIdle();
+}
+
+TEST_F(TouchSourceTest, Watch_ResponseBeforeEvent_ShouldCloseChannel) {
+  // Initial call to Watch() should be empty since we can't respond to any events yet.
+  bool callback_triggered = false;
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  client_ptr_->Watch(std::move(responses),
+                     [&callback_triggered](auto) { callback_triggered = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_FALSE(callback_triggered);
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, Watch_MoreResponsesThanEvents_ShouldCloseChannel) {
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  client_ptr_->Watch({}, [](auto events) { EXPECT_EQ(events.size(), 1u); });
+  RunLoopUntilIdle();
+  EXPECT_FALSE(channel_closed_);
+
+  // Expecting one response. Send two.
+  bool callback_fired = false;
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  client_ptr_->Watch(std::move(responses), [&callback_fired](auto) { callback_fired = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_FALSE(callback_fired);
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, Watch_FewerResponsesThanEvents_ShouldCloseChannel) {
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                    /*is_end_of_stream*/ false);
+  client_ptr_->Watch({}, [](auto events) { EXPECT_EQ(events.size(), 2u); });
+  RunLoopUntilIdle();
+  EXPECT_FALSE(channel_closed_);
+
+  // Expecting two responses. Send one.
+  bool callback_fired = false;
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+  client_ptr_->Watch(std::move(responses), [&callback_fired](auto) { callback_fired = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_FALSE(callback_fired);
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, Watch_CallingTwiceWithoutWaiting_ShouldCloseChannel) {
+  client_ptr_->Watch({}, [](auto) { EXPECT_FALSE(true); });
+  client_ptr_->Watch({}, [](auto) { EXPECT_FALSE(true); });
+  RunLoopUntilIdle();
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, MissingArgument_ShouldCloseChannel) {
+  uint64_t num_events = 0;
+  client_ptr_->Watch({}, [&num_events](auto events) { num_events += events.size(); });
+  RunLoopUntilIdle();
+  EXPECT_EQ(num_events, 0u);
+  EXPECT_FALSE(channel_closed_);
+
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd}, /*is_end_of_stream*/ false);
+  RunLoopUntilIdle();
+  EXPECT_EQ(num_events, 1u);
+  EXPECT_FALSE(channel_closed_);
+
+  std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+  responses.emplace_back();  // Empty response for pointer event should close channel.
+  client_ptr_->Watch(std::move(responses),
+                     [&num_events](auto events) { num_events += events.size(); });
+
+  RunLoopUntilIdle();
+  EXPECT_EQ(num_events, 1u);
+  EXPECT_TRUE(channel_closed_);
+}
+
+TEST_F(TouchSourceTest, UpdateResponse) {
+  {  // Complete a stream and respond HOLD to it.
+    client_ptr_->Watch({}, [](auto) {});
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd},
+                                      /*is_end_of_stream*/ false);
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove},
+                                      /*is_end_of_stream*/ true);
+    RunLoopUntilIdle();
+
+    std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    client_ptr_->Watch(std::move(responses), [](auto) {});
+    RunLoopUntilIdle();
+  }
+
+  {
+    bool callback_triggered = false;
+    client_ptr_->UpdateResponse(
+        fuchsia::ui::pointer::TouchInteractionId{
+            .device_id = kDeviceId,
+            .pointer_id = kPointerId,
+            .interaction_id = kStreamId,
+        },
+        CreateResponse(TouchResponseType::YES),
+        [&callback_triggered] { callback_triggered = true; });
+    RunLoopUntilIdle();
+    EXPECT_TRUE(callback_triggered);
+    EXPECT_FALSE(channel_closed_);
+  }
+}
+
+TEST_F(TouchSourceTest, UpdateResponse_UnknownStreamId_ShouldCloseChannel) {
+  bool callback_triggered = false;
+  client_ptr_->UpdateResponse(
+      fuchsia::ui::pointer::TouchInteractionId{
+          .device_id = 1,
+          .pointer_id = 1,
+          .interaction_id = 12153,  // Unknown stream id.
+      },
+      CreateResponse(TouchResponseType::YES), [&callback_triggered] { callback_triggered = true; });
+
+  RunLoopUntilIdle();
+  EXPECT_FALSE(callback_triggered);
+  EXPECT_TRUE(channel_closed_);
+  EXPECT_TRUE(received_responses_.empty());
+}
+
+TEST_F(TouchSourceTest, UpdateResponse_BeforeStreamEnd_ShouldCloseChannel) {
+  {  // Start a stream and respond to it.
+    bool callback_triggered = false;
+    client_ptr_->Watch({}, [&callback_triggered](auto) { callback_triggered = true; });
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd},
+                                      /*is_end_of_stream*/ false);
+    RunLoopUntilIdle();
+    EXPECT_TRUE(callback_triggered);
+
+    std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    client_ptr_->Watch(std::move(responses), [](auto) {});
+    RunLoopUntilIdle();
+  }
+
+  {  // Try to reject the stream despite it not having ended.
+    bool callback_triggered = false;
+    client_ptr_->UpdateResponse(
+        fuchsia::ui::pointer::TouchInteractionId{
+            .device_id = 1,
+            .pointer_id = 1,
+            .interaction_id = kStreamId,
+        },
+        CreateResponse(TouchResponseType::YES),
+        [&callback_triggered] { callback_triggered = true; });
+    RunLoopUntilIdle();
+    EXPECT_FALSE(callback_triggered);
+    EXPECT_TRUE(channel_closed_);
+  }
+}
+
+TEST_F(TouchSourceTest, UpdateResponse_WhenLastResponseWasntHOLD_ShouldCloseChannel) {
+  {  // Start a stream and respond to it.
+    bool callback_triggered = false;
+    client_ptr_->Watch({}, [&callback_triggered](auto) { callback_triggered = true; });
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd},
+                                      /*is_end_of_stream*/ false);
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove},
+                                      /*is_end_of_stream*/ true);
+    RunLoopUntilIdle();
+    EXPECT_TRUE(callback_triggered);
+
+    bool callback2_triggered = false;
+    std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+    // Respond with something other than HOLD.
+    responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+    responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+    client_ptr_->Watch(std::move(responses), [](auto) {});
+    RunLoopUntilIdle();
+  }
+
+  {
+    bool callback_triggered = false;
+    client_ptr_->UpdateResponse(
+        fuchsia::ui::pointer::TouchInteractionId{
+            .device_id = 1,
+            .pointer_id = 1,
+            .interaction_id = kStreamId,
+        },
+        CreateResponse(TouchResponseType::YES),
+        [&callback_triggered] { callback_triggered = true; });
+    RunLoopUntilIdle();
+    EXPECT_FALSE(callback_triggered);
+    EXPECT_TRUE(channel_closed_);
+  }
+}
+
+TEST_F(TouchSourceTest, UpdateResponse_WithHOLD_ShouldCloseChannel) {
+  {  // Start a stream and respond to it.
+    bool callback_triggered = false;
+    client_ptr_->Watch({}, [&callback_triggered](auto) { callback_triggered = true; });
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd},
+                                      /*is_end_of_stream*/ false);
+    touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove},
+                                      /*is_end_of_stream*/ true);
+    RunLoopUntilIdle();
+    EXPECT_TRUE(callback_triggered);
+
+    std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    client_ptr_->Watch(std::move(responses), [](auto) {});
+    RunLoopUntilIdle();
+  }
+
+  {  // Try to update the stream with a HOLD response.
+    bool callback_triggered = false;
+    client_ptr_->UpdateResponse(
+        fuchsia::ui::pointer::TouchInteractionId{
+            .device_id = 1,
+            .pointer_id = 1,
+            .interaction_id = kStreamId,
+        },
+        CreateResponse(TouchResponseType::HOLD),
+        [&callback_triggered] { callback_triggered = true; });
+    RunLoopUntilIdle();
+    EXPECT_FALSE(callback_triggered);
+    EXPECT_TRUE(channel_closed_);
+  }
+}
+
+TEST_F(TouchSourceTest, ViewportIsDeliveredCorrectly) {
+  Viewport viewport1;
+  viewport1.extents = std::array<std::array<float, 2>, 2>{{{0, 0}, {10, 10}}};
+  viewport1.context_from_viewport_transform = {
+      // clang-format off
+    1, 0, 0, 0,
+    0, 1, 0, 0,
+    0, 0, 1, 0,
+    0, 0, 0, 1
+      // clang-format on
+  };
+  Viewport viewport2;
+  viewport2.extents = std::array<std::array<float, 2>, 2>{{{-5, 1}, {100, 40}}};
+  viewport2.context_from_viewport_transform = {
+      // clang-format off
+    1, 2, 3, 0,
+    4, 5, 6, 0,
+    7, 8, 9, 0,
+    0, 0, 0, 1
+      // clang-format on
+  };
+
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kAdd, .viewport = viewport1},
+                                    /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange, .viewport = viewport1},
+                                    /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove, .viewport = viewport2},
+                                    /*is_end_of_stream*/ true);
+
+  client_ptr_->Watch({}, [&](auto events) {
+    ASSERT_EQ(events.size(), 3u);
+    EXPECT_TRUE(events[0].has_view_parameters());
+    EXPECT_TRUE(events[0].has_pointer_sample());
+
+    EXPECT_FALSE(events[1].has_view_parameters());
+    EXPECT_TRUE(events[1].has_pointer_sample());
+
+    EXPECT_TRUE(events[2].has_view_parameters());
+    EXPECT_TRUE(events[2].has_pointer_sample());
+
+    ExpectEqual(events[0].view_parameters(), viewport1);
+    // ExpectEqual(events[2].view_parameters(), viewport2);
+  });
+
+  RunLoopUntilIdle();
+}
+
+// Sends a full stream and observes that GestureResponses are as expected.
+TEST_F(TouchSourceTest, NormalStream) {
+  touch_event_source_->UpdateStream(
+      kStreamId, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                    /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                    /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove},
+                                    /*is_end_of_stream*/ true);
+
+  EXPECT_TRUE(received_responses_.empty());
+
+  client_ptr_->Watch({}, [&](auto events) {
+    ASSERT_EQ(events.size(), 4u);
+    EXPECT_EQ(events[0].pointer_sample().phase(), fup_EventPhase::ADD);
+    EXPECT_EQ(events[1].pointer_sample().phase(), fup_EventPhase::CHANGE);
+    EXPECT_EQ(events[2].pointer_sample().phase(), fup_EventPhase::CHANGE);
+    EXPECT_EQ(events[3].pointer_sample().phase(), fup_EventPhase::REMOVE);
+
+    EXPECT_TRUE(events[0].has_timestamp());
+    EXPECT_TRUE(events[1].has_timestamp());
+    EXPECT_TRUE(events[2].has_timestamp());
+    EXPECT_TRUE(events[3].has_timestamp());
+
+    std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+    responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    responses.emplace_back(CreateResponse(TouchResponseType::YES));
+
+    client_ptr_->Watch({std::move(responses)}, [](auto events) {
+      // These will be checked after EndContest() below, when the callback runs.
+      EXPECT_EQ(events.size(), 1u);
+      EXPECT_FALSE(events.at(0).has_pointer_sample());
+      EXPECT_TRUE(events.at(0).has_timestamp());
+      ASSERT_TRUE(events.at(0).has_interaction_result());
+
+      const auto& interaction_result = events.at(0).interaction_result();
+      EXPECT_EQ(interaction_result.interaction.interaction_id, kStreamId);
+      EXPECT_EQ(interaction_result.interaction.device_id, kDeviceId);
+      EXPECT_EQ(interaction_result.interaction.pointer_id, kPointerId);
+      EXPECT_EQ(interaction_result.status, fuchsia::ui::pointer::TouchInteractionStatus::GRANTED);
+    });
+  });
+
+  RunLoopUntilIdle();
+  EXPECT_EQ(received_responses_.size(), 1u);
+  EXPECT_THAT(received_responses_[kStreamId],
+              testing::ElementsAre(GestureResponse::kMaybe, GestureResponse::kHold,
+                                   GestureResponse::kHold, GestureResponse::kYes));
+
+  // Check winning conditions.
+  touch_event_source_->EndContest(kStreamId, /*awarded_win*/ true);
+  RunLoopUntilIdle();
+}
+
+// Sends a full legacy interaction (including UP and DOWN events) and observes that GestureResponses
+// are included for the extra events not seen by clients. Each filtered event should duplicate the
+// response of the previous event.
+TEST_F(TouchSourceTest, LegacyInteraction) {
+  touch_event_source_->UpdateStream(
+      kStreamId, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kDown}, /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                    /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kChange},
+                                    /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kUp}, /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove},
+                                    /*is_end_of_stream*/ true);
+
+  EXPECT_TRUE(received_responses_.empty());
+
+  client_ptr_->Watch({}, [&](auto events) {
+    ASSERT_EQ(events.size(), 4u);
+    EXPECT_EQ(events[0].pointer_sample().phase(), fup_EventPhase::ADD);
+    EXPECT_EQ(events[1].pointer_sample().phase(), fup_EventPhase::CHANGE);
+    EXPECT_EQ(events[2].pointer_sample().phase(), fup_EventPhase::CHANGE);
+    EXPECT_EQ(events[3].pointer_sample().phase(), fup_EventPhase::REMOVE);
+
+    std::vector<fuchsia::ui::pointer::TouchResponse> responses;
+    responses.emplace_back(CreateResponse(TouchResponseType::MAYBE));
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    responses.emplace_back(CreateResponse(TouchResponseType::HOLD));
+    responses.emplace_back(CreateResponse(TouchResponseType::YES));
+    client_ptr_->Watch({std::move(responses)}, [](auto events) {
+      // These will be checked after EndContest() below.
+      EXPECT_EQ(events.size(), 1u);
+      EXPECT_FALSE(events.at(0).has_pointer_sample());
+      EXPECT_TRUE(events.at(0).has_timestamp());
+      ASSERT_TRUE(events.at(0).has_interaction_result());
+
+      const auto& interaction_result = events.at(0).interaction_result();
+      EXPECT_EQ(interaction_result.interaction.interaction_id, kStreamId);
+      EXPECT_EQ(interaction_result.interaction.device_id, kDeviceId);
+      EXPECT_EQ(interaction_result.interaction.pointer_id, kPointerId);
+      EXPECT_EQ(interaction_result.status, fuchsia::ui::pointer::TouchInteractionStatus::GRANTED);
+    });
+  });
+
+  RunLoopUntilIdle();
+  EXPECT_EQ(received_responses_.size(), 1u);
+  EXPECT_THAT(
+      received_responses_.at(kStreamId),
+      testing::ElementsAre(GestureResponse::kMaybe, GestureResponse::kMaybe, GestureResponse::kHold,
+                           GestureResponse::kHold, GestureResponse::kHold, GestureResponse::kYes));
+
+  // Check losing conditions.
+  touch_event_source_->EndContest(kStreamId, /*awarded_win*/ true);
+  RunLoopUntilIdle();
+}
+
+TEST_F(TouchSourceTest, OnDestruction_ShouldExitOngoingContests) {
+  constexpr StreamId kStreamId2 = 2, kStreamId3 = 3, kStreamId4 = 4, kStreamId5 = 5, kStreamId6 = 6;
+
+  // Start a few streams.
+  touch_event_source_->UpdateStream(
+      kStreamId, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(
+      kStreamId2, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(
+      kStreamId3, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(
+      kStreamId4, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(
+      kStreamId5, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+  touch_event_source_->UpdateStream(
+      kStreamId6, {.device_id = kDeviceId, .pointer_id = kPointerId, .phase = Phase::kAdd},
+      /*is_end_of_stream*/ false);
+
+  // End streams 1-3.
+  touch_event_source_->UpdateStream(kStreamId, {.phase = Phase::kRemove},
+                                    /*is_end_of_stream*/ true);
+  touch_event_source_->UpdateStream(kStreamId2, {.phase = Phase::kRemove},
+                                    /*is_end_of_stream*/ true);
+  touch_event_source_->UpdateStream(kStreamId3, {.phase = Phase::kRemove},
+                                    /*is_end_of_stream*/ true);
+
+  // Award some wins and losses.
+  touch_event_source_->EndContest(kStreamId, /*awarded_win*/ true);
+  touch_event_source_->EndContest(kStreamId2, /*awarded_win*/ false);
+  touch_event_source_->EndContest(kStreamId4, /*awarded_win*/ true);
+  touch_event_source_->EndContest(kStreamId5, /*awarded_win*/ false);
+
+  // We now have streams in the following states:
+  // 1: Ended, Won
+  // 2: Ended, Lost
+  // 3: Ended, Undecided
+  // 4: Ongoing, Won
+  // 5: Ongoing, Lost
+  // 6: Ongoing, Undecided
+  //
+  // TouchSource should respond only to undecided streams on destruction.
+
+  EXPECT_TRUE(received_responses_.empty());
+
+  // Destroy the event source and observe proper cleanup.
+  touch_event_source_.reset();
+
+  EXPECT_EQ(received_responses_.size(), 2u);
+  EXPECT_THAT(received_responses_.at(kStreamId3), testing::ElementsAre(GestureResponse::kNo));
+  EXPECT_THAT(received_responses_.at(kStreamId6), testing::ElementsAre(GestureResponse::kNo));
+}
+
+}  // namespace
+}  // namespace lib_ui_input_tests
diff --git a/src/ui/scenic/lib/input/touch_source.cc b/src/ui/scenic/lib/input/touch_source.cc
new file mode 100644
index 0000000..e70b899
--- /dev/null
+++ b/src/ui/scenic/lib/input/touch_source.cc
@@ -0,0 +1,386 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ui/scenic/lib/input/touch_source.h"
+
+#include <lib/async/cpp/time.h>
+#include <lib/async/default.h>
+#include <lib/syslog/cpp/macros.h>
+#include <lib/trace/event.h>
+
+#include <unordered_map>
+
+#include "src/lib/fxl/macros.h"
+
+namespace scenic_impl::input {
+
+namespace {
+
+GestureResponse ConvertToGestureResponse(fuchsia::ui::pointer::TouchResponseType type) {
+  switch (type) {
+    case fuchsia::ui::pointer::TouchResponseType::YES:
+      return GestureResponse::kYes;
+    case fuchsia::ui::pointer::TouchResponseType::YES_PRIORITIZE:
+      return GestureResponse::kYesPrioritize;
+    case fuchsia::ui::pointer::TouchResponseType::NO:
+      return GestureResponse::kNo;
+    case fuchsia::ui::pointer::TouchResponseType::MAYBE:
+      return GestureResponse::kMaybe;
+    case fuchsia::ui::pointer::TouchResponseType::MAYBE_PRIORITIZE:
+      return GestureResponse::kMaybePrioritize;
+    case fuchsia::ui::pointer::TouchResponseType::MAYBE_SUPPRESS:
+      return GestureResponse::kMaybeSuppress;
+    case fuchsia::ui::pointer::TouchResponseType::MAYBE_PRIORITIZE_SUPPRESS:
+      return GestureResponse::kMaybePrioritizeSuppress;
+    case fuchsia::ui::pointer::TouchResponseType::HOLD:
+      return GestureResponse::kHold;
+    case fuchsia::ui::pointer::TouchResponseType::HOLD_SUPPRESS:
+      return GestureResponse::kHoldSuppress;
+    default:
+      return GestureResponse::kUndefined;
+  }
+}
+
+fuchsia::ui::pointer::EventPhase ConvertToEventPhase(Phase phase) {
+  switch (phase) {
+    case Phase::kAdd:
+      return fuchsia::ui::pointer::EventPhase::ADD;
+    case Phase::kChange:
+      return fuchsia::ui::pointer::EventPhase::CHANGE;
+    case Phase::kRemove:
+      return fuchsia::ui::pointer::EventPhase::REMOVE;
+    case Phase::kCancel:
+      return fuchsia::ui::pointer::EventPhase::CANCEL;
+    default:
+      // Never reached.
+      FX_CHECK(false) << "Unknown phase: " << phase;
+      return fuchsia::ui::pointer::EventPhase::CANCEL;
+  }
+}
+
+fuchsia::ui::pointer::TouchEvent NewTouchEvent(StreamId stream_id,
+                                               const InternalPointerEvent& event,
+                                               bool is_end_of_stream) {
+  fuchsia::ui::pointer::TouchEvent new_event;
+  new_event.set_timestamp(event.timestamp);
+  new_event.set_trace_flow_id(TRACE_NONCE());
+  {
+    fuchsia::ui::pointer::TouchPointerSample pointer;
+
+    pointer.set_phase(ConvertToEventPhase(event.phase));
+    pointer.set_position_in_viewport(
+        {event.position_in_viewport[0], event.position_in_viewport[1]});
+    pointer.set_interaction(fuchsia::ui::pointer::TouchInteractionId{
+        .device_id = event.device_id, .pointer_id = event.pointer_id, .interaction_id = stream_id});
+    new_event.set_pointer_sample(std::move(pointer));
+  }
+
+  return new_event;
+}
+
+fuchsia::ui::pointer::TouchEvent NewEndEvent(StreamId stream_id, uint32_t device_id,
+                                             uint32_t pointer_id, bool awarded_win) {
+  fuchsia::ui::pointer::TouchEvent new_event;
+  new_event.set_timestamp(async::Now(async_get_default_dispatcher()).get());
+  new_event.set_interaction_result(fuchsia::ui::pointer::TouchInteractionResult{
+      .interaction =
+          fuchsia::ui::pointer::TouchInteractionId{
+              .device_id = device_id, .pointer_id = pointer_id, .interaction_id = stream_id},
+      .status = awarded_win ? fuchsia::ui::pointer::TouchInteractionStatus::GRANTED
+                            : fuchsia::ui::pointer::TouchInteractionStatus::DENIED});
+  return new_event;
+}
+
+void AddViewParametersToEvent(fuchsia::ui::pointer::TouchEvent& event, const Viewport& viewport) {
+  event.set_view_parameters(
+      fuchsia::ui::pointer::ViewParameters{
+          .view =
+              fuchsia::ui::pointer::Rectangle{
+                  // TODO(fxbug.dev/73639): Add view bounds.
+              },
+          .viewport =
+              fuchsia::ui::pointer::Rectangle{
+                  .min = {{viewport.extents.min[0], viewport.extents.min[1]}},
+                  .max = {{viewport.extents.max[0], viewport.extents.max[1]}}},
+          .viewport_to_view_transform = {viewport.context_from_viewport_transform[0][0],
+                                         viewport.context_from_viewport_transform[0][1],
+                                         viewport.context_from_viewport_transform[0][2],
+                                         viewport.context_from_viewport_transform[1][0],
+                                         viewport.context_from_viewport_transform[1][1],
+                                         viewport.context_from_viewport_transform[1][2],
+                                         viewport.context_from_viewport_transform[2][0],
+                                         viewport.context_from_viewport_transform[2][1],
+                                         viewport.context_from_viewport_transform[2][2]}});
+}
+
+bool IsHold(GestureResponse response) {
+  switch (response) {
+    case GestureResponse::kHold:
+    case GestureResponse::kHoldSuppress:
+      return true;
+    default:
+      return false;
+  }
+}
+
+bool IsHold(fuchsia::ui::pointer::TouchResponseType response) {
+  switch (response) {
+    case fuchsia::ui::pointer::TouchResponseType::HOLD:
+    case fuchsia::ui::pointer::TouchResponseType::HOLD_SUPPRESS:
+      return true;
+    default:
+      return false;
+  }
+}
+
+}  // namespace
+
+TouchSource::TouchSource(fidl::InterfaceRequest<fuchsia::ui::pointer::TouchSource> event_provider,
+                         fit::function<void(StreamId, const std::vector<GestureResponse>&)> respond,
+                         fit::function<void()> error_handler)
+    : binding_(this, std::move(event_provider)),
+      respond_(std::move(respond)),
+      error_handler_(std::move(error_handler)) {
+  binding_.set_error_handler([this](zx_status_t) { error_handler_(); });
+}
+
+TouchSource::~TouchSource() {
+  // Cancel ongoing streams
+  for (const auto& [id, data] : ongoing_streams_) {
+    if (!data.was_won) {
+      respond_(id, {GestureResponse::kNo});
+    }
+  }
+}
+
+void TouchSource::UpdateStream(StreamId stream_id, const InternalPointerEvent& event,
+                               bool is_end_of_stream) {
+  const bool is_new_stream = ongoing_streams_.count(stream_id) == 0;
+  FX_CHECK(is_new_stream == (event.phase == Phase::kAdd)) << "Stream must only start with ADD.";
+  FX_CHECK(is_end_of_stream == (event.phase == Phase::kRemove || event.phase == Phase::kCancel));
+
+  if (is_new_stream) {
+    ongoing_streams_.try_emplace(
+        stream_id, StreamData{.device_id = event.device_id, .pointer_id = event.pointer_id});
+  }
+  auto& stream = ongoing_streams_.at(stream_id);
+
+  // Filter legacy events.
+  // TODO(fxbug.dev/53316): Remove when we no longer need to filter events.
+  ++stream.num_pointer_events;
+  if (event.phase == Phase::kDown || event.phase == Phase::kUp) {
+    FX_DCHECK(!is_end_of_stream);
+    FX_DCHECK(stream.num_pointer_events > 1);
+    stream.filtered_events.emplace(stream.num_pointer_events);
+    return;
+  }
+
+  auto out_event = NewTouchEvent(stream_id, event, is_end_of_stream);
+
+  if (is_new_stream) {
+    fuchsia::ui::pointer::TouchDeviceInfo device_info;
+    device_info.set_id(event.device_id);
+    out_event.set_device_info(std::move(device_info));
+  }
+
+  stream.stream_has_ended = is_end_of_stream;
+  const auto viewport = event.viewport;
+  if (current_viewport_ != viewport || is_first_event_) {
+    is_first_event_ = false;
+    current_viewport_ = viewport;
+    AddViewParametersToEvent(out_event, current_viewport_);
+  }
+
+  pending_events_.push({.stream_id = stream_id, .event = std::move(out_event)});
+  SendPendingIfWaiting();
+}
+
+void TouchSource::EndContest(StreamId stream_id, bool awarded_win) {
+  FX_DCHECK(ongoing_streams_.count(stream_id) != 0);
+  auto& stream = ongoing_streams_[stream_id];
+  stream.was_won = awarded_win;
+  pending_events_.push(
+      {.stream_id = stream_id,
+       .event = NewEndEvent(stream_id, stream.device_id, stream.pointer_id, awarded_win)});
+  SendPendingIfWaiting();
+
+  if (!awarded_win) {
+    ongoing_streams_.erase(stream_id);
+  }
+}
+
+zx_status_t TouchSource::ValidateResponses(
+    const std::vector<fuchsia::ui::pointer::TouchResponse>& responses,
+    const std::vector<ReturnTicket>& return_tickets, bool have_pending_callback) {
+  if (have_pending_callback) {
+    FX_LOGS(ERROR) << "TouchSource: Client called Watch twice without waiting for response.";
+    return ZX_ERR_BAD_STATE;
+  }
+
+  if (return_tickets.size() != responses.size()) {
+    FX_LOGS(ERROR)
+        << "TouchSource: Client called Watch with the wrong number of responses. Expected: "
+        << return_tickets.size() << " Received: " << responses.size();
+    return ZX_ERR_INVALID_ARGS;
+  }
+
+  for (size_t i = 0; i < responses.size(); ++i) {
+    const auto& response = responses.at(i);
+    if (!return_tickets.at(i).expects_response) {
+      if (!response.IsEmpty()) {
+        FX_LOGS(ERROR) << "TouchSource: Expected empty response, receive non-empty response";
+        return ZX_ERR_INVALID_ARGS;
+      }
+    } else {
+      if (!response.has_response_type()) {
+        FX_LOGS(ERROR) << "TouchSource: Response was missing arguments.";
+        return ZX_ERR_INVALID_ARGS;
+      }
+
+      if (ConvertToGestureResponse(response.response_type()) == GestureResponse::kUndefined) {
+        FX_LOGS(ERROR) << "TouchSource: Response " << i << " had unknown response type.";
+        return ZX_ERR_INVALID_ARGS;
+      }
+    }
+  }
+
+  return ZX_OK;
+}
+
+void TouchSource::Watch(std::vector<fuchsia::ui::pointer::TouchResponse> responses,
+                        WatchCallback callback) {
+  TRACE_DURATION("input", "TouchSource::Watch");
+  const zx_status_t error = ValidateResponses(
+      responses, return_tickets_, /*have_pending_callback*/ pending_callback_ != nullptr);
+  if (error != ZX_OK) {
+    CloseChannel(error);
+    return;
+  }
+
+  // De-interlace responses from different streams.
+  std::unordered_map<StreamId, std::vector<GestureResponse>> responses_per_stream;
+  size_t index = 0;
+  for (const auto& response : responses) {
+    if (response.has_trace_flow_id()) {
+      TRACE_FLOW_END("input", "received_response", response.trace_flow_id());
+    }
+
+    const auto [stream_id, expects_response] = return_tickets_.at(index++);
+    if (!expects_response) {
+      continue;
+    }
+
+    const GestureResponse gd_response = ConvertToGestureResponse(response.response_type());
+    responses_per_stream[stream_id].emplace_back(gd_response);
+    auto& stream = ongoing_streams_[stream_id];
+    stream.last_response = gd_response;
+
+    // TODO(fxbug.dev/53316): Remove when we no longer need to filter events.
+    // Duplicate the response for any subsequent filtered events.
+    ++stream.num_responses;
+    while (!stream.filtered_events.empty() &&
+           stream.num_responses == stream.filtered_events.front() - 1) {
+      ++stream.num_responses;
+      stream.filtered_events.pop();
+      responses_per_stream[stream_id].emplace_back(gd_response);
+    }
+  }
+
+  for (const auto& [stream_id, gd_responses] : responses_per_stream) {
+    respond_(stream_id, gd_responses);
+  }
+
+  pending_callback_ = std::move(callback);
+  return_tickets_.clear();
+  SendPendingIfWaiting();
+}
+
+zx_status_t TouchSource::ValidateUpdateResponse(
+    const fuchsia::ui::pointer::TouchInteractionId& stream_identifier,
+    const fuchsia::ui::pointer::TouchResponse& response,
+    const std::unordered_map<StreamId, StreamData>& ongoing_streams) {
+  const StreamId stream_id = stream_identifier.interaction_id;
+  if (ongoing_streams.count(stream_id) == 0) {
+    FX_LOGS(ERROR)
+        << "TouchSource: Attempted to UpdateResponse for unkown stream. Received stream id: "
+        << stream_id;
+    return ZX_ERR_BAD_STATE;
+  }
+
+  if (!response.has_response_type()) {
+    FX_LOGS(ERROR)
+        << "TouchSource: Can only UpdateResponse() called without response_type argument.";
+    return ZX_ERR_INVALID_ARGS;
+  }
+
+  if (IsHold(response.response_type())) {
+    FX_LOGS(ERROR) << "TouchSource: Can only UpdateResponse() with non-HOLD response.";
+    return ZX_ERR_INVALID_ARGS;
+  }
+
+  const auto& stream = ongoing_streams.at(stream_id);
+  if (!IsHold(stream.last_response)) {
+    FX_LOGS(ERROR) << "TouchSource: Can only UpdateResponse() if previous response was HOLD.";
+    return ZX_ERR_BAD_STATE;
+  }
+
+  if (!stream.stream_has_ended) {
+    FX_LOGS(ERROR) << "TouchSource: Can only UpdateResponse() for ended streams.";
+    return ZX_ERR_BAD_STATE;
+  }
+
+  return ZX_OK;
+}
+
+void TouchSource::UpdateResponse(fuchsia::ui::pointer::TouchInteractionId stream_identifier,
+                                 fuchsia::ui::pointer::TouchResponse response,
+                                 UpdateResponseCallback callback) {
+  TRACE_DURATION("input", "TouchSource::UpdateResponse");
+  const zx_status_t error = ValidateUpdateResponse(stream_identifier, response, ongoing_streams_);
+  if (error != ZX_OK) {
+    CloseChannel(error);
+    return;
+  }
+
+  if (response.has_trace_flow_id()) {
+    TRACE_FLOW_END("input", "received_response", response.trace_flow_id());
+  }
+
+  const StreamId stream_id = stream_identifier.interaction_id;
+  const GestureResponse converted_response = ConvertToGestureResponse(response.response_type());
+  ongoing_streams_.at(stream_id).last_response = converted_response;
+  respond_(stream_id, {converted_response});
+
+  callback();
+}
+
+void TouchSource::SendPendingIfWaiting() {
+  if (!pending_callback_ || pending_events_.empty()) {
+    return;
+  }
+  FX_DCHECK(return_tickets_.empty());
+
+  std::vector<fuchsia::ui::pointer::TouchEvent> events;
+  for (size_t i = 0; !pending_events_.empty() && i < fuchsia::ui::pointer::TOUCH_MAX_EVENT; ++i) {
+    auto [stream_id, event] = std::move(pending_events_.front());
+    TRACE_FLOW_BEGIN("input", "dispatch_event_to_client", event.trace_flow_id());
+
+    pending_events_.pop();
+    return_tickets_.push_back(
+        {.stream_id = stream_id, .expects_response = event.has_pointer_sample()});
+    events.emplace_back(std::move(event));
+  }
+  FX_DCHECK(!events.empty());
+  FX_DCHECK(events.size() == return_tickets_.size());
+
+  pending_callback_(std::move(events));
+  pending_callback_ = nullptr;
+}
+
+void TouchSource::CloseChannel(zx_status_t epitaph) {
+  // NOTE: Triggers destruction of this object.
+  binding_.Close(epitaph);
+}
+
+}  // namespace scenic_impl::input
diff --git a/src/ui/scenic/lib/input/touch_source.h b/src/ui/scenic/lib/input/touch_source.h
new file mode 100644
index 0000000..9a2723e
--- /dev/null
+++ b/src/ui/scenic/lib/input/touch_source.h
@@ -0,0 +1,113 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SRC_UI_SCENIC_LIB_INPUT_TOUCH_SOURCE_H_
+#define SRC_UI_SCENIC_LIB_INPUT_TOUCH_SOURCE_H_
+
+#include <fuchsia/ui/pointer/cpp/fidl.h>
+#include <lib/fidl/cpp/binding.h>
+#include <lib/fit/function.h>
+
+#include <queue>
+#include <unordered_map>
+
+#include "src/ui/scenic/lib/input/gesture_contender.h"
+
+namespace scenic_impl::input {
+
+// Implementation of the |fuchsia::ui::pointer::TouchSource| interface. One instance per
+// channel.
+class TouchSource : public GestureContender, public fuchsia::ui::pointer::TouchSource {
+ public:
+  // |respond_| must not destroy the TouchSource object.
+  TouchSource(fidl::InterfaceRequest<fuchsia::ui::pointer::TouchSource> event_provider,
+              fit::function<void(StreamId, const std::vector<GestureResponse>&)> respond,
+              fit::function<void()> error_handler);
+
+  ~TouchSource() override;
+
+  // |GestureContender|
+  void UpdateStream(StreamId stream_id, const InternalPointerEvent& event,
+                    bool is_end_of_stream) override;
+
+  // |GestureContender|
+  void EndContest(StreamId stream_id, bool awarded_win) override;
+
+  // |fuchsia::ui::pointer::TouchSource|
+  void Watch(std::vector<fuchsia::ui::pointer::TouchResponse> responses,
+             WatchCallback callback) override;
+
+  // |fuchsia::ui::pointer::TouchSource|
+  void UpdateResponse(fuchsia::ui::pointer::TouchInteractionId stream,
+                      fuchsia::ui::pointer::TouchResponse response,
+                      UpdateResponseCallback callback) override;
+
+  // TODO(fxbug.dev/64379): Implement ANR.
+
+ private:
+  struct StreamData {
+    uint32_t device_id = 0;
+    uint32_t pointer_id = 0;
+    bool stream_has_ended = false;
+    bool was_won = false;
+    GestureResponse last_response = GestureResponse::kUndefined;
+
+    // TODO(fxbug.dev/53316): Remove when we no longer need to filter events. Keeps indexes into
+    // duplicate events for legacy injectors.
+    uint64_t num_pointer_events = 0;
+    uint64_t num_responses = 0;
+    std::queue<uint64_t> filtered_events;
+  };
+
+  // Used to track expected responses from the client for each sent event.
+  struct ReturnTicket {
+    StreamId stream_id = kInvalidStreamId;
+    bool expects_response = false;
+  };
+
+  // Used to track events awaiting Watch() calls.
+  struct PendingEvent {
+    StreamId stream_id = kInvalidStreamId;
+    fuchsia::ui::pointer::TouchEvent event;
+  };
+
+  void SendPendingIfWaiting();
+
+  // Checks that the input is valid for the current state. If not valid it returns the error string
+  // to print and the epitaph to send on the channel when closing.
+  static zx_status_t ValidateResponses(
+      const std::vector<fuchsia::ui::pointer::TouchResponse>& responses,
+      const std::vector<ReturnTicket>& last_messages, bool have_pending_callback);
+  static zx_status_t ValidateUpdateResponse(
+      const fuchsia::ui::pointer::TouchInteractionId& stream_identifier,
+      const fuchsia::ui::pointer::TouchResponse& response,
+      const std::unordered_map<StreamId, StreamData>& ongoing_streams);
+
+  // Closes the fidl channel. This triggers the destruction of the TouchSource object through
+  // the error handler set in InputSystem. NOTE: No further method calls or member accesses should
+  // be made after CloseChannel(), since they might be made on a destroyed object.
+  void CloseChannel(zx_status_t epitaph);
+
+  bool is_first_event_ = true;
+  Viewport current_viewport_;
+
+  // Events waiting to be sent to client. Sent in batches of up to
+  // fuchsia::ui::pointer::TOUCH_MAX_EVENT events on each call to Watch().
+  std::queue<PendingEvent> pending_events_;
+  // When a vector of events is sent out in response to a Watch() call, the next Watch() call must
+  // contain responses matching the previous set of events. |return_tickets_| tracks the expected
+  // responses for the previous set of events.
+  std::vector<ReturnTicket> return_tickets_;
+
+  fidl::Binding<fuchsia::ui::pointer::TouchSource> binding_;
+  const fit::function<void(StreamId, const std::vector<GestureResponse>&)> respond_;
+  const fit::function<void()> error_handler_;
+
+  std::unordered_map<StreamId, StreamData> ongoing_streams_;
+  WatchCallback pending_callback_ = nullptr;
+};
+
+}  // namespace scenic_impl::input
+
+#endif  // SRC_UI_SCENIC_LIB_INPUT_TOUCH_SOURCE_H_