[FFmpeg-devel] [PATCH] avcodec/frame_thread_encoder: factorize frame threading
Marton Balint
cus at passwd.hu
Sun Apr 7 21:47:05 EEST 2019
framethread.c is put into libavutil, but is has to be included directly to
avoid creating avpriv functions.
Functionality should be identical, there is one slight difference: we close the
per-thread avcodec contexts in the main thread and not in the workers.
Signed-off-by: Marton Balint <cus at passwd.hu>
---
libavcodec/frame_thread_encoder.c | 229 ++++++++++++++------------------------
libavutil/framethread.c | 207 ++++++++++++++++++++++++++++++++++
2 files changed, 288 insertions(+), 148 deletions(-)
create mode 100644 libavutil/framethread.c
diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c
index 55756c4c54..72b5d72803 100644
--- a/libavcodec/frame_thread_encoder.c
+++ b/libavcodec/frame_thread_encoder.c
@@ -22,7 +22,6 @@
#include "frame_thread_encoder.h"
-#include "libavutil/fifo.h"
#include "libavutil/avassert.h"
#include "libavutil/imgutils.h"
#include "libavutil/opt.h"
@@ -31,93 +30,67 @@
#include "internal.h"
#include "thread.h"
-#define MAX_THREADS 64
-#define BUFFER_SIZE (2*MAX_THREADS)
+#include "libavutil/framethread.c"
-typedef struct{
- void *indata;
- void *outdata;
- int64_t return_code;
- unsigned index;
-} Task;
-
-typedef struct{
+typedef struct {
+ AVCodecContext *thread_avctx[FRAMETHREAD_MAX_THREADS];
AVCodecContext *parent_avctx;
pthread_mutex_t buffer_mutex;
+ FrameThread *framethread;
+} EncodeThreadContext;
+
+typedef struct {
+ EncodeThreadContext *ctx;
+ AVFrame *frame;
+ AVPacket *packet;
+} EncodeTaskContext;
+
+static void encode_task_free(void *task_priv)
+{
+ EncodeTaskContext *task = task_priv;
+ av_frame_free(&task->frame);
+ av_packet_free(&task->packet);
+}
- AVFifoBuffer *task_fifo;
- pthread_mutex_t task_fifo_mutex;
- pthread_cond_t task_fifo_cond;
-
- Task finished_tasks[BUFFER_SIZE];
- pthread_mutex_t finished_task_mutex;
- pthread_cond_t finished_task_cond;
-
- unsigned task_index;
- unsigned finished_task_index;
-
- pthread_t worker[MAX_THREADS];
- atomic_int exit;
-} ThreadContext;
-
-static void * attribute_align_arg worker(void *v){
- AVCodecContext *avctx = v;
- ThreadContext *c = avctx->internal->frame_thread_encoder;
- AVPacket *pkt = NULL;
-
- while (!atomic_load(&c->exit)) {
- int got_packet, ret;
- AVFrame *frame;
- Task task;
-
- if(!pkt) pkt = av_packet_alloc();
- if(!pkt) continue;
- av_init_packet(pkt);
-
- pthread_mutex_lock(&c->task_fifo_mutex);
- while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
- if (atomic_load(&c->exit)) {
- pthread_mutex_unlock(&c->task_fifo_mutex);
- goto end;
- }
- pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
- }
- av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
- pthread_mutex_unlock(&c->task_fifo_mutex);
- frame = task.indata;
+static int encode_task_func(void *task_priv, int thread_id)
+{
+ EncodeTaskContext *task = task_priv;
+ EncodeThreadContext *c = task->ctx;
+ AVPacket *pkt = av_packet_alloc();
+ int got_packet, ret;
- ret = avcodec_encode_video2(avctx, pkt, frame, &got_packet);
+ if (!pkt) {
pthread_mutex_lock(&c->buffer_mutex);
- av_frame_unref(frame);
+ av_frame_unref(task->frame);
pthread_mutex_unlock(&c->buffer_mutex);
- av_frame_free(&frame);
- if(got_packet) {
- int ret2 = av_packet_make_refcounted(pkt);
- if (ret >= 0 && ret2 < 0)
- ret = ret2;
- } else {
- pkt->data = NULL;
- pkt->size = 0;
- }
- pthread_mutex_lock(&c->finished_task_mutex);
- c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
- c->finished_tasks[task.index].return_code = ret;
- pthread_cond_signal(&c->finished_task_cond);
- pthread_mutex_unlock(&c->finished_task_mutex);
+ av_frame_free(&task->frame);
+ return AVERROR(ENOMEM);
}
-end:
- av_free(pkt);
+
+ av_init_packet(pkt);
+
+ ret = avcodec_encode_video2(c->thread_avctx[thread_id], pkt, task->frame, &got_packet);
pthread_mutex_lock(&c->buffer_mutex);
- avcodec_close(avctx);
+ av_frame_unref(task->frame);
pthread_mutex_unlock(&c->buffer_mutex);
- av_freep(&avctx);
- return NULL;
+ av_frame_free(&task->frame);
+ if(got_packet) {
+ int ret2 = av_packet_make_refcounted(pkt);
+ if (ret >= 0 && ret2 < 0)
+ ret = ret2;
+ } else {
+ pkt->data = NULL;
+ pkt->size = 0;
+ }
+ task->packet = pkt;
+
+ return ret;
}
int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
int i=0;
- ThreadContext *c;
-
+ int ret;
+ EncodeThreadContext *c;
if( !(avctx->thread_type & FF_THREAD_FRAME)
|| !(avctx->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY))
@@ -164,32 +137,29 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
if(!avctx->thread_count) {
avctx->thread_count = av_cpu_count();
- avctx->thread_count = FFMIN(avctx->thread_count, MAX_THREADS);
+ avctx->thread_count = FFMIN(avctx->thread_count, FRAMETHREAD_MAX_THREADS);
}
if(avctx->thread_count <= 1)
return 0;
- if(avctx->thread_count > MAX_THREADS)
+ if(avctx->thread_count > FRAMETHREAD_MAX_THREADS)
return AVERROR(EINVAL);
av_assert0(!avctx->internal->frame_thread_encoder);
- c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(ThreadContext));
+ c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(EncodeThreadContext));
if(!c)
return AVERROR(ENOMEM);
- c->parent_avctx = avctx;
+ ret = framethread_create(&c->framethread, encode_task_func, encode_task_free, avctx->thread_count);
+ if (ret < 0) {
+ av_freep(&avctx->internal->frame_thread_encoder);
+ return ret;
+ }
- c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
- if(!c->task_fifo)
- goto fail;
+ c->parent_avctx = avctx;
- pthread_mutex_init(&c->task_fifo_mutex, NULL);
- pthread_mutex_init(&c->finished_task_mutex, NULL);
pthread_mutex_init(&c->buffer_mutex, NULL);
- pthread_cond_init(&c->task_fifo_cond, NULL);
- pthread_cond_init(&c->finished_task_cond, NULL);
- atomic_init(&c->exit, 0);
for(i=0; i<avctx->thread_count ; i++){
AVDictionary *tmp = NULL;
@@ -223,9 +193,7 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
av_dict_free(&tmp);
av_assert0(!thread_avctx->internal->frame_thread_encoder);
thread_avctx->internal->frame_thread_encoder = c;
- if(pthread_create(&c->worker[i], NULL, worker, thread_avctx)) {
- goto fail;
- }
+ c->thread_avctx[i] = thread_avctx;
}
avctx->active_thread_type = FF_THREAD_FRAME;
@@ -239,53 +207,27 @@ fail:
}
void ff_frame_thread_encoder_free(AVCodecContext *avctx){
- int i;
- ThreadContext *c= avctx->internal->frame_thread_encoder;
+ EncodeThreadContext *c= avctx->internal->frame_thread_encoder;
- pthread_mutex_lock(&c->task_fifo_mutex);
- atomic_store(&c->exit, 1);
- pthread_cond_broadcast(&c->task_fifo_cond);
- pthread_mutex_unlock(&c->task_fifo_mutex);
-
- for (i=0; i<avctx->thread_count; i++) {
- pthread_join(c->worker[i], NULL);
- }
-
- while (av_fifo_size(c->task_fifo) > 0) {
- Task task;
- AVFrame *frame;
- av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
- frame = task.indata;
- av_frame_free(&frame);
- task.indata = NULL;
- }
-
- for (i=0; i<BUFFER_SIZE; i++) {
- if (c->finished_tasks[i].outdata != NULL) {
- AVPacket *pkt = c->finished_tasks[i].outdata;
- av_packet_free(&pkt);
- c->finished_tasks[i].outdata = NULL;
- }
- }
-
- pthread_mutex_destroy(&c->task_fifo_mutex);
- pthread_mutex_destroy(&c->finished_task_mutex);
+ framethread_free(&c->framethread);
pthread_mutex_destroy(&c->buffer_mutex);
- pthread_cond_destroy(&c->task_fifo_cond);
- pthread_cond_destroy(&c->finished_task_cond);
- av_fifo_freep(&c->task_fifo);
+ for (int i = 0; i < avctx->thread_count; i++) {
+ avcodec_close(c->thread_avctx[i]);
+ av_freep(&c->thread_avctx[i]);
+ }
av_freep(&avctx->internal->frame_thread_encoder);
}
int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
- ThreadContext *c = avctx->internal->frame_thread_encoder;
- Task task;
+ EncodeThreadContext *c = avctx->internal->frame_thread_encoder;
+ EncodeTaskContext *t;
int ret;
av_assert1(!*got_packet_ptr);
if(frame){
AVFrame *new = av_frame_alloc();
+ EncodeTaskContext *task;
if(!new)
return AVERROR(ENOMEM);
ret = av_frame_ref(new, frame);
@@ -293,35 +235,26 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF
av_frame_free(&new);
return ret;
}
+ task = av_mallocz(sizeof(*task));
+ if (!task) {
+ av_frame_free(&new);
+ return AVERROR(ENOMEM);
+ }
+ task->ctx = c;
+ task->frame = new;
- task.index = c->task_index;
- task.indata = (void*)new;
- pthread_mutex_lock(&c->task_fifo_mutex);
- av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
- pthread_cond_signal(&c->task_fifo_cond);
- pthread_mutex_unlock(&c->task_fifo_mutex);
-
- c->task_index = (c->task_index+1) % BUFFER_SIZE;
+ framethread_submit_task(c->framethread, task);
}
- pthread_mutex_lock(&c->finished_task_mutex);
- if (c->task_index == c->finished_task_index ||
- (frame && !c->finished_tasks[c->finished_task_index].outdata &&
- (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) {
- pthread_mutex_unlock(&c->finished_task_mutex);
- return 0;
- }
+ t = framethread_wait_task(c->framethread, !frame, &ret);
+ if (!t)
+ return 0;
- while (!c->finished_tasks[c->finished_task_index].outdata) {
- pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
- }
- task = c->finished_tasks[c->finished_task_index];
- *pkt = *(AVPacket*)(task.outdata);
+ *pkt = *t->packet;
if(pkt->data)
*got_packet_ptr = 1;
- av_freep(&c->finished_tasks[c->finished_task_index].outdata);
- c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
- pthread_mutex_unlock(&c->finished_task_mutex);
+ av_freep(&t->packet);
+ av_freep(&t);
- return task.return_code;
+ return ret;
}
diff --git a/libavutil/framethread.c b/libavutil/framethread.c
new file mode 100644
index 0000000000..4cd94f4833
--- /dev/null
+++ b/libavutil/framethread.c
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2012 Michael Niedermayer <michaelni at gmx.at>
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdatomic.h>
+
+#include "libavutil/fifo.h"
+#include "libavutil/avassert.h"
+#include "libavutil/thread.h"
+
+#define FRAMETHREAD_MAX_THREADS 64
+#define BUFFER_SIZE (2*FRAMETHREAD_MAX_THREADS)
+
+typedef struct FrameThread FrameThread;
+
+typedef struct Task {
+ int64_t return_code;
+ unsigned index;
+ int finished;
+ void *priv;
+} Task;
+
+typedef struct Worker {
+ FrameThread *c;
+ int thread_id;
+ pthread_t worker;
+} Worker;
+
+typedef struct FrameThread {
+ AVFifoBuffer *task_fifo;
+ pthread_mutex_t task_fifo_mutex;
+ pthread_cond_t task_fifo_cond;
+
+ Task finished_tasks[BUFFER_SIZE];
+ pthread_mutex_t finished_task_mutex;
+ pthread_cond_t finished_task_cond;
+
+ Worker worker[FRAMETHREAD_MAX_THREADS];
+ unsigned task_index;
+ unsigned finished_task_index;
+
+ int (*task_func) (void *priv, int thread_id);
+ void (*task_free_func) (void *priv);
+ int thread_count;
+ atomic_int exit;
+} FrameThread;
+
+static void *attribute_align_arg thread_worker(void *w)
+{
+ Worker *worker = w;
+ FrameThread *c = worker->c;
+
+ while (!atomic_load(&c->exit)) {
+ Task task;
+
+ pthread_mutex_lock(&c->task_fifo_mutex);
+ while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
+ if (atomic_load(&c->exit)) {
+ pthread_mutex_unlock(&c->task_fifo_mutex);
+ goto end;
+ }
+ pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
+ }
+ av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+ pthread_mutex_unlock(&c->task_fifo_mutex);
+ task.return_code = c->task_func(task.priv, worker->thread_id);
+ task.finished = 1;
+ pthread_mutex_lock(&c->finished_task_mutex);
+ c->finished_tasks[task.index] = task;
+ pthread_cond_signal(&c->finished_task_cond);
+ pthread_mutex_unlock(&c->finished_task_mutex);
+ }
+ end:
+ return NULL;
+}
+
+static void framethread_free(FrameThread ** pc)
+{
+ FrameThread *c = *pc;
+
+ if (!c)
+ return;
+
+ pthread_mutex_lock(&c->task_fifo_mutex);
+ atomic_store(&c->exit, 1);
+ pthread_cond_broadcast(&c->task_fifo_cond);
+ pthread_mutex_unlock(&c->task_fifo_mutex);
+
+ for (int i = 0; i < c->thread_count; i++)
+ pthread_join(c->worker[i].worker, NULL);
+
+ while (av_fifo_size(c->task_fifo) > 0) {
+ Task task;
+ av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+ c->task_free_func(task.priv);
+ }
+
+ for (int i = 0; i < BUFFER_SIZE; i++) {
+ if (c->finished_tasks[i].finished)
+ c->task_free_func(c->finished_tasks[i].priv);
+ }
+
+ pthread_mutex_destroy(&c->task_fifo_mutex);
+ pthread_mutex_destroy(&c->finished_task_mutex);
+ pthread_cond_destroy(&c->task_fifo_cond);
+ pthread_cond_destroy(&c->finished_task_cond);
+ av_fifo_freep(&c->task_fifo);
+
+ av_freep(pc);
+}
+
+static int framethread_create(FrameThread ** pctx,
+ int (*task_func) (void *priv, int thread_id),
+ void (*task_free_func) (void *priv), int nb_threads)
+{
+ FrameThread *c;
+
+ av_assert0(nb_threads >= 1);
+ av_assert0(nb_threads <= FRAMETHREAD_MAX_THREADS);
+
+ *pctx = c = av_mallocz(sizeof(*c));
+ if (!c)
+ return AVERROR(ENOMEM);
+
+ c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
+ if (!c->task_fifo) {
+ av_freep(pctx);
+ return AVERROR(ENOMEM);
+ }
+
+ c->task_func = task_func;
+ c->task_free_func = task_free_func;
+
+ pthread_mutex_init(&c->task_fifo_mutex, NULL);
+ pthread_mutex_init(&c->finished_task_mutex, NULL);
+
+ pthread_cond_init(&c->task_fifo_cond, NULL);
+ pthread_cond_init(&c->finished_task_cond, NULL);
+ atomic_init(&c->exit, 0);
+
+ for (int i = 0; i < nb_threads; i++) {
+ c->worker[i].thread_id = i;
+ c->worker[i].c = c;
+ if (pthread_create(&c->worker[i].worker, NULL, thread_worker, &c->worker[i]))
+ goto fail;
+ c->thread_count++;
+ }
+
+ return 0;
+
+ fail:
+ framethread_free(pctx);
+ return AVERROR(ENOMEM);
+}
+
+static void framethread_submit_task(FrameThread * c, void *priv)
+{
+ Task task = { 0 };
+ task.priv = priv;
+ task.index = c->task_index;
+ pthread_mutex_lock(&c->task_fifo_mutex);
+ av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
+ pthread_cond_signal(&c->task_fifo_cond);
+ pthread_mutex_unlock(&c->task_fifo_mutex);
+ c->task_index = (c->task_index + 1) % BUFFER_SIZE;
+}
+
+static void *framethread_wait_task(FrameThread * c, int flush, int *return_code)
+{
+ Task *task;
+ void *task_priv;
+
+ pthread_mutex_lock(&c->finished_task_mutex);
+ if (c->task_index == c->finished_task_index ||
+ (!flush && !c->finished_tasks[c->finished_task_index].finished &&
+ (c->task_index - c->finished_task_index) % BUFFER_SIZE <= c->thread_count)) {
+ pthread_mutex_unlock(&c->finished_task_mutex);
+ return NULL;
+ }
+ while (!c->finished_tasks[c->finished_task_index].finished) {
+ pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
+ }
+ task = &c->finished_tasks[c->finished_task_index];
+ task->finished = 0;
+ *return_code = task->return_code;
+ task_priv = task->priv;
+
+ c->finished_task_index = (c->finished_task_index + 1) % BUFFER_SIZE;
+ pthread_mutex_unlock(&c->finished_task_mutex);
+ return task_priv;
+}
--
2.16.4
More information about the ffmpeg-devel
mailing list