| #ifndef _DEBLOCKBUFFER_HPP |
| #define _DEBLOCKBUFFER_HPP |
| /*------------------------------------------------------------------------- |
| * drawElements C++ Base Library |
| * ----------------------------- |
| * |
| * Copyright 2014 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| *//*! |
| * \file |
| * \brief Block-based thread-safe queue. |
| *//*--------------------------------------------------------------------*/ |
| |
| #include "deBlockBuffer.hpp" |
| #include "deMutex.hpp" |
| #include "deSemaphore.h" |
| |
| #include <exception> |
| |
| namespace de |
| { |
| |
| void BlockBuffer_selfTest(void); |
| |
| class BufferCanceledException : public std::exception |
| { |
| public: |
| inline BufferCanceledException(void) |
| { |
| } |
| inline ~BufferCanceledException(void) throw() |
| { |
| } |
| |
| const char *what(void) const throw() |
| { |
| return "BufferCanceledException"; |
| } |
| }; |
| |
| template <typename T> |
| class BlockBuffer |
| { |
| public: |
| typedef BufferCanceledException CanceledException; |
| |
| BlockBuffer(int blockSize, int numBlocks); |
| ~BlockBuffer(void); |
| |
| void clear(void); //!< Resets buffer. Will block until pending writes and reads have completed. |
| |
| void write(int numElements, const T *elements); |
| int tryWrite(int numElements, const T *elements); |
| void flush(void); |
| bool tryFlush(void); |
| |
| void read(int numElements, T *elements); |
| int tryRead(int numElements, T *elements); |
| |
| void cancel( |
| void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException. |
| bool isCanceled(void) const |
| { |
| return !!m_canceled; |
| } |
| |
| private: |
| BlockBuffer(const BlockBuffer &other); |
| BlockBuffer &operator=(const BlockBuffer &other); |
| |
| int writeToCurrentBlock(int numElements, const T *elements, bool blocking); |
| int readFromCurrentBlock(int numElements, T *elements, bool blocking); |
| |
| void flushWriteBlock(void); |
| |
| deSemaphore m_fill; //!< Block fill count. |
| deSemaphore m_empty; //!< Block empty count. |
| |
| int m_writeBlock; //!< Current write block ndx. |
| int m_writePos; //!< Position in block. 0 if block is not yet acquired. |
| |
| int m_readBlock; //!< Current read block ndx. |
| int m_readPos; //!< Position in block. 0 if block is not yet acquired. |
| |
| int m_blockSize; |
| int m_numBlocks; |
| |
| T *m_elements; |
| int *m_numUsedInBlock; |
| |
| Mutex m_writeLock; |
| Mutex m_readLock; |
| |
| volatile uint32_t m_canceled; |
| } DE_WARN_UNUSED_TYPE; |
| |
| template <typename T> |
| BlockBuffer<T>::BlockBuffer(int blockSize, int numBlocks) |
| : m_fill(0) |
| , m_empty(0) |
| , m_writeBlock(0) |
| , m_writePos(0) |
| , m_readBlock(0) |
| , m_readPos(0) |
| , m_blockSize(blockSize) |
| , m_numBlocks(numBlocks) |
| , m_elements(DE_NULL) |
| , m_numUsedInBlock(DE_NULL) |
| , m_writeLock() |
| , m_readLock() |
| , m_canceled(false) |
| { |
| DE_ASSERT(blockSize > 0); |
| DE_ASSERT(numBlocks > 0); |
| |
| try |
| { |
| m_elements = new T[m_numBlocks * m_blockSize]; |
| m_numUsedInBlock = new int[m_numBlocks]; |
| } |
| catch (...) |
| { |
| delete[] m_elements; |
| delete[] m_numUsedInBlock; |
| throw; |
| } |
| |
| m_fill = deSemaphore_create(0, DE_NULL); |
| m_empty = deSemaphore_create(numBlocks, DE_NULL); |
| DE_ASSERT(m_fill && m_empty); |
| } |
| |
| template <typename T> |
| BlockBuffer<T>::~BlockBuffer(void) |
| { |
| delete[] m_elements; |
| delete[] m_numUsedInBlock; |
| |
| deSemaphore_destroy(m_fill); |
| deSemaphore_destroy(m_empty); |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::clear(void) |
| { |
| ScopedLock readLock(m_readLock); |
| ScopedLock writeLock(m_writeLock); |
| |
| deSemaphore_destroy(m_fill); |
| deSemaphore_destroy(m_empty); |
| |
| m_fill = deSemaphore_create(0, DE_NULL); |
| m_empty = deSemaphore_create(m_numBlocks, DE_NULL); |
| m_writeBlock = 0; |
| m_writePos = 0; |
| m_readBlock = 0; |
| m_readPos = 0; |
| m_canceled = false; |
| |
| DE_ASSERT(m_fill && m_empty); |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::cancel(void) |
| { |
| DE_ASSERT(!m_canceled); |
| m_canceled = true; |
| |
| deSemaphore_increment(m_empty); |
| deSemaphore_increment(m_fill); |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::writeToCurrentBlock(int numElements, const T *elements, bool blocking) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_writePos == 0) |
| { |
| /* Write thread doesn't own current block - need to acquire. */ |
| if (blocking) |
| deSemaphore_decrement(m_empty); |
| else |
| { |
| if (!deSemaphore_tryDecrement(m_empty)) |
| return 0; |
| } |
| |
| /* Check for canceled bit. */ |
| if (m_canceled) |
| { |
| // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here. |
| deSemaphore_increment(m_empty); |
| m_writeLock.unlock(); |
| throw CanceledException(); |
| } |
| } |
| |
| /* Write thread owns current block. */ |
| T *block = m_elements + m_writeBlock * m_blockSize; |
| int numToWrite = de::min(numElements, m_blockSize - m_writePos); |
| |
| DE_ASSERT(numToWrite > 0); |
| |
| for (int ndx = 0; ndx < numToWrite; ndx++) |
| block[m_writePos + ndx] = elements[ndx]; |
| |
| m_writePos += numToWrite; |
| |
| if (m_writePos == m_blockSize) |
| flushWriteBlock(); /* Flush current write block. */ |
| |
| return numToWrite; |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::readFromCurrentBlock(int numElements, T *elements, bool blocking) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_readPos == 0) |
| { |
| /* Read thread doesn't own current block - need to acquire. */ |
| if (blocking) |
| deSemaphore_decrement(m_fill); |
| else |
| { |
| if (!deSemaphore_tryDecrement(m_fill)) |
| return 0; |
| } |
| |
| /* Check for canceled bit. */ |
| if (m_canceled) |
| { |
| // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here. |
| deSemaphore_increment(m_fill); |
| m_readLock.unlock(); |
| throw CanceledException(); |
| } |
| } |
| |
| /* Read thread now owns current block. */ |
| const T *block = m_elements + m_readBlock * m_blockSize; |
| int numUsedInBlock = m_numUsedInBlock[m_readBlock]; |
| int numToRead = de::min(numElements, numUsedInBlock - m_readPos); |
| |
| DE_ASSERT(numToRead > 0); |
| |
| for (int ndx = 0; ndx < numToRead; ndx++) |
| elements[ndx] = block[m_readPos + ndx]; |
| |
| m_readPos += numToRead; |
| |
| if (m_readPos == numUsedInBlock) |
| { |
| /* Free current read block and advance. */ |
| m_readBlock = (m_readBlock + 1) % m_numBlocks; |
| m_readPos = 0; |
| deSemaphore_increment(m_empty); |
| } |
| |
| return numToRead; |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::tryWrite(int numElements, const T *elements) |
| { |
| int numWritten = 0; |
| |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| if (!m_writeLock.tryLock()) |
| return numWritten; |
| |
| while (numWritten < numElements) |
| { |
| int ret = writeToCurrentBlock(numElements - numWritten, elements + numWritten, false /* non-blocking */); |
| |
| if (ret == 0) |
| break; /* Write failed. */ |
| |
| numWritten += ret; |
| } |
| |
| m_writeLock.unlock(); |
| |
| return numWritten; |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::write(int numElements, const T *elements) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| m_writeLock.lock(); |
| |
| int numWritten = 0; |
| while (numWritten < numElements) |
| numWritten += writeToCurrentBlock(numElements - numWritten, elements + numWritten, true /* blocking */); |
| |
| m_writeLock.unlock(); |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::flush(void) |
| { |
| m_writeLock.lock(); |
| |
| if (m_writePos > 0) |
| flushWriteBlock(); |
| |
| m_writeLock.unlock(); |
| } |
| |
| template <typename T> |
| bool BlockBuffer<T>::tryFlush(void) |
| { |
| if (!m_writeLock.tryLock()) |
| return false; |
| |
| if (m_writePos > 0) |
| flushWriteBlock(); |
| |
| m_writeLock.unlock(); |
| |
| return true; |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::flushWriteBlock(void) |
| { |
| DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize)); |
| |
| m_numUsedInBlock[m_writeBlock] = m_writePos; |
| m_writeBlock = (m_writeBlock + 1) % m_numBlocks; |
| m_writePos = 0; |
| deSemaphore_increment(m_fill); |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::tryRead(int numElements, T *elements) |
| { |
| int numRead = 0; |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| if (!m_readLock.tryLock()) |
| return numRead; |
| |
| while (numRead < numElements) |
| { |
| int ret = readFromCurrentBlock(numElements - numRead, &elements[numRead], false /* non-blocking */); |
| |
| if (ret == 0) |
| break; /* Failed. */ |
| |
| numRead += ret; |
| } |
| |
| m_readLock.unlock(); |
| |
| return numRead; |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::read(int numElements, T *elements) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| m_readLock.lock(); |
| |
| int numRead = 0; |
| while (numRead < numElements) |
| numRead += readFromCurrentBlock(numElements - numRead, &elements[numRead], true /* blocking */); |
| |
| m_readLock.unlock(); |
| } |
| |
| } // namespace de |
| |
| #endif // _DEBLOCKBUFFER_HPP |