| /* |
| * Copyright (C) 2023 Nuo Mi |
| * |
| * This file is part of FFmpeg. |
| * |
| * FFmpeg is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public |
| * License as published by the Free Software Foundation; either |
| * version 2.1 of the License, or (at your option) any later version. |
| * |
| * FFmpeg is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with FFmpeg; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| */ |
| |
| #include "config.h" |
| |
| #include "mem.h" |
| #include "thread.h" |
| |
| #include "executor.h" |
| |
| #if !HAVE_THREADS |
| |
| #define ExecutorThread char |
| |
| #define executor_thread_create(t, a, s, ar) 0 |
| #define executor_thread_join(t, r) do {} while(0) |
| |
| #else |
| |
| #define ExecutorThread pthread_t |
| |
| #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar) |
| #define executor_thread_join(t, r) pthread_join(t, r) |
| |
| #endif //!HAVE_THREADS |
| |
| typedef struct ThreadInfo { |
| AVExecutor *e; |
| ExecutorThread thread; |
| } ThreadInfo; |
| |
| struct AVExecutor { |
| AVTaskCallbacks cb; |
| int thread_count; |
| |
| ThreadInfo *threads; |
| uint8_t *local_contexts; |
| |
| AVMutex lock; |
| AVCond cond; |
| int die; |
| |
| AVTask *tasks; |
| }; |
| |
| static AVTask* remove_task(AVTask **prev, AVTask *t) |
| { |
| *prev = t->next; |
| t->next = NULL; |
| return t; |
| } |
| |
| static void add_task(AVTask **prev, AVTask *t) |
| { |
| t->next = *prev; |
| *prev = t; |
| } |
| |
| static int run_one_task(AVExecutor *e, void *lc) |
| { |
| AVTaskCallbacks *cb = &e->cb; |
| AVTask **prev; |
| |
| for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next) |
| /* nothing */; |
| if (*prev) { |
| AVTask *t = remove_task(prev, *prev); |
| ff_mutex_unlock(&e->lock); |
| cb->run(t, lc, cb->user_data); |
| ff_mutex_lock(&e->lock); |
| return 1; |
| } |
| return 0; |
| } |
| |
| #if HAVE_THREADS |
| static void *executor_worker_task(void *data) |
| { |
| ThreadInfo *ti = (ThreadInfo*)data; |
| AVExecutor *e = ti->e; |
| void *lc = e->local_contexts + (ti - e->threads) * e->cb.local_context_size; |
| |
| ff_mutex_lock(&e->lock); |
| while (1) { |
| if (e->die) break; |
| |
| if (!run_one_task(e, lc)) { |
| //no task in one loop |
| ff_cond_wait(&e->cond, &e->lock); |
| } |
| } |
| ff_mutex_unlock(&e->lock); |
| return NULL; |
| } |
| #endif |
| |
| static void executor_free(AVExecutor *e, const int has_lock, const int has_cond) |
| { |
| if (e->thread_count) { |
| //signal die |
| ff_mutex_lock(&e->lock); |
| e->die = 1; |
| ff_cond_broadcast(&e->cond); |
| ff_mutex_unlock(&e->lock); |
| |
| for (int i = 0; i < e->thread_count; i++) |
| executor_thread_join(e->threads[i].thread, NULL); |
| } |
| if (has_cond) |
| ff_cond_destroy(&e->cond); |
| if (has_lock) |
| ff_mutex_destroy(&e->lock); |
| |
| av_free(e->threads); |
| av_free(e->local_contexts); |
| |
| av_free(e); |
| } |
| |
| AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count) |
| { |
| AVExecutor *e; |
| int has_lock = 0, has_cond = 0; |
| if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher) |
| return NULL; |
| |
| e = av_mallocz(sizeof(*e)); |
| if (!e) |
| return NULL; |
| e->cb = *cb; |
| |
| e->local_contexts = av_calloc(thread_count, e->cb.local_context_size); |
| if (!e->local_contexts) |
| goto free_executor; |
| |
| e->threads = av_calloc(thread_count, sizeof(*e->threads)); |
| if (!e->threads) |
| goto free_executor; |
| |
| has_lock = !ff_mutex_init(&e->lock, NULL); |
| has_cond = !ff_cond_init(&e->cond, NULL); |
| |
| if (!has_lock || !has_cond) |
| goto free_executor; |
| |
| for (/* nothing */; e->thread_count < thread_count; e->thread_count++) { |
| ThreadInfo *ti = e->threads + e->thread_count; |
| ti->e = e; |
| if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti)) |
| goto free_executor; |
| } |
| return e; |
| |
| free_executor: |
| executor_free(e, has_lock, has_cond); |
| return NULL; |
| } |
| |
| void av_executor_free(AVExecutor **executor) |
| { |
| if (!executor || !*executor) |
| return; |
| executor_free(*executor, 1, 1); |
| *executor = NULL; |
| } |
| |
| void av_executor_execute(AVExecutor *e, AVTask *t) |
| { |
| AVTaskCallbacks *cb = &e->cb; |
| AVTask **prev; |
| |
| ff_mutex_lock(&e->lock); |
| if (t) { |
| for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next) |
| /* nothing */; |
| add_task(prev, t); |
| } |
| ff_cond_signal(&e->cond); |
| ff_mutex_unlock(&e->lock); |
| |
| #if !HAVE_THREADS |
| // We are running in a single-threaded environment, so we must handle all tasks ourselves |
| while (run_one_task(e, e->local_contexts)) |
| /* nothing */; |
| #endif |
| } |