blob: 98d9724521e829f1f48f579f11f85296e8b9a2c5 [file] [log] [blame]
//
// experimental/detail/channel_service.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
#define ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include "asio/detail/config.hpp"
#include "asio/associated_cancellation_slot.hpp"
#include "asio/cancellation_type.hpp"
#include "asio/detail/mutex.hpp"
#include "asio/detail/op_queue.hpp"
#include "asio/execution_context.hpp"
#include "asio/experimental/detail/channel_message.hpp"
#include "asio/experimental/detail/channel_receive_op.hpp"
#include "asio/experimental/detail/channel_send_op.hpp"
#include "asio/experimental/detail/has_signature.hpp"
#include "asio/detail/push_options.hpp"
namespace asio {
namespace experimental {
namespace detail {
template <typename Mutex>
class channel_service
: public asio::detail::execution_context_service_base<
channel_service<Mutex> >
{
public:
// Possible states for a channel end.
enum state
{
buffer = 0,
waiter = 1,
block = 2,
closed = 3
};
// The base implementation type of all channels.
struct base_implementation_type
{
// Default constructor.
base_implementation_type()
: receive_state_(block),
send_state_(block),
max_buffer_size_(0),
next_(0),
prev_(0)
{
}
// The current state of the channel.
state receive_state_ : 16;
state send_state_ : 16;
// The maximum number of elements that may be buffered in the channel.
std::size_t max_buffer_size_;
// The operations that are waiting on the channel.
asio::detail::op_queue<channel_operation> waiters_;
// Pointers to adjacent channel implementations in linked list.
base_implementation_type* next_;
base_implementation_type* prev_;
// The mutex type to protect the internal implementation.
mutable Mutex mutex_;
};
// The implementation for a specific value type.
template <typename Traits, typename... Signatures>
struct implementation_type;
// Constructor.
channel_service(execution_context& ctx);
// Destroy all user-defined handler objects owned by the service.
void shutdown();
// Construct a new channel implementation.
void construct(base_implementation_type& impl, std::size_t max_buffer_size);
// Destroy a channel implementation.
template <typename Traits, typename... Signatures>
void destroy(implementation_type<Traits, Signatures...>& impl);
// Move-construct a new channel implementation.
template <typename Traits, typename... Signatures>
void move_construct(implementation_type<Traits, Signatures...>& impl,
implementation_type<Traits, Signatures...>& other_impl);
// Move-assign from another channel implementation.
template <typename Traits, typename... Signatures>
void move_assign(implementation_type<Traits, Signatures...>& impl,
channel_service& other_service,
implementation_type<Traits, Signatures...>& other_impl);
// Get the capacity of the channel.
std::size_t capacity(
const base_implementation_type& impl) const ASIO_NOEXCEPT;
// Determine whether the channel is open.
bool is_open(const base_implementation_type& impl) const ASIO_NOEXCEPT;
// Reset the channel to its initial state.
template <typename Traits, typename... Signatures>
void reset(implementation_type<Traits, Signatures...>& impl);
// Close the channel.
template <typename Traits, typename... Signatures>
void close(implementation_type<Traits, Signatures...>& impl);
// Cancel all operations associated with the channel.
template <typename Traits, typename... Signatures>
void cancel(implementation_type<Traits, Signatures...>& impl);
// Cancel the operation associated with the channel that has the given key.
template <typename Traits, typename... Signatures>
void cancel_by_key(implementation_type<Traits, Signatures...>& impl,
void* cancellation_key);
// Determine whether a value can be read from the channel without blocking.
bool ready(const base_implementation_type& impl) const ASIO_NOEXCEPT;
// Synchronously send a new value into the channel.
template <typename Message, typename Traits,
typename... Signatures, typename... Args>
bool try_send(implementation_type<Traits, Signatures...>& impl,
ASIO_MOVE_ARG(Args)... args);
// Synchronously send a number of new values into the channel.
template <typename Message, typename Traits,
typename... Signatures, typename... Args>
std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl,
std::size_t count, ASIO_MOVE_ARG(Args)... args);
// Asynchronously send a new value into the channel.
template <typename Traits, typename... Signatures,
typename Handler, typename IoExecutor>
void async_send(implementation_type<Traits, Signatures...>& impl,
ASIO_MOVE_ARG2(typename implementation_type<
Traits, Signatures...>::payload_type) payload,
Handler& handler, const IoExecutor& io_ex)
{
typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);
// Allocate and construct an operation to wrap the handler.
typedef channel_send_op<
typename implementation_type<Traits, Signatures...>::payload_type,
Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(ASIO_MOVE_CAST2(typename implementation_type<
Traits, Signatures...>::payload_type)(payload), handler, io_ex);
// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<op_cancellation<Traits, Signatures...> >(
this, &impl);
}
ASIO_HANDLER_CREATION((this->context(), *p.p,
"channel", &impl, 0, "async_send"));
start_send_op(impl, p.p);
p.v = p.p = 0;
}
// Synchronously receive a value from the channel.
template <typename Traits, typename... Signatures, typename Handler>
bool try_receive(implementation_type<Traits, Signatures...>& impl,
ASIO_MOVE_ARG(Handler) handler);
// Asynchronously send a new value into the channel.
// Asynchronously receive a value from the channel.
template <typename Traits, typename... Signatures,
typename Handler, typename IoExecutor>
void async_receive(implementation_type<Traits, Signatures...>& impl,
Handler& handler, const IoExecutor& io_ex)
{
typename associated_cancellation_slot<Handler>::type slot
= asio::get_associated_cancellation_slot(handler);
// Allocate and construct an operation to wrap the handler.
typedef channel_receive_op<
typename implementation_type<Traits, Signatures...>::payload_type,
Handler, IoExecutor> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(handler, io_ex);
// Optionally register for per-operation cancellation.
if (slot.is_connected())
{
p.p->cancellation_key_ =
&slot.template emplace<op_cancellation<Traits, Signatures...> >(
this, &impl);
}
ASIO_HANDLER_CREATION((this->context(), *p.p,
"channel", &impl, 0, "async_receive"));
start_receive_op(impl, p.p);
p.v = p.p = 0;
}
private:
// Helper function object to handle a closed notification.
template <typename Payload, typename Signature>
struct complete_receive
{
explicit complete_receive(channel_receive<Payload>* op)
: op_(op)
{
}
template <typename... Args>
void operator()(ASIO_MOVE_ARG(Args)... args)
{
op_->complete(
channel_message<Signature>(0,
ASIO_MOVE_CAST(Args)(args)...));
}
channel_receive<Payload>* op_;
};
// Destroy a base channel implementation.
void base_destroy(base_implementation_type& impl);
// Helper function to start an asynchronous put operation.
template <typename Traits, typename... Signatures>
void start_send_op(implementation_type<Traits, Signatures...>& impl,
channel_send<typename implementation_type<
Traits, Signatures...>::payload_type>* send_op);
// Helper function to start an asynchronous get operation.
template <typename Traits, typename... Signatures>
void start_receive_op(implementation_type<Traits, Signatures...>& impl,
channel_receive<typename implementation_type<
Traits, Signatures...>::payload_type>* receive_op);
// Helper class used to implement per-operation cancellation.
template <typename Traits, typename... Signatures>
class op_cancellation
{
public:
op_cancellation(channel_service* s,
implementation_type<Traits, Signatures...>* impl)
: service_(s),
impl_(impl)
{
}
void operator()(cancellation_type_t type)
{
if (!!(type &
(cancellation_type::terminal
| cancellation_type::partial
| cancellation_type::total)))
{
service_->cancel_by_key(*impl_, this);
}
}
private:
channel_service* service_;
implementation_type<Traits, Signatures...>* impl_;
};
// Mutex to protect access to the linked list of implementations.
asio::detail::mutex mutex_;
// The head of a linked list of all implementations.
base_implementation_type* impl_list_;
};
// The implementation for a specific value type.
template <typename Mutex>
template <typename Traits, typename... Signatures>
struct channel_service<Mutex>::implementation_type : base_implementation_type
{
// The traits type associated with the channel.
typedef typename Traits::template rebind<Signatures...>::other traits_type;
// Type of an element stored in the buffer.
typedef typename conditional<
has_signature<
typename traits_type::receive_cancelled_signature,
Signatures...
>::value,
typename conditional<
has_signature<
typename traits_type::receive_closed_signature,
Signatures...
>::value,
channel_payload<Signatures...>,
channel_payload<
Signatures...,
typename traits_type::receive_closed_signature
>
>::type,
typename conditional<
has_signature<
typename traits_type::receive_closed_signature,
Signatures...,
typename traits_type::receive_cancelled_signature
>::value,
channel_payload<
Signatures...,
typename traits_type::receive_cancelled_signature
>,
channel_payload<
Signatures...,
typename traits_type::receive_cancelled_signature,
typename traits_type::receive_closed_signature
>
>::type
>::type payload_type;
// Move from another buffer.
void buffer_move_from(implementation_type& other)
{
buffer_ = ASIO_MOVE_CAST(
typename traits_type::template container<
payload_type>::type)(other.buffer_);
other.buffer_clear();
}
// Get number of buffered elements.
std::size_t buffer_size() const
{
return buffer_.size();
}
// Push a new value to the back of the buffer.
void buffer_push(payload_type payload)
{
buffer_.push_back(ASIO_MOVE_CAST(payload_type)(payload));
}
// Push new values to the back of the buffer.
std::size_t buffer_push_n(std::size_t count, payload_type payload)
{
std::size_t i = 0;
for (; i < count && buffer_.size() < this->max_buffer_size_; ++i)
buffer_.push_back(payload);
return i;
}
// Get the element at the front of the buffer.
payload_type buffer_front()
{
return ASIO_MOVE_CAST(payload_type)(buffer_.front());
}
// Pop a value from the front of the buffer.
void buffer_pop()
{
buffer_.pop_front();
}
// Clear all buffered values.
void buffer_clear()
{
buffer_.clear();
}
private:
// Buffered values.
typename traits_type::template container<payload_type>::type buffer_;
};
// The implementation for a void value type.
template <typename Mutex>
template <typename Traits, typename R>
struct channel_service<Mutex>::implementation_type<Traits, R()>
: channel_service::base_implementation_type
{
// The traits type associated with the channel.
typedef typename Traits::template rebind<R()>::other traits_type;
// Type of an element stored in the buffer.
typedef typename conditional<
has_signature<
typename traits_type::receive_cancelled_signature,
R()
>::value,
typename conditional<
has_signature<
typename traits_type::receive_closed_signature,
R()
>::value,
channel_payload<R()>,
channel_payload<
R(),
typename traits_type::receive_closed_signature
>
>::type,
typename conditional<
has_signature<
typename traits_type::receive_closed_signature,
R(),
typename traits_type::receive_cancelled_signature
>::value,
channel_payload<
R(),
typename traits_type::receive_cancelled_signature
>,
channel_payload<
R(),
typename traits_type::receive_cancelled_signature,
typename traits_type::receive_closed_signature
>
>::type
>::type payload_type;
// Construct with empty buffer.
implementation_type()
: buffer_(0)
{
}
// Move from another buffer.
void buffer_move_from(implementation_type& other)
{
buffer_ = other.buffer_;
other.buffer_ = 0;
}
// Get number of buffered elements.
std::size_t buffer_size() const
{
return buffer_;
}
// Push a new value to the back of the buffer.
void buffer_push(payload_type)
{
++buffer_;
}
// Push new values to the back of the buffer.
std::size_t buffer_push_n(std::size_t count, payload_type)
{
std::size_t available = this->max_buffer_size_ - buffer_;
count = (count < available) ? count : available;
buffer_ += count;
return count;
}
// Get the element at the front of the buffer.
payload_type buffer_front()
{
return payload_type(channel_message<R()>(0));
}
// Pop a value from the front of the buffer.
void buffer_pop()
{
--buffer_;
}
// Clear all values from the buffer.
void buffer_clear()
{
buffer_ = 0;
}
private:
// Number of buffered "values".
std::size_t buffer_;
};
} // namespace detail
} // namespace experimental
} // namespace asio
#include "asio/detail/pop_options.hpp"
#include "asio/experimental/detail/impl/channel_service.hpp"
#endif // ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP