| // |
| // experimental/detail/channel_service.hpp |
| // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| // |
| // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com) |
| // |
| // Distributed under the Boost Software License, Version 1.0. (See accompanying |
| // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
| // |
| |
| #ifndef ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP |
| #define ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP |
| |
| #if defined(_MSC_VER) && (_MSC_VER >= 1200) |
| # pragma once |
| #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) |
| |
| #include "asio/detail/config.hpp" |
| #include "asio/associated_cancellation_slot.hpp" |
| #include "asio/cancellation_type.hpp" |
| #include "asio/detail/mutex.hpp" |
| #include "asio/detail/op_queue.hpp" |
| #include "asio/execution_context.hpp" |
| #include "asio/experimental/detail/channel_message.hpp" |
| #include "asio/experimental/detail/channel_receive_op.hpp" |
| #include "asio/experimental/detail/channel_send_op.hpp" |
| #include "asio/experimental/detail/has_signature.hpp" |
| |
| #include "asio/detail/push_options.hpp" |
| |
| namespace asio { |
| namespace experimental { |
| namespace detail { |
| |
| template <typename Mutex> |
| class channel_service |
| : public asio::detail::execution_context_service_base< |
| channel_service<Mutex> > |
| { |
| public: |
| // Possible states for a channel end. |
| enum state |
| { |
| buffer = 0, |
| waiter = 1, |
| block = 2, |
| closed = 3 |
| }; |
| |
| // The base implementation type of all channels. |
| struct base_implementation_type |
| { |
| // Default constructor. |
| base_implementation_type() |
| : receive_state_(block), |
| send_state_(block), |
| max_buffer_size_(0), |
| next_(0), |
| prev_(0) |
| { |
| } |
| |
| // The current state of the channel. |
| state receive_state_ : 16; |
| state send_state_ : 16; |
| |
| // The maximum number of elements that may be buffered in the channel. |
| std::size_t max_buffer_size_; |
| |
| // The operations that are waiting on the channel. |
| asio::detail::op_queue<channel_operation> waiters_; |
| |
| // Pointers to adjacent channel implementations in linked list. |
| base_implementation_type* next_; |
| base_implementation_type* prev_; |
| |
| // The mutex type to protect the internal implementation. |
| mutable Mutex mutex_; |
| }; |
| |
| // The implementation for a specific value type. |
| template <typename Traits, typename... Signatures> |
| struct implementation_type; |
| |
| // Constructor. |
| channel_service(execution_context& ctx); |
| |
| // Destroy all user-defined handler objects owned by the service. |
| void shutdown(); |
| |
| // Construct a new channel implementation. |
| void construct(base_implementation_type& impl, std::size_t max_buffer_size); |
| |
| // Destroy a channel implementation. |
| template <typename Traits, typename... Signatures> |
| void destroy(implementation_type<Traits, Signatures...>& impl); |
| |
| // Move-construct a new channel implementation. |
| template <typename Traits, typename... Signatures> |
| void move_construct(implementation_type<Traits, Signatures...>& impl, |
| implementation_type<Traits, Signatures...>& other_impl); |
| |
| // Move-assign from another channel implementation. |
| template <typename Traits, typename... Signatures> |
| void move_assign(implementation_type<Traits, Signatures...>& impl, |
| channel_service& other_service, |
| implementation_type<Traits, Signatures...>& other_impl); |
| |
| // Get the capacity of the channel. |
| std::size_t capacity( |
| const base_implementation_type& impl) const ASIO_NOEXCEPT; |
| |
| // Determine whether the channel is open. |
| bool is_open(const base_implementation_type& impl) const ASIO_NOEXCEPT; |
| |
| // Reset the channel to its initial state. |
| template <typename Traits, typename... Signatures> |
| void reset(implementation_type<Traits, Signatures...>& impl); |
| |
| // Close the channel. |
| template <typename Traits, typename... Signatures> |
| void close(implementation_type<Traits, Signatures...>& impl); |
| |
| // Cancel all operations associated with the channel. |
| template <typename Traits, typename... Signatures> |
| void cancel(implementation_type<Traits, Signatures...>& impl); |
| |
| // Cancel the operation associated with the channel that has the given key. |
| template <typename Traits, typename... Signatures> |
| void cancel_by_key(implementation_type<Traits, Signatures...>& impl, |
| void* cancellation_key); |
| |
| // Determine whether a value can be read from the channel without blocking. |
| bool ready(const base_implementation_type& impl) const ASIO_NOEXCEPT; |
| |
| // Synchronously send a new value into the channel. |
| template <typename Message, typename Traits, |
| typename... Signatures, typename... Args> |
| bool try_send(implementation_type<Traits, Signatures...>& impl, |
| ASIO_MOVE_ARG(Args)... args); |
| |
| // Synchronously send a number of new values into the channel. |
| template <typename Message, typename Traits, |
| typename... Signatures, typename... Args> |
| std::size_t try_send_n(implementation_type<Traits, Signatures...>& impl, |
| std::size_t count, ASIO_MOVE_ARG(Args)... args); |
| |
| // Asynchronously send a new value into the channel. |
| template <typename Traits, typename... Signatures, |
| typename Handler, typename IoExecutor> |
| void async_send(implementation_type<Traits, Signatures...>& impl, |
| ASIO_MOVE_ARG2(typename implementation_type< |
| Traits, Signatures...>::payload_type) payload, |
| Handler& handler, const IoExecutor& io_ex) |
| { |
| typename associated_cancellation_slot<Handler>::type slot |
| = asio::get_associated_cancellation_slot(handler); |
| |
| // Allocate and construct an operation to wrap the handler. |
| typedef channel_send_op< |
| typename implementation_type<Traits, Signatures...>::payload_type, |
| Handler, IoExecutor> op; |
| typename op::ptr p = { asio::detail::addressof(handler), |
| op::ptr::allocate(handler), 0 }; |
| p.p = new (p.v) op(ASIO_MOVE_CAST2(typename implementation_type< |
| Traits, Signatures...>::payload_type)(payload), handler, io_ex); |
| |
| // Optionally register for per-operation cancellation. |
| if (slot.is_connected()) |
| { |
| p.p->cancellation_key_ = |
| &slot.template emplace<op_cancellation<Traits, Signatures...> >( |
| this, &impl); |
| } |
| |
| ASIO_HANDLER_CREATION((this->context(), *p.p, |
| "channel", &impl, 0, "async_send")); |
| |
| start_send_op(impl, p.p); |
| p.v = p.p = 0; |
| } |
| |
| // Synchronously receive a value from the channel. |
| template <typename Traits, typename... Signatures, typename Handler> |
| bool try_receive(implementation_type<Traits, Signatures...>& impl, |
| ASIO_MOVE_ARG(Handler) handler); |
| |
| // Asynchronously send a new value into the channel. |
| // Asynchronously receive a value from the channel. |
| template <typename Traits, typename... Signatures, |
| typename Handler, typename IoExecutor> |
| void async_receive(implementation_type<Traits, Signatures...>& impl, |
| Handler& handler, const IoExecutor& io_ex) |
| { |
| typename associated_cancellation_slot<Handler>::type slot |
| = asio::get_associated_cancellation_slot(handler); |
| |
| // Allocate and construct an operation to wrap the handler. |
| typedef channel_receive_op< |
| typename implementation_type<Traits, Signatures...>::payload_type, |
| Handler, IoExecutor> op; |
| typename op::ptr p = { asio::detail::addressof(handler), |
| op::ptr::allocate(handler), 0 }; |
| p.p = new (p.v) op(handler, io_ex); |
| |
| // Optionally register for per-operation cancellation. |
| if (slot.is_connected()) |
| { |
| p.p->cancellation_key_ = |
| &slot.template emplace<op_cancellation<Traits, Signatures...> >( |
| this, &impl); |
| } |
| |
| ASIO_HANDLER_CREATION((this->context(), *p.p, |
| "channel", &impl, 0, "async_receive")); |
| |
| start_receive_op(impl, p.p); |
| p.v = p.p = 0; |
| } |
| |
| private: |
| // Helper function object to handle a closed notification. |
| template <typename Payload, typename Signature> |
| struct complete_receive |
| { |
| explicit complete_receive(channel_receive<Payload>* op) |
| : op_(op) |
| { |
| } |
| |
| template <typename... Args> |
| void operator()(ASIO_MOVE_ARG(Args)... args) |
| { |
| op_->complete( |
| channel_message<Signature>(0, |
| ASIO_MOVE_CAST(Args)(args)...)); |
| } |
| |
| channel_receive<Payload>* op_; |
| }; |
| |
| // Destroy a base channel implementation. |
| void base_destroy(base_implementation_type& impl); |
| |
| // Helper function to start an asynchronous put operation. |
| template <typename Traits, typename... Signatures> |
| void start_send_op(implementation_type<Traits, Signatures...>& impl, |
| channel_send<typename implementation_type< |
| Traits, Signatures...>::payload_type>* send_op); |
| |
| // Helper function to start an asynchronous get operation. |
| template <typename Traits, typename... Signatures> |
| void start_receive_op(implementation_type<Traits, Signatures...>& impl, |
| channel_receive<typename implementation_type< |
| Traits, Signatures...>::payload_type>* receive_op); |
| |
| // Helper class used to implement per-operation cancellation. |
| template <typename Traits, typename... Signatures> |
| class op_cancellation |
| { |
| public: |
| op_cancellation(channel_service* s, |
| implementation_type<Traits, Signatures...>* impl) |
| : service_(s), |
| impl_(impl) |
| { |
| } |
| |
| void operator()(cancellation_type_t type) |
| { |
| if (!!(type & |
| (cancellation_type::terminal |
| | cancellation_type::partial |
| | cancellation_type::total))) |
| { |
| service_->cancel_by_key(*impl_, this); |
| } |
| } |
| |
| private: |
| channel_service* service_; |
| implementation_type<Traits, Signatures...>* impl_; |
| }; |
| |
| // Mutex to protect access to the linked list of implementations. |
| asio::detail::mutex mutex_; |
| |
| // The head of a linked list of all implementations. |
| base_implementation_type* impl_list_; |
| }; |
| |
| // The implementation for a specific value type. |
| template <typename Mutex> |
| template <typename Traits, typename... Signatures> |
| struct channel_service<Mutex>::implementation_type : base_implementation_type |
| { |
| // The traits type associated with the channel. |
| typedef typename Traits::template rebind<Signatures...>::other traits_type; |
| |
| // Type of an element stored in the buffer. |
| typedef typename conditional< |
| has_signature< |
| typename traits_type::receive_cancelled_signature, |
| Signatures... |
| >::value, |
| typename conditional< |
| has_signature< |
| typename traits_type::receive_closed_signature, |
| Signatures... |
| >::value, |
| channel_payload<Signatures...>, |
| channel_payload< |
| Signatures..., |
| typename traits_type::receive_closed_signature |
| > |
| >::type, |
| typename conditional< |
| has_signature< |
| typename traits_type::receive_closed_signature, |
| Signatures..., |
| typename traits_type::receive_cancelled_signature |
| >::value, |
| channel_payload< |
| Signatures..., |
| typename traits_type::receive_cancelled_signature |
| >, |
| channel_payload< |
| Signatures..., |
| typename traits_type::receive_cancelled_signature, |
| typename traits_type::receive_closed_signature |
| > |
| >::type |
| >::type payload_type; |
| |
| // Move from another buffer. |
| void buffer_move_from(implementation_type& other) |
| { |
| buffer_ = ASIO_MOVE_CAST( |
| typename traits_type::template container< |
| payload_type>::type)(other.buffer_); |
| other.buffer_clear(); |
| } |
| |
| // Get number of buffered elements. |
| std::size_t buffer_size() const |
| { |
| return buffer_.size(); |
| } |
| |
| // Push a new value to the back of the buffer. |
| void buffer_push(payload_type payload) |
| { |
| buffer_.push_back(ASIO_MOVE_CAST(payload_type)(payload)); |
| } |
| |
| // Push new values to the back of the buffer. |
| std::size_t buffer_push_n(std::size_t count, payload_type payload) |
| { |
| std::size_t i = 0; |
| for (; i < count && buffer_.size() < this->max_buffer_size_; ++i) |
| buffer_.push_back(payload); |
| return i; |
| } |
| |
| // Get the element at the front of the buffer. |
| payload_type buffer_front() |
| { |
| return ASIO_MOVE_CAST(payload_type)(buffer_.front()); |
| } |
| |
| // Pop a value from the front of the buffer. |
| void buffer_pop() |
| { |
| buffer_.pop_front(); |
| } |
| |
| // Clear all buffered values. |
| void buffer_clear() |
| { |
| buffer_.clear(); |
| } |
| |
| private: |
| // Buffered values. |
| typename traits_type::template container<payload_type>::type buffer_; |
| }; |
| |
| // The implementation for a void value type. |
| template <typename Mutex> |
| template <typename Traits, typename R> |
| struct channel_service<Mutex>::implementation_type<Traits, R()> |
| : channel_service::base_implementation_type |
| { |
| // The traits type associated with the channel. |
| typedef typename Traits::template rebind<R()>::other traits_type; |
| |
| // Type of an element stored in the buffer. |
| typedef typename conditional< |
| has_signature< |
| typename traits_type::receive_cancelled_signature, |
| R() |
| >::value, |
| typename conditional< |
| has_signature< |
| typename traits_type::receive_closed_signature, |
| R() |
| >::value, |
| channel_payload<R()>, |
| channel_payload< |
| R(), |
| typename traits_type::receive_closed_signature |
| > |
| >::type, |
| typename conditional< |
| has_signature< |
| typename traits_type::receive_closed_signature, |
| R(), |
| typename traits_type::receive_cancelled_signature |
| >::value, |
| channel_payload< |
| R(), |
| typename traits_type::receive_cancelled_signature |
| >, |
| channel_payload< |
| R(), |
| typename traits_type::receive_cancelled_signature, |
| typename traits_type::receive_closed_signature |
| > |
| >::type |
| >::type payload_type; |
| |
| // Construct with empty buffer. |
| implementation_type() |
| : buffer_(0) |
| { |
| } |
| |
| // Move from another buffer. |
| void buffer_move_from(implementation_type& other) |
| { |
| buffer_ = other.buffer_; |
| other.buffer_ = 0; |
| } |
| |
| // Get number of buffered elements. |
| std::size_t buffer_size() const |
| { |
| return buffer_; |
| } |
| |
| // Push a new value to the back of the buffer. |
| void buffer_push(payload_type) |
| { |
| ++buffer_; |
| } |
| |
| // Push new values to the back of the buffer. |
| std::size_t buffer_push_n(std::size_t count, payload_type) |
| { |
| std::size_t available = this->max_buffer_size_ - buffer_; |
| count = (count < available) ? count : available; |
| buffer_ += count; |
| return count; |
| } |
| |
| // Get the element at the front of the buffer. |
| payload_type buffer_front() |
| { |
| return payload_type(channel_message<R()>(0)); |
| } |
| |
| // Pop a value from the front of the buffer. |
| void buffer_pop() |
| { |
| --buffer_; |
| } |
| |
| // Clear all values from the buffer. |
| void buffer_clear() |
| { |
| buffer_ = 0; |
| } |
| |
| private: |
| // Number of buffered "values". |
| std::size_t buffer_; |
| }; |
| |
| } // namespace detail |
| } // namespace experimental |
| } // namespace asio |
| |
| #include "asio/detail/pop_options.hpp" |
| |
| #include "asio/experimental/detail/impl/channel_service.hpp" |
| |
| #endif // ASIO_EXPERIMENTAL_DETAIL_CHANNEL_SERVICE_HPP |