| // Copyright 2014 The Native Client Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <pthread.h> |
| #include <semaphore.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <stdint.h> |
| |
| #include "native_client/tests/benchmark/thread_pool.h" |
| |
| namespace sdk_util { |
| |
| // Initializes mutex, semaphores and a pool of threads. If 0 is passed for |
| // num_threads, all work will be performed on the dispatch thread. |
| ThreadPool::ThreadPool(int num_threads) |
| : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), |
| user_data_(NULL), user_work_function_(NULL) { |
| if (num_threads_ > 0) { |
| int status; |
| status = sem_init(&work_sem_, 0, 0); |
| if (-1 == status) { |
| fprintf(stderr, "Failed to initialize semaphore!\n"); |
| exit(-1); |
| } |
| status = sem_init(&done_sem_, 0, 0); |
| if (-1 == status) { |
| fprintf(stderr, "Failed to initialize semaphore!\n"); |
| exit(-1); |
| } |
| threads_ = new pthread_t[num_threads_]; |
| for (int i = 0; i < num_threads_; i++) { |
| status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); |
| if (0 != status) { |
| fprintf(stderr, "Failed to create thread!\n"); |
| exit(-1); |
| } |
| } |
| } |
| } |
| |
| // Post exit request, wait for all threads to join, and cleanup. |
| ThreadPool::~ThreadPool() { |
| if (num_threads_ > 0) { |
| PostExitAndJoinAll(); |
| delete[] threads_; |
| sem_destroy(&done_sem_); |
| sem_destroy(&work_sem_); |
| } |
| } |
| |
| // Setup work parameters. This function is called from the dispatch thread, |
| // when all worker threads are sleeping. |
| void ThreadPool::Setup(int counter, WorkFunction work, void *data) { |
| counter_ = counter; |
| user_work_function_ = work; |
| user_data_ = data; |
| } |
| |
| // Return decremented task counter. This function |
| // can be called from multiple threads at any given time. |
| int ThreadPool::DecCounter() { |
| return __sync_add_and_fetch(&counter_, -1); |
| } |
| |
| // Set exit flag, post and join all the threads in the pool. This function is |
| // called only from the dispatch thread, and only when all worker threads are |
| // sleeping. |
| void ThreadPool::PostExitAndJoinAll() { |
| exiting_ = true; |
| // Wake up all the sleeping worker threads. |
| for (int i = 0; i < num_threads_; ++i) |
| sem_post(&work_sem_); |
| void* retval; |
| for (int i = 0; i < num_threads_; ++i) |
| pthread_join(threads_[i], &retval); |
| } |
| |
| // Main work loop - one for each worker thread. |
| void ThreadPool::WorkLoop() { |
| while (true) { |
| // Wait for work. If no work is availble, this thread will sleep here. |
| sem_wait(&work_sem_); |
| if (exiting_) break; |
| while (true) { |
| // Grab a task index to work on from the counter. |
| int task_index = DecCounter(); |
| if (task_index < 0) |
| break; |
| user_work_function_(task_index, user_data_); |
| } |
| // Post to dispatch thread work is done. |
| sem_post(&done_sem_); |
| } |
| } |
| |
| // pthread entry point for a worker thread. |
| void* ThreadPool::WorkerThreadEntry(void* thiz) { |
| static_cast<ThreadPool*>(thiz)->WorkLoop(); |
| return NULL; |
| } |
| |
| // DispatchMany() will dispatch a set of tasks across worker threads. |
| // Note: This function will block until all work has completed. |
| void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) { |
| // On entry, all worker threads are sleeping. |
| Setup(num_tasks, work, data); |
| |
| // Wake up the worker threads & have them process tasks. |
| for (int i = 0; i < num_threads_; i++) |
| sem_post(&work_sem_); |
| |
| // Worker threads are now awake and busy. |
| |
| // This dispatch thread will now sleep-wait for the worker threads to finish. |
| for (int i = 0; i < num_threads_; i++) |
| sem_wait(&done_sem_); |
| // On exit, all tasks are done and all worker threads are sleeping again. |
| } |
| |
| // DispatchHere will dispatch all tasks on this thread. |
| void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) { |
| for (int i = 0; i < num_tasks; i++) |
| work(i, data); |
| } |
| |
| // Dispatch() will invoke the user supplied work function across |
| // one or more threads for each task. |
| // Note: This function will block until all work has completed. |
| void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { |
| if (num_threads_ > 0) |
| DispatchMany(num_tasks, work, data); |
| else |
| DispatchHere(num_tasks, work, data); |
| } |
| |
| } // namespace sdk_util |