blob: d41ae9709d47fa34db424549d9cbeb1bb34811af [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/bin/media/media_service/file_reader_impl.h"
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include "garnet/bin/media/util/file_channel.h"
#include "lib/fsl/tasks/message_loop.h"
#include "lib/fxl/files/file_descriptor.h"
#include "lib/fxl/logging.h"
namespace media {
// static
std::shared_ptr<FileReaderImpl> FileReaderImpl::Create(
zx::channel file_channel,
fidl::InterfaceRequest<SeekingReader> request,
MediaServiceImpl* owner) {
return std::shared_ptr<FileReaderImpl>(new FileReaderImpl(
FdFromChannel(std::move(file_channel)), std::move(request), owner));
}
FileReaderImpl::FileReaderImpl(fxl::UniqueFD fd,
fidl::InterfaceRequest<SeekingReader> request,
MediaServiceImpl* owner)
: MediaServiceImpl::Product<SeekingReader>(this, std::move(request), owner),
fd_(std::move(fd)),
buffer_(kBufferSize) {
result_ = fd_.is_valid() ? MediaResult::OK : MediaResult::NOT_FOUND;
if (result_ == MediaResult::OK) {
off_t seek_result = lseek(fd_.get(), 0, SEEK_END);
if (seek_result >= 0) {
size_ = static_cast<uint64_t>(seek_result);
} else {
size_ = kUnknownSize;
// TODO(dalesat): More specific error code.
result_ = MediaResult::UNKNOWN_ERROR;
}
}
}
FileReaderImpl::~FileReaderImpl() {
if (wait_id_ != 0) {
GetDefaultAsyncWaiter()->CancelWait(wait_id_);
}
}
void FileReaderImpl::Describe(const DescribeCallback& callback) {
callback(result_, size_, true);
}
void FileReaderImpl::ReadAt(uint64_t position, const ReadAtCallback& callback) {
FXL_DCHECK(position < size_);
if (result_ != MediaResult::OK) {
callback(result_, zx::socket());
return;
}
if (socket_) {
if (wait_id_ != 0) {
GetDefaultAsyncWaiter()->CancelWait(wait_id_);
wait_id_ = 0;
}
socket_.reset();
}
off_t seek_result = lseek(fd_.get(), position, SEEK_SET);
if (seek_result < 0) {
FXL_LOG(ERROR) << "seek failed, result " << seek_result << " errno "
<< errno;
// TODO(dalesat): More specific error code.
result_ = MediaResult::UNKNOWN_ERROR;
callback(result_, zx::socket());
return;
}
zx::socket other_socket;
zx_status_t status = zx::socket::create(0u, &socket_, &other_socket);
if (status != ZX_OK) {
FXL_LOG(ERROR) << "zx::socket::create failed, status " << status;
// TODO(dalesat): More specific error code.
result_ = MediaResult::UNKNOWN_ERROR;
callback(result_, zx::socket());
return;
}
remaining_buffer_bytes_count_ = 0;
reached_end_ = false;
WriteToSocket();
if (result_ != MediaResult::OK) {
// Error occurred during WriteToSocket.
FXL_LOG(ERROR) << "error occurred during WriteToSocket, result_ "
<< result_;
callback(result_, zx::socket());
return;
}
callback(result_, std::move(other_socket));
}
// static
void FileReaderImpl::WriteToSocketStatic(zx_status_t status,
zx_signals_t pending,
uint64_t count,
void* closure) {
FileReaderImpl* reader = reinterpret_cast<FileReaderImpl*>(closure);
reader->wait_id_ = 0;
if (status == ZX_ERR_CANCELED) {
// Run loop has aborted...the app is shutting down.
reader->socket_.reset();
return;
}
if (status != ZX_OK) {
FXL_LOG(ERROR) << "zx::socket::write failed, status " << status;
reader->socket_.reset();
return;
}
reader->WriteToSocket();
}
void FileReaderImpl::WriteToSocket() {
while (true) {
if (remaining_buffer_bytes_count_ == 0 && !reached_end_) {
ReadFromFile();
}
if (remaining_buffer_bytes_count_ == 0) {
return;
}
FXL_DCHECK(remaining_buffer_bytes_ != nullptr);
size_t byte_count;
zx_status_t status =
socket_.write(0u, remaining_buffer_bytes_,
remaining_buffer_bytes_count_, &byte_count);
if (status == ZX_OK) {
FXL_DCHECK(byte_count != 0);
remaining_buffer_bytes_ += byte_count;
remaining_buffer_bytes_count_ -= byte_count;
continue;
}
if (status == ZX_ERR_SHOULD_WAIT) {
wait_id_ = GetDefaultAsyncWaiter()->AsyncWait(
socket_.get(), ZX_SOCKET_WRITABLE | ZX_SOCKET_PEER_CLOSED,
ZX_TIME_INFINITE, FileReaderImpl::WriteToSocketStatic, this);
return;
}
if (status == ZX_ERR_PEER_CLOSED) {
// Consumer end was closed. This is normal behavior, depending on what
// the consumer is up to.
socket_.reset();
return;
}
FXL_LOG(ERROR) << "zx::socket::write failed, status " << status;
socket_.reset();
// TODO(dalesat): More specific error code.
result_ = MediaResult::UNKNOWN_ERROR;
return;
}
}
void FileReaderImpl::ReadFromFile() {
FXL_DCHECK(buffer_.size() == kBufferSize);
FXL_DCHECK(!reached_end_);
ssize_t result =
fxl::ReadFileDescriptor(fd_.get(), buffer_.data(), kBufferSize);
if (result < 0) {
// TODO(dalesat): More specific error code.
result_ = MediaResult::UNKNOWN_ERROR;
return;
}
if (result < static_cast<ssize_t>(kBufferSize)) {
reached_end_ = true;
}
remaining_buffer_bytes_count_ = static_cast<size_t>(result);
remaining_buffer_bytes_ = buffer_.data();
}
} // namespace media