[Scenic][SCN-713]  Direct YUV support

This is a working (though hacky in places) implementation of direct YUV support, where a host image, on a UMA platform, is imported directly as device memory and used as the source for a sampler, without any copies involved.

However, it does so by both punching a not-so-pretty hole through Escher, and relies on quite a bit of hardcoded behavior to maintain some optimality. Further refinments are documented in a series of jira bugs noted in the comments (e.g. ES-199, ES-200, ES-201, SCN-1380, SCN-1387. SCN-1403).

Tests: New host_image_unittest, new scenic_pixel_test, extended shader_program_unittest (which also effectively does an integration test with the pipeline caching), run on multiple devices and as part of CQ, as well as a manual test of use_media_decoder, along with manual checks for Vulkan validation errors.

SCN-713 #done

WAS-Change-Id: I2557909245af036bba8d3b8ac9441dd05b6b92bd

Direct YUV test flow with sampler

WAS-Change-Id: I1e30f65e84ac13e90475df321909c5ccf0010df0
Change-Id: I8956205f73482b47589c28a6c170a93ab0884b17
diff --git a/garnet/examples/media/use_media_decoder/BUILD.gn b/garnet/examples/media/use_media_decoder/BUILD.gn
index 1e8fae3..e415688 100644
--- a/garnet/examples/media/use_media_decoder/BUILD.gn
+++ b/garnet/examples/media/use_media_decoder/BUILD.gn
@@ -42,6 +42,9 @@
 
 executable("use_media_decoder") {
   testonly = true
+
+  output_name = "use_media_decoder"
+
   sources = [
     "main.cc",
   ]
diff --git a/garnet/examples/ui/shadertoy/service/renderer.cc b/garnet/examples/ui/shadertoy/service/renderer.cc
index 3482f01..7d90728 100644
--- a/garnet/examples/ui/shadertoy/service/renderer.cc
+++ b/garnet/examples/ui/shadertoy/service/renderer.cc
@@ -130,7 +130,7 @@
 
     channel_image_info[i].imageLayout = vk::ImageLayout::eShaderReadOnlyOptimal;
     channel_image_info[i].imageView = channel_texture->vk_image_view();
-    channel_image_info[i].sampler = channel_texture->vk_sampler();
+    channel_image_info[i].sampler = channel_texture->sampler()->vk();
 
     writes[i].dstSet = descriptor_set;
     writes[i].dstArrayElement = 0;
@@ -205,8 +205,8 @@
                                                  1, channels);
   uploader.Submit();
 
-  return fxl::MakeRefCounted<escher::Texture>(
-      escher()->resource_recycler(), std::move(image), vk::Filter::eNearest);
+  return escher::Texture::New(escher()->resource_recycler(), std::move(image),
+                              vk::Filter::eNearest);
 }
 
 Renderer::Params::Params()
diff --git a/garnet/lib/media/test/codec_client.cc b/garnet/lib/media/test/codec_client.cc
index 4a77af9..a1ab748 100644
--- a/garnet/lib/media/test/codec_client.cc
+++ b/garnet/lib/media/test/codec_client.cc
@@ -2,10 +2,10 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include <lib/media/test/codec_client.h>
+#include "lib/media/test/codec_client.h"
 
-#include <lib/async/cpp/task.h>
-#include <src/lib/fxl/logging.h>
+#include "lib/async/cpp/task.h"
+#include "src/lib/fxl/logging.h"
 
 namespace {
 
@@ -28,7 +28,8 @@
 
 }  // namespace
 
-CodecClient::CodecClient(async::Loop* loop, fidl::InterfaceHandle<fuchsia::sysmem::Allocator> sysmem)
+CodecClient::CodecClient(
+    async::Loop* loop, fidl::InterfaceHandle<fuchsia::sysmem::Allocator> sysmem)
     : loop_(loop), dispatcher_(loop_->dispatcher()) {
   // We haven't created a channel yet, but that's fine, and we want the error
   // handler set up before any error can possibly be generated by the channel so
@@ -124,12 +125,13 @@
 
   FXL_CHECK(input_constraints_->has_packet_count_for_server_recommended());
   FXL_CHECK(input_constraints_->has_packet_count_for_server_max());
-  uint32_t packet_count_for_client = std::max(
-    kMinExtraInputPacketsForClient,
-    input_constraints_->packet_count_for_client_min());
+  uint32_t packet_count_for_client =
+      std::max(kMinExtraInputPacketsForClient,
+               input_constraints_->packet_count_for_client_min());
   uint32_t packet_count_for_server =
       input_constraints_->packet_count_for_server_recommended();
-  if (packet_count_for_client > input_constraints_->packet_count_for_client_max()) {
+  if (packet_count_for_client >
+      input_constraints_->packet_count_for_client_max()) {
     FXL_LOG(FATAL) << "server can't accomodate "
                       "kMinExtraInputPacketsForClient - not "
                       "using server - exiting";
@@ -138,10 +140,10 @@
   uint32_t input_packet_count;
   fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info;
   if (!ConfigurePortBufferCollection(
-      false, kInputBufferLifetimeOrdinal,
-      input_constraints_->buffer_constraints_version_ordinal(),
-      packet_count_for_server, packet_count_for_client, &input_packet_count,
-      &input_buffer_collection_, &buffer_collection_info)) {
+          false, kInputBufferLifetimeOrdinal,
+          input_constraints_->buffer_constraints_version_ordinal(),
+          packet_count_for_server, packet_count_for_client, &input_packet_count,
+          &input_buffer_collection_, &buffer_collection_info)) {
     FXL_LOG(FATAL) << "ConfigurePortBufferCollection failed (input)";
   }
 
@@ -149,8 +151,10 @@
   input_free_bits_.resize(input_packet_count, true);
   all_input_buffers_.reserve(input_packet_count);
   for (uint32_t i = 0; i < input_packet_count; i++) {
-    std::unique_ptr<CodecBuffer> local_buffer =
-      CodecBuffer::CreateFromVmo(i, std::move(buffer_collection_info.buffers[i].vmo), buffer_collection_info.buffers[i].vmo_usable_start, buffer_collection_info.settings.buffer_settings.size_bytes, true);
+    std::unique_ptr<CodecBuffer> local_buffer = CodecBuffer::CreateFromVmo(
+        i, std::move(buffer_collection_info.buffers[i].vmo),
+        buffer_collection_info.buffers[i].vmo_usable_start,
+        buffer_collection_info.settings.buffer_settings.size_bytes, true);
     if (!local_buffer) {
       FXL_LOG(FATAL) << "CodecBuffer::CreateFromVmo() failed";
     }
@@ -177,7 +181,8 @@
     fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken>*
         out_codec_sysmem_token) {
   fuchsia::sysmem::BufferCollectionSyncPtr buffer_collection;
-  fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken> codec_sysmem_token;
+  fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken>
+      codec_sysmem_token;
 
   // Create client_token which will get converted into out_buffer_collection.
   fuchsia::sysmem::BufferCollectionTokenSyncPtr client_token;
@@ -185,29 +190,30 @@
       client_token_request = client_token.NewRequest();
 
   // Create codec_sysmem_token that'll get returned via out_codec_sysmem_token.
-  client_token->Duplicate(
-      std::numeric_limits<uint32_t>::max(), codec_sysmem_token.NewRequest());
+  client_token->Duplicate(std::numeric_limits<uint32_t>::max(),
+                          codec_sysmem_token.NewRequest());
 
   // client_token gets converted into a buffer_collection.
   //
-  // Start client_token connection and start converting it into a BufferCollection,
-  // so we can Sync() the previous Duplicate().
+  // Start client_token connection and start converting it into a
+  // BufferCollection, so we can Sync() the previous Duplicate().
   PostToFidlThread(
       [this, client_token_request = std::move(client_token_request),
        client_token = client_token.Unbind(),
        buffer_collection_request = buffer_collection.NewRequest()]() mutable {
-    if (!sysmem_) {
-      return;
-    }
-    sysmem_->AllocateSharedCollection(std::move(client_token_request));
-    // codec_sysmem_token will be known to sysmem by the time client_token
-    // closure is seen by sysmem, which in turn is before
-    // buffer_collection_request will be hooked up, which is why
-    // buffer_collection->Sync() completion below is enough to prove that sysmem
-    // knows about codec_sysmem_token before codec_sysmem_token is sent to the
-    // codec.
-    sysmem_->BindSharedCollection(std::move(client_token), std::move(buffer_collection_request));
-  });
+        if (!sysmem_) {
+          return;
+        }
+        sysmem_->AllocateSharedCollection(std::move(client_token_request));
+        // codec_sysmem_token will be known to sysmem by the time client_token
+        // closure is seen by sysmem, which in turn is before
+        // buffer_collection_request will be hooked up, which is why
+        // buffer_collection->Sync() completion below is enough to prove that
+        // sysmem knows about codec_sysmem_token before codec_sysmem_token is
+        // sent to the codec.
+        sysmem_->BindSharedCollection(std::move(client_token),
+                                      std::move(buffer_collection_request));
+      });
 
   // After Sync() completes its round trip, we know that sysmem knows about
   // codec_sysmem_token (causally), which is important because we'll shortly
@@ -215,7 +221,8 @@
   // a different sysmem channel.
   zx_status_t sync_status = buffer_collection->Sync();
   if (sync_status != ZX_OK) {
-    FXL_LOG(FATAL) << "buffer_collection->Sync() failed - status: " << sync_status;
+    FXL_LOG(FATAL) << "buffer_collection->Sync() failed - status: "
+                   << sync_status;
   }
 
   *out_buffer_collection = std::move(buffer_collection);
@@ -241,11 +248,13 @@
   zx_status_t call_status = buffer_collection->WaitForBuffersAllocated(
       &allocate_status, &result_buffer_collection_info);
   if (call_status != ZX_OK) {
-    FXL_LOG(ERROR) << "WaitForBuffersAllocated returned failure - status: " << call_status;
+    FXL_LOG(ERROR) << "WaitForBuffersAllocated returned failure - status: "
+                   << call_status;
     return false;
   }
   if (allocate_status != ZX_OK) {
-    FXL_LOG(ERROR) << "WaitForBuffersAllocated allocation failed - status: " << allocate_status;
+    FXL_LOG(ERROR) << "WaitForBuffersAllocated allocation failed - status: "
+                   << allocate_status;
     return false;
   }
 
@@ -318,7 +327,9 @@
     // We're allowed to forget format any time there's a stream change, so we
     // do.  This isn't critical for this test code, but it's closer to how a
     // real client will likely track the output format on a per-stream basis.
-    ZX_ASSERT(!last_output_format_ || last_output_format_->stream_lifetime_ordinal() == output_stream_lifetime_ordinal_);
+    ZX_ASSERT(!last_output_format_ ||
+              last_output_format_->stream_lifetime_ordinal() ==
+                  output_stream_lifetime_ordinal_);
     output_stream_lifetime_ordinal_ = output_stream_lifetime_ordinal;
     last_output_format_ = nullptr;
     // We intentionally don't reset is_packet_since_last_format_.
@@ -419,8 +430,7 @@
   ZX_ASSERT(packet->has_stream_lifetime_ordinal());
   ZX_ASSERT(packet->has_start_offset());
   ZX_ASSERT(packet->has_valid_length_bytes());
-  // timestamp_ish field is optional
-  // start_access_unit field is optional
+  // timestamp_ish field is optional start_access_unit field is optional
   // known_end_access_unit is optional
   fuchsia::media::Packet local_packet = fidl::Clone(*packet);
   {  // scope lock
@@ -501,8 +511,8 @@
     // The main mechanism used to detect that the server isn't sending output
     // too soon is output_config_action_pending_.  In contrast, the client code
     // in this example permits itself to send RecycleOutputPacket() after the
-    // client has already seen OnOutputConstraints() with action required true, even
-    // though the client could stop itself from doing so as a potential
+    // client has already seen OnOutputConstraints() with action required true,
+    // even though the client could stop itself from doing so as a potential
     // optimization.  The client is allowed to send RecycleOutputPacket() up
     // until the implied ReleaseOutputBuffers() at the start of
     // SetOutputSettings().  Between then and a given packet_index becoming
@@ -512,8 +522,8 @@
     // Because of the client allowing itself to send RecycleOutputPacket() for a
     // while longer than fundamentally necessary, we delay upkeep on
     // output_free_bits_ until here.  This upkeep isn't really fundamentally
-    // necessary between OnOutputConstraints() with action required true and the last
-    // AddOutputBuffer() as part of output re-configuration, but ... this
+    // necessary between OnOutputConstraints() with action required true and the
+    // last AddOutputBuffer() as part of output re-configuration, but ... this
     // explicit delayed upkeep _may_ help illustrate how it's acceptable for a
     // client to let the completion end of output processing send
     // RecycleOutputPacket() as long as all those will be sent before
@@ -547,13 +557,14 @@
       // Free the old output buffers, if any.
       all_output_buffers_.clear();
 
-      // Here is where we snap which exact constraints version we'll actually use.
+      // Here is where we snap which exact constraints version we'll actually
+      // use.
       //
       // For a client that's doing output buffer re-config on the FIDL thread
       // during OnOutputConstraints with action required true, this will always
-      // just be the constraints being presently received.  But this example shows
-      // how to drive the codec in a protocol-valid way without being forced to
-      // perform buffer re-configuration on the FIDL thread.
+      // just be the constraints being presently received.  But this example
+      // shows how to drive the codec in a protocol-valid way without being
+      // forced to perform buffer re-configuration on the FIDL thread.
       ZX_ASSERT(output_constraints_action_pending_);
       ZX_ASSERT(last_required_output_constraints_);
       ZX_ASSERT(last_output_constraints_);
@@ -575,10 +586,11 @@
     ZX_ASSERT(buffer_constraints.has_packet_count_for_server_recommended());
     uint32_t packet_count_for_server =
         buffer_constraints.packet_count_for_server_recommended();
-    uint32_t packet_count_for_client = std::max(
-        kMinExtraOutputPacketsForClient,
-        buffer_constraints.packet_count_for_client_min());
-    if (packet_count_for_client > buffer_constraints.packet_count_for_client_max()) {
+    uint32_t packet_count_for_client =
+        std::max(kMinExtraOutputPacketsForClient,
+                 buffer_constraints.packet_count_for_client_min());
+    if (packet_count_for_client >
+        buffer_constraints.packet_count_for_client_max()) {
       FXL_LOG(FATAL) << "server can't accomodate "
                         "kMinExtraOutputPacketsForClient - not "
                         "using server - exiting";
@@ -587,10 +599,10 @@
     uint32_t packet_count;
     fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info;
     if (!ConfigurePortBufferCollection(
-        true, new_output_buffer_lifetime_ordinal,
-        buffer_constraints.buffer_constraints_version_ordinal(),
-        packet_count_for_server, packet_count_for_client,
-        &packet_count, &output_buffer_collection_, &buffer_collection_info)) {
+            true, new_output_buffer_lifetime_ordinal,
+            buffer_constraints.buffer_constraints_version_ordinal(),
+            packet_count_for_server, packet_count_for_client, &packet_count,
+            &output_buffer_collection_, &buffer_collection_info)) {
       FXL_LOG(FATAL) << "ConfigurePortBufferCollection failed (output)";
     }
 
@@ -603,7 +615,7 @@
         std::unique_ptr<CodecBuffer> buffer = CodecBuffer::CreateFromVmo(
             i, std::move(buffer_collection_info.buffers[i].vmo),
             buffer_collection_info.buffers[i].vmo_usable_start,
-            buffer_collection_info.settings.buffer_settings.size_bytes, false);
+            buffer_collection_info.settings.buffer_settings.size_bytes, true);
         if (!buffer) {
           FXL_LOG(FATAL) << "CodecBuffer::Allocate() failed (output)";
         }
@@ -620,11 +632,13 @@
     }  // ~lock
 
     // We're ready to receive output.
-    PostToFidlThread([this, output_buffer_lifetime_ordinal = new_output_buffer_lifetime_ordinal]{
+    PostToFidlThread([this, output_buffer_lifetime_ordinal =
+                                new_output_buffer_lifetime_ordinal] {
       if (!codec_) {
         return;
       }
-      if (output_buffer_lifetime_ordinal != current_output_buffer_lifetime_ordinal_) {
+      if (output_buffer_lifetime_ordinal !=
+          current_output_buffer_lifetime_ordinal_) {
         return;
       }
       codec_->CompleteOutputBufferPartialSettings(
@@ -639,9 +653,9 @@
       // config that's action-required.
 
       if (snapped_constraints->buffer_constraints()
-               .buffer_constraints_version_ordinal() >=
+              .buffer_constraints_version_ordinal() >=
           last_required_output_constraints_->buffer_constraints()
-               .buffer_constraints_version_ordinal()) {
+              .buffer_constraints_version_ordinal()) {
         // Good.  The client is caught up.  The output_config_action_pending_
         // can become false here, but may very shortly become true again if
         // another OnOutputConstraints() is received after we release the lock
@@ -699,7 +713,8 @@
   settings.set_packet_count_for_client(packet_count_for_client);
 
   fuchsia::sysmem::BufferCollectionSyncPtr buffer_collection;
-  fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken> codec_sysmem_token;
+  fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken>
+      codec_sysmem_token;
   if (!CreateAndSyncBufferCollection(&buffer_collection, &codec_sysmem_token)) {
     FXL_LOG(FATAL) << "CreateAndSyncBufferCollection failed (output)";
     return false;
@@ -708,7 +723,11 @@
   settings.set_sysmem_token(std::move(codec_sysmem_token));
 
   fuchsia::sysmem::BufferCollectionConstraints constraints;
-  constraints.usage.cpu = is_output ? fuchsia::sysmem::cpuUsageReadOften : fuchsia::sysmem::cpuUsageWriteOften;
+  // TODO(SCN-1388): Hardcoded to read/write to allow direct Vulkan import on
+  // UMA platforms.
+  constraints.usage.cpu =
+      fuchsia::sysmem::cpuUsageReadOften | fuchsia::sysmem::cpuUsageWriteOften;
+
   // TODO(dustingreen): Make this more flexible once we're more flexible on
   // frame_count on output of decoder.
   constraints.min_buffer_count_for_camping = packet_count_for_client;
@@ -730,10 +749,10 @@
   // protected memory.
   constraints.buffer_memory_constraints.secure_permitted = false;
 
-  // Despite being a consumer of output uncompressed video frames (when
-  // decoding video and is_output), for now we intentionally don't constrain to
-  // the PixelFormatType(s) that we can consume, and instead fail later if we
-  // get something unexpected on output.  That's just easier than plumbing
+  // Despite being a consumer of output uncompressed video frames (when decoding
+  // video and is_output), for now we intentionally don't constrain to the
+  // PixelFormatType(s) that we can consume, and instead fail later if we get
+  // something unexpected on output. That's just easier than plumbing
   // PixelFormatType(s) to here for now.
   ZX_DEBUG_ASSERT(constraints.image_format_constraints_count == 0);
 
@@ -751,33 +770,38 @@
   buffer_collection->SetConstraints(true, std::move(constraints));
 
   fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info;
-  // This borrows buffer_collection during the call.
+  // This borrows buffer_collection during
+  // the call.
   if (!WaitForSysmemBuffersAllocated(&buffer_collection,
                                      &buffer_collection_info)) {
-    FXL_LOG(FATAL) << "WaitForSysmemBuffersAllocated failed";
+    FXL_LOG(FATAL) << "WaitForSysmemBuffer"
+                      "sAllocated failed";
     return false;
   }
 
-  packet_count = std::max(
-    packet_count, buffer_collection_info.buffer_count);
+  packet_count = std::max(packet_count, buffer_collection_info.buffer_count);
 
   fuchsia::sysmem::BufferCollectionPtr buffer_collection_ptr;
 
-  // For the Bind() we probably don't strictly need to be on FIDL thread, so
-  // do the Bind() here.  This does mean we need to set the error handler
-  // before the Bind() however.
-  buffer_collection_ptr.set_error_handler([is_output](zx_status_t status){
-    FXL_LOG(FATAL) << "BufferCollection failed - status: " << status << " is_output: " << is_output;
+  // For the Bind() we probably don't strictly need to be on FIDL thread, so do
+  // the Bind() here.  This does mean we need to set the error handler before
+  // the Bind() however.
+  buffer_collection_ptr.set_error_handler([is_output](zx_status_t status) {
+    FXL_LOG(FATAL) << "BufferCollection failed - "
+                      "status: "
+                   << status << " is_output: " << is_output;
   });
 
   // This implicitly converts buffer_collection from
-  // fidl::SynchronousInterfacePtr<> to fidl::InterfaceHandle<>, which is
-  // what we want; we're moving handling of the BufferCollection from this
-  // thread to the FIDL thread.
-  zx_status_t bind_status = buffer_collection_ptr.Bind(
-      std::move(buffer_collection), dispatcher_);
+  // fidl::SynchronousInterfacePtr<> to fidl::InterfaceHandle<>, which is what
+  // we want; we're moving handling of the BufferCollection from this thread to
+  // the FIDL thread.
+  zx_status_t bind_status =
+      buffer_collection_ptr.Bind(std::move(buffer_collection), dispatcher_);
   if (bind_status != ZX_OK) {
-    FXL_LOG(FATAL) << "buffer_collection_ptr.Bind() failed - status: " << bind_status << " is_output: " << is_output;
+    FXL_LOG(FATAL) << "buffer_collection_ptr.Bind() "
+                      "failed - status: "
+                   << bind_status << " is_output: " << is_output;
     return false;
   }
 
@@ -811,7 +835,8 @@
     std::unique_lock<std::mutex> lock(lock_);
 
     if (!shared_constraints->has_stream_lifetime_ordinal()) {
-      FXL_LOG(FATAL) << "StreamOutputConstraints missing stream_lifetime_ordinal";
+      FXL_LOG(FATAL)
+          << "StreamOutputConstraints missing stream_lifetime_ordinal";
     }
     uint64_t stream_lifetime_ordinal =
         shared_constraints->stream_lifetime_ordinal();
@@ -822,14 +847,15 @@
                last_output_constraints_->buffer_constraints()
                    .has_buffer_constraints_version_ordinal()));
     uint64_t previous_buffer_constraints_version_ordinal =
-        last_output_constraints_ ? last_output_constraints_->buffer_constraints()
-                                   .buffer_constraints_version_ordinal()
-                            : 0;
+        last_output_constraints_
+            ? last_output_constraints_->buffer_constraints()
+                  .buffer_constraints_version_ordinal()
+            : 0;
     ZX_ASSERT(shared_constraints->has_buffer_constraints() &&
               shared_constraints->buffer_constraints()
                   .has_buffer_constraints_version_ordinal());
     if (shared_constraints->buffer_constraints()
-             .buffer_constraints_version_ordinal() <
+            .buffer_constraints_version_ordinal() <
         previous_buffer_constraints_version_ordinal) {
       FXL_LOG(FATAL)
           << "broken server sent badly ordered buffer constraints ordinals";
@@ -881,8 +907,7 @@
   if (!shared_format->has_stream_lifetime_ordinal()) {
     FXL_LOG(FATAL) << "StreamOutputFormat missing stream_lifetime_ordinal";
   }
-  uint64_t stream_lifetime_ordinal =
-      shared_format->stream_lifetime_ordinal();
+  uint64_t stream_lifetime_ordinal = shared_format->stream_lifetime_ordinal();
   TrackOutputStreamLifetimeOrdinal(stream_lifetime_ordinal);
 
   if (is_format_since_last_packet_) {
@@ -891,8 +916,7 @@
     // logically part of each output packet, with the optimization that we only
     // send output format when it changes, in between packets, to avoid needing
     // to send output format with every packet.
-    FXL_LOG(FATAL)
-        << "broken server sent two OnOutputFormat() in a row";
+    FXL_LOG(FATAL) << "broken server sent two OnOutputFormat() in a row";
   }
   if (!shared_format->has_stream_lifetime_ordinal()) {
     FXL_LOG(FATAL) << "OnOutputFormat !has_stream_lifetime_ordinal()";
@@ -921,11 +945,13 @@
     if (!local_packet->has_stream_lifetime_ordinal()) {
       FXL_LOG(FATAL) << "Packet missing stream_lifetime_ordinal";
     }
-    uint64_t stream_lifetime_ordinal =
-        local_packet->stream_lifetime_ordinal();
+    uint64_t stream_lifetime_ordinal = local_packet->stream_lifetime_ordinal();
     TrackOutputStreamLifetimeOrdinal(stream_lifetime_ordinal);
-    if (!last_output_format_ || last_output_format_->stream_lifetime_ordinal() != stream_lifetime_ordinal) {
-      FXL_LOG(FATAL) << "OnOutputFormat required before OnOutputPacket, per-stream";
+    if (!last_output_format_ ||
+        last_output_format_->stream_lifetime_ordinal() !=
+            stream_lifetime_ordinal) {
+      FXL_LOG(FATAL)
+          << "OnOutputFormat required before OnOutputPacket, per-stream";
     }
 
     std::unique_ptr<CodecOutput> output = std::make_unique<CodecOutput>(
diff --git a/garnet/lib/media/test/frame_sink_view.cc b/garnet/lib/media/test/frame_sink_view.cc
index 9e1c525..e7632e9 100644
--- a/garnet/lib/media/test/frame_sink_view.cc
+++ b/garnet/lib/media/test/frame_sink_view.cc
@@ -4,9 +4,8 @@
 
 #include "lib/media/test/frame_sink_view.h"
 
-#include "lib/media/test/frame_sink.h"
-
 #include <lib/media/codec_impl/fourcc.h>
+#include <lib/media/test/frame_sink.h>
 #include <lib/ui/scenic/cpp/commands.h>
 
 #include <iomanip>
diff --git a/garnet/lib/ui/gfx/gfx_system.cc b/garnet/lib/ui/gfx/gfx_system.cc
index f9164d0..8ba6b48 100644
--- a/garnet/lib/ui/gfx/gfx_system.cc
+++ b/garnet/lib/ui/gfx/gfx_system.cc
@@ -91,6 +91,7 @@
            VK_KHR_GET_PHYSICAL_DEVICE_PROPERTIES_2_EXTENSION_NAME,
            VK_KHR_EXTERNAL_MEMORY_CAPABILITIES_EXTENSION_NAME,
            VK_KHR_EXTERNAL_SEMAPHORE_CAPABILITIES_EXTENSION_NAME,
+           VK_KHR_GET_PHYSICAL_DEVICE_PROPERTIES_2_EXTENSION_NAME,
        },
        kRequiresSurface});
 
@@ -106,13 +107,14 @@
   // the display manager API to present frames directly, instead of using
   // Vulkan swapchains.
   escher::VulkanDeviceQueues::Params device_queues_params(
-      {{
-           VK_KHR_EXTERNAL_MEMORY_EXTENSION_NAME,
-           VK_FUCHSIA_EXTERNAL_MEMORY_EXTENSION_NAME,
-           VK_KHR_EXTERNAL_SEMAPHORE_EXTENSION_NAME,
-           VK_FUCHSIA_EXTERNAL_SEMAPHORE_EXTENSION_NAME,
-           VK_FUCHSIA_BUFFER_COLLECTION_EXTENSION_NAME,
-       },
+      {{VK_KHR_EXTERNAL_MEMORY_EXTENSION_NAME,
+        VK_FUCHSIA_EXTERNAL_MEMORY_EXTENSION_NAME,
+        VK_KHR_EXTERNAL_SEMAPHORE_EXTENSION_NAME,
+        VK_FUCHSIA_EXTERNAL_SEMAPHORE_EXTENSION_NAME,
+        VK_FUCHSIA_BUFFER_COLLECTION_EXTENSION_NAME,
+        VK_KHR_MAINTENANCE1_EXTENSION_NAME, VK_KHR_BIND_MEMORY_2_EXTENSION_NAME,
+        VK_KHR_GET_MEMORY_REQUIREMENTS_2_EXTENSION_NAME,
+        VK_KHR_SAMPLER_YCBCR_CONVERSION_EXTENSION_NAME},
        surface_,
        escher::VulkanDeviceQueues::Params::kDisableQueueFilteringForPresent});
   vulkan_device_queues_ =
diff --git a/garnet/lib/ui/gfx/resources/host_image.cc b/garnet/lib/ui/gfx/resources/host_image.cc
index 7e8ccf2..ae5dc13 100644
--- a/garnet/lib/ui/gfx/resources/host_image.cc
+++ b/garnet/lib/ui/gfx/resources/host_image.cc
@@ -10,6 +10,12 @@
 #include "garnet/lib/ui/gfx/resources/memory.h"
 #include "garnet/lib/ui/gfx/util/image_formats.h"
 #include "lib/images/cpp/images.h"
+#include "src/ui/lib/escher/impl/naive_image.h"
+
+namespace {
+// TODO(SCN-1387): This number needs to be queried via sysmem or vulkan.
+constexpr uint32_t kYuvStrideRequirement = 64;
+}  // namespace
 
 namespace scenic_impl {
 namespace gfx {
@@ -95,6 +101,36 @@
     return nullptr;
   }
 
+  if (image_info.pixel_format == fuchsia::images::PixelFormat::NV12 &&
+      image_info.stride % kYuvStrideRequirement == 0) {
+    // If we are not on a UMA platform, GetGpuMem will return a null pointer.
+    auto gpu_memory = memory->GetGpuMem();
+    if (gpu_memory) {
+      escher::ImageInfo escher_image_info;
+      escher_image_info.format = vk::Format::eG8B8R82Plane420Unorm;
+      escher_image_info.width = image_info.width;
+      escher_image_info.height = image_info.height;
+      escher_image_info.sample_count = 1;
+      escher_image_info.usage = vk::ImageUsageFlagBits::eSampled;
+      escher_image_info.tiling = vk::ImageTiling::eLinear;
+      escher_image_info.is_mutable = false;
+      // TODO(SCN-1012): This code assumes that Memory::GetGpuMem() will only
+      // return device local memory.
+      escher_image_info.memory_flags = vk::MemoryPropertyFlagBits::eDeviceLocal;
+
+      vk::Image vk_image = escher::image_utils::CreateVkImage(
+          session->resource_context().vk_device, escher_image_info);
+      auto escher_image = escher::impl::NaiveImage::AdoptVkImage(
+          session->resource_context().escher_resource_recycler,
+          escher_image_info, vk_image, gpu_memory);
+      auto host_image = fxl::AdoptRef(
+          new HostImage(session, id, std::move(memory), std::move(escher_image),
+                        memory_offset, image_info));
+      host_image->is_directly_mapped_ = true;
+      return host_image;
+    }
+  }
+
   // TODO(SCN-141): Support non-minimal strides for all formats.  For now, NV12
   // is ok because it will have image_conversion_function_ and for formats with
   // image_conversion_function_, the stride is really only the input data stride
@@ -118,6 +154,10 @@
 }
 
 bool HostImage::UpdatePixels(escher::BatchGpuUploader* gpu_uploader) {
+  if (is_directly_mapped_) {
+    return false;
+  }
+
   if (!gpu_uploader) {
     FXL_LOG(WARNING) << "No BatchGpuUploader, cannot UpdatePixels.";
     return true;
diff --git a/garnet/lib/ui/gfx/resources/host_image.h b/garnet/lib/ui/gfx/resources/host_image.h
index 9c29088..7ff1dd2 100644
--- a/garnet/lib/ui/gfx/resources/host_image.h
+++ b/garnet/lib/ui/gfx/resources/host_image.h
@@ -43,10 +43,11 @@
 
   void Accept(class ResourceVisitor* visitor) override;
 
-  // Re-upload host memory contents to GPU memory. Returns true if contents were
-  // updated.
+  bool IsDirectlyMapped() { return is_directly_mapped_; }
 
  protected:
+  // Re-upload host memory contents to GPU memory. Returns true if contents were
+  // updated.
   bool UpdatePixels(escher::BatchGpuUploader* gpu_uploader) override;
 
  private:
@@ -68,6 +69,8 @@
   fuchsia::images::ImageInfo image_format_;
   escher::image_utils::ImageConversionFunction image_conversion_function_ =
       nullptr;
+
+  bool is_directly_mapped_ = false;
 };
 
 }  // namespace gfx
diff --git a/garnet/lib/ui/gfx/resources/image.cc b/garnet/lib/ui/gfx/resources/image.cc
index b5851cb..539c32d 100644
--- a/garnet/lib/ui/gfx/resources/image.cc
+++ b/garnet/lib/ui/gfx/resources/image.cc
@@ -17,7 +17,7 @@
     ResourceType::kImage | ResourceType::kImageBase, "Image"};
 
 Image::Image(Session* session, ResourceId id, const ResourceTypeInfo& type_info)
-    : ImageBase(session, id, Image::kTypeInfo) {
+    : ImageBase(session, id, type_info) {
   FXL_DCHECK(type_info.IsKindOf(Image::kTypeInfo));
 }
 
diff --git a/garnet/lib/ui/gfx/resources/material.cc b/garnet/lib/ui/gfx/resources/material.cc
index d29d69c..1041a81 100644
--- a/garnet/lib/ui/gfx/resources/material.cc
+++ b/garnet/lib/ui/gfx/resources/material.cc
@@ -9,6 +9,34 @@
 #include "garnet/lib/ui/gfx/resources/image_base.h"
 #include "garnet/lib/ui/gfx/resources/image_pipe.h"
 
+namespace {
+
+// TODO(SCN-1380): This is a hack that temporarily avoids memory and performance
+// issues involving immutable samplers. It relies on the fact that Scenic is
+// currently a singleton service with a single vk::Device, with no user-provided
+// parameters that affect sampler construction other than image format. SCN-1380
+// covers the real task to solve this problem, which involves design work both
+// at the Scenic and Escher levels.
+escher::SamplerPtr CreateNV12Sampler(escher::ResourceRecycler* recycler) {
+  static vk::Device last_device;
+  static escher::SamplerPtr last_sampler;
+
+  if (recycler->vulkan_context().device == last_device && last_sampler)
+    return last_sampler;
+
+  if (last_sampler) {
+    FXL_LOG(WARNING) << "YUV Sampler was not successfully cached, memory "
+                        "footprint will increase.";
+  }
+
+  last_device = recycler->vulkan_context().device;
+  last_sampler = fxl::MakeRefCounted<escher::Sampler>(
+      recycler, vk::Format::eG8B8R82Plane420Unorm, vk::Filter::eLinear);
+  return last_sampler;
+}
+
+}  // namespace
+
 namespace scenic_impl {
 namespace gfx {
 
@@ -43,9 +71,23 @@
   if (!escher_texture || escher_image != escher_texture->image()) {
     escher::TexturePtr texture;
     if (escher_image) {
-      texture = fxl::MakeRefCounted<escher::Texture>(
-          session()->resource_context().escher_resource_recycler, escher_image,
-          vk::Filter::eLinear);
+      auto recycler = session()->resource_context().escher_resource_recycler;
+      // TODO(SCN-1403): Technically, eG8B8R82Plane420Unorm is not enough to
+      // assume NV12, but it's currently the only format we support at the
+      // sampler level.
+      if (escher_image->format() == vk::Format::eG8B8R82Plane420Unorm) {
+        texture = fxl::MakeRefCounted<escher::Texture>(
+            recycler, CreateNV12Sampler(recycler), escher_image);
+      } else {
+        texture =
+            escher::Texture::New(recycler, escher_image, vk::Filter::eLinear);
+        // TODO(ES-199, ES-200): Reusing samplers is just good policy, but it is
+        // required for immutable samplers because, until these bugs are fixed,
+        // Escher will keep these samplers around forever.
+        FXL_DCHECK(!texture->sampler()->is_immutable())
+            << "Immutable samplers need to be cached to avoid unbounded memory "
+               "consumption";
+      }
     }
     escher_material_->SetTexture(std::move(texture));
   }
diff --git a/garnet/lib/ui/gfx/resources/memory.cc b/garnet/lib/ui/gfx/resources/memory.cc
index 8ce6dca8..928807b 100644
--- a/garnet/lib/ui/gfx/resources/memory.cc
+++ b/garnet/lib/ui/gfx/resources/memory.cc
@@ -16,8 +16,6 @@
 // x86 platforms, vk::Buffers come out of a separate memory pool. These helper
 // functions help make sure that there is a single valid memory pool, for
 // both images and buffers, by creating a dummy representative buffer/image.
-#if __x86_64__
-
 uint32_t GetBufferMemoryBits(vk::Device device) {
   static vk::Device cached_device;
   static uint32_t cached_bits;
@@ -76,8 +74,6 @@
   return cached_bits;
 }
 
-#endif  // __x86_64__
-
 }  // namespace
 
 namespace scenic_impl {
@@ -185,20 +181,20 @@
 
   auto vk_physical_device = session()->resource_context().vk_physical_device;
 
-  uint32_t memoryTypeBits = handle_properties.memoryTypeBits;
+  uint32_t memory_type_bits = handle_properties.memoryTypeBits;
 
 // TODO(SCN-1368): This code should be unnecessary once we have a code flow that
 // understands how the memory is expected to be used.
 #if __x86_64__
-  memoryTypeBits &= GetBufferMemoryBits(vk_device);
-  memoryTypeBits &= GetImageMemoryBits(vk_device);
-  FXL_CHECK(memoryTypeBits != 0)
+  memory_type_bits &= GetBufferMemoryBits(vk_device);
+  memory_type_bits &= GetImageMemoryBits(vk_device);
+  FXL_CHECK(memory_type_bits != 0)
       << "This platform does not have a single memory pool that is valid for "
          "both images and buffers. Please fix SCN-1368.";
 #endif  // __x86_64__
 
   uint32_t memory_type_index = escher::impl::GetMemoryTypeIndex(
-      vk_physical_device, memoryTypeBits, required_flags);
+      vk_physical_device, memory_type_bits, required_flags);
 
   vk::PhysicalDeviceMemoryProperties memory_types =
       vk_physical_device.getMemoryProperties();
@@ -251,5 +247,22 @@
   return the_clone;
 }
 
+uint32_t Memory::HasSharedMemoryPools(vk::Device device,
+                                      vk::PhysicalDevice physical_device) {
+  vk::MemoryPropertyFlags required_flags =
+      vk::MemoryPropertyFlagBits::eDeviceLocal |
+      vk::MemoryPropertyFlagBits::eHostVisible;
+
+  uint32_t memory_type_bits =
+      GetBufferMemoryBits(device) & GetImageMemoryBits(device);
+
+  uint32_t memory_type_index = escher::impl::GetMemoryTypeIndex(
+      physical_device, memory_type_bits, required_flags);
+
+  vk::PhysicalDeviceMemoryProperties memory_types =
+      physical_device.getMemoryProperties();
+  return memory_type_index < memory_types.memoryTypeCount;
+}
+
 }  // namespace gfx
 }  // namespace scenic_impl
diff --git a/garnet/lib/ui/gfx/resources/memory.h b/garnet/lib/ui/gfx/resources/memory.h
index d1199e9..ab2c4d7 100644
--- a/garnet/lib/ui/gfx/resources/memory.h
+++ b/garnet/lib/ui/gfx/resources/memory.h
@@ -6,6 +6,7 @@
 #define GARNET_LIB_UI_GFX_RESOURCES_MEMORY_H_
 
 #include <fuchsia/ui/gfx/cpp/fidl.h>
+
 #include <vulkan/vulkan.hpp>
 
 #include "garnet/lib/ui/gfx/resources/resource.h"
@@ -61,6 +62,11 @@
   // |Resource|
   void Accept(ResourceVisitor* visitor) override;
 
+  // This function is used for tests, so they can easily detect if they should
+  // bother trying to test UMA memory flows.
+  static uint32_t HasSharedMemoryPools(vk::Device device,
+                                       vk::PhysicalDevice physical_device);
+
  private:
   Memory(Session* session, ResourceId id, ::fuchsia::ui::gfx::MemoryArgs args);
 
diff --git a/garnet/lib/ui/gfx/tests/BUILD.gn b/garnet/lib/ui/gfx/tests/BUILD.gn
index 6d13da6..c0b5405 100644
--- a/garnet/lib/ui/gfx/tests/BUILD.gn
+++ b/garnet/lib/ui/gfx/tests/BUILD.gn
@@ -84,6 +84,7 @@
     "gfx_command_applier_unittest.cc",
     "hardware_layer_assignment_unittest.cc",
     "hittest_global_unittest.cc",
+    "host_image_unittest.cc",
     "image_pipe_unittest.cc",
     "import_unittest.cc",
     "memory_unittest.cc",
@@ -132,6 +133,7 @@
   include_dirs = [ "//src/ui/lib/escher" ]
   deps = [
     ":testing_deps",
+    "//garnet/lib/ui/yuv:yuv",
     "//garnet/public/lib/fsl",
     "//garnet/public/lib/gtest",
     "//garnet/testing/views",
diff --git a/garnet/lib/ui/gfx/tests/host_image_unittest.cc b/garnet/lib/ui/gfx/tests/host_image_unittest.cc
new file mode 100644
index 0000000..ac475cf
--- /dev/null
+++ b/garnet/lib/ui/gfx/tests/host_image_unittest.cc
@@ -0,0 +1,191 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "garnet/lib/ui/gfx/resources/host_image.h"
+
+#include "garnet/lib/ui/gfx/tests/session_test.h"
+#include "garnet/lib/ui/gfx/tests/vk_session_test.h"
+#include "gtest/gtest.h"
+#include "lib/images/cpp/images.h"
+#include "lib/ui/scenic/cpp/commands.h"
+#include "src/ui/lib/escher/test/gtest_vulkan.h"
+#include "src/ui/lib/escher/vk/vulkan_device_queues.h"
+
+using namespace escher;
+
+namespace {
+
+const uint32_t kVmoSize = 65536;
+// If you change the size of this buffer, make sure that the YUV test in
+// scenic_pixel_test.cc is also updated. Unlike this unit test,
+// scenic_pixel_test.cc has no way to confirm that it is going through the
+// direct-to-GPU path.
+// TODO(SCN-1387): This number needs to be queried via sysmem or vulkan.
+const uint32_t kSize = 64;
+const uint32_t kMemoryId = 1;
+const uint32_t kImageId = 2;
+const uint32_t kImagePipeId = 3;
+
+class ImageFactoryListener : public ImageFactory {
+ public:
+  ImageFactoryListener(ImageFactory* factory) : factory_(factory) {}
+  ImagePtr NewImage(const ImageInfo& info, GpuMemPtr* out_ptr = nullptr) {
+    ++images_created_;
+    return factory_->NewImage(info, out_ptr);
+  }
+
+  uint32_t images_created_ = 0;
+  ImageFactory* factory_;
+};
+
+}  // namespace
+
+namespace scenic_impl {
+namespace gfx {
+namespace test {
+
+class HostImageTest : public VkSessionTest {
+ public:
+  void OnSessionContextCreated(SessionContext* context) override {
+    ASSERT_FALSE(listener);
+    listener =
+        std::make_unique<ImageFactoryListener>(context->escher_image_factory);
+    context->escher_image_factory = listener.get();
+  }
+
+  std::unique_ptr<ImageFactoryListener> listener;
+};
+
+VK_TEST_F(HostImageTest, FindResource) {
+  zx::vmo vmo;
+  zx_status_t status = zx::vmo::create(kVmoSize, 0u, &vmo);
+  ASSERT_EQ(ZX_OK, status);
+
+  ASSERT_TRUE(Apply(
+      scenic::NewCreateMemoryCmd(kMemoryId, std::move(vmo), kVmoSize,
+                                 fuchsia::images::MemoryType::HOST_MEMORY)));
+
+  fuchsia::images::ImageInfo image_info{
+      .width = kSize,
+      .height = kSize,
+      .stride = static_cast<uint32_t>(
+          kSize * images::StrideBytesPerWidthPixel(
+                      fuchsia::images::PixelFormat::BGRA_8)),
+      .pixel_format = fuchsia::images::PixelFormat::BGRA_8,
+  };
+
+  ASSERT_TRUE(
+      Apply(scenic::NewCreateImageCmd(kImageId, kMemoryId, 0, image_info)));
+
+  fidl::InterfacePtr<fuchsia::images::ImagePipe> image_pipe;
+  ASSERT_TRUE(Apply(
+      scenic::NewCreateImagePipeCmd(kImagePipeId, image_pipe.NewRequest())));
+
+  // Host images should be findable as their concrete sub-class.
+  auto host_image_resource = FindResource<HostImage>(kImageId);
+  EXPECT_TRUE(host_image_resource);
+  // Host images should also be findable as their base class (i.e., Image).
+  auto image_resource = FindResource<Image>(kImageId);
+  EXPECT_TRUE(image_resource);
+  // Memory should not be findable as the same base class.
+  auto memory_as_image_resource = FindResource<Image>(kMemoryId);
+  EXPECT_FALSE(memory_as_image_resource);
+  // Image pipes should not be findable as the Image class (even though they are
+  // an ImageBase, the next class down).
+  auto image_pipe_as_image_resource = FindResource<Image>(kImagePipeId);
+  EXPECT_FALSE(image_pipe_as_image_resource);
+}
+
+VK_TEST_F(HostImageTest, RgbaImport) {
+  zx::vmo vmo;
+  zx_status_t status = zx::vmo::create(kVmoSize, 0u, &vmo);
+  ASSERT_EQ(ZX_OK, status);
+
+  ASSERT_TRUE(Apply(
+      scenic::NewCreateMemoryCmd(kMemoryId, std::move(vmo), kVmoSize,
+                                 fuchsia::images::MemoryType::HOST_MEMORY)));
+
+  fuchsia::images::ImageInfo image_info{
+      .width = kSize,
+      .height = kSize,
+      .stride = static_cast<uint32_t>(
+          kSize * images::StrideBytesPerWidthPixel(
+                      fuchsia::images::PixelFormat::BGRA_8)),
+      .pixel_format = fuchsia::images::PixelFormat::BGRA_8,
+  };
+
+  ASSERT_TRUE(
+      Apply(scenic::NewCreateImageCmd(kImageId, kMemoryId, 0, image_info)));
+
+  auto image_resource = FindResource<HostImage>(kImageId);
+  ASSERT_TRUE(image_resource);
+
+  EXPECT_FALSE(image_resource->IsDirectlyMapped());
+  // Before updating pixels, image resources should never return a valid Escher
+  // image.
+  EXPECT_FALSE(image_resource->GetEscherImage());
+  // Updating shouldn't crash when passesd a null gpu_uploader, but it should
+  // also keep the image dirty, because the copy from CPU to GPU memory has not
+  // occured yet.
+  image_resource->UpdateEscherImage(nullptr);
+  // Because we did not provide a valid batch uploader, the image is still dirty
+  // and in need of an update. Until that succeeds, GetEscherImage() should not
+  // return a valid image.
+  EXPECT_FALSE(image_resource->GetEscherImage());
+  // A backing image should have been constructed through the image factory.
+  EXPECT_EQ(1u, listener->images_created_);
+}
+
+VK_TEST_F(HostImageTest, YuvImportOnUmaPlatform) {
+  auto vulkan_queues = CreateVulkanDeviceQueues();
+  auto device = vulkan_queues->vk_device();
+  auto physical_device = vulkan_queues->vk_physical_device();
+
+  if (!Memory::HasSharedMemoryPools(device, physical_device)) {
+    FXL_LOG(INFO)
+        << "Could not find UMA compatible memory pool, aborting test.";
+  }
+
+  zx::vmo vmo;
+  zx_status_t status = zx::vmo::create(kVmoSize, 0u, &vmo);
+  ASSERT_EQ(ZX_OK, status);
+
+  ASSERT_TRUE(Apply(
+      scenic::NewCreateMemoryCmd(kMemoryId, std::move(vmo), kVmoSize,
+                                 fuchsia::images::MemoryType::HOST_MEMORY)));
+
+  fuchsia::images::ImageInfo image_info{
+      .width = kSize,
+      .height = kSize,
+      .stride = static_cast<uint32_t>(
+          kSize *
+          images::StrideBytesPerWidthPixel(fuchsia::images::PixelFormat::NV12)),
+      .pixel_format = fuchsia::images::PixelFormat::NV12,
+  };
+
+  ASSERT_TRUE(
+      Apply(scenic::NewCreateImageCmd(kImageId, kMemoryId, 0, image_info)));
+
+  auto image_resource = FindResource<HostImage>(kImageId);
+  ASSERT_TRUE(image_resource);
+
+  EXPECT_TRUE(image_resource->IsDirectlyMapped());
+  // Even direct mapped images don't return a valid Escher image until at least
+  // one call to UpdatePixels has occured.
+  EXPECT_FALSE(image_resource->GetEscherImage());
+  // Updating should be a no-op, so it shouldn't crash when passesd a null
+  // gpu_uploader, but it should also remove the dirty bit, meaning there is no
+  // additional work to do.
+  image_resource->UpdateEscherImage(nullptr);
+  // Despite not updating, the resource should have a valid Escher image, since
+  // we mapped it directly with zero copies.
+  EXPECT_TRUE(image_resource->GetEscherImage());
+  // The images should have been constructed directly, not through the image
+  // factory.
+  EXPECT_EQ(0u, listener->images_created_);
+}
+
+}  // namespace test
+}  // namespace gfx
+}  // namespace scenic_impl
diff --git a/garnet/lib/ui/gfx/tests/scenic_pixel_test.cc b/garnet/lib/ui/gfx/tests/scenic_pixel_test.cc
index be8703d..fba25b0 100644
--- a/garnet/lib/ui/gfx/tests/scenic_pixel_test.cc
+++ b/garnet/lib/ui/gfx/tests/scenic_pixel_test.cc
@@ -20,6 +20,7 @@
 #include <lib/sys/cpp/testing/test_with_environment.h>
 #include <lib/ui/scenic/cpp/session.h>
 #include <lib/ui/scenic/cpp/view_token_pair.h>
+#include <lib/images/cpp/images.h>
 #include <src/lib/fxl/logging.h>
 #include <zircon/status.h>
 
@@ -35,11 +36,18 @@
 #include "src/ui/lib/escher/impl/vulkan_utils.h"
 #include "src/ui/lib/escher/hmd/pose_buffer.h"
 #include "src/ui/lib/escher/test/gtest_vulkan.h"
+#include "garnet/lib/ui/yuv/yuv.h"
 
 namespace {
 
 constexpr char kEnvironment[] = "ScenicPixelTest";
 constexpr zx::duration kTimeout = zx::sec(15);
+// If you change the size of YUV buffers, make sure that the YUV test in
+// host_image_unittest.cc is also updated. Unlike that unit test,
+// scenic_pixel_test.cc has no way to confirm that it is going through the
+// direct-to-GPU path.
+// TODO(SCN-1387): This number needs to be queried via sysmem or vulkan.
+constexpr uint32_t kYuvSize = 64;
 
 // These tests need Scenic and RootPresenter at minimum, which expand to the
 // dependencies below. Using |TestWithEnvironment|, we use
@@ -173,6 +181,71 @@
 
   EXPECT_GT(histogram[scenic::BackgroundView::kBackgroundColor], 0u);
   histogram.erase(scenic::BackgroundView::kBackgroundColor);
+  // This assert is written this way so that, when it fails, it prints out all
+  // the unexpected colors
+  EXPECT_EQ((std::map<scenic::Color, size_t>){}, histogram)
+      << "Unexpected colors";
+}
+
+TEST_F(ScenicPixelTest, NV12Texture) {
+  scenic::BackgroundView view(CreatePresentationContext());
+  fuchsia::images::ImageInfo image_info{
+      .width = kYuvSize,
+      .height = kYuvSize,
+      .stride = static_cast<uint32_t>(
+          kYuvSize *
+          images::StrideBytesPerWidthPixel(fuchsia::images::PixelFormat::NV12)),
+      .pixel_format = fuchsia::images::PixelFormat::NV12,
+  };
+
+  uint32_t num_pixels = image_info.width * image_info.height;
+  uint64_t image_vmo_bytes = images::ImageSize(image_info);
+  EXPECT_EQ((3 * num_pixels) / 2, image_vmo_bytes);
+
+  zx::vmo image_vmo;
+  zx_status_t status = zx::vmo::create(image_vmo_bytes, 0, &image_vmo);
+  EXPECT_EQ(ZX_OK, status);
+  uint8_t* vmo_base;
+  status = zx::vmar::root_self()->map(0, image_vmo, 0, image_vmo_bytes,
+                                      ZX_VM_PERM_WRITE | ZX_VM_PERM_READ,
+                                      reinterpret_cast<uintptr_t*>(&vmo_base));
+  EXPECT_EQ(ZX_OK, status);
+
+  static const uint8_t kYValue = 110;
+  static const uint8_t kUValue = 192;
+  static const uint8_t kVValue = 192;
+
+  // Set all the Y pixels at full res.
+  for (uint32_t i = 0; i < num_pixels; ++i) {
+    vmo_base[i] = kYValue;
+  }
+
+  // Set all the UV pixels pairwise at half res.
+  for (uint32_t i = num_pixels; i < num_pixels + num_pixels / 2; i += 2) {
+    vmo_base[i] = kUValue;
+    vmo_base[i + 1] = kVValue;
+  }
+
+  view.SetHostImage(std::move(image_vmo), image_vmo_bytes, image_info);
+  RunUntilPresent(&view);
+
+  fuchsia::ui::scenic::ScreenshotData screenshot = TakeScreenshot();
+
+  EXPECT_GT(screenshot.info.width, 0u);
+  EXPECT_GT(screenshot.info.height, 0u);
+
+  // We could assert on each pixel individually, but a histogram might give us a
+  // more meaningful failure.
+  std::map<scenic::Color, size_t> histogram = scenic::Histogram(screenshot);
+
+  uint8_t bgra[4];
+  yuv::YuvToBgra(kYValue, kUValue, kVValue, bgra);
+  scenic::Color color(bgra[2], bgra[1], bgra[0], bgra[3]);
+  EXPECT_GT(histogram[color], 0u);
+  histogram.erase(color);
+
+  // This assert is written this way so that, when it fails, it prints out all
+  // the unexpected colors
   EXPECT_EQ((std::map<scenic::Color, size_t>){}, histogram)
       << "Unexpected colors";
 }
diff --git a/garnet/lib/ui/gfx/tests/session_test.h b/garnet/lib/ui/gfx/tests/session_test.h
index d3d2bdd..56791a9 100644
--- a/garnet/lib/ui/gfx/tests/session_test.h
+++ b/garnet/lib/ui/gfx/tests/session_test.h
@@ -5,11 +5,10 @@
 #ifndef GARNET_LIB_UI_GFX_TESTS_SESSION_TEST_H_
 #define GARNET_LIB_UI_GFX_TESTS_SESSION_TEST_H_
 
-#include "garnet/lib/ui/gfx/tests/error_reporting_test.h"
-
 #include <lib/fit/function.h>
 
 #include "garnet/lib/ui/gfx/engine/session.h"
+#include "garnet/lib/ui/gfx/tests/error_reporting_test.h"
 #include "garnet/lib/ui/gfx/tests/mocks.h"
 #include "garnet/lib/ui/scenic/event_reporter.h"
 #include "garnet/lib/ui/scenic/tests/scenic_test.h"
diff --git a/garnet/lib/ui/gfx/tests/vk_session_test.cc b/garnet/lib/ui/gfx/tests/vk_session_test.cc
index 73c56ca..f1d61eb 100644
--- a/garnet/lib/ui/gfx/tests/vk_session_test.cc
+++ b/garnet/lib/ui/gfx/tests/vk_session_test.cc
@@ -97,6 +97,8 @@
   session_context.escher_image_factory = image_factory_.get();
   session_context.release_fence_signaller = release_fence_signaller_.get();
 
+  OnSessionContextCreated(&session_context);
+
   return std::make_unique<SessionForTest>(1, std::move(session_context), this,
                                           error_reporter());
 }
diff --git a/garnet/lib/ui/gfx/tests/vk_session_test.h b/garnet/lib/ui/gfx/tests/vk_session_test.h
index b42540c..31a4201 100644
--- a/garnet/lib/ui/gfx/tests/vk_session_test.h
+++ b/garnet/lib/ui/gfx/tests/vk_session_test.h
@@ -26,6 +26,10 @@
   // |SessionTest|
   std::unique_ptr<SessionForTest> CreateSession() override;
 
+  // This function provides a mechanism for tests to inject their own objects
+  // into the SessionContext before construction.
+  virtual void OnSessionContextCreated(SessionContext* context) {}
+
  private:
   std::unique_ptr<escher::Escher> escher_;
   std::unique_ptr<escher::ImageFactoryAdapter> image_factory_;
diff --git a/garnet/lib/ui/yuv/yuv.cc b/garnet/lib/ui/yuv/yuv.cc
index 806c6cb..9e300bd 100644
--- a/garnet/lib/ui/yuv/yuv.cc
+++ b/garnet/lib/ui/yuv/yuv.cc
@@ -4,11 +4,14 @@
 
 #include "garnet/lib/ui/yuv/yuv.h"
 
+#include <algorithm>
+#include <cmath>
+
 namespace {
 
-uint8_t clip(int in) {
-  uint32_t out = in < 0 ? 0 : (uint32_t)in;
-  return out > 255 ? 255 : (out & 0xff);
+uint8_t NormalizedFloatToUnsignedByte(float in) {
+  int32_t out = std::roundf(in * 255.0f);
+  return static_cast<uint8_t>(std::clamp(out, 0, 255));
 }
 
 }  // namespace
@@ -17,13 +20,26 @@
 
 // Letting compiler decide whether to inline, for now.
 void YuvToBgra(uint8_t y_raw, uint8_t u_raw, uint8_t v_raw, uint8_t* bgra) {
-  int32_t y = 298 * (static_cast<int32_t>(y_raw) - 16);
+  // Convert from encoded space to normalized space assuming eItuNarrow.
+  int32_t y = static_cast<int32_t>(y_raw) - 16;
   int32_t u = static_cast<int32_t>(u_raw) - 128;
   int32_t v = static_cast<int32_t>(v_raw) - 128;
-  bgra[0] = clip(((y + 516 * u + 128) / 256));            // blue
-  bgra[1] = clip(((y - 208 * v - 100 * u + 128) / 256));  // green
-  bgra[2] = clip(((y + 409 * v + 128) / 256));            // red
-  bgra[3] = 0xff;                                         // alpha
+
+  // Note: Normally, we would clamp here. But some drivers do not clamp in the
+  // middle of their implementation, and this function is used for pixel tests.
+  float fy = static_cast<float>(y) / 219.0f;
+  float fu = static_cast<float>(u) / 224.0f;
+  float fv = static_cast<float>(v) / 224.0f;
+
+  // Convert from YUV to RGB using the coefficients for eYcbcr709.
+  float r = fy + 1.5748f * fv;
+  float g = fy - (0.13397432f / 0.7152f) * fu - (0.33480248f / 0.7152) * fv;
+  float b = fy + 1.8556f * fu;
+
+  bgra[0] = NormalizedFloatToUnsignedByte(b);  // blue
+  bgra[1] = NormalizedFloatToUnsignedByte(g);  // green
+  bgra[2] = NormalizedFloatToUnsignedByte(r);  // red
+  bgra[3] = 0xff;                              // alpha
 }
 
 }  // namespace yuv
diff --git a/garnet/testing/views/background_view.cc b/garnet/testing/views/background_view.cc
index 327a2b0..8a044f4 100644
--- a/garnet/testing/views/background_view.cc
+++ b/garnet/testing/views/background_view.cc
@@ -4,8 +4,8 @@
 
 #include "garnet/testing/views/background_view.h"
 
-#include <src/lib/fxl/logging.h>
 #include <lib/ui/gfx/cpp/math.h>
+#include <src/lib/fxl/logging.h>
 #include <zircon/status.h>
 
 namespace scenic {
@@ -30,6 +30,16 @@
   view_.AddChild(background_node_);
 }
 
+void BackgroundView::SetHostImage(zx::vmo vmo, uint64_t size,
+                                  fuchsia::images::ImageInfo info) {
+  Memory memory(&session_, std::move(vmo), size,
+                fuchsia::images::MemoryType::HOST_MEMORY);
+  Image image(&session_, memory.id(), 0, info);
+  Material background_material(&session_);
+  background_material.SetTexture(image);
+  background_node_.SetMaterial(background_material);
+}
+
 void BackgroundView::set_present_callback(
     Session::PresentCallback present_callback) {
   present_callback_ = std::move(present_callback);
diff --git a/garnet/testing/views/background_view.h b/garnet/testing/views/background_view.h
index 62345dbb..4a7fb13 100644
--- a/garnet/testing/views/background_view.h
+++ b/garnet/testing/views/background_view.h
@@ -32,6 +32,9 @@
   // |TestView|
   void set_present_callback(Session::PresentCallback present_callback) override;
 
+  void SetHostImage(zx::vmo vmo, uint64_t size,
+                    fuchsia::images::ImageInfo info);
+
  protected:
   Session* session() { return &session_; }
   View* view() { return &view_; }
diff --git a/sdk/lib/ui/scenic/cpp/host_memory.cc b/sdk/lib/ui/scenic/cpp/host_memory.cc
index b4fb450..d857185a 100644
--- a/sdk/lib/ui/scenic/cpp/host_memory.cc
+++ b/sdk/lib/ui/scenic/cpp/host_memory.cc
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-#include <lib/ui/scenic/cpp/host_memory.h>
+#include "lib/ui/scenic/cpp/host_memory.h"
 
 #include <lib/ui/scenic/cpp/commands.h>
 #include <lib/zx/vmar.h>
@@ -25,9 +25,10 @@
   auto data = std::make_shared<HostData>(local_vmo, 0u, size);
 
   // Drop rights before we transfer the VMO to the session manager.
-  // TODO(MA-492): Now that host-local memory may be concurrently used as
-  // device-local memory on UMA platforms, we need to keep all permissions on
-  // the duplicated vmo handle, until Vulkan can import read-only memory.
+  // TODO(MA-492, SCN-1388): Now that host-local memory may be concurrently used
+  // as device-local memory on UMA platforms, we need to keep all permissions on
+  // the duplicated vmo handle, until Vulkan can import read-only memory, and/or
+  // we know how to negotiate through sysmem for the proper permissions.
   zx::vmo remote_vmo;
   status = local_vmo.replace(ZX_RIGHT_SAME_RIGHTS, &remote_vmo);
   ZX_ASSERT_MSG(status == ZX_OK, "replace rights failed: status=%d", status);
diff --git a/src/ui/lib/escher/BUILD.gn b/src/ui/lib/escher/BUILD.gn
index 04ee4a6..26984de 100644
--- a/src/ui/lib/escher/BUILD.gn
+++ b/src/ui/lib/escher/BUILD.gn
@@ -468,6 +468,8 @@
     "vk/render_pass.cc",
     "vk/render_pass.h",
     "vk/render_pass_info.h",
+    "vk/sampler.cc",
+    "vk/sampler.h",
     "vk/shader_module.cc",
     "vk/shader_module.h",
     "vk/shader_module_template.cc",
diff --git a/src/ui/lib/escher/escher.cc b/src/ui/lib/escher/escher.cc
index 6f3f7ca..832bb7e 100644
--- a/src/ui/lib/escher/escher.cc
+++ b/src/ui/lib/escher/escher.cc
@@ -110,9 +110,9 @@
       NewMeshManager(command_buffer_pool(), transfer_command_buffer_pool(),
                      gpu_allocator(), gpu_uploader(), resource_recycler());
   pipeline_layout_cache_ =
-      std::make_unique<impl::PipelineLayoutCache>(resource_recycler()),
+      std::make_unique<impl::PipelineLayoutCache>(resource_recycler());
   render_pass_cache_ =
-      std::make_unique<impl::RenderPassCache>(resource_recycler()),
+      std::make_unique<impl::RenderPassCache>(resource_recycler());
   framebuffer_allocator_ = std::make_unique<impl::FramebufferAllocator>(
       resource_recycler(), render_pass_cache_.get());
   shader_program_factory_ = std::make_unique<DefaultShaderProgramFactory>(
@@ -141,6 +141,7 @@
   render_pass_cache_.reset();
   pipeline_layout_cache_.reset();
   mesh_manager_.reset();
+  descriptor_set_allocators_.clear();
 
   // ResourceRecyclers must be released before the CommandBufferSequencer is,
   // since they register themselves with it.
@@ -201,9 +202,8 @@
                               vk::ImageAspectFlags aspect_mask,
                               bool use_unnormalized_coordinates) {
   TRACE_DURATION("gfx", "Escher::NewTexture (from image)");
-  return fxl::MakeRefCounted<Texture>(resource_recycler(), std::move(image),
-                                      filter, aspect_mask,
-                                      use_unnormalized_coordinates);
+  return Texture::New(resource_recycler(), std::move(image), filter,
+                      aspect_mask, use_unnormalized_coordinates);
 }
 
 BufferPtr Escher::NewBuffer(vk::DeviceSize size,
@@ -228,9 +228,8 @@
                        .usage = usage_flags};
   ImagePtr image =
       gpu_allocator()->AllocateImage(resource_recycler(), image_info);
-  return fxl::MakeRefCounted<Texture>(resource_recycler(), std::move(image),
-                                      filter, aspect_flags,
-                                      use_unnormalized_coordinates);
+  return Texture::New(resource_recycler(), std::move(image), filter,
+                      aspect_flags, use_unnormalized_coordinates);
 }
 
 TexturePtr Escher::NewAttachmentTexture(vk::Format format, uint32_t width,
@@ -277,6 +276,10 @@
   // TODO(ES-103): The correct solution is not to use multiple Frames per frame.
   if (requested_type != CommandBuffer::Type::kTransfer) {
     for (auto& pair : descriptor_set_allocators_) {
+      // TODO(ES-199): Nothing calls Clear() on the DescriptorSetAllocators, so
+      // their internal allocations are currently able to grow without bound.
+      // DescriptorSets are not managed by ResourceRecyclers, so just
+      // adding a call to Clear() here would be dangerous.
       pair.second->BeginFrame();
     }
   }
@@ -297,11 +300,14 @@
 }
 
 impl::DescriptorSetAllocator* Escher::GetDescriptorSetAllocator(
-    const impl::DescriptorSetLayout& layout) {
+    const impl::DescriptorSetLayout& layout,
+    const SamplerPtr& immutable_sampler) {
   TRACE_DURATION("gfx", "escher::Escher::GetDescriptorSetAllocator");
   static_assert(sizeof(impl::DescriptorSetLayout) == 32,
                 "hash code below must be updated");
   Hasher h;
+  if (immutable_sampler)
+    h.struc(immutable_sampler->vk());
   h.u32(layout.sampled_image_mask);
   h.u32(layout.storage_image_mask);
   h.u32(layout.uniform_buffer_mask);
@@ -319,7 +325,12 @@
   }
 
   TRACE_DURATION("gfx", "escher::Escher::GetDescriptorSetAllocator[creation]");
-  auto new_allocator = new impl::DescriptorSetAllocator(vk_device(), layout);
+  auto new_allocator =
+      new impl::DescriptorSetAllocator(vk_device(), layout, immutable_sampler);
+
+  // TODO(ES-200): This hash table never decreases in size. Users of Escher that
+  // generate unique descriptor set layouts (e.g., with immutable samplers) can
+  // cause this system to cache unbounded amounts of memory.
   descriptor_set_allocators_.emplace_hint(it, hash, new_allocator);
   return new_allocator;
 }
diff --git a/src/ui/lib/escher/escher.h b/src/ui/lib/escher/escher.h
index 6654d5d..af9a7c5 100644
--- a/src/ui/lib/escher/escher.h
+++ b/src/ui/lib/escher/escher.h
@@ -101,7 +101,8 @@
   uint64_t GetNumGpuBytesAllocated();
 
   impl::DescriptorSetAllocator* GetDescriptorSetAllocator(
-      const impl::DescriptorSetLayout& layout);
+      const impl::DescriptorSetLayout& layout,
+      const SamplerPtr& immutable_sampler);
 
   // Do periodic housekeeping.  This is called by Renderer::EndFrame(), so you
   // don't need to call it if your application is constantly rendering.
diff --git a/src/ui/lib/escher/impl/compute_shader.cc b/src/ui/lib/escher/impl/compute_shader.cc
index 6466072..fd59cd3 100644
--- a/src/ui/lib/escher/impl/compute_shader.cc
+++ b/src/ui/lib/escher/impl/compute_shader.cc
@@ -197,7 +197,7 @@
   for (uint32_t i = 0; i < textures.size(); ++i) {
     descriptor_set_writes_[i].dstSet = descriptor_set;
     descriptor_image_info_[i].imageView = textures[i]->vk_image_view();
-    descriptor_image_info_[i].sampler = textures[i]->vk_sampler();
+    descriptor_image_info_[i].sampler = textures[i]->sampler()->vk();
     command_buffer->KeepAlive(textures[i]);
   }
   for (uint32_t i = 0; i < buffers.size(); ++i) {
diff --git a/src/ui/lib/escher/impl/debug_print.cc b/src/ui/lib/escher/impl/debug_print.cc
index 2235a69..30fee2b 100644
--- a/src/ui/lib/escher/impl/debug_print.cc
+++ b/src/ui/lib/escher/impl/debug_print.cc
@@ -271,13 +271,13 @@
 std::ostream& operator<<(std::ostream& str,
                          const impl::PipelineLayoutSpec& spec) {
   str << "==============PipelineLayoutSpec[\n\tattribute_mask: " << std::hex
-      << spec.attribute_mask
-      << "\n\trender_target_mask: " << spec.render_target_mask
-      << "\n\tnum_push_constant_ranges: " << spec.num_push_constant_ranges
-      << "\n\tdescriptor_set_mask: " << spec.descriptor_set_mask;
-  ForEachBitIndex(spec.descriptor_set_mask, [&](uint32_t index) {
+      << spec.attribute_mask()
+      << "\n\trender_target_mask: " << spec.render_target_mask()
+      << "\n\tnum_push_constant_ranges: " << spec.num_push_constant_ranges()
+      << "\n\tdescriptor_set_mask: " << spec.descriptor_set_mask();
+  ForEachBitIndex(spec.descriptor_set_mask(), [&](uint32_t index) {
     str << "\n=== index: " << index << " "
-        << spec.descriptor_set_layouts[index];
+        << spec.descriptor_set_layouts(index);
   });
 
   return str << "\n]";
diff --git a/src/ui/lib/escher/material/material.cc b/src/ui/lib/escher/material/material.cc
index 6020592..a68e01d 100644
--- a/src/ui/lib/escher/material/material.cc
+++ b/src/ui/lib/escher/material/material.cc
@@ -20,7 +20,7 @@
   texture_ = texture;
   if (texture) {
     image_view_ = texture_->vk_image_view();
-    sampler_ = texture_->vk_sampler();
+    sampler_ = texture_->sampler()->vk();
   } else {
     image_view_ = nullptr;
     sampler_ = nullptr;
diff --git a/src/ui/lib/escher/paper/paper_render_funcs.cc b/src/ui/lib/escher/paper/paper_render_funcs.cc
index c9a6530..7910eef 100644
--- a/src/ui/lib/escher/paper/paper_render_funcs.cc
+++ b/src/ui/lib/escher/paper/paper_render_funcs.cc
@@ -68,7 +68,10 @@
 
   // TODO(ES-158): this assumes that all meshes in this render-queue pass are
   // drawn exactly the same way.  We will need something better soon.
-  cb->SetShaderProgram(context->shader_program());
+  cb->SetShaderProgram(context->shader_program(),
+                       mesh_data->texture->sampler()->is_immutable()
+                           ? mesh_data->texture->sampler()
+                           : nullptr);
 
   // For each instance, set up per-instance state and draw.
   for (uint32_t i = 0; i < instance_count; ++i) {
diff --git a/src/ui/lib/escher/resources/resource_type_info.h b/src/ui/lib/escher/resources/resource_type_info.h
index 226d787..9762932 100644
--- a/src/ui/lib/escher/resources/resource_type_info.h
+++ b/src/ui/lib/escher/resources/resource_type_info.h
@@ -18,14 +18,15 @@
   // Concrete subclasses.
   kImage = 1 << 2,
   kImageView = 1 << 3,
-  kTexture = 1 << 4,
-  kFramebuffer = 1 << 5,
-  kBuffer = 1 << 6,
-  kMesh = 1 << 7,
-  kRenderPass = 1 << 8,
-  kPipelineLayout = 1 << 9,
-  kShaderProgram = 1 << 10,
-  kFrame = 1 << 11,
+  kSampler = 1 << 4,
+  kTexture = 1 << 5,
+  kFramebuffer = 1 << 6,
+  kBuffer = 1 << 7,
+  kMesh = 1 << 8,
+  kRenderPass = 1 << 9,
+  kPipelineLayout = 1 << 10,
+  kShaderProgram = 1 << 11,
+  kFrame = 1 << 12,
 
   // Resources defined in escher::impl namespace.
   kImplModelPipelineCache = 1 << 27,
diff --git a/src/ui/lib/escher/test/vk/shader_program_unittest.cc b/src/ui/lib/escher/test/vk/shader_program_unittest.cc
index 4142c4e..3b29ecc 100644
--- a/src/ui/lib/escher/test/vk/shader_program_unittest.cc
+++ b/src/ui/lib/escher/test/vk/shader_program_unittest.cc
@@ -10,6 +10,7 @@
 #include "src/ui/lib/escher/shape/mesh.h"
 #include "src/ui/lib/escher/test/gtest_escher.h"
 #include "src/ui/lib/escher/test/vk/vulkan_tester.h"
+#include "src/ui/lib/escher/util/image_utils.h"
 #include "src/ui/lib/escher/util/string_utils.h"
 #include "src/ui/lib/escher/vk/command_buffer.h"
 #include "src/ui/lib/escher/vk/shader_module_template.h"
@@ -19,6 +20,9 @@
 namespace {
 using namespace escher;
 
+// TODO(SCN-1387): This number needs to be queried via sysmem or vulkan.
+const uint32_t kYuvSize = 64;
+
 class ShaderProgramTest : public ::testing::Test, public VulkanTester {
  protected:
   const MeshPtr& ring_mesh1() const { return ring_mesh1_; }
@@ -211,7 +215,6 @@
   EXPECT_EQ(depth_readonly_pipeline, ObtainGraphicsPipeline(cb));
 
   // Changing to a mesh with a different layout results in a different pipeline.
-  // pipeline.
   mesh = sphere_mesh();
   ab = &mesh->attribute_buffer(0);
 
@@ -229,6 +232,50 @@
   EXPECT_NE(depth_readonly_pipeline, ObtainGraphicsPipeline(cb));
   EXPECT_NE(vk::Pipeline(), ObtainGraphicsPipeline(cb));
 
+  vk::Pipeline last_pipeline = ObtainGraphicsPipeline(cb);
+
+  // Switching to an immutable sampler changes the pipeline.
+  ImageInfo info;
+  info.width = kYuvSize;
+  info.height = kYuvSize;
+  info.format = vk::Format::eG8B8R82Plane420Unorm;
+  info.usage = vk::ImageUsageFlagBits::eSampled;
+  info.is_mutable = false;
+
+  auto yuv_image = escher->image_cache()->NewImage(info);
+  auto yuv_texture = escher->NewTexture(yuv_image, vk::Filter::eLinear);
+
+  EXPECT_TRUE(yuv_texture->sampler()->is_immutable());
+
+  cb->SetShaderProgram(program, yuv_texture->sampler());
+  EXPECT_NE(last_pipeline, ObtainGraphicsPipeline(cb));
+  EXPECT_NE(vk::Pipeline(), ObtainGraphicsPipeline(cb));
+
+  auto yuv_pipeline = ObtainGraphicsPipeline(cb);
+  last_pipeline = ObtainGraphicsPipeline(cb);
+
+  // Using the same sampler does not.
+  cb->SetShaderProgram(program, yuv_texture->sampler());
+  EXPECT_EQ(last_pipeline, ObtainGraphicsPipeline(cb));
+  EXPECT_NE(vk::Pipeline(), ObtainGraphicsPipeline(cb));
+
+  last_pipeline = ObtainGraphicsPipeline(cb);
+
+  // Using a different sampler does cause the pipeline to change, because
+  // immutable samplers require custom descriptor sets, and pipelines are bound
+  // to specific descriptor sets at construction time.
+  cb->SetShaderProgram(program, noise_texture->sampler());
+  EXPECT_NE(last_pipeline, ObtainGraphicsPipeline(cb));
+  EXPECT_NE(vk::Pipeline(), ObtainGraphicsPipeline(cb));
+
+  last_pipeline = ObtainGraphicsPipeline(cb);
+
+  // Using the previous YUV sampler reuses the old pipeline.
+  cb->SetShaderProgram(program, yuv_texture->sampler());
+  EXPECT_NE(last_pipeline, ObtainGraphicsPipeline(cb));
+  EXPECT_NE(vk::Pipeline(), ObtainGraphicsPipeline(cb));
+  EXPECT_EQ(yuv_pipeline, ObtainGraphicsPipeline(cb));
+
   cb->EndRenderPass();
 
   // TODO(ES-83): ideally only submitted CommandBuffers would need to be
diff --git a/src/ui/lib/escher/third_party/granite/vk/command_buffer.cc b/src/ui/lib/escher/third_party/granite/vk/command_buffer.cc
index 0cd6b74..d8ce049 100644
--- a/src/ui/lib/escher/third_party/granite/vk/command_buffer.cc
+++ b/src/ui/lib/escher/third_party/granite/vk/command_buffer.cc
@@ -295,10 +295,10 @@
 
   b->image.fp.imageLayout = vk_layout;
   b->image.fp.imageView = texture->vk_float_view();
-  b->image.fp.sampler = texture->vk_sampler();
+  b->image.fp.sampler = texture->sampler()->vk();
   b->image.integer.imageLayout = vk_layout;
   b->image.integer.imageView = texture->vk_integer_view();
-  b->image.integer.sampler = texture->vk_sampler();
+  b->image.integer.sampler = texture->sampler()->vk();
   set->uids[binding] = texture->uid();
   // TODO(ES-83): if we reify Samplers as a separate resource type,
   // then use the sampler's uid instead of the texture.
@@ -394,9 +394,9 @@
     // The push constants were invalidated (perhaps by being explicitly set, or
     // perhaps by a change in the descriptor set layout; it doesn't matter).
     uint32_t num_ranges =
-        current_pipeline_layout_->spec().num_push_constant_ranges;
+        current_pipeline_layout_->spec().num_push_constant_ranges();
     for (unsigned i = 0; i < num_ranges; ++i) {
-      auto& range = current_pipeline_layout_->spec().push_constant_ranges[i];
+      auto& range = current_pipeline_layout_->spec().push_constant_ranges()[i];
       vk().pushConstants(current_vk_pipeline_layout_, range.stageFlags,
                          range.offset, range.size,
                          bindings_.push_constant_data + range.offset);
@@ -442,7 +442,7 @@
 void CommandBuffer::FlushDescriptorSets() {
   TRACE_DURATION("gfx", "escher::CommandBuffer::FlushDescriptorSets");
   auto& spec = current_pipeline_layout_->spec();
-  uint32_t sets_to_flush = spec.descriptor_set_mask & dirty_descriptor_sets_;
+  uint32_t sets_to_flush = spec.descriptor_set_mask() & dirty_descriptor_sets_;
   ForEachBitIndex(sets_to_flush, [this](uint32_t set_index) {
     FlushDescriptorSet(set_index);
   });
@@ -452,7 +452,7 @@
 
 void CommandBuffer::FlushDescriptorSet(uint32_t set_index) {
   const impl::DescriptorSetLayout& set_layout =
-      current_pipeline_layout_->spec().descriptor_set_layouts[set_index];
+      current_pipeline_layout_->spec().descriptor_set_layouts(set_index);
 
   const auto set_bindings = GetDescriptorSetBindings(set_index);
   uint32_t num_dynamic_offsets = 0;
@@ -774,12 +774,16 @@
                       value, aspect);
 }
 
-void CommandBuffer::SetShaderProgram(ShaderProgram* program) {
+void CommandBuffer::SetShaderProgram(ShaderProgram* program,
+                                     const SamplerPtr& immutable_sampler) {
   // TODO(ES-83): checking the uid() isn't really necessary since we're using
   // ref-counted pointers... a pointer comparison would be enough.  This is a
   // general difference between Escher and the original Granite code; we should
   // come up with a general design philosophy and stick to it.
-  if (current_program_ && current_program_->uid() == program->uid()) {
+  if (current_program_ && current_program_->uid() == program->uid() &&
+      current_pipeline_layout_ &&
+      current_pipeline_layout_->spec().immutable_sampler() ==
+          immutable_sampler) {
     return;
   }
 
@@ -797,7 +801,8 @@
   SetDirty(kDirtyPipelineBit | kDirtyDynamicBits);
 
   auto old_pipeline_layout = current_pipeline_layout_;
-  current_pipeline_layout_ = program->pipeline_layout();
+  current_pipeline_layout_ = program->ObtainPipelineLayout(immutable_sampler);
+
   current_vk_pipeline_layout_ = current_pipeline_layout_->vk();
   impl_->KeepAlive(current_pipeline_layout_);
 
@@ -810,8 +815,8 @@
 
     // If the push constant layout changes, all descriptor sets
     // are invalidated.
-    if (new_spec.push_constant_layout_hash !=
-        old_spec.push_constant_layout_hash) {
+    if (new_spec.push_constant_layout_hash() !=
+        old_spec.push_constant_layout_hash()) {
       dirty_descriptor_sets_ = ~0u;
       SetDirty(kDirtyPushConstantsBit);
     } else {
diff --git a/src/ui/lib/escher/third_party/granite/vk/command_buffer.h b/src/ui/lib/escher/third_party/granite/vk/command_buffer.h
index 6a32075..e02d29d 100644
--- a/src/ui/lib/escher/third_party/granite/vk/command_buffer.h
+++ b/src/ui/lib/escher/third_party/granite/vk/command_buffer.h
@@ -224,10 +224,18 @@
   void SetToDefaultState(DefaultState state);
 
   // Set the ShaderProgram that will be used to obtain the VkPipeline to be used
-  // by the next draw-call or compute dispatch.
-  void SetShaderProgram(ShaderProgram* program);
-  void SetShaderProgram(const ShaderProgramPtr& program) {
-    SetShaderProgram(program.get());
+  // by the next draw-call or compute dispatch. If a valid vk::Sampler object is
+  // passed in, that sampler will be used as the immutable sampler for every
+  // sampler descriptor set in the associated PipelineLayout.
+  //
+  // TODO(ES-202): This code-flow assumes that ShaderPrograms source from, at
+  // most, a single sampler. This is a blocking bug for implementing, e.g.,
+  // ES-159.
+  void SetShaderProgram(ShaderProgram* program,
+                        const SamplerPtr& immutable_sampler = nullptr);
+  void SetShaderProgram(const ShaderProgramPtr& program,
+                        const SamplerPtr& immutable_sampler = nullptr) {
+    SetShaderProgram(program.get(), immutable_sampler);
   }
 
   // Set the viewport.  Must be called within a render pass.
diff --git a/src/ui/lib/escher/third_party/granite/vk/command_buffer_pipeline_state.cc b/src/ui/lib/escher/third_party/granite/vk/command_buffer_pipeline_state.cc
index c9d6f86..75b5b0f 100644
--- a/src/ui/lib/escher/third_party/granite/vk/command_buffer_pipeline_state.cc
+++ b/src/ui/lib/escher/third_party/granite/vk/command_buffer_pipeline_state.cc
@@ -71,8 +71,10 @@
 vk::Pipeline CommandBufferPipelineState::FlushGraphicsPipeline(
     const PipelineLayout* pipeline_layout, ShaderProgram* program) {
   Hasher h;
+  h.u64(pipeline_layout->spec().hash().val);
+
   active_vertex_bindings_ = 0;
-  uint32_t attribute_mask = pipeline_layout->spec().attribute_mask;
+  uint32_t attribute_mask = pipeline_layout->spec().attribute_mask();
   ForEachBitIndex(attribute_mask, [&](uint32_t bit) {
     h.u32(bit);
     active_vertex_bindings_ |= 1u << vertex_attributes_[bit].binding;
@@ -136,7 +138,7 @@
         render_pass->GetColorAttachmentForSubpass(current_subpass, i);
 
     if (subpass_color_attachment.attachment != VK_ATTACHMENT_UNUSED &&
-        (pipeline_layout_spec.render_target_mask & (1u << i))) {
+        (pipeline_layout_spec.render_target_mask() & (1u << i))) {
       static_assert(VulkanLimits::kNumColorAttachments * 4 <=
                         sizeof(static_state.color_write_mask) * 8,
                     "not enough bits for color mask.");
@@ -288,7 +290,7 @@
       vertex_input_bindings[VulkanLimits::kNumVertexBuffers];
   InitPipelineVertexInputStateCreateInfo(
       &vertex_input_info, vertex_input_attribs, vertex_input_bindings,
-      pipeline_layout_spec.attribute_mask, vertex_attributes_,
+      pipeline_layout_spec.attribute_mask(), vertex_attributes_,
       vertex_bindings_);
 
   // Input assembly
diff --git a/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.cc b/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.cc
index a8f5f3e..ff18620 100644
--- a/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.cc
+++ b/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.cc
@@ -45,49 +45,31 @@
     // TODO(ES-83): don't ask for an allocator if the set is masked?
     // Would be nice, but then we wouldn't have a layout available for the
     // skipped sets.
-    descriptor_set_allocators_[i] =
-        escher()->GetDescriptorSetAllocator(spec.descriptor_set_layouts[i]);
+    descriptor_set_allocators_[i] = escher()->GetDescriptorSetAllocator(
+        spec.descriptor_set_layouts(i), spec.immutable_sampler());
     set_layouts[i] = descriptor_set_allocators_[i]->vk_layout();
-    if (spec.descriptor_set_mask & (1u << i)) {
+    if (spec.descriptor_set_mask() & (1u << i)) {
+      // When creating a layout via vk::Device::CreatePipelineLayout(), Vulkan
+      // uses the index within the array of vk::DescriptorSetLayouts as the
+      // index of that descriptor set.  In other words, if your GLSL code
+      // mentions (descriptor_set = 3, binding = 2), then the corresponding
+      // vk::DescriptorSetLayout *must* have been at index 3 of the array passed
+      // to CreatePipelineLayout().
+      //
+      // So this counter has to equal the index of the highest-referenced
+      // descriptor set, plus one.
       num_set_layouts = i + 1;
     }
   }
-
-  unsigned num_ranges = 0;
-  vk::PushConstantRange ranges[EnumCount<ShaderStage>()];
-
-  for (auto& range : spec.push_constant_ranges) {
-    if (range.size != 0) {
-      bool unique = true;
-      for (unsigned i = 0; i < num_ranges; i++) {
-        // Try to merge equivalent ranges for multiple stages.
-        // TODO(ES-83): what to do about overlapping ranges?  Should DCHECK?
-        if (ranges[i].offset == range.offset && ranges[i].size == range.size) {
-          unique = false;
-          ranges[i].stageFlags |= range.stageFlags;
-          break;
-        }
-      }
-
-      if (unique)
-        ranges[num_ranges++] = range;
-    }
-  }
-
-  static_assert(sizeof(spec_.push_constant_ranges) == sizeof(ranges), "");
-  FXL_DCHECK(num_ranges < EnumCount<ShaderStage>());
-  memcpy(spec_.push_constant_ranges, ranges, num_ranges * sizeof(ranges[0]));
-  spec_.num_push_constant_ranges = num_ranges;
-
   vk::PipelineLayoutCreateInfo info;
   if (num_set_layouts) {
     info.setLayoutCount = num_set_layouts;
     info.pSetLayouts = set_layouts;
   }
 
-  if (num_ranges) {
-    info.pushConstantRangeCount = num_ranges;
-    info.pPushConstantRanges = ranges;
+  if (spec.num_push_constant_ranges()) {
+    info.pushConstantRangeCount = spec.num_push_constant_ranges();
+    info.pPushConstantRanges = spec.push_constant_ranges().data();
   }
 
   pipeline_layout_ =
diff --git a/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.h b/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.h
index 7840db1..0df86fb 100644
--- a/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.h
+++ b/src/ui/lib/escher/third_party/granite/vk/pipeline_layout.h
@@ -31,43 +31,105 @@
 #include "src/ui/lib/escher/forward_declarations.h"
 #include "src/ui/lib/escher/resources/resource.h"
 #include "src/ui/lib/escher/third_party/granite/vk/descriptor_set_layout.h"
+#include "src/ui/lib/escher/util/bit_ops.h"
 #include "src/ui/lib/escher/util/debug_print.h"
 #include "src/ui/lib/escher/util/enum_count.h"
-#include "src/ui/lib/escher/util/hash.h"
+#include "src/ui/lib/escher/util/hashable.h"
+#include "src/ui/lib/escher/util/hasher.h"
+#include "src/ui/lib/escher/vk/sampler.h"
 #include "src/ui/lib/escher/vk/shader_stage.h"
 #include "src/ui/lib/escher/vk/vulkan_limits.h"
 
 namespace escher {
 namespace impl {
 
-// Aggregate the ShaderModuleResourceLayouts of all ShaderModules that are used
-// to create a pipeline.
-struct PipelineLayoutSpec {
-  uint32_t attribute_mask = 0;
+// Aggregate the ShaderModuleResourceLayouts of all ShaderModules that are
+// used to create a pipeline.
+struct PipelineLayoutSpec : public Hashable {
+ public:
+  static constexpr size_t kMaxPushConstantRanges = EnumCount<ShaderStage>();
+
+  PipelineLayoutSpec(
+      uint32_t attribute_mask, uint32_t render_target_mask,
+      const std::array<DescriptorSetLayout, VulkanLimits::kNumDescriptorSets>&
+          layouts,
+      const std::array<vk::PushConstantRange, kMaxPushConstantRanges>& ranges,
+      uint32_t num_ranges, SamplerPtr immutable_sampler)
+      : immutable_sampler_(immutable_sampler),
+        attribute_mask_(attribute_mask),
+        render_target_mask_(render_target_mask),
+        descriptor_set_layouts_(layouts),
+        num_push_constant_ranges_(num_ranges),
+        push_constant_ranges_(ranges) {
+
+    FXL_DCHECK(num_ranges < kMaxPushConstantRanges);
+    for (uint32_t i = 0; i < VulkanLimits::kNumDescriptorSets; ++i) {
+      if (descriptor_set_layouts_[i].stages) {
+        descriptor_set_mask_ |= 1u << i;
+      }
+    }
+
+    // Compute a hash to quickly decide whether all descriptor sets must be
+    // invalidated.
+    Hasher h;
+    for (uint32_t i = 0; i < num_ranges; ++i) {
+      h.struc(push_constant_ranges_[i]);
+    }
+    push_constant_layout_hash_ = h.value();
+  }
+
+  const SamplerPtr& immutable_sampler() const { return immutable_sampler_; }
+
+  uint32_t attribute_mask() const { return attribute_mask_; }
+  uint32_t render_target_mask() const { return render_target_mask_; }
+
+  uint32_t descriptor_set_mask() const { return descriptor_set_mask_; }
+  const DescriptorSetLayout&
+  descriptor_set_layouts(uint32_t index) const {
+    return descriptor_set_layouts_[index];
+  }
+  uint32_t num_push_constant_ranges() const {
+    return num_push_constant_ranges_;
+  }
+  const std::array<vk::PushConstantRange, kMaxPushConstantRanges>&
+  push_constant_ranges() const {
+    return push_constant_ranges_;
+  }
+
+  Hash push_constant_layout_hash() const { return push_constant_layout_hash_; }
+
+  bool operator==(const PipelineLayoutSpec& other) const;
+
+ private:
+  // |Hashable|
+  Hash GenerateHash() const override;
+
+  SamplerPtr immutable_sampler_;
+  uint32_t attribute_mask_ = 0;
   // TODO(ES-83): document.
-  uint32_t render_target_mask = 0;
-  DescriptorSetLayout descriptor_set_layouts[VulkanLimits::kNumDescriptorSets] =
-      {};
-  vk::PushConstantRange push_constant_ranges[EnumCount<ShaderStage>()] = {};
-  uint32_t num_push_constant_ranges = 0;
-  uint32_t descriptor_set_mask = 0;
+  uint32_t render_target_mask_ = 0;
+  uint32_t descriptor_set_mask_ = 0;
+  std::array<DescriptorSetLayout, VulkanLimits::kNumDescriptorSets>
+      descriptor_set_layouts_ = {};
+  uint32_t num_push_constant_ranges_ = 0;
+  std::array<vk::PushConstantRange, kMaxPushConstantRanges>
+      push_constant_ranges_ = {};
 
   // Allows quick comparison to decide whether the push constant ranges have
   // changed.  If so, all descriptor sets are invalidated.
   // TODO(ES-83): I remember reading why this is necessary... we should
   // make note of the section of the Vulkan spec that requires this.
-  Hash push_constant_layout_hash = {0};
-
-  bool operator==(const PipelineLayoutSpec& other) const;
+  Hash push_constant_layout_hash_ = {0};
 };
 
 // TODO(ES-83): extend downward to enclose PipelineLayout.  Cannot do this yet
 // because there is already a PipelineLayout in impl/vk.
 }  // namespace impl
 
-// A PipelineLayout encapsulates a VkPipelineLayout object, as well as an array
-// of DescriptorSetAllocators that are configured to allocate descriptor sets
-// that match the sets required, at each index, by pipelines with this layout.
+// A PipelineLayout encapsulates a VkPipelineLayout object, as well as an
+// array of DescriptorSetAllocators that are configured to allocate descriptor
+// sets that match the sets required, at each index, by pipelines with this
+// layout.
 //
 // TODO(ES-83): does this need to be a Resource?  If these are always
 // reffed by pipelines that use them, then it should suffice to keep those
@@ -93,7 +155,9 @@
 
  private:
   vk::PipelineLayout pipeline_layout_;
-  impl::PipelineLayoutSpec spec_;
+  // This PipelineLayoutSpec will be used for hashes and equality tests, so it
+  // should match the construction parameter and not be mutated.
+  const impl::PipelineLayoutSpec spec_;
   impl::DescriptorSetAllocator*
       descriptor_set_allocators_[VulkanLimits::kNumDescriptorSets] = {};
 };
@@ -104,15 +168,33 @@
 
 inline bool impl::PipelineLayoutSpec::operator==(
     const impl::PipelineLayoutSpec& other) const {
-  return attribute_mask == other.attribute_mask &&
-         render_target_mask == other.render_target_mask &&
-         descriptor_set_mask == other.descriptor_set_mask &&
-         push_constant_layout_hash == other.push_constant_layout_hash &&
-         num_push_constant_ranges == other.num_push_constant_ranges &&
-         0 == memcmp(descriptor_set_layouts, other.descriptor_set_layouts,
-                     sizeof(descriptor_set_layouts)) &&
-         0 == memcmp(push_constant_ranges, other.push_constant_ranges,
-                     sizeof(push_constant_ranges));
+  return immutable_sampler_ == other.immutable_sampler_ &&
+         attribute_mask_ == other.attribute_mask_ &&
+         render_target_mask_ == other.render_target_mask_ &&
+         descriptor_set_mask_ == other.descriptor_set_mask_ &&
+         descriptor_set_layouts_ == other.descriptor_set_layouts_ &&
+         num_push_constant_ranges_ == other.num_push_constant_ranges_ &&
+         push_constant_ranges_ == other.push_constant_ranges_ &&
+         push_constant_layout_hash_ == other.push_constant_layout_hash_;
+}
+
+inline Hash impl::PipelineLayoutSpec::GenerateHash() const {
+  Hasher h;
+
+  h.struc(immutable_sampler_);
+  h.u32(attribute_mask_);
+  h.u32(render_target_mask_);
+
+  h.u32(descriptor_set_mask_);
+  ForEachBitIndex(descriptor_set_mask_, [&](uint32_t index) {
+    h.struc(descriptor_set_layouts_[index]);
+  });
+
+  // Instead of hashing the push constant ranges again, just hash the hash of
+  // the push constant ranges that was already computed in the constructor.
+  h.u64(push_constant_layout_hash_.val);
+
+  return h.value();
 }
 
 ESCHER_DEBUG_PRINTABLE(impl::PipelineLayoutSpec);
diff --git a/src/ui/lib/escher/third_party/granite/vk/shader_utils.cc b/src/ui/lib/escher/third_party/granite/vk/shader_utils.cc
index 1ecbdaf..3e936d9 100644
--- a/src/ui/lib/escher/third_party/granite/vk/shader_utils.cc
+++ b/src/ui/lib/escher/third_party/granite/vk/shader_utils.cc
@@ -29,6 +29,7 @@
 
 #include "src/ui/lib/escher/third_party/granite/vk/pipeline_layout.h"
 #include "src/ui/lib/escher/util/enum_cast.h"
+#include "src/ui/lib/escher/util/hasher.h"
 #include "src/ui/lib/escher/vk/shader_module.h"
 #include "third_party/spirv-cross/spirv_cross.hpp"
 
@@ -125,21 +126,29 @@
   }
 }
 
-void GeneratePipelineLayoutSpec(
+PipelineLayoutSpec GeneratePipelineLayoutSpec(
     const std::array<ShaderModulePtr, EnumCount<ShaderStage>()>& shader_modules,
-    PipelineLayoutSpec* spec) {
+    const SamplerPtr& immutable_sampler) {
+  uint32_t attribute_mask = 0;
   if (auto& vertex_module = shader_modules[EnumCast(ShaderStage::kVertex)]) {
-    spec->attribute_mask =
+    attribute_mask =
         vertex_module->shader_module_resource_layout().attribute_mask;
   }
+  uint32_t render_target_mask = 0;
   if (auto& fragment_module =
           shader_modules[EnumCast(ShaderStage::kFragment)]) {
-    spec->render_target_mask =
+    render_target_mask =
         fragment_module->shader_module_resource_layout().render_target_mask;
   }
 
-  spec->descriptor_set_mask = 0;
+  std::array<DescriptorSetLayout, VulkanLimits::kNumDescriptorSets>
+      descriptor_set_layouts;
 
+  // Store the initial push constant ranges locally, then de-dup further down.
+  std::array<vk::PushConstantRange, PipelineLayoutSpec::kMaxPushConstantRanges>
+      raw_ranges = {};
+  std::array<vk::PushConstantRange, PipelineLayoutSpec::kMaxPushConstantRanges>
+      push_constant_ranges = {};
   for (uint32_t i = 0; i < EnumCount<ShaderStage>(); ++i) {
     auto& module = shader_modules[i];
     if (!module)
@@ -148,7 +157,7 @@
     auto& module_layout = module->shader_module_resource_layout();
 
     for (uint32_t set = 0; set < VulkanLimits::kNumDescriptorSets; ++set) {
-      impl::DescriptorSetLayout& pipe_dsl = spec->descriptor_set_layouts[set];
+      impl::DescriptorSetLayout& pipe_dsl = descriptor_set_layouts[set];
       const impl::DescriptorSetLayout& mod_dsl = module_layout.sets[set];
 
       pipe_dsl.sampled_image_mask |= mod_dsl.sampled_image_mask;
@@ -161,17 +170,36 @@
       pipe_dsl.stages |= mod_dsl.stages;
     }
 
-    spec->push_constant_ranges[i].stageFlags =
-        ShaderStageToFlags(ShaderStage(i));
-    spec->push_constant_ranges[i].offset = module_layout.push_constant_offset;
-    spec->push_constant_ranges[i].size = module_layout.push_constant_range;
+    raw_ranges[i].stageFlags = ShaderStageToFlags(ShaderStage(i));
+    raw_ranges[i].offset = module_layout.push_constant_offset;
+    raw_ranges[i].size = module_layout.push_constant_range;
   }
 
-  for (uint32_t i = 0; i < VulkanLimits::kNumDescriptorSets; ++i) {
-    if (spec->descriptor_set_layouts[i].stages) {
-      spec->descriptor_set_mask |= 1u << i;
+  unsigned num_ranges = 0;
+  for (auto& range : raw_ranges) {
+    if (range.size != 0) {
+      bool unique = true;
+      for (unsigned i = 0; i < num_ranges; i++) {
+        // Try to merge equivalent ranges for multiple stages.
+        // TODO(ES-83): what to do about overlapping ranges?  Should DCHECK?
+        if (push_constant_ranges[i].offset == range.offset &&
+            push_constant_ranges[i].size == range.size) {
+          unique = false;
+          push_constant_ranges[i].stageFlags |= range.stageFlags;
+          break;
+        }
+      }
+
+      if (unique)
+        push_constant_ranges[num_ranges++] = range;
     }
   }
+
+  uint32_t num_push_constant_ranges = num_ranges;
+
+  return PipelineLayoutSpec(attribute_mask, render_target_mask,
+                            descriptor_set_layouts, push_constant_ranges,
+                            num_push_constant_ranges, immutable_sampler);
 }
 
 }  // namespace impl
diff --git a/src/ui/lib/escher/third_party/granite/vk/shader_utils.h b/src/ui/lib/escher/third_party/granite/vk/shader_utils.h
index c5b5306..745d9257 100644
--- a/src/ui/lib/escher/third_party/granite/vk/shader_utils.h
+++ b/src/ui/lib/escher/third_party/granite/vk/shader_utils.h
@@ -29,6 +29,7 @@
 #include <array>
 
 #include "src/ui/lib/escher/util/enum_count.h"
+#include "src/ui/lib/escher/vk/sampler.h"
 #include "src/ui/lib/escher/vk/shader_module.h"
 #include "src/ui/lib/escher/vk/shader_stage.h"
 
@@ -46,9 +47,9 @@
 // Generate a PipelineLayoutSpec using each non-null ShaderStage's
 // ShaderModuleResourceLayout.
 struct PipelineLayoutSpec;
-void GeneratePipelineLayoutSpec(
+PipelineLayoutSpec GeneratePipelineLayoutSpec(
     const std::array<ShaderModulePtr, EnumCount<ShaderStage>()>& shader_modules,
-    PipelineLayoutSpec* spec_out);
+    const SamplerPtr& immutable_sampler);
 
 }  // namespace impl
 }  // namespace escher
diff --git a/src/ui/lib/escher/util/depth_to_color.cc b/src/ui/lib/escher/util/depth_to_color.cc
index a75d966..90000f0 100644
--- a/src/ui/lib/escher/util/depth_to_color.cc
+++ b/src/ui/lib/escher/util/depth_to_color.cc
@@ -56,9 +56,9 @@
   ImagePtr tmp_image = image_factory_->NewImage(
       {vk::Format::eR8G8B8A8Unorm, width, width, 1,
        image_flags | vk::ImageUsageFlagBits::eStorage});
-  TexturePtr tmp_texture = fxl::MakeRefCounted<Texture>(
-      escher_->resource_recycler(), tmp_image, vk::Filter::eNearest,
-      vk::ImageAspectFlagBits::eColor, true);
+  TexturePtr tmp_texture =
+      Texture::New(escher_->resource_recycler(), tmp_image,
+                   vk::Filter::eNearest, vk::ImageAspectFlagBits::eColor, true);
   command_buffer->TransitionImageLayout(tmp_image, vk::ImageLayout::eUndefined,
                                         vk::ImageLayout::eGeneral);
 
diff --git a/src/ui/lib/escher/util/image_utils.cc b/src/ui/lib/escher/util/image_utils.cc
index f99e1017..992f1f2 100644
--- a/src/ui/lib/escher/util/image_utils.cc
+++ b/src/ui/lib/escher/util/image_utils.cc
@@ -112,7 +112,8 @@
   create_info.usage = info.usage;
   create_info.sharingMode = vk::SharingMode::eExclusive;
   create_info.initialLayout = vk::ImageLayout::eUndefined;
-  create_info.flags = vk::ImageCreateFlagBits::eMutableFormat;
+  create_info.flags = info.is_mutable ? vk::ImageCreateFlagBits::eMutableFormat
+                                      : vk::ImageCreateFlags();
   return create_info;
 }
 
diff --git a/src/ui/lib/escher/vk/image.h b/src/ui/lib/escher/vk/image.h
index ea724b6..b37f70f 100644
--- a/src/ui/lib/escher/vk/image.h
+++ b/src/ui/lib/escher/vk/image.h
@@ -23,11 +23,13 @@
   vk::MemoryPropertyFlags memory_flags =
       vk::MemoryPropertyFlagBits::eDeviceLocal;
   vk::ImageTiling tiling = vk::ImageTiling::eOptimal;
+  bool is_mutable = true;
 
   bool operator==(const ImageInfo& other) const {
     return format == other.format && width == other.width &&
            height == other.height && sample_count == other.sample_count &&
-           usage == other.usage && memory_flags == other.memory_flags;
+           usage == other.usage && memory_flags == other.memory_flags &&
+           is_mutable == other.is_mutable;
   }
 
   // Transient images are neither loaded nor stored by render passes.  Instead
diff --git a/src/ui/lib/escher/vk/image_view.cc b/src/ui/lib/escher/vk/image_view.cc
index 0852a5d..9ccaf07 100644
--- a/src/ui/lib/escher/vk/image_view.cc
+++ b/src/ui/lib/escher/vk/image_view.cc
@@ -18,7 +18,7 @@
                                             ResourceType::kImageView);
 
 ImageView::ImageView(ResourceRecycler* resource_recycler, ImagePtr image,
-                     vk::ImageAspectFlags aspect_mask)
+                     vk::ImageAspectFlags aspect_mask, void* extension_data)
     : Resource(resource_recycler),
       image_(std::move(image)),
       width_(image_->width()),
@@ -39,6 +39,7 @@
   }
 
   vk::ImageViewCreateInfo view_info;
+  view_info.pNext = extension_data;
   view_info.viewType = vk::ImageViewType::e2D;
   view_info.subresourceRange.baseMipLevel = 0;
   view_info.subresourceRange.levelCount = 1;
diff --git a/src/ui/lib/escher/vk/image_view.h b/src/ui/lib/escher/vk/image_view.h
index 8bf8396..d166dca 100644
--- a/src/ui/lib/escher/vk/image_view.h
+++ b/src/ui/lib/escher/vk/image_view.h
@@ -19,9 +19,12 @@
   // Construct an ImageView, which encapsulates a newly-created VkImageView.
   // |aspect_mask| is used to create the VkImageView, and |resource_recycler|
   // guarantees that the underlying Vulkan resources are not destroyed while
-  // still referenced by a pending command buffer.
+  // still referenced by a pending command buffer. |extension_data| is used
+  // as the pNext pointer in the ImageView's construction, and is assumed to be
+  // a temporary variable.
   ImageView(ResourceRecycler* resource_recycler, ImagePtr image,
-            vk::ImageAspectFlags aspect_mask = vk::ImageAspectFlags());
+            vk::ImageAspectFlags aspect_mask = vk::ImageAspectFlags(),
+            void* extension_data = nullptr);
   ~ImageView() override;
 
   static ImageViewPtr New(ImagePtr image, vk::ImageAspectFlags aspect_mask =
diff --git a/src/ui/lib/escher/vk/impl/descriptor_set_allocator.cc b/src/ui/lib/escher/vk/impl/descriptor_set_allocator.cc
index 79e90a2..a15c389 100644
--- a/src/ui/lib/escher/vk/impl/descriptor_set_allocator.cc
+++ b/src/ui/lib/escher/vk/impl/descriptor_set_allocator.cc
@@ -10,25 +10,32 @@
 namespace escher {
 namespace impl {
 
-DescriptorSetAllocator::DescriptorSetAllocator(vk::Device device,
-                                               DescriptorSetLayout layout)
-    : cache_(device, layout) {}
+DescriptorSetAllocator::DescriptorSetAllocator(
+    vk::Device device, DescriptorSetLayout layout,
+    const SamplerPtr& immutable_sampler)
+    : cache_(device, layout, immutable_sampler) {}
 
-DescriptorSetAllocator::PoolPolicy::PoolPolicy(vk::Device device,
-                                               DescriptorSetLayout layout)
-    : vk_device_(device), layout_(layout) {
+DescriptorSetAllocator::PoolPolicy::PoolPolicy(
+    vk::Device device, DescriptorSetLayout layout,
+    const SamplerPtr& immutable_sampler)
+    : vk_device_(device),
+      layout_(layout),
+      immutable_sampler_(immutable_sampler) {
   FXL_DCHECK(layout.IsValid());
 
   std::array<vk::DescriptorSetLayoutBinding, VulkanLimits::kNumBindings>
       bindings;
   size_t num_bindings = 0;
 
+  bool has_sampled_image = false;
   for (uint32_t i = 0; i < VulkanLimits::kNumBindings; i++) {
     uint32_t index_mask = 1u << i;
 
     if (index_mask & layout.sampled_image_mask) {
-      bindings[num_bindings++] = {i, vk::DescriptorType::eCombinedImageSampler,
-                                  1, layout.stages, nullptr};
+      has_sampled_image = true;
+      bindings[num_bindings++] = {
+          i, vk::DescriptorType::eCombinedImageSampler, 1, layout.stages,
+          immutable_sampler ? &(immutable_sampler->vk()) : nullptr};
       pool_sizes_.push_back({vk::DescriptorType::eCombinedImageSampler, 0});
       continue;
     }
@@ -73,6 +80,13 @@
     }
   }
 
+  if (immutable_sampler && has_sampled_image) {
+    // TODO(ES-199): Leaving this log in for now, so we can detect when systems
+    // are OOMing due to ES-199. For most use cases, this log will trigger once.
+    FXL_LOG(INFO) << "Allocating immutable descriptor set layout, sampler = "
+                  << immutable_sampler->vk();
+  }
+
   vk::DescriptorSetLayoutCreateInfo info;
   if (num_bindings) {
     info.bindingCount = num_bindings;
diff --git a/src/ui/lib/escher/vk/impl/descriptor_set_allocator.h b/src/ui/lib/escher/vk/impl/descriptor_set_allocator.h
index ea86d54..eb9364a 100644
--- a/src/ui/lib/escher/vk/impl/descriptor_set_allocator.h
+++ b/src/ui/lib/escher/vk/impl/descriptor_set_allocator.h
@@ -10,6 +10,7 @@
 
 #include "src/ui/lib/escher/third_party/granite/vk/descriptor_set_layout.h"
 #include "src/ui/lib/escher/util/hash_cache.h"
+#include "src/ui/lib/escher/vk/sampler.h"
 
 namespace escher {
 namespace impl {
@@ -19,7 +20,8 @@
 // with FramesUntilEviction == 2.
 class DescriptorSetAllocator {
  public:
-  DescriptorSetAllocator(vk::Device device, DescriptorSetLayout layout);
+  DescriptorSetAllocator(vk::Device device, DescriptorSetLayout layout,
+                         const SamplerPtr& immutable_sampler = nullptr);
 
   void BeginFrame() { cache_.BeginFrame(); }
   void Clear() { cache_.Clear(); }
@@ -54,7 +56,8 @@
   // time. Each block is associated with a separate vk::DescriptorPool.
   class PoolPolicy {
    public:
-    PoolPolicy(vk::Device device, DescriptorSetLayout layout);
+    PoolPolicy(vk::Device device, DescriptorSetLayout layout,
+               const SamplerPtr& immutable_sampler);
     ~PoolPolicy();
 
     void InitializePoolObjectBlock(CacheItem* objects, size_t block_index,
@@ -82,6 +85,8 @@
 
     std::vector<vk::DescriptorPoolSize> pool_sizes_;
     std::map<size_t, vk::DescriptorPool> pools_;
+
+    SamplerPtr immutable_sampler_;
   };
 
   HashCache<CacheItem, PoolPolicy, 2> cache_;
diff --git a/src/ui/lib/escher/vk/impl/pipeline_layout_cache.cc b/src/ui/lib/escher/vk/impl/pipeline_layout_cache.cc
index 1ed894c..737bcd4 100644
--- a/src/ui/lib/escher/vk/impl/pipeline_layout_cache.cc
+++ b/src/ui/lib/escher/vk/impl/pipeline_layout_cache.cc
@@ -16,20 +16,14 @@
 
 const PipelineLayoutPtr& PipelineLayoutCache::ObtainPipelineLayout(
     const PipelineLayoutSpec& spec) {
-  Hasher h;
-  h.struc(spec.descriptor_set_layouts);
-  h.struc(spec.push_constant_ranges);
-  h.u32(spec.attribute_mask);
-
-  Hash hash = h.value();
-  auto it = layouts_.find(hash);
+  auto it = layouts_.find(spec.hash());
   if (it != end(layouts_)) {
     FXL_DCHECK(it->second->spec() == spec);
     return it->second;
   }
 
   auto pair = layouts_.insert(std::make_pair(
-      hash, fxl::MakeRefCounted<PipelineLayout>(recycler_, spec)));
+      spec.hash(), fxl::MakeRefCounted<PipelineLayout>(recycler_, spec)));
   return pair.first->second;
 }
 
diff --git a/src/ui/lib/escher/vk/impl/pipeline_layout_cache.h b/src/ui/lib/escher/vk/impl/pipeline_layout_cache.h
index 86a002c..c6726cc 100644
--- a/src/ui/lib/escher/vk/impl/pipeline_layout_cache.h
+++ b/src/ui/lib/escher/vk/impl/pipeline_layout_cache.h
@@ -22,6 +22,8 @@
   const PipelineLayoutPtr& ObtainPipelineLayout(
       const PipelineLayoutSpec& layout);
 
+  // TODO(ES-201): There is currently no eviction policy for this cache, as
+  // Escher never calls this function.
   void Clear();
 
  private:
diff --git a/src/ui/lib/escher/vk/sampler.cc b/src/ui/lib/escher/vk/sampler.cc
new file mode 100644
index 0000000..7b07a8b
--- /dev/null
+++ b/src/ui/lib/escher/vk/sampler.cc
@@ -0,0 +1,82 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ui/lib/escher/vk/sampler.h"
+
+#include "src/ui/lib/escher/impl/vulkan_utils.h"
+#include "src/ui/lib/escher/resources/resource_recycler.h"
+
+namespace escher {
+
+const ResourceTypeInfo Sampler::kTypeInfo("Sampler", ResourceType::kResource,
+                                          ResourceType::kSampler);
+
+Sampler::Sampler(ResourceRecycler* resource_recycler, vk::Format format,
+                 vk::Filter filter, bool use_unnormalized_coordinates)
+    : Resource(resource_recycler), is_immutable_(false) {
+  auto device = resource_recycler->vulkan_context().device;
+
+  // TODO(SCN_1403): eG8B8R82Plane420Unorm is not enough to assume NV12, but
+  // it's currently the only format we support at the sampler level.
+  if (format == vk::Format::eG8B8R82Plane420Unorm) {
+    vk::SamplerYcbcrConversionCreateInfo ycbcr_create_info;
+    ycbcr_create_info.pNext = nullptr;
+    ycbcr_create_info.format = format;
+    ycbcr_create_info.ycbcrModel = vk::SamplerYcbcrModelConversion::eYcbcr709;
+    ycbcr_create_info.ycbcrRange = vk::SamplerYcbcrRange::eItuNarrow;
+    ycbcr_create_info.components = {
+        VK_COMPONENT_SWIZZLE_IDENTITY,  // R
+        VK_COMPONENT_SWIZZLE_IDENTITY,  // G
+        VK_COMPONENT_SWIZZLE_IDENTITY,  // B
+        VK_COMPONENT_SWIZZLE_IDENTITY,  // A
+    };
+    ycbcr_create_info.xChromaOffset = vk::ChromaLocation::eCositedEven;
+    ycbcr_create_info.yChromaOffset = vk::ChromaLocation::eCositedEven;
+    ycbcr_create_info.chromaFilter = filter;
+    ycbcr_create_info.forceExplicitReconstruction = VK_FALSE;
+
+    ycbcr_conversion_ =
+        ESCHER_CHECKED_VK_RESULT(device.createSamplerYcbcrConversion(
+            ycbcr_create_info, nullptr,
+            resource_recycler->vulkan_context().loader));
+    is_immutable_ = true;
+  }
+
+  vk::SamplerCreateInfo sampler_info;
+  sampler_info.pNext = GetExtensionData();
+  sampler_info.magFilter = filter;
+  sampler_info.minFilter = filter;
+
+  sampler_info.anisotropyEnable = false;
+  sampler_info.maxAnisotropy = 1.0;
+  sampler_info.unnormalizedCoordinates = use_unnormalized_coordinates;
+  sampler_info.compareEnable = VK_FALSE;
+  sampler_info.compareOp = vk::CompareOp::eAlways;
+  sampler_info.mipLodBias = 0.0f;
+  sampler_info.minLod = 0.0f;
+  sampler_info.maxLod = 0.0f;
+  if (use_unnormalized_coordinates) {
+    sampler_info.mipmapMode = vk::SamplerMipmapMode::eNearest;
+    sampler_info.addressModeU = vk::SamplerAddressMode::eClampToEdge;
+    sampler_info.addressModeV = vk::SamplerAddressMode::eClampToEdge;
+    sampler_info.addressModeW = vk::SamplerAddressMode::eClampToEdge;
+  } else {
+    sampler_info.mipmapMode = vk::SamplerMipmapMode::eLinear;
+    sampler_info.addressModeU = vk::SamplerAddressMode::eRepeat;
+    sampler_info.addressModeV = vk::SamplerAddressMode::eRepeat;
+    sampler_info.addressModeW = vk::SamplerAddressMode::eRepeat;
+  }
+
+  sampler_ = ESCHER_CHECKED_VK_RESULT(vk_device().createSampler(sampler_info));
+}
+
+Sampler::~Sampler() {
+  if (is_immutable_) {
+    vk_device().destroySamplerYcbcrConversion(ycbcr_conversion_.conversion,
+                                              nullptr, vulkan_context().loader);
+  }
+  vk_device().destroySampler(sampler_);
+}
+
+}  // namespace escher
diff --git a/src/ui/lib/escher/vk/sampler.h b/src/ui/lib/escher/vk/sampler.h
new file mode 100644
index 0000000..17cb69d
--- /dev/null
+++ b/src/ui/lib/escher/vk/sampler.h
@@ -0,0 +1,49 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SRC_UI_LIB_ESCHER_VK_SAMPLER_H_
+#define SRC_UI_LIB_ESCHER_VK_SAMPLER_H_
+
+#include "src/ui/lib/escher/forward_declarations.h"
+#include "src/ui/lib/escher/resources/resource.h"
+
+namespace escher {
+
+// Wraps an vk::Sampler object, and exposes the extension data used to construct
+// it, so that the same extension data can be used in other contexts (e.g., when
+// creating vk::ImageView objects).
+class Sampler : public Resource {
+ public:
+  static const ResourceTypeInfo kTypeInfo;
+  const ResourceTypeInfo& type_info() const override { return kTypeInfo; }
+
+  Sampler(ResourceRecycler* resource_recycler, vk::Format format,
+          vk::Filter filter, bool use_unnormalized_coordinates = false);
+  ~Sampler() override;
+
+  const vk::Sampler& vk() const { return sampler_; }
+
+  // If this sampler is immutable, it can only be used with a descriptor
+  // set/pipeline that has been pre-configured with this sampler.
+  bool is_immutable() const { return is_immutable_; }
+
+  // If this sampler has extension data, then any ImageViews that use this
+  // sampler must be initialized with the same extension data.
+  void* GetExtensionData() {
+    return is_immutable_ ? &ycbcr_conversion_ : nullptr;
+  }
+
+ private:
+  vk::Sampler sampler_;
+  vk::SamplerYcbcrConversionInfo ycbcr_conversion_;
+  bool is_immutable_;
+
+  FXL_DISALLOW_COPY_AND_ASSIGN(Sampler);
+};
+
+typedef fxl::RefPtr<Sampler> SamplerPtr;
+
+}  // namespace escher
+
+#endif  // SRC_UI_LIB_ESCHER_VK_SAMPLER_H_
diff --git a/src/ui/lib/escher/vk/shader_program.cc b/src/ui/lib/escher/vk/shader_program.cc
index 25a76d0..2b0eb25 100644
--- a/src/ui/lib/escher/vk/shader_program.cc
+++ b/src/ui/lib/escher/vk/shader_program.cc
@@ -64,75 +64,71 @@
   for (auto& pipeline : graphics_pipelines_) {
     device.destroyPipeline(pipeline.second);
   }
-  if (compute_pipeline_) {
-    device.destroyPipeline(compute_pipeline_);
-  }
 }
 
 ShaderProgram::ShaderProgram(ResourceManager* manager) : Resource(manager) {}
 
 void ShaderProgram::OnShaderModuleUpdated(ShaderModule* shader_module) {
-  if (pipeline_layout_) {
-    // We must keep the obsolete pipelines for just as long as if this object's
-    // ref-count hit zero.  The easiest way to do this is to move them into
-    // another ShaderProgram and immediately deref it.
-    auto keep_alive = new ShaderProgram(owner());
-    keep_alive->KeepAlive(sequence_number());
-    keep_alive->pipeline_layout_ = std::move(pipeline_layout_);
-    keep_alive->graphics_pipelines_ = std::move(graphics_pipelines_);
-    keep_alive->compute_pipeline_ = compute_pipeline_;
-
-    // Clear values so that they will be lazily recreated when requested.
-    pipeline_layout_ = nullptr;
-    graphics_pipelines_.clear();
-    compute_pipeline_ = vk::Pipeline();
-
-    // Allow the ref-count to immediately hit zero.
-    fxl::AdoptRef(keep_alive);
-  }
+  ClearPipelineLayout();
+  ClearPipelineStash();
 }
 
-void ShaderProgram::ObtainPipelineLayout() {
+void ShaderProgram::ClearPipelineLayout() {
+  if (!pipeline_layout_)
+    return;
+
+  // We must keep the obsolete pipeline layout alive for just as long as it
+  // takes for this object's ref-count to hit zero.  The easiest way to do this
+  // is to move them into another ShaderProgram and immediately deref it.
+  auto keep_alive = new ShaderProgram(owner());
+  keep_alive->KeepAlive(sequence_number());
+  keep_alive->pipeline_layout_ = std::move(pipeline_layout_);
+  pipeline_layout_ = nullptr;
+
+  // Allow the ref-count to immediately hit zero.
+  fxl::AdoptRef(keep_alive);
+}
+
+void ShaderProgram::ClearPipelineStash() {
+  if (graphics_pipelines_.empty())
+    return;
+
+  // We must keep the obsolete pipelines alive for just as long as it
+  // takes for this object's ref-count to hit zero.  The easiest way to do this
+  // is to move them into another ShaderProgram and immediately deref it.
+  auto keep_alive = new ShaderProgram(owner());
+  keep_alive->KeepAlive(sequence_number());
+  keep_alive->graphics_pipelines_ = std::move(graphics_pipelines_);
+  graphics_pipelines_.clear();
+
+  // Allow the ref-count to immediately hit zero.
+  fxl::AdoptRef(keep_alive);
+}
+
+PipelineLayout* ShaderProgram::ObtainPipelineLayout(
+    const SamplerPtr& immutable_sampler) {
   TRACE_DURATION("gfx", "escher::ShaderProgram::ObtainPipelineLayout");
-  FXL_DCHECK(graphics_pipelines_.empty());
-  FXL_DCHECK(!compute_pipeline_);
+  // If we already have a pipeline layout, and the immutable sampler matches,
+  // just use that.
+  if (pipeline_layout_ &&
+      pipeline_layout_->spec().immutable_sampler() == immutable_sampler) {
+    return pipeline_layout_.get();
+  }
+
+  // Clear out the old pipeline layout, if there is one.
+  ClearPipelineLayout();
+
   FXL_DCHECK(!pipeline_layout_);
 
-  impl::PipelineLayoutSpec spec;
-  impl::GeneratePipelineLayoutSpec(shader_modules_, &spec);
+  impl::PipelineLayoutSpec spec =
+      impl::GeneratePipelineLayoutSpec(shader_modules_, immutable_sampler);
 
-  // Compute a hash to quickly decide whether all descriptor sets must be
-  // invalidated.
-  Hasher h;
-  h.struc(spec.push_constant_ranges);
-  spec.push_constant_layout_hash = h.value();
-
+  // TODO(ES-201): This code assumes we only need to cache a single pipeline
+  // layout per shader program. Immutable samplers ruin that assumption.
   pipeline_layout_ =
       escher()->pipeline_layout_cache()->ObtainPipelineLayout(spec);
 
-  // Unlike graphics pipelines, which may have multiple variants depending on
-  // e.g. stencil buffer configuration, there is only ever one variant of a
-  // compute shader.  Therefore, it can be created immediately.
-  if (auto& compute_module = GetModuleForStage(ShaderStage::kCompute)) {
-    TRACE_DURATION("gfx",
-                   "escher::ShaderProgram::ObtainPipelineLayout [create "
-                   "compute pipeline]");
-    // Need to revisit when the Vulkan spec allows compute shaders to be used
-    // within render passes.
-    FXL_DCHECK(!GetModuleForStage(ShaderStage::kVertex));
-    FXL_DCHECK(!GetModuleForStage(ShaderStage::kGeometry));
-    FXL_DCHECK(!GetModuleForStage(ShaderStage::kFragment));
-    FXL_DCHECK(!GetModuleForStage(ShaderStage::kTessellationControl));
-    FXL_DCHECK(!GetModuleForStage(ShaderStage::kTessellationEvaluation));
-
-    vk::ComputePipelineCreateInfo info;
-    info.layout = pipeline_layout_->vk();
-    info.stage.stage = vk::ShaderStageFlagBits::eCompute;
-    info.stage.module = compute_module->vk();
-    info.stage.pName = "main";
-    compute_pipeline_ = ESCHER_CHECKED_VK_RESULT(
-        vk_device().createComputePipeline(nullptr, info));
-  }
+  return pipeline_layout_.get();
 }
 
 }  // namespace escher
diff --git a/src/ui/lib/escher/vk/shader_program.h b/src/ui/lib/escher/vk/shader_program.h
index 985dd60..6180651 100644
--- a/src/ui/lib/escher/vk/shader_program.h
+++ b/src/ui/lib/escher/vk/shader_program.h
@@ -10,6 +10,7 @@
 #include "src/ui/lib/escher/util/enum_count.h"
 #include "src/ui/lib/escher/util/hash.h"
 #include "src/ui/lib/escher/util/hash_map.h"
+#include "src/ui/lib/escher/vk/sampler.h"
 #include "src/ui/lib/escher/vk/shader_module.h"
 
 namespace escher {
@@ -44,8 +45,13 @@
   // NOTE: The following public methods are called by the CommandBuffer
   // implementation, and are not useful to Escher clients.
 
-  // Return the pipeline layout common to all pipeline variants of this program.
-  PipelineLayout* pipeline_layout();
+  // Return the pipeline layout for this program, operating with the optional
+  // immutable sampler passed in (pass the null vk::Sampler handle to opt-out).
+  //
+  // TODO(ES-202): This code-flow assumes that ShaderPrograms source from, at
+  // most, a single sampler. This is a blocking bug for implementing, e.g.,
+  // ES-159.
+  PipelineLayout* ObtainPipelineLayout(const SamplerPtr& immutable_sampler);
 
   // Return the module corresponding to the specified shader stage, or nullptr
   // if the program has no shader for that stage (e.g. many graphics programs
@@ -63,29 +69,26 @@
   ShaderProgram(ResourceRecycler* resource_recycler,
                 ShaderModulePtr shader_module);
 
-  void ObtainPipelineLayout();
-
-  // Used by OnShaderModuleUpdated() as an easy way to have the ResourceRecycler
-  // keep the obsolete pipelines alive until safe to destroy them.
+  // Used by ClearCurrentPipelineLayout() and ClearPipelineStash() as an easy
+  // way to have the ResourceRecycler keep the obsolete pipelines alive until
+  // safe to destroy them.
   explicit ShaderProgram(ResourceManager* owner);
   void OnShaderModuleUpdated(ShaderModule* shader_module) override;
+  void ClearPipelineLayout();
+  void ClearPipelineStash();
 
   std::array<ShaderModulePtr, EnumCount<ShaderStage>()> shader_modules_;
+
+  // TODO(ES-201): These are effectively strong references to vk::Pipelines --
+  // it is assumed that this object will be responsible for deleting them when
+  // they go out of scope. During normal execution (e.g., without a shader
+  // refresh) this cache is never cleared.
   HashMap<Hash, vk::Pipeline> graphics_pipelines_;
-  vk::Pipeline compute_pipeline_;
 
   PipelineLayoutPtr pipeline_layout_;
 };
 
 // Inline function definitions.
-
-inline PipelineLayout* ShaderProgram::pipeline_layout() {
-  if (!pipeline_layout_) {
-    ObtainPipelineLayout();
-  }
-  return pipeline_layout_.get();
-}
-
 inline const ShaderModulePtr& ShaderProgram::GetModuleForStage(
     ShaderStage stage) const {
   FXL_DCHECK(stage != ShaderStage::kEnumCount);
diff --git a/src/ui/lib/escher/vk/texture.cc b/src/ui/lib/escher/vk/texture.cc
index cf7fc54..8f87868 100644
--- a/src/ui/lib/escher/vk/texture.cc
+++ b/src/ui/lib/escher/vk/texture.cc
@@ -15,44 +15,29 @@
                                           ResourceType::kImageView,
                                           ResourceType::kTexture);
 
-Texture::Texture(ResourceRecycler* resource_recycler, ImagePtr image,
-                 vk::Filter filter, vk::ImageAspectFlags aspect_mask,
-                 bool use_unnormalized_coordinates)
-    : ImageView(resource_recycler, std::move(image), aspect_mask) {
-  vk::SamplerCreateInfo sampler_info = {};
-  sampler_info.magFilter = filter;
-  sampler_info.minFilter = filter;
-
-  sampler_info.anisotropyEnable = false;
-  sampler_info.maxAnisotropy = 1.0;
-  sampler_info.unnormalizedCoordinates = use_unnormalized_coordinates;
-  sampler_info.compareEnable = VK_FALSE;
-  sampler_info.compareOp = vk::CompareOp::eAlways;
-  sampler_info.mipLodBias = 0.0f;
-  sampler_info.minLod = 0.0f;
-  sampler_info.maxLod = 0.0f;
-  if (use_unnormalized_coordinates) {
-    sampler_info.mipmapMode = vk::SamplerMipmapMode::eNearest;
-    sampler_info.addressModeU = vk::SamplerAddressMode::eClampToEdge;
-    sampler_info.addressModeV = vk::SamplerAddressMode::eClampToEdge;
-    sampler_info.addressModeW = vk::SamplerAddressMode::eClampToEdge;
-  } else {
-    sampler_info.mipmapMode = vk::SamplerMipmapMode::eLinear;
-    sampler_info.addressModeU = vk::SamplerAddressMode::eRepeat;
-    sampler_info.addressModeV = vk::SamplerAddressMode::eRepeat;
-    sampler_info.addressModeW = vk::SamplerAddressMode::eRepeat;
-  }
-  sampler_ = ESCHER_CHECKED_VK_RESULT(vk_device().createSampler(sampler_info));
-}
-
-Texture::~Texture() { vk_device().destroySampler(sampler_); }
-
 TexturePtr Texture::New(ResourceRecycler* resource_recycler, ImagePtr image,
                         vk::Filter filter, vk::ImageAspectFlags aspect_mask,
                         bool use_unnormalized_coordinates) {
-  return fxl::MakeRefCounted<Texture>(resource_recycler, std::move(image),
-                                      filter, aspect_mask,
-                                      use_unnormalized_coordinates);
+  SamplerPtr sampler = fxl::MakeRefCounted<Sampler>(
+      resource_recycler, image->format(), filter, use_unnormalized_coordinates);
+
+  if (sampler->is_immutable()) {
+    FXL_LOG(WARNING)
+        << "An immutable sampler was created using Texture::New. If "
+           "this happens over and over again, the system will likely OOM. "
+           "Build a separate immutable Sampler object and share it across "
+           "multiple Texture objects.";
+  }
+
+  return fxl::MakeRefCounted<Texture>(resource_recycler, std::move(sampler),
+                                      image, aspect_mask);
 }
 
+Texture::Texture(ResourceRecycler* recycler, SamplerPtr sampler, ImagePtr image,
+                 vk::ImageAspectFlags aspect_mask)
+    : ImageView(recycler, image, aspect_mask, sampler->GetExtensionData()),
+      sampler_(std::move(sampler)) {}
+
+Texture::~Texture() {}
+
 }  // namespace escher
diff --git a/src/ui/lib/escher/vk/texture.h b/src/ui/lib/escher/vk/texture.h
index 2dacf28..3cbbf13 100644
--- a/src/ui/lib/escher/vk/texture.h
+++ b/src/ui/lib/escher/vk/texture.h
@@ -8,6 +8,7 @@
 #include "src/ui/lib/escher/forward_declarations.h"
 #include "src/ui/lib/escher/resources/resource.h"
 #include "src/ui/lib/escher/vk/image_view.h"
+#include "src/ui/lib/escher/vk/sampler.h"
 
 namespace escher {
 
@@ -16,17 +17,15 @@
   static const ResourceTypeInfo kTypeInfo;
   const ResourceTypeInfo& type_info() const override { return kTypeInfo; }
 
+  Texture(ResourceRecycler* recycler, SamplerPtr sampler, ImagePtr image,
+          vk::ImageAspectFlags aspect_mask = vk::ImageAspectFlagBits::eColor);
+  ~Texture() override;
+
   // Construct a new Texture, which encapsulates a newly-created VkImageView and
   // VkSampler.  |aspect_mask| is used to create the VkImageView, and |filter|
   // and |use_unnormalized_coordinates| are used to create the VkSampler.
   // |resource_recycler| guarantees that the underlying Vulkan resources are not
   // destroyed while still referenced by a pending command buffer.
-  Texture(ResourceRecycler* resource_recycler, ImagePtr image,
-          vk::Filter filter,
-          vk::ImageAspectFlags aspect_mask = vk::ImageAspectFlagBits::eColor,
-          bool use_unnormalized_coordinates = false);
-  ~Texture() override;
-
   static TexturePtr New(
       ResourceRecycler* resource_recycler, ImagePtr image, vk::Filter filter,
       vk::ImageAspectFlags aspect_mask = vk::ImageAspectFlagBits::eColor,
@@ -34,10 +33,10 @@
 
   vk::Image vk_image() const { return image()->vk(); }
   vk::ImageView vk_image_view() const { return vk(); }
-  vk::Sampler vk_sampler() const { return sampler_; }
+  const SamplerPtr& sampler() const { return sampler_; }
 
  private:
-  vk::Sampler sampler_;
+  SamplerPtr sampler_;
 
   FXL_DISALLOW_COPY_AND_ASSIGN(Texture);
 };
diff --git a/src/ui/lib/escher/vk/vulkan_context.h b/src/ui/lib/escher/vk/vulkan_context.h
index 060e85a..b8955fa 100644
--- a/src/ui/lib/escher/vk/vulkan_context.h
+++ b/src/ui/lib/escher/vk/vulkan_context.h
@@ -15,6 +15,7 @@
   const vk::Instance instance;
   const vk::PhysicalDevice physical_device;
   const vk::Device device;
+  const vk::DispatchLoaderDynamic loader;
   // Queue that supports both graphics and compute.
   const vk::Queue queue;
   const uint32_t queue_family_index;
@@ -23,11 +24,13 @@
   const uint32_t transfer_queue_family_index;
 
   VulkanContext(vk::Instance instance, vk::PhysicalDevice physical_device,
-                vk::Device device, vk::Queue queue, uint32_t queue_family_index,
+                vk::Device device, vk::DispatchLoaderDynamic loader,
+                vk::Queue queue, uint32_t queue_family_index,
                 vk::Queue transfer_queue, uint32_t transfer_queue_family_index)
       : instance(instance),
         physical_device(physical_device),
         device(device),
+        loader(loader),
         queue(queue),
         queue_family_index(queue_family_index),
         transfer_queue(transfer_queue),
diff --git a/src/ui/lib/escher/vk/vulkan_device_queues.cc b/src/ui/lib/escher/vk/vulkan_device_queues.cc
index 65ef6ee7..cefe733 100644
--- a/src/ui/lib/escher/vk/vulkan_device_queues.cc
+++ b/src/ui/lib/escher/vk/vulkan_device_queues.cc
@@ -12,7 +12,7 @@
 namespace escher {
 
 template <typename FuncT>
-static FuncT GetDeviceProcAddr(vk::Device device, const char *func_name) {
+static FuncT GetDeviceProcAddr(vk::Device device, const char* func_name) {
   FuncT func = reinterpret_cast<FuncT>(device.getProcAddr(func_name));
   FXL_CHECK(func) << "failed to find function address for: " << func_name;
   return func;
@@ -26,7 +26,7 @@
       max_image_height(props.limits.maxImageDimension2D) {}
 
 VulkanDeviceQueues::ProcAddrs::ProcAddrs(
-    vk::Device device, const std::set<std::string> &extension_names) {
+    vk::Device device, const std::set<std::string>& extension_names) {
   if (extension_names.find(VK_KHR_SWAPCHAIN_EXTENSION_NAME) !=
       extension_names.end()) {
     GET_DEVICE_PROC_ADDR(CreateSwapchainKHR);
@@ -49,8 +49,8 @@
 
 SuitablePhysicalDeviceAndQueueFamilies
 FindSuitablePhysicalDeviceAndQueueFamilies(
-    const VulkanInstancePtr &instance,
-    const VulkanDeviceQueues::Params &params) {
+    const VulkanInstancePtr& instance,
+    const VulkanDeviceQueues::Params& params) {
   auto physical_devices = ESCHER_CHECKED_VK_RESULT(
       instance->vk_instance().enumeratePhysicalDevices());
 
@@ -64,7 +64,7 @@
                                    vk::QueueFlagBits::eGraphics |
                                    vk::QueueFlagBits::eCompute;
 
-  for (auto &physical_device : physical_devices) {
+  for (auto& physical_device : physical_devices) {
     // Look for a physical device that has all required extensions.
     if (!VulkanDeviceQueues::ValidateExtensions(
             physical_device, params.extension_names,
@@ -166,8 +166,8 @@
   queue_info[1].queueCount = 1;
   queue_info[1].pQueuePriorities = &kQueuePriority;
 
-  std::vector<const char *> extension_names;
-  for (auto &extension : params.extension_names) {
+  std::vector<const char*> extension_names;
+  for (auto& extension : params.extension_names) {
     extension_names.push_back(extension.c_str());
   }
 
@@ -272,11 +272,11 @@
 // Helper for ValidateExtensions().
 static bool ValidateExtension(
     vk::PhysicalDevice device, const std::string name,
-    const std::vector<vk::ExtensionProperties> &base_extensions,
-    const std::set<std::string> &required_layer_names) {
+    const std::vector<vk::ExtensionProperties>& base_extensions,
+    const std::set<std::string>& required_layer_names) {
   auto found =
       std::find_if(base_extensions.begin(), base_extensions.end(),
-                   [&name](const vk::ExtensionProperties &extension) {
+                   [&name](const vk::ExtensionProperties& extension) {
                      return !strncmp(extension.extensionName, name.c_str(),
                                      VK_MAX_EXTENSION_NAME_SIZE);
                    });
@@ -285,7 +285,7 @@
 
   // Didn't find the extension in the base list of extensions.  Perhaps it is
   // implemented in a layer.
-  for (auto &layer_name : required_layer_names) {
+  for (auto& layer_name : required_layer_names) {
     auto layer_extensions = ESCHER_CHECKED_VK_RESULT(
         device.enumerateDeviceExtensionProperties(layer_name));
     FXL_LOG(INFO) << "Looking for Vulkan device extension: " << name
@@ -293,7 +293,7 @@
 
     auto found =
         std::find_if(layer_extensions.begin(), layer_extensions.end(),
-                     [&name](vk::ExtensionProperties &extension) {
+                     [&name](vk::ExtensionProperties& extension) {
                        return !strncmp(extension.extensionName, name.c_str(),
                                        VK_MAX_EXTENSION_NAME_SIZE);
                      });
@@ -306,12 +306,12 @@
 
 bool VulkanDeviceQueues::ValidateExtensions(
     vk::PhysicalDevice device,
-    const std::set<std::string> &required_extension_names,
-    const std::set<std::string> &required_layer_names) {
+    const std::set<std::string>& required_extension_names,
+    const std::set<std::string>& required_layer_names) {
   auto extensions =
       ESCHER_CHECKED_VK_RESULT(device.enumerateDeviceExtensionProperties());
 
-  for (auto &name : required_extension_names) {
+  for (auto& name : required_extension_names) {
     if (!ValidateExtension(device, name, extensions, required_layer_names)) {
       FXL_LOG(WARNING) << "Vulkan has no device extension named: " << name;
       return false;
@@ -322,7 +322,7 @@
 
 VulkanContext VulkanDeviceQueues::GetVulkanContext() const {
   return escher::VulkanContext(instance_->vk_instance(), vk_physical_device(),
-                               vk_device(), vk_main_queue(),
+                               vk_device(), dispatch_loader(), vk_main_queue(),
                                vk_main_queue_family(), vk_transfer_queue(),
                                vk_transfer_queue_family());
 }