blob: 40f22f75af50e3702a71c1134f3b756b0d131ff6 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "local_decompressor_creator.h"
#include <lib/async/cpp/task.h>
#include <lib/sync/completion.h>
#include <lib/sys/cpp/component_context.h>
#include <lib/zx/channel.h>
#include <lib/zx/result.h>
#include <zircon/errors.h>
#include <zircon/time.h>
#include "src/storage/blobfs/compression/external_decompressor.h"
namespace blobfs {
class LambdaConnector : public DecompressorCreatorConnector {
public:
explicit LambdaConnector(std::function<zx_status_t(zx::channel)> callback);
// ExternalDecompressorCreatorConnector interface.
zx_status_t ConnectToDecompressorCreator(zx::channel remote_channel) override;
private:
std::function<zx_status_t(zx::channel)> callback_;
};
LambdaConnector::LambdaConnector(std::function<zx_status_t(zx::channel)> callback)
: callback_(std::move(callback)) {}
zx_status_t LambdaConnector::ConnectToDecompressorCreator(zx::channel remote_channel) {
return callback_(std::move(remote_channel));
}
zx::result<std::unique_ptr<LocalDecompressorCreator>> LocalDecompressorCreator::Create() {
std::unique_ptr<LocalDecompressorCreator> decompressor(new LocalDecompressorCreator());
decompressor->connector_ = std::make_unique<LambdaConnector>(
[decompressor = decompressor.get()](zx::channel remote_channel) {
return decompressor->RegisterChannel(std::move(remote_channel));
});
if (zx_status_t status = decompressor->loop_.StartThread(); status != ZX_OK) {
return zx::error(status);
}
return zx::ok(std::move(decompressor));
}
LocalDecompressorCreator::~LocalDecompressorCreator() {
sync_completion_t done;
// Unbind everything from the server thread and prevent future bindings.
ZX_ASSERT(ZX_OK == async::PostTask(loop_.dispatcher(), [this, &done]() {
this->shutting_down_ = true;
for (auto& binding : bindings_) {
binding.Close(ZX_ERR_CANCELED);
}
this->bindings_.clear();
sync_completion_signal(&done);
}));
sync_completion_wait(&done, ZX_TIME_INFINITE);
}
zx_status_t LocalDecompressorCreator::RegisterChannel(zx::channel channel) {
// Pushing binding management onto the server thread since the HLCPP bindings
// are not thread safe.
zx_status_t bind_status = ZX_OK;
sync_completion_t done;
zx_status_t post_status = async::PostTask(loop_.dispatcher(), [&]() mutable {
if (this->shutting_down_) {
bind_status = ZX_ERR_CANCELED;
} else {
bind_status = this->RegisterChannelOnServerThread(std::move(channel));
}
sync_completion_signal(&done);
});
if (post_status != ZX_OK) {
return post_status;
}
sync_completion_wait(&done, ZX_TIME_INFINITE);
return bind_status;
}
zx_status_t LocalDecompressorCreator::RegisterChannelOnServerThread(zx::channel channel) {
auto it = bindings_.begin();
while (it != bindings_.end()) {
if (!it->is_bound()) {
it = bindings_.erase(it);
} else {
++it;
}
}
// Add new binding.
bindings_.emplace_back(&decompressor_);
return bindings_.back().Bind(std::move(channel), loop_.dispatcher());
}
} // namespace blobfs