blob: b249c1c526fa37b925abe52ed862c6ee56dbae36 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/camera/bin/camera-gym/stream_cycler.h"
#include <lib/async-loop/default.h>
#include <lib/async/cpp/task.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/trace/event.h>
#include <stdint.h>
#include <zircon/types.h>
#include <sstream>
#include "src/camera/bin/camera-gym/moving_window.h"
#include "src/lib/fsl/handles/object_info.h"
namespace camera {
using Command = fuchsia::camera::gym::Command;
using SetConfigCommand = fuchsia::camera::gym::SetConfigCommand;
using AddStreamCommand = fuchsia::camera::gym::AddStreamCommand;
using SetCropCommand = fuchsia::camera::gym::SetCropCommand;
using SetResolutionCommand = fuchsia::camera::gym::SetResolutionCommand;
// Watch dog for general purpose catastrophic frame timeout. This is meant for detecting really bad
// things happening, such as sensor no longer producing frames.
constexpr zx::duration kCheckInterval = zx::msec(2000); // 2 seconds
constexpr zx::duration kFrameTimeout = zx::msec(5000); // 5 seconds
// Ratio controls how often ROI is moved.
// (1 = every frame, 2 = every other frame, etc)
constexpr uint32_t kRegionOfInterestFramesPerMoveRatio = 1;
// Sets the error handler on the provided interface to log an error and abort the process.
template <class T>
static void SetAbortOnError(fidl::InterfacePtr<T>& p, std::string message) {
p.set_error_handler([message](zx_status_t status) {
// FATAL severity causes abort to be called.
FX_PLOGS(FATAL, status) << message;
});
}
StreamCycler::StreamCycler(async_dispatcher_t* dispatcher, bool manual_mode)
: dispatcher_(dispatcher), manual_mode_(manual_mode) {
SetAbortOnError(watcher_, "fuchsia.camera3.DeviceWatcher disconnected.");
SetAbortOnError(allocator_, "fuchsia.sysmem.Allocator disconnected.");
SetAbortOnError(device_, "fuchsia.camera3.Device disconnected.");
}
StreamCycler::~StreamCycler() = default;
fpromise::result<std::unique_ptr<StreamCycler>, zx_status_t> StreamCycler::Create(
fuchsia::camera3::DeviceWatcherHandle watcher, fuchsia::sysmem::AllocatorHandle allocator,
async_dispatcher_t* dispatcher, bool manual_mode) {
auto cycler = std::unique_ptr<StreamCycler>(new StreamCycler(dispatcher, manual_mode));
zx_status_t status = cycler->watcher_.Bind(std::move(watcher), dispatcher);
if (status != ZX_OK) {
FX_PLOGS(ERROR, status);
return fpromise::error(status);
}
status = cycler->allocator_.Bind(std::move(allocator), dispatcher);
if (status != ZX_OK) {
FX_PLOGS(ERROR, status);
return fpromise::error(status);
}
cycler->watcher_->WatchDevices(
fit::bind_member(cycler.get(), &StreamCycler::WatchDevicesCallback));
return fpromise::ok(std::move(cycler));
}
void StreamCycler::SetHandlers(StreamCycler::AddCollectionHandler on_add_collection,
StreamCycler::RemoveCollectionHandler on_remove_collection,
StreamCycler::ShowBufferHandler on_show_buffer,
StreamCycler::MuteStateHandler on_mute_changed) {
add_collection_handler_ = std::move(on_add_collection);
remove_collection_handler_ = std::move(on_remove_collection);
show_buffer_handler_ = std::move(on_show_buffer);
mute_state_handler_ = std::move(on_mute_changed);
}
void StreamCycler::WatchDevicesCallback(std::vector<fuchsia::camera3::WatchDevicesEvent> events) {
for (auto& event : events) {
if (event.is_added()) {
// Connect to device.
// TODO(fxbug.dev/48506) Properly detect expected device id.
watcher_->ConnectToDevice(event.added(), device_.NewRequest(dispatcher_));
// Watch for mute changes.
device_->WatchMuteState(fit::bind_member(this, &StreamCycler::WatchMuteStateHandler));
// Fetch camera configurations
device_->GetConfigurations(
[this](std::vector<fuchsia::camera3::Configuration> configurations) {
configurations_ = std::move(configurations);
// Once we have the known camera configurations, default to the first configuration
// index. This is automatically chosen in the driver, so we do not need to ask for it.
// The callback for WatchCurrentConfiguration() will connect to all streams.
device_->WatchCurrentConfiguration(
fit::bind_member(this, &StreamCycler::WatchCurrentConfigurationCallback));
});
// Start watch dog.
async::PostDelayedTask(dispatcher_, fit::bind_member(this, &StreamCycler::CheckAllWatchdogs),
kCheckInterval);
}
}
// Hanging get.
watcher_->WatchDevices(fit::bind_member(this, &StreamCycler::WatchDevicesCallback));
}
void StreamCycler::WatchMuteStateHandler(bool software_muted, bool hardware_muted) {
muted_ = software_muted | hardware_muted;
mute_state_handler_(muted_);
device_->WatchMuteState(fit::bind_member(this, &StreamCycler::WatchMuteStateHandler));
// If we just un-muted, reset all watchdogs so we do not trip it.
if (!muted_) {
ResetAllWatchdogs();
}
}
void StreamCycler::ForceNextStreamConfiguration() {
uint32_t config_index = NextConfigIndex();
ZX_ASSERT(configurations_.size() > config_index);
ZX_ASSERT(!configurations_[config_index].streams.empty());
device_->SetCurrentConfiguration(config_index);
}
void StreamCycler::WatchCurrentConfigurationCallback(uint32_t config_index) {
// Remember the current device config_index AFTER it has been set.
current_config_index_ = config_index;
if (manual_mode_) {
// If manual mode, offer a done indication to command sequencer because the selection of a new
// device configuration just finished, but do not do anything else.
CommandSuccessNotify();
} else {
// If automatic mode, start all streams for current config, and start a timer to change to the
// next configuration after kDemoTime.
// Start connecting to all streams.
ConnectToAllStreams();
// After a specified demo period, set the next stream configuration, which will end up cutting
// off all existing streams.
if (auto_cycle_interval_) {
async::PostDelayedTask(
dispatcher_, [this]() { ForceNextStreamConfiguration(); }, *auto_cycle_interval_);
}
}
// Be ready for configuration changes.
device_->WatchCurrentConfiguration(
fit::bind_member(this, &StreamCycler::WatchCurrentConfigurationCallback));
}
void StreamCycler::ConnectToAllStreams() {
for (size_t i = 0; i < configurations_[current_config_index_].streams.size(); i++) {
ConnectToStream(current_config_index_, i);
}
}
void StreamCycler::ConnectToStream(uint32_t config_index, uint32_t stream_index) {
ZX_ASSERT(configurations_.size() > config_index);
ZX_ASSERT(configurations_[config_index].streams.size() > stream_index);
auto image_format = configurations_[config_index].streams[stream_index].image_format;
// Connect to specific stream
StreamInfo new_stream_info;
new_stream_info.last_received = zx::clock::get_monotonic(); // Initialize to current time
stream_infos_.emplace(stream_index, std::move(new_stream_info));
auto& stream = stream_infos_[stream_index].stream;
auto stream_request = stream.NewRequest(dispatcher_);
if (config_index == 1 || config_index == 2) {
stream_infos_[stream_index].source_highlight = 0;
}
// Allocate buffer collection
fuchsia::sysmem::BufferCollectionTokenPtr token_orig;
allocator_->AllocateSharedCollection(token_orig.NewRequest());
stream->SetBufferCollection(std::move(token_orig));
stream->WatchBufferCollection([this, image_format, config_index, stream_index,
&stream](fuchsia::sysmem::BufferCollectionTokenHandle token_back) {
ZX_ASSERT(image_format.coded_width > 0); // image_format must be reasonable.
ZX_ASSERT(image_format.coded_height > 0);
ZX_ASSERT(image_format.bytes_per_row > 0);
auto& stream_info = stream_infos_[stream_index];
if (add_collection_handler_) {
std::ostringstream oss;
oss << "c" << config_index << "s" << stream_index << ".data";
stream_info.add_collection_handler_returned_value =
add_collection_handler_(std::move(token_back), image_format, oss.str());
} else {
token_back.BindSync()->Close();
}
// Initialize the current image format and the current coded size.
stream_info.image_format = image_format;
if (manual_mode_) {
// If manual mode, offer a "ConnectToStream is done" indication to command sequencer.
CommandSuccessNotify();
}
// Begin watch dog deadline fresh for this stream.
ResetStreamWatchdog(stream_index);
// Kick start the stream
stream->GetNextFrame([this, stream_index](fuchsia::camera3::FrameInfo frame_info) {
OnNextFrame(stream_index, std::move(frame_info));
});
});
device_->ConnectToStream(stream_index, std::move(stream_request));
stream.set_error_handler(
[this, stream_index](zx_status_t status) { DisconnectStream(stream_index); });
}
void StreamCycler::OnNextFrame(uint32_t stream_index, fuchsia::camera3::FrameInfo frame_info) {
TRACE_DURATION("camera", "StreamCycler::OnNextFrame");
TRACE_FLOW_END("camera", "camera3::Stream::GetNextFrame",
fsl::GetKoid(frame_info.release_fence.get()));
// Reset the watchdog for this particular stream.
ResetStreamWatchdog(stream_index);
auto& stream_info = stream_infos_[stream_index];
if (show_buffer_handler_ && stream_info.add_collection_handler_returned_value) {
show_buffer_handler_(stream_info.add_collection_handler_returned_value.value(),
frame_info.buffer_index, std::move(frame_info.release_fence),
stream_info.highlight);
} else {
frame_info.release_fence.reset();
}
auto& stream = stream_infos_[stream_index].stream;
// Set the region of interest if appropriate.
ZX_ASSERT(configurations_.size() > current_config_index_);
auto& current_configuration = configurations_[current_config_index_];
ZX_ASSERT(current_configuration.streams.size() > stream_index);
auto& current_stream = current_configuration.streams[stream_index];
// If automatic mode, sequence through the crop region automatically.
if (current_stream.supports_crop_region && !manual_mode_) {
const uint32_t limit = kRegionOfInterestFramesPerMoveRatio;
static uint32_t count = 0;
++count;
if (count >= limit) {
count = 0;
auto region = moving_window_.NextWindow();
stream->WatchCropRegion([this, stream_index](std::unique_ptr<fuchsia::math::RectF> region) {
WatchCropRegionCallback(stream_index, std::move(region));
});
stream->SetCropRegion(std::make_unique<fuchsia::math::RectF>(region));
if (stream_infos_[stream_index].source_highlight) {
stream_infos_[stream_infos_[stream_index].source_highlight.value()].highlight = region;
}
}
}
stream->GetNextFrame([this, stream_index](fuchsia::camera3::FrameInfo frame_info) {
OnNextFrame(stream_index, std::move(frame_info));
});
}
void StreamCycler::WatchCropRegionCallback(uint32_t stream_index,
std::unique_ptr<fuchsia::math::RectF> region) {
CommandSuccessNotify();
}
void StreamCycler::CommandSuccessNotify() {
CommandStatusHandler command_status_handler = std::move(command_status_handler_);
if (command_status_handler) {
ZX_ASSERT(controller_dispatcher_ != nullptr);
async::PostTask(controller_dispatcher_,
[command_status_handler = std::move(command_status_handler)]() mutable {
fuchsia::camera::gym::Controller_SendCommand_Result result;
command_status_handler(result.WithResponse({}));
});
}
}
void StreamCycler::CommandFailureNotify(::fuchsia::camera::gym::CommandError status) {
CommandStatusHandler command_status_handler = std::move(command_status_handler_);
if (command_status_handler) {
ZX_ASSERT(controller_dispatcher_ != nullptr);
async::PostTask(controller_dispatcher_, [command_status_handler =
std::move(command_status_handler)]() mutable {
fuchsia::camera::gym::Controller_SendCommand_Result result;
command_status_handler(result.WithErr(::fuchsia::camera::gym::CommandError::OUT_OF_RANGE));
});
}
}
void StreamCycler::DisconnectStream(uint32_t stream_index) {
if (remove_collection_handler_) {
auto& stream_info = stream_infos_[stream_index];
if (stream_info.add_collection_handler_returned_value) {
remove_collection_handler_(stream_info.add_collection_handler_returned_value.value());
}
}
stream_infos_.erase(stream_index);
}
uint32_t StreamCycler::NextConfigIndex() {
ZX_ASSERT(configurations_.size() > 0);
ZX_ASSERT(current_config_index_ < configurations_.size());
return (current_config_index_ + 1) % configurations_.size();
}
void StreamCycler::ResetStreamWatchdog(uint32_t stream_index) {
auto& stream_info = stream_infos_[stream_index];
stream_info.last_received = zx::clock::get_monotonic();
}
void StreamCycler::ResetAllWatchdogs() {
for (auto& [stream_index, stream_info] : stream_infos_) {
stream_info.last_received = zx::clock::get_monotonic();
}
}
void StreamCycler::CheckAllWatchdogs() {
auto now = zx::clock::get_monotonic();
if (!muted_) {
for (const auto& [stream_index, info] : stream_infos_) {
ZX_ASSERT_MSG((now - info.last_received) < kFrameTimeout, "stream=%u", stream_index);
}
}
async::PostDelayedTask(dispatcher_, fit::bind_member(this, &StreamCycler::CheckAllWatchdogs),
kCheckInterval);
}
void StreamCycler::ExecuteCommand(Command command, CommandStatusHandler handler) {
async::PostTask(dispatcher_,
[this, command = std::move(command), handler = std::move(handler)]() mutable {
PostedExecuteCommand(std::move(command), std::move(handler));
});
}
void StreamCycler::PostedExecuteCommand(Command command, CommandStatusHandler handler) {
ZX_ASSERT(!command_status_handler_);
// TODO(b/180554943) - Allow for async in future. For now we only accept running one command at a
// time. Individual commands below are assume to simply kick off the command, but the command
// status handler must be called appropriately when the command has truly finished. This callback
// is done using either CommandSuccessNotify or CommandFailureNotify.
command_status_handler_ = std::move(handler);
switch (command.Which()) {
case Command::Tag::kSetConfig:
ExecuteSetConfigCommand(command.set_config());
break;
case Command::Tag::kAddStream:
ExecuteAddStreamCommand(command.add_stream());
break;
case Command::Tag::kSetCrop:
ExecuteSetCropCommand(command.set_crop());
break;
case Command::Tag::kSetResolution:
ExecuteSetResolutionCommand(command.set_resolution());
break;
default:
ZX_ASSERT(false);
}
}
// Actual execution of the "set-config" command.
void StreamCycler::ExecuteSetConfigCommand(SetConfigCommand& command) {
uint32_t config_index = command.config_id;
if (config_index >= configurations_.size()) {
FX_LOGS(INFO) << "MANUAL MODE: ERROR: config_index " << config_index << " out of range";
CommandFailureNotify(::fuchsia::camera::gym::CommandError::OUT_OF_RANGE);
return;
}
// TODO(b/180555616) - This is not a great check. It is possible for another client to change the
// configuration index for this camera device, and now "current_config_index_" is out-of-date, and
// the test could behave incorrectly. It is assumed that the engineer running this test knows
// exactly which clients have access to this camera device.
if (current_config_index_ == config_index) {
CommandSuccessNotify();
}
device_->SetCurrentConfiguration(config_index);
}
// Actual execution of the "add-stream" command.
void StreamCycler::ExecuteAddStreamCommand(AddStreamCommand& command) {
uint32_t stream_index = command.stream_id;
uint32_t config_index = current_config_index_;
ZX_ASSERT(config_index < configurations_.size());
if (stream_index >= configurations_[config_index].streams.size()) {
FX_LOGS(INFO) << "MANUAL MODE: ERROR: stream_index " << stream_index
<< " out of range for config_index " << config_index;
CommandFailureNotify(::fuchsia::camera::gym::CommandError::OUT_OF_RANGE);
return;
}
ConnectToStream(config_index, stream_index);
}
// Actual execution of the "set-crop" command.
void StreamCycler::ExecuteSetCropCommand(SetCropCommand& command) {
uint32_t stream_index = command.stream_id;
float x = command.x;
float y = command.y;
float width = command.width;
float height = command.height;
uint32_t config_index = current_config_index_;
ZX_ASSERT(config_index < configurations_.size());
if (stream_index >= configurations_[config_index].streams.size()) {
FX_LOGS(INFO) << "MANUAL MODE: ERROR: stream_index " << stream_index
<< " out of range for config_index " << config_index;
CommandFailureNotify(::fuchsia::camera::gym::CommandError::OUT_OF_RANGE);
return;
}
fuchsia::math::RectF region = {x, y, width, height};
// TODO(b/180555730) - What if the stream_info has not been created yet?
// What if the stream has not been connected yet?
auto& stream = stream_infos_[stream_index].stream;
stream->WatchCropRegion([this, stream_index](std::unique_ptr<fuchsia::math::RectF> region) {
WatchCropRegionCallback(stream_index, std::move(region));
});
stream->SetCropRegion(std::make_unique<fuchsia::math::RectF>(region));
}
// Actual execution of the "set-resolution" command.
void StreamCycler::ExecuteSetResolutionCommand(SetResolutionCommand& command) {
// TODO(fxb/60143) - Placeholder for set resolution command.
CommandSuccessNotify();
}
} // namespace camera