blob: 012a4e70e4b97f89ad880f5874164f3c955146d6 [file] [log] [blame]
#include "include/private/dvr/buffer_hub_queue_client.h"
#include <inttypes.h>
#include <log/log.h>
#include <poll.h>
#include <sys/epoll.h>
#include <array>
#include <pdx/default_transport/client_channel.h>
#include <pdx/default_transport/client_channel_factory.h>
#include <pdx/file_handle.h>
#include <private/dvr/bufferhub_rpc.h>
#define RETRY_EINTR(fnc_call) \
([&]() -> decltype(fnc_call) { \
decltype(fnc_call) result; \
do { \
result = (fnc_call); \
} while (result == -1 && errno == EINTR); \
return result; \
})()
using android::pdx::ErrorStatus;
using android::pdx::LocalChannelHandle;
using android::pdx::Status;
namespace android {
namespace dvr {
BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
: Client{pdx::default_transport::ClientChannel::Create(
std::move(channel_handle))},
meta_size_(0),
buffers_(BufferHubQueue::kMaxQueueCapacity),
epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
available_buffers_(BufferHubQueue::kMaxQueueCapacity),
fences_(BufferHubQueue::kMaxQueueCapacity),
capacity_(0),
id_(-1) {
Initialize();
}
BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
: Client{pdx::default_transport::ClientChannelFactory::Create(
endpoint_path)},
meta_size_(0),
buffers_(BufferHubQueue::kMaxQueueCapacity),
epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
available_buffers_(BufferHubQueue::kMaxQueueCapacity),
fences_(BufferHubQueue::kMaxQueueCapacity),
capacity_(0),
id_(-1) {
Initialize();
}
void BufferHubQueue::Initialize() {
int ret = epoll_fd_.Create();
if (ret < 0) {
ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
strerror(-ret));
return;
}
epoll_event event = {.events = EPOLLIN | EPOLLET,
.data = {.u64 = static_cast<uint64_t>(
BufferHubQueue::kEpollQueueEventIndex)}};
ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
if (ret < 0) {
ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
strerror(-ret));
}
}
Status<void> BufferHubQueue::ImportQueue() {
auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
if (!status) {
ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
status.GetErrorMessage().c_str());
return ErrorStatus(status.error());
} else {
SetupQueue(status.get().meta_size_bytes, status.get().id);
return {};
}
}
void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
meta_size_ = meta_size_bytes;
id_ = id;
meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
}
std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
if (auto status = CreateConsumerQueueHandle())
return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
else
return nullptr;
}
std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
if (auto status = CreateConsumerQueueHandle())
return std::unique_ptr<ConsumerQueue>(
new ConsumerQueue(status.take(), true));
else
return nullptr;
}
Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
if (!status) {
ALOGE(
"BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
"%s",
status.GetErrorMessage().c_str());
return ErrorStatus(status.error());
}
return status;
}
bool BufferHubQueue::WaitForBuffers(int timeout) {
std::array<epoll_event, kMaxEvents> events;
// Loop at least once to check for hangups.
do {
ALOGD_IF(
TRACE,
"BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
id(), count(), capacity());
// If there is already a buffer then just check for hangup without waiting.
const int ret = epoll_fd_.Wait(events.data(), events.size(),
count() == 0 ? timeout : 0);
if (ret == 0) {
ALOGI_IF(TRACE,
"BufferHubQueue::WaitForBuffers: No events before timeout: "
"queue_id=%d",
id());
return count() != 0;
}
if (ret < 0 && ret != -EINTR) {
ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
strerror(-ret));
return false;
}
const int num_events = ret;
// A BufferQueue's epoll fd tracks N+1 events, where there are N events,
// one for each buffer, in the queue and one extra event for the queue
// client itself.
for (int i = 0; i < num_events; i++) {
int64_t index = static_cast<int64_t>(events[i].data.u64);
ALOGD_IF(TRACE,
"BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
index);
if (is_buffer_event_index(index)) {
HandleBufferEvent(static_cast<size_t>(index), events[i].events);
} else if (is_queue_event_index(index)) {
HandleQueueEvent(events[i].events);
} else {
ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
index);
}
}
} while (count() == 0 && capacity() > 0 && !hung_up());
return count() != 0;
}
void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
auto buffer = buffers_[slot];
if (!buffer) {
ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
return;
}
auto status = buffer->GetEventMask(poll_events);
if (!status) {
ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
status.GetErrorMessage().c_str());
return;
}
const int events = status.get();
if (events & EPOLLIN) {
const int ret = OnBufferReady(buffer, &fences_[slot]);
if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
// Only enqueue the buffer if it moves to or is already in the state
// requested in OnBufferReady(). If the buffer is busy this means that the
// buffer moved from released to posted when a new consumer was created
// before the ProducerQueue had a chance to regain it. This is a valid
// transition that we have to handle because edge triggered poll events
// latch the ready state even if it is later de-asserted -- don't enqueue
// or print an error log in this case.
if (ret != -EBUSY)
Enqueue(buffer, slot);
} else {
ALOGE(
"BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
"queue_id=%d buffer_id=%d: %s",
id(), buffer->id(), strerror(-ret));
}
} else if (events & EPOLLHUP) {
// This might be caused by producer replacing an existing buffer slot, or
// when BufferHubQueue is shutting down. For the first case, currently the
// epoll FD is cleaned up when the replacement consumer client is imported,
// we shouldn't detach again if |epollhub_pending_[slot]| is set.
ALOGW(
"BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
"buffer event fd: %d, EPOLLHUP pending: %d",
slot, buffer->event_fd(), int{epollhup_pending_[slot]});
if (epollhup_pending_[slot]) {
epollhup_pending_[slot] = false;
} else {
DetachBuffer(slot);
}
} else {
ALOGW(
"BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
"events=%d",
slot, events);
}
}
void BufferHubQueue::HandleQueueEvent(int poll_event) {
auto status = GetEventMask(poll_event);
if (!status) {
ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
status.GetErrorMessage().c_str());
return;
}
const int events = status.get();
if (events & EPOLLIN) {
// Note that after buffer imports, if |count()| still returns 0, epoll
// wait will be tried again to acquire the newly imported buffer.
auto buffer_status = OnBufferAllocated();
if (!buffer_status) {
ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
buffer_status.GetErrorMessage().c_str());
}
} else if (events & EPOLLHUP) {
ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
hung_up_ = true;
} else {
ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
}
}
int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
size_t slot) {
if (is_full()) {
// TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
// import buffer.
ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
capacity_);
return -E2BIG;
}
if (buffers_[slot] != nullptr) {
// Replace the buffer if the slot is preoccupied. This could happen when the
// producer side replaced the slot with a newly allocated buffer. Detach the
// buffer before setting up with the new one.
DetachBuffer(slot);
epollhup_pending_[slot] = true;
}
epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
if (ret < 0) {
ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
strerror(-ret));
return ret;
}
buffers_[slot] = buf;
capacity_++;
return 0;
}
int BufferHubQueue::DetachBuffer(size_t slot) {
auto& buf = buffers_[slot];
if (buf == nullptr) {
ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
return -EINVAL;
}
const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
if (ret < 0) {
ALOGE(
"BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
"%s",
strerror(-ret));
return ret;
}
buffers_[slot] = nullptr;
capacity_--;
return 0;
}
void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
size_t slot) {
if (count() == capacity_) {
ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
return;
}
// Set slot buffer back to vector.
// TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
// the limitation of the RingBuffer we are using. Would be better to refactor
// that.
BufferInfo buffer_info(slot, meta_size_);
buffer_info.buffer = buf;
// Swap metadata loaded during onBufferReady into vector.
std::swap(buffer_info.metadata, meta_buffer_tmp_);
available_buffers_.Append(std::move(buffer_info));
}
Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
int timeout, size_t* slot, void* meta, LocalHandle* fence) {
ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
if (!WaitForBuffers(timeout))
return ErrorStatus(ETIMEDOUT);
std::shared_ptr<BufferHubBuffer> buf;
BufferInfo& buffer_info = available_buffers_.Front();
*fence = std::move(fences_[buffer_info.slot]);
// Report current pos as the output slot.
std::swap(buffer_info.slot, *slot);
// Swap buffer from vector to be returned later.
std::swap(buffer_info.buffer, buf);
// Swap metadata from vector into tmp so that we can write out to |meta|.
std::swap(buffer_info.metadata, meta_buffer_tmp_);
available_buffers_.PopFront();
if (!buf) {
ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
return ErrorStatus(ENOBUFS);
}
if (meta) {
std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
reinterpret_cast<uint8_t*>(meta));
}
return {std::move(buf)};
}
ProducerQueue::ProducerQueue(size_t meta_size)
: ProducerQueue(meta_size, 0, 0, 0, 0) {}
ProducerQueue::ProducerQueue(LocalChannelHandle handle)
: BASE(std::move(handle)) {
auto status = ImportQueue();
if (!status) {
ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
status.GetErrorMessage().c_str());
Close(-status.error());
}
}
ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
uint64_t usage_clear_mask,
uint64_t usage_deny_set_mask,
uint64_t usage_deny_clear_mask)
: BASE(BufferHubRPC::kClientPath) {
auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
meta_size, UsagePolicy{usage_set_mask, usage_clear_mask,
usage_deny_set_mask, usage_deny_clear_mask});
if (!status) {
ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
status.GetErrorMessage().c_str());
Close(-status.error());
return;
}
SetupQueue(status.get().meta_size_bytes, status.get().id);
}
int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
uint32_t layer_count, uint32_t format,
uint64_t usage, size_t* out_slot) {
if (out_slot == nullptr) {
ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
return -EINVAL;
}
if (is_full()) {
ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
capacity());
return -E2BIG;
}
const size_t kBufferCount = 1U;
Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
width, height, layer_count, format, usage, kBufferCount);
if (!status) {
ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
status.GetErrorMessage().c_str());
return -status.error();
}
auto buffer_handle_slots = status.take();
LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
"BufferHubRPC::ProducerQueueAllocateBuffers should "
"return one and only one buffer handle.");
// We only allocate one buffer at a time.
auto& buffer_handle = buffer_handle_slots[0].first;
size_t buffer_slot = buffer_handle_slots[0].second;
ALOGD_IF(TRACE,
"ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
buffer_handle.value());
*out_slot = buffer_slot;
return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
buffer_slot);
}
int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
size_t slot) {
ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
id(), buf->id(), slot);
// For producer buffer, we need to enqueue the newly added buffer
// immediately. Producer queue starts with all buffers in available state.
const int ret = BufferHubQueue::AddBuffer(buf, slot);
if (ret < 0)
return ret;
Enqueue(buf, slot);
return 0;
}
int ProducerQueue::DetachBuffer(size_t slot) {
auto status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
if (!status) {
ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
status.GetErrorMessage().c_str());
return -status.error();
}
return BufferHubQueue::DetachBuffer(slot);
}
Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
int timeout, size_t* slot, LocalHandle* release_fence) {
if (slot == nullptr || release_fence == nullptr) {
ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
slot, release_fence);
return ErrorStatus(EINVAL);
}
auto buffer_status =
BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
if (!buffer_status)
return buffer_status.error_status();
return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
}
int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* release_fence) {
ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
id(), buf->id());
auto buffer = std::static_pointer_cast<BufferProducer>(buf);
return buffer->Gain(release_fence);
}
ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
: BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
auto status = ImportQueue();
if (!status) {
ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
status.GetErrorMessage().c_str());
Close(-status.error());
}
auto import_status = ImportBuffers();
if (import_status) {
ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
import_status.get());
} else {
ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
import_status.GetErrorMessage().c_str());
}
}
Status<size_t> ConsumerQueue::ImportBuffers() {
auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
if (!status) {
ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
status.GetErrorMessage().c_str());
return ErrorStatus(status.error());
}
int ret;
int last_error = 0;
int imported_buffers = 0;
auto buffer_handle_slots = status.take();
for (auto& buffer_handle_slot : buffer_handle_slots) {
ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
buffer_handle_slot.first.value());
std::unique_ptr<BufferConsumer> buffer_consumer =
BufferConsumer::Import(std::move(buffer_handle_slot.first));
// Setup ignore state before adding buffer to the queue.
if (ignore_on_import_) {
ALOGD_IF(TRACE,
"ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
"buffer_id=%d",
buffer_consumer->id());
ret = buffer_consumer->SetIgnore(true);
if (ret < 0) {
ALOGE(
"ConsumerQueue::ImportBuffers: Failed to set ignored state on "
"imported buffer buffer_id=%d: %s",
buffer_consumer->id(), strerror(-ret));
last_error = ret;
}
}
ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
if (ret < 0) {
ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
strerror(-ret));
last_error = ret;
continue;
} else {
imported_buffers++;
}
}
if (imported_buffers > 0)
return {imported_buffers};
else
return ErrorStatus(-last_error);
}
int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
size_t slot) {
ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
id(), buf->id(), slot);
const int ret = BufferHubQueue::AddBuffer(buf, slot);
if (ret < 0)
return ret;
// Check to see if the buffer is already signaled. This is necessary to catch
// cases where buffers are already available; epoll edge triggered mode does
// not fire until and edge transition when adding new buffers to the epoll
// set.
const int kTimeoutMs = 0;
pollfd pfd{buf->event_fd(), POLLIN, 0};
const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
if (count < 0) {
const int error = errno;
ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
strerror(errno));
return -error;
}
if (count == 1)
HandleBufferEvent(slot, pfd.revents);
return 0;
}
Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
int timeout, size_t* slot, void* meta, size_t meta_size,
LocalHandle* acquire_fence) {
if (meta_size != meta_size_) {
ALOGE(
"ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
"does not match metadata size (%zu) for the queue.",
meta_size, meta_size_);
return ErrorStatus(EINVAL);
}
if (slot == nullptr || acquire_fence == nullptr) {
ALOGE(
"ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
"acquire_fence=%p",
slot, meta, acquire_fence);
return ErrorStatus(EINVAL);
}
auto buffer_status =
BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
if (!buffer_status)
return buffer_status.error_status();
return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
}
int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* acquire_fence) {
ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
id(), buf->id());
auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
}
Status<void> ConsumerQueue::OnBufferAllocated() {
auto status = ImportBuffers();
if (!status) {
ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
status.GetErrorMessage().c_str());
return ErrorStatus(status.error());
} else if (status.get() == 0) {
ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
return ErrorStatus(ENOBUFS);
} else {
ALOGD_IF(TRACE,
"ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
status.get());
return {};
}
}
} // namespace dvr
} // namespace android