| /* |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
| * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| /** |
| * Thread message API test |
| */ |
| |
| #include "libavutil/avassert.h" |
| #include "libavutil/avstring.h" |
| #include "libavutil/frame.h" |
| #include "libavutil/threadmessage.h" |
| #include "libavutil/thread.h" // not public |
| |
| struct sender_data { |
| int id; |
| pthread_t tid; |
| int workload; |
| AVThreadMessageQueue *queue; |
| }; |
| |
| /* same as sender_data but shuffled for testing purpose */ |
| struct receiver_data { |
| pthread_t tid; |
| int workload; |
| int id; |
| AVThreadMessageQueue *queue; |
| }; |
| |
| struct message { |
| AVFrame *frame; |
| // we add some junk in the message to make sure the message size is > |
| // sizeof(void*) |
| int magic; |
| }; |
| |
| #define MAGIC 0xdeadc0de |
| |
| static void free_frame(void *arg) |
| { |
| struct message *msg = arg; |
| av_assert0(msg->magic == MAGIC); |
| av_frame_free(&msg->frame); |
| } |
| |
| static void *sender_thread(void *arg) |
| { |
| int i, ret = 0; |
| struct sender_data *wd = arg; |
| |
| av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); |
| for (i = 0; i < wd->workload; i++) { |
| if (rand() % wd->workload < wd->workload / 10) { |
| av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); |
| av_thread_message_flush(wd->queue); |
| } else { |
| char *val; |
| AVDictionary *meta = NULL; |
| struct message msg = { |
| .magic = MAGIC, |
| .frame = av_frame_alloc(), |
| }; |
| |
| if (!msg.frame) { |
| ret = AVERROR(ENOMEM); |
| break; |
| } |
| |
| /* we add some metadata to identify the frames */ |
| val = av_asprintf("frame %d/%d from sender %d", |
| i + 1, wd->workload, wd->id); |
| if (!val) { |
| av_frame_free(&msg.frame); |
| ret = AVERROR(ENOMEM); |
| break; |
| } |
| ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); |
| if (ret < 0) { |
| av_frame_free(&msg.frame); |
| break; |
| } |
| av_frame_set_metadata(msg.frame, meta); |
| |
| /* allocate a real frame in order to simulate "real" work */ |
| msg.frame->format = AV_PIX_FMT_RGBA; |
| msg.frame->width = 320; |
| msg.frame->height = 240; |
| ret = av_frame_get_buffer(msg.frame, 32); |
| if (ret < 0) { |
| av_frame_free(&msg.frame); |
| break; |
| } |
| |
| /* push the frame in the common queue */ |
| av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", |
| wd->id, i + 1, wd->workload, msg.frame); |
| ret = av_thread_message_queue_send(wd->queue, &msg, 0); |
| if (ret < 0) { |
| av_frame_free(&msg.frame); |
| break; |
| } |
| } |
| } |
| av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", |
| wd->id, av_err2str(ret)); |
| av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); |
| return NULL; |
| } |
| |
| static void *receiver_thread(void *arg) |
| { |
| int i, ret = 0; |
| struct receiver_data *rd = arg; |
| |
| for (i = 0; i < rd->workload; i++) { |
| if (rand() % rd->workload < rd->workload / 10) { |
| av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue\n", rd->id); |
| av_thread_message_flush(rd->queue); |
| } else { |
| struct message msg; |
| AVDictionary *meta; |
| AVDictionaryEntry *e; |
| |
| ret = av_thread_message_queue_recv(rd->queue, &msg, 0); |
| if (ret < 0) |
| break; |
| av_assert0(msg.magic == MAGIC); |
| meta = av_frame_get_metadata(msg.frame); |
| e = av_dict_get(meta, "sig", NULL, 0); |
| av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); |
| av_frame_free(&msg.frame); |
| } |
| } |
| |
| av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); |
| av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); |
| |
| return NULL; |
| } |
| |
| static int get_workload(int minv, int maxv) |
| { |
| return maxv == minv ? maxv : rand() % (maxv - minv) + minv; |
| } |
| |
| int main(int ac, char **av) |
| { |
| int i, ret = 0; |
| int max_queue_size; |
| int nb_senders, sender_min_load, sender_max_load; |
| int nb_receivers, receiver_min_load, receiver_max_load; |
| struct sender_data *senders; |
| struct receiver_data *receivers; |
| AVThreadMessageQueue *queue = NULL; |
| |
| if (ac != 8) { |
| av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " |
| "<nb_senders> <sender_min_send> <sender_max_send> " |
| "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); |
| return 1; |
| } |
| |
| max_queue_size = atoi(av[1]); |
| nb_senders = atoi(av[2]); |
| sender_min_load = atoi(av[3]); |
| sender_max_load = atoi(av[4]); |
| nb_receivers = atoi(av[5]); |
| receiver_min_load = atoi(av[6]); |
| receiver_max_load = atoi(av[7]); |
| |
| if (max_queue_size <= 0 || |
| nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || |
| nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { |
| av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); |
| return 1; |
| } |
| |
| av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " |
| "%d receivers receiving [%d-%d]\n", max_queue_size, |
| nb_senders, sender_min_load, sender_max_load, |
| nb_receivers, receiver_min_load, receiver_max_load); |
| |
| senders = av_mallocz_array(nb_senders, sizeof(*senders)); |
| receivers = av_mallocz_array(nb_receivers, sizeof(*receivers)); |
| if (!senders || !receivers) { |
| ret = AVERROR(ENOMEM); |
| goto end; |
| } |
| |
| ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); |
| if (ret < 0) |
| goto end; |
| |
| av_thread_message_queue_set_free_func(queue, free_frame); |
| |
| #define SPAWN_THREADS(type) do { \ |
| for (i = 0; i < nb_##type##s; i++) { \ |
| struct type##_data *td = &type##s[i]; \ |
| \ |
| td->id = i; \ |
| td->queue = queue; \ |
| td->workload = get_workload(type##_min_load, type##_max_load); \ |
| \ |
| ret = pthread_create(&td->tid, NULL, type##_thread, td); \ |
| if (ret) { \ |
| const int err = AVERROR(ret); \ |
| av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \ |
| " thread: %s\n", av_err2str(err)); \ |
| goto end; \ |
| } \ |
| } \ |
| } while (0) |
| |
| #define WAIT_THREADS(type) do { \ |
| for (i = 0; i < nb_##type##s; i++) { \ |
| struct type##_data *td = &type##s[i]; \ |
| \ |
| ret = pthread_join(td->tid, NULL); \ |
| if (ret) { \ |
| const int err = AVERROR(ret); \ |
| av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \ |
| " thread: %s\n", av_err2str(err)); \ |
| goto end; \ |
| } \ |
| } \ |
| } while (0) |
| |
| SPAWN_THREADS(receiver); |
| SPAWN_THREADS(sender); |
| |
| WAIT_THREADS(sender); |
| WAIT_THREADS(receiver); |
| |
| end: |
| av_thread_message_queue_free(&queue); |
| av_freep(&senders); |
| av_freep(&receivers); |
| |
| if (ret < 0 && ret != AVERROR_EOF) { |
| av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); |
| return 1; |
| } |
| return 0; |
| } |