| #ifndef _DEBLOCKBUFFER_HPP |
| #define _DEBLOCKBUFFER_HPP |
| /*------------------------------------------------------------------------- |
| * drawElements C++ Base Library |
| * ----------------------------- |
| * |
| * Copyright 2014 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| *//*! |
| * \file |
| * \brief Block-based thread-safe queue. |
| *//*--------------------------------------------------------------------*/ |
| |
| #include "deBlockBuffer.hpp" |
| #include "deMutex.hpp" |
| #include "deSemaphore.h" |
| |
| #include <exception> |
| |
| namespace de |
| { |
| |
| void BlockBuffer_selfTest (void); |
| |
| class BufferCanceledException : public std::exception |
| { |
| public: |
| inline BufferCanceledException (void) {} |
| inline ~BufferCanceledException (void) throw() {} |
| |
| const char* what (void) const throw() { return "BufferCanceledException"; } |
| }; |
| |
| template <typename T> |
| class BlockBuffer |
| { |
| public: |
| typedef BufferCanceledException CanceledException; |
| |
| BlockBuffer (int blockSize, int numBlocks); |
| ~BlockBuffer (void); |
| |
| void clear (void); //!< Resets buffer. Will block until pending writes and reads have completed. |
| |
| void write (int numElements, const T* elements); |
| int tryWrite (int numElements, const T* elements); |
| void flush (void); |
| bool tryFlush (void); |
| |
| void read (int numElements, T* elements); |
| int tryRead (int numElements, T* elements); |
| |
| void cancel (void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException. |
| bool isCanceled (void) const { return !!m_canceled; } |
| |
| private: |
| BlockBuffer (const BlockBuffer& other); |
| BlockBuffer& operator= (const BlockBuffer& other); |
| |
| int writeToCurrentBlock (int numElements, const T* elements, bool blocking); |
| int readFromCurrentBlock(int numElements, T* elements, bool blocking); |
| |
| void flushWriteBlock (void); |
| |
| deSemaphore m_fill; //!< Block fill count. |
| deSemaphore m_empty; //!< Block empty count. |
| |
| int m_writeBlock; //!< Current write block ndx. |
| int m_writePos; //!< Position in block. 0 if block is not yet acquired. |
| |
| int m_readBlock; //!< Current read block ndx. |
| int m_readPos; //!< Position in block. 0 if block is not yet acquired. |
| |
| int m_blockSize; |
| int m_numBlocks; |
| |
| T* m_elements; |
| int* m_numUsedInBlock; |
| |
| Mutex m_writeLock; |
| Mutex m_readLock; |
| |
| volatile deUint32 m_canceled; |
| } DE_WARN_UNUSED_TYPE; |
| |
| template <typename T> |
| BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks) |
| : m_fill (0) |
| , m_empty (0) |
| , m_writeBlock (0) |
| , m_writePos (0) |
| , m_readBlock (0) |
| , m_readPos (0) |
| , m_blockSize (blockSize) |
| , m_numBlocks (numBlocks) |
| , m_elements (DE_NULL) |
| , m_numUsedInBlock (DE_NULL) |
| , m_writeLock () |
| , m_readLock () |
| , m_canceled (DE_FALSE) |
| { |
| DE_ASSERT(blockSize > 0); |
| DE_ASSERT(numBlocks > 0); |
| |
| try |
| { |
| m_elements = new T[m_numBlocks*m_blockSize]; |
| m_numUsedInBlock = new int[m_numBlocks]; |
| } |
| catch (...) |
| { |
| delete[] m_elements; |
| delete[] m_numUsedInBlock; |
| throw; |
| } |
| |
| m_fill = deSemaphore_create(0, DE_NULL); |
| m_empty = deSemaphore_create(numBlocks, DE_NULL); |
| DE_ASSERT(m_fill && m_empty); |
| } |
| |
| template <typename T> |
| BlockBuffer<T>::~BlockBuffer (void) |
| { |
| delete[] m_elements; |
| delete[] m_numUsedInBlock; |
| |
| deSemaphore_destroy(m_fill); |
| deSemaphore_destroy(m_empty); |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::clear (void) |
| { |
| ScopedLock readLock (m_readLock); |
| ScopedLock writeLock (m_writeLock); |
| |
| deSemaphore_destroy(m_fill); |
| deSemaphore_destroy(m_empty); |
| |
| m_fill = deSemaphore_create(0, DE_NULL); |
| m_empty = deSemaphore_create(m_numBlocks, DE_NULL); |
| m_writeBlock = 0; |
| m_writePos = 0; |
| m_readBlock = 0; |
| m_readPos = 0; |
| m_canceled = DE_FALSE; |
| |
| DE_ASSERT(m_fill && m_empty); |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::cancel (void) |
| { |
| DE_ASSERT(!m_canceled); |
| m_canceled = DE_TRUE; |
| |
| deSemaphore_increment(m_empty); |
| deSemaphore_increment(m_fill); |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_writePos == 0) |
| { |
| /* Write thread doesn't own current block - need to acquire. */ |
| if (blocking) |
| deSemaphore_decrement(m_empty); |
| else |
| { |
| if (!deSemaphore_tryDecrement(m_empty)) |
| return 0; |
| } |
| |
| /* Check for canceled bit. */ |
| if (m_canceled) |
| { |
| // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here. |
| deSemaphore_increment(m_empty); |
| m_writeLock.unlock(); |
| throw CanceledException(); |
| } |
| } |
| |
| /* Write thread owns current block. */ |
| T* block = m_elements + m_writeBlock*m_blockSize; |
| int numToWrite = de::min(numElements, m_blockSize-m_writePos); |
| |
| DE_ASSERT(numToWrite > 0); |
| |
| for (int ndx = 0; ndx < numToWrite; ndx++) |
| block[m_writePos+ndx] = elements[ndx]; |
| |
| m_writePos += numToWrite; |
| |
| if (m_writePos == m_blockSize) |
| flushWriteBlock(); /* Flush current write block. */ |
| |
| return numToWrite; |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_readPos == 0) |
| { |
| /* Read thread doesn't own current block - need to acquire. */ |
| if (blocking) |
| deSemaphore_decrement(m_fill); |
| else |
| { |
| if (!deSemaphore_tryDecrement(m_fill)) |
| return 0; |
| } |
| |
| /* Check for canceled bit. */ |
| if (m_canceled) |
| { |
| // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here. |
| deSemaphore_increment(m_fill); |
| m_readLock.unlock(); |
| throw CanceledException(); |
| } |
| } |
| |
| /* Read thread now owns current block. */ |
| const T* block = m_elements + m_readBlock*m_blockSize; |
| int numUsedInBlock = m_numUsedInBlock[m_readBlock]; |
| int numToRead = de::min(numElements, numUsedInBlock-m_readPos); |
| |
| DE_ASSERT(numToRead > 0); |
| |
| for (int ndx = 0; ndx < numToRead; ndx++) |
| elements[ndx] = block[m_readPos+ndx]; |
| |
| m_readPos += numToRead; |
| |
| if (m_readPos == numUsedInBlock) |
| { |
| /* Free current read block and advance. */ |
| m_readBlock = (m_readBlock+1) % m_numBlocks; |
| m_readPos = 0; |
| deSemaphore_increment(m_empty); |
| } |
| |
| return numToRead; |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::tryWrite (int numElements, const T* elements) |
| { |
| int numWritten = 0; |
| |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| if (!m_writeLock.tryLock()) |
| return numWritten; |
| |
| while (numWritten < numElements) |
| { |
| int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */); |
| |
| if (ret == 0) |
| break; /* Write failed. */ |
| |
| numWritten += ret; |
| } |
| |
| m_writeLock.unlock(); |
| |
| return numWritten; |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::write (int numElements, const T* elements) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| m_writeLock.lock(); |
| |
| int numWritten = 0; |
| while (numWritten < numElements) |
| numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */); |
| |
| m_writeLock.unlock(); |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::flush (void) |
| { |
| m_writeLock.lock(); |
| |
| if (m_writePos > 0) |
| flushWriteBlock(); |
| |
| m_writeLock.unlock(); |
| } |
| |
| template <typename T> |
| bool BlockBuffer<T>::tryFlush (void) |
| { |
| if (!m_writeLock.tryLock()) |
| return false; |
| |
| if (m_writePos > 0) |
| flushWriteBlock(); |
| |
| m_writeLock.unlock(); |
| |
| return true; |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::flushWriteBlock (void) |
| { |
| DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize)); |
| |
| m_numUsedInBlock[m_writeBlock] = m_writePos; |
| m_writeBlock = (m_writeBlock+1) % m_numBlocks; |
| m_writePos = 0; |
| deSemaphore_increment(m_fill); |
| } |
| |
| template <typename T> |
| int BlockBuffer<T>::tryRead (int numElements, T* elements) |
| { |
| int numRead = 0; |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| if (!m_readLock.tryLock()) |
| return numRead; |
| |
| while (numRead < numElements) |
| { |
| int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */); |
| |
| if (ret == 0) |
| break; /* Failed. */ |
| |
| numRead += ret; |
| } |
| |
| m_readLock.unlock(); |
| |
| return numRead; |
| } |
| |
| template <typename T> |
| void BlockBuffer<T>::read (int numElements, T* elements) |
| { |
| DE_ASSERT(numElements > 0 && elements != DE_NULL); |
| |
| if (m_canceled) |
| throw CanceledException(); |
| |
| m_readLock.lock(); |
| |
| int numRead = 0; |
| while (numRead < numElements) |
| numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */); |
| |
| m_readLock.unlock(); |
| } |
| |
| } // de |
| |
| #endif // _DEBLOCKBUFFER_HPP |