[FFmpeg-devel] [PATCH] avcodec/frame_thread_encoder: factorize frame threading

Marton Balint cus at passwd.hu
Sun Apr 7 21:47:05 EEST 2019


framethread.c is put into libavutil, but is has to be included directly to
avoid creating avpriv functions.

Functionality should be identical, there is one slight difference: we close the
per-thread avcodec contexts in the main thread and not in the workers.

Signed-off-by: Marton Balint <cus at passwd.hu>
---
 libavcodec/frame_thread_encoder.c | 229 ++++++++++++++------------------------
 libavutil/framethread.c           | 207 ++++++++++++++++++++++++++++++++++
 2 files changed, 288 insertions(+), 148 deletions(-)
 create mode 100644 libavutil/framethread.c

diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c
index 55756c4c54..72b5d72803 100644
--- a/libavcodec/frame_thread_encoder.c
+++ b/libavcodec/frame_thread_encoder.c
@@ -22,7 +22,6 @@
 
 #include "frame_thread_encoder.h"
 
-#include "libavutil/fifo.h"
 #include "libavutil/avassert.h"
 #include "libavutil/imgutils.h"
 #include "libavutil/opt.h"
@@ -31,93 +30,67 @@
 #include "internal.h"
 #include "thread.h"
 
-#define MAX_THREADS 64
-#define BUFFER_SIZE (2*MAX_THREADS)
+#include "libavutil/framethread.c"
 
-typedef struct{
-    void *indata;
-    void *outdata;
-    int64_t return_code;
-    unsigned index;
-} Task;
-
-typedef struct{
+typedef struct {
+    AVCodecContext *thread_avctx[FRAMETHREAD_MAX_THREADS];
     AVCodecContext *parent_avctx;
     pthread_mutex_t buffer_mutex;
+    FrameThread *framethread;
+} EncodeThreadContext;
+
+typedef struct {
+    EncodeThreadContext *ctx;
+    AVFrame *frame;
+    AVPacket *packet;
+} EncodeTaskContext;
+
+static void encode_task_free(void *task_priv)
+{
+    EncodeTaskContext *task = task_priv;
+    av_frame_free(&task->frame);
+    av_packet_free(&task->packet);
+}
 
-    AVFifoBuffer *task_fifo;
-    pthread_mutex_t task_fifo_mutex;
-    pthread_cond_t task_fifo_cond;
-
-    Task finished_tasks[BUFFER_SIZE];
-    pthread_mutex_t finished_task_mutex;
-    pthread_cond_t finished_task_cond;
-
-    unsigned task_index;
-    unsigned finished_task_index;
-
-    pthread_t worker[MAX_THREADS];
-    atomic_int exit;
-} ThreadContext;
-
-static void * attribute_align_arg worker(void *v){
-    AVCodecContext *avctx = v;
-    ThreadContext *c = avctx->internal->frame_thread_encoder;
-    AVPacket *pkt = NULL;
-
-    while (!atomic_load(&c->exit)) {
-        int got_packet, ret;
-        AVFrame *frame;
-        Task task;
-
-        if(!pkt) pkt = av_packet_alloc();
-        if(!pkt) continue;
-        av_init_packet(pkt);
-
-        pthread_mutex_lock(&c->task_fifo_mutex);
-        while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
-            if (atomic_load(&c->exit)) {
-                pthread_mutex_unlock(&c->task_fifo_mutex);
-                goto end;
-            }
-            pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
-        }
-        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
-        pthread_mutex_unlock(&c->task_fifo_mutex);
-        frame = task.indata;
+static int encode_task_func(void *task_priv, int thread_id)
+{
+    EncodeTaskContext *task = task_priv;
+    EncodeThreadContext *c = task->ctx;
+    AVPacket *pkt = av_packet_alloc();
+    int got_packet, ret;
 
-        ret = avcodec_encode_video2(avctx, pkt, frame, &got_packet);
+    if (!pkt) {
         pthread_mutex_lock(&c->buffer_mutex);
-        av_frame_unref(frame);
+        av_frame_unref(task->frame);
         pthread_mutex_unlock(&c->buffer_mutex);
-        av_frame_free(&frame);
-        if(got_packet) {
-            int ret2 = av_packet_make_refcounted(pkt);
-            if (ret >= 0 && ret2 < 0)
-                ret = ret2;
-        } else {
-            pkt->data = NULL;
-            pkt->size = 0;
-        }
-        pthread_mutex_lock(&c->finished_task_mutex);
-        c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
-        c->finished_tasks[task.index].return_code = ret;
-        pthread_cond_signal(&c->finished_task_cond);
-        pthread_mutex_unlock(&c->finished_task_mutex);
+        av_frame_free(&task->frame);
+        return AVERROR(ENOMEM);
     }
-end:
-    av_free(pkt);
+
+    av_init_packet(pkt);
+
+    ret = avcodec_encode_video2(c->thread_avctx[thread_id], pkt, task->frame, &got_packet);
     pthread_mutex_lock(&c->buffer_mutex);
-    avcodec_close(avctx);
+    av_frame_unref(task->frame);
     pthread_mutex_unlock(&c->buffer_mutex);
-    av_freep(&avctx);
-    return NULL;
+    av_frame_free(&task->frame);
+    if(got_packet) {
+        int ret2 = av_packet_make_refcounted(pkt);
+        if (ret >= 0 && ret2 < 0)
+            ret = ret2;
+    } else {
+        pkt->data = NULL;
+        pkt->size = 0;
+    }
+    task->packet = pkt;
+
+    return ret;
 }
 
 int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
     int i=0;
-    ThreadContext *c;
-
+    int ret;
+    EncodeThreadContext *c;
 
     if(   !(avctx->thread_type & FF_THREAD_FRAME)
        || !(avctx->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY))
@@ -164,32 +137,29 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
 
     if(!avctx->thread_count) {
         avctx->thread_count = av_cpu_count();
-        avctx->thread_count = FFMIN(avctx->thread_count, MAX_THREADS);
+        avctx->thread_count = FFMIN(avctx->thread_count, FRAMETHREAD_MAX_THREADS);
     }
 
     if(avctx->thread_count <= 1)
         return 0;
 
-    if(avctx->thread_count > MAX_THREADS)
+    if(avctx->thread_count > FRAMETHREAD_MAX_THREADS)
         return AVERROR(EINVAL);
 
     av_assert0(!avctx->internal->frame_thread_encoder);
-    c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(ThreadContext));
+    c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(EncodeThreadContext));
     if(!c)
         return AVERROR(ENOMEM);
 
-    c->parent_avctx = avctx;
+    ret = framethread_create(&c->framethread, encode_task_func, encode_task_free, avctx->thread_count);
+    if (ret < 0) {
+        av_freep(&avctx->internal->frame_thread_encoder);
+        return ret;
+    }
 
-    c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
-    if(!c->task_fifo)
-        goto fail;
+    c->parent_avctx = avctx;
 
-    pthread_mutex_init(&c->task_fifo_mutex, NULL);
-    pthread_mutex_init(&c->finished_task_mutex, NULL);
     pthread_mutex_init(&c->buffer_mutex, NULL);
-    pthread_cond_init(&c->task_fifo_cond, NULL);
-    pthread_cond_init(&c->finished_task_cond, NULL);
-    atomic_init(&c->exit, 0);
 
     for(i=0; i<avctx->thread_count ; i++){
         AVDictionary *tmp = NULL;
@@ -223,9 +193,7 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
         av_dict_free(&tmp);
         av_assert0(!thread_avctx->internal->frame_thread_encoder);
         thread_avctx->internal->frame_thread_encoder = c;
-        if(pthread_create(&c->worker[i], NULL, worker, thread_avctx)) {
-            goto fail;
-        }
+        c->thread_avctx[i] = thread_avctx;
     }
 
     avctx->active_thread_type = FF_THREAD_FRAME;
@@ -239,53 +207,27 @@ fail:
 }
 
 void ff_frame_thread_encoder_free(AVCodecContext *avctx){
-    int i;
-    ThreadContext *c= avctx->internal->frame_thread_encoder;
+    EncodeThreadContext *c= avctx->internal->frame_thread_encoder;
 
-    pthread_mutex_lock(&c->task_fifo_mutex);
-    atomic_store(&c->exit, 1);
-    pthread_cond_broadcast(&c->task_fifo_cond);
-    pthread_mutex_unlock(&c->task_fifo_mutex);
-
-    for (i=0; i<avctx->thread_count; i++) {
-         pthread_join(c->worker[i], NULL);
-    }
-
-    while (av_fifo_size(c->task_fifo) > 0) {
-        Task task;
-        AVFrame *frame;
-        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
-        frame = task.indata;
-        av_frame_free(&frame);
-        task.indata = NULL;
-    }
-
-    for (i=0; i<BUFFER_SIZE; i++) {
-        if (c->finished_tasks[i].outdata != NULL) {
-            AVPacket *pkt = c->finished_tasks[i].outdata;
-            av_packet_free(&pkt);
-            c->finished_tasks[i].outdata = NULL;
-        }
-    }
-
-    pthread_mutex_destroy(&c->task_fifo_mutex);
-    pthread_mutex_destroy(&c->finished_task_mutex);
+    framethread_free(&c->framethread);
     pthread_mutex_destroy(&c->buffer_mutex);
-    pthread_cond_destroy(&c->task_fifo_cond);
-    pthread_cond_destroy(&c->finished_task_cond);
-    av_fifo_freep(&c->task_fifo);
+    for (int i = 0; i < avctx->thread_count; i++) {
+        avcodec_close(c->thread_avctx[i]);
+        av_freep(&c->thread_avctx[i]);
+    }
     av_freep(&avctx->internal->frame_thread_encoder);
 }
 
 int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
-    ThreadContext *c = avctx->internal->frame_thread_encoder;
-    Task task;
+    EncodeThreadContext *c = avctx->internal->frame_thread_encoder;
+    EncodeTaskContext *t;
     int ret;
 
     av_assert1(!*got_packet_ptr);
 
     if(frame){
         AVFrame *new = av_frame_alloc();
+        EncodeTaskContext *task;
         if(!new)
             return AVERROR(ENOMEM);
         ret = av_frame_ref(new, frame);
@@ -293,35 +235,26 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF
             av_frame_free(&new);
             return ret;
         }
+        task = av_mallocz(sizeof(*task));
+        if (!task) {
+            av_frame_free(&new);
+            return AVERROR(ENOMEM);
+        }
+        task->ctx = c;
+        task->frame = new;
 
-        task.index = c->task_index;
-        task.indata = (void*)new;
-        pthread_mutex_lock(&c->task_fifo_mutex);
-        av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
-        pthread_cond_signal(&c->task_fifo_cond);
-        pthread_mutex_unlock(&c->task_fifo_mutex);
-
-        c->task_index = (c->task_index+1) % BUFFER_SIZE;
+        framethread_submit_task(c->framethread, task);
     }
 
-    pthread_mutex_lock(&c->finished_task_mutex);
-    if (c->task_index == c->finished_task_index ||
-        (frame && !c->finished_tasks[c->finished_task_index].outdata &&
-         (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) {
-            pthread_mutex_unlock(&c->finished_task_mutex);
-            return 0;
-        }
+    t = framethread_wait_task(c->framethread, !frame, &ret);
+    if (!t)
+        return 0;
 
-    while (!c->finished_tasks[c->finished_task_index].outdata) {
-        pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
-    }
-    task = c->finished_tasks[c->finished_task_index];
-    *pkt = *(AVPacket*)(task.outdata);
+    *pkt = *t->packet;
     if(pkt->data)
         *got_packet_ptr = 1;
-    av_freep(&c->finished_tasks[c->finished_task_index].outdata);
-    c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
-    pthread_mutex_unlock(&c->finished_task_mutex);
+    av_freep(&t->packet);
+    av_freep(&t);
 
-    return task.return_code;
+    return ret;
 }
diff --git a/libavutil/framethread.c b/libavutil/framethread.c
new file mode 100644
index 0000000000..4cd94f4833
--- /dev/null
+++ b/libavutil/framethread.c
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2012 Michael Niedermayer <michaelni at gmx.at>
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdatomic.h>
+
+#include "libavutil/fifo.h"
+#include "libavutil/avassert.h"
+#include "libavutil/thread.h"
+
+#define FRAMETHREAD_MAX_THREADS 64
+#define BUFFER_SIZE (2*FRAMETHREAD_MAX_THREADS)
+
+typedef struct FrameThread FrameThread;
+
+typedef struct Task {
+    int64_t return_code;
+    unsigned index;
+    int finished;
+    void *priv;
+} Task;
+
+typedef struct Worker {
+    FrameThread *c;
+    int thread_id;
+    pthread_t worker;
+} Worker;
+
+typedef struct FrameThread {
+    AVFifoBuffer *task_fifo;
+    pthread_mutex_t task_fifo_mutex;
+    pthread_cond_t task_fifo_cond;
+
+    Task finished_tasks[BUFFER_SIZE];
+    pthread_mutex_t finished_task_mutex;
+    pthread_cond_t finished_task_cond;
+
+    Worker worker[FRAMETHREAD_MAX_THREADS];
+    unsigned task_index;
+    unsigned finished_task_index;
+
+    int (*task_func) (void *priv, int thread_id);
+    void (*task_free_func) (void *priv);
+    int thread_count;
+    atomic_int exit;
+} FrameThread;
+
+static void *attribute_align_arg thread_worker(void *w)
+{
+    Worker *worker = w;
+    FrameThread *c = worker->c;
+
+    while (!atomic_load(&c->exit)) {
+        Task task;
+
+        pthread_mutex_lock(&c->task_fifo_mutex);
+        while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
+            if (atomic_load(&c->exit)) {
+                pthread_mutex_unlock(&c->task_fifo_mutex);
+                goto end;
+            }
+            pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
+        }
+        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+        pthread_mutex_unlock(&c->task_fifo_mutex);
+        task.return_code = c->task_func(task.priv, worker->thread_id);
+        task.finished = 1;
+        pthread_mutex_lock(&c->finished_task_mutex);
+        c->finished_tasks[task.index] = task;
+        pthread_cond_signal(&c->finished_task_cond);
+        pthread_mutex_unlock(&c->finished_task_mutex);
+    }
+  end:
+    return NULL;
+}
+
+static void framethread_free(FrameThread ** pc)
+{
+    FrameThread *c = *pc;
+
+    if (!c)
+        return;
+
+    pthread_mutex_lock(&c->task_fifo_mutex);
+    atomic_store(&c->exit, 1);
+    pthread_cond_broadcast(&c->task_fifo_cond);
+    pthread_mutex_unlock(&c->task_fifo_mutex);
+
+    for (int i = 0; i < c->thread_count; i++)
+        pthread_join(c->worker[i].worker, NULL);
+
+    while (av_fifo_size(c->task_fifo) > 0) {
+        Task task;
+        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
+        c->task_free_func(task.priv);
+    }
+
+    for (int i = 0; i < BUFFER_SIZE; i++) {
+        if (c->finished_tasks[i].finished)
+            c->task_free_func(c->finished_tasks[i].priv);
+    }
+
+    pthread_mutex_destroy(&c->task_fifo_mutex);
+    pthread_mutex_destroy(&c->finished_task_mutex);
+    pthread_cond_destroy(&c->task_fifo_cond);
+    pthread_cond_destroy(&c->finished_task_cond);
+    av_fifo_freep(&c->task_fifo);
+
+    av_freep(pc);
+}
+
+static int framethread_create(FrameThread ** pctx,
+                              int (*task_func) (void *priv, int thread_id),
+                              void (*task_free_func) (void *priv), int nb_threads)
+{
+    FrameThread *c;
+
+    av_assert0(nb_threads >= 1);
+    av_assert0(nb_threads <= FRAMETHREAD_MAX_THREADS);
+
+    *pctx = c = av_mallocz(sizeof(*c));
+    if (!c)
+        return AVERROR(ENOMEM);
+
+    c->task_fifo = av_fifo_alloc_array(BUFFER_SIZE, sizeof(Task));
+    if (!c->task_fifo) {
+        av_freep(pctx);
+        return AVERROR(ENOMEM);
+    }
+
+    c->task_func = task_func;
+    c->task_free_func = task_free_func;
+
+    pthread_mutex_init(&c->task_fifo_mutex, NULL);
+    pthread_mutex_init(&c->finished_task_mutex, NULL);
+
+    pthread_cond_init(&c->task_fifo_cond, NULL);
+    pthread_cond_init(&c->finished_task_cond, NULL);
+    atomic_init(&c->exit, 0);
+
+    for (int i = 0; i < nb_threads; i++) {
+        c->worker[i].thread_id = i;
+        c->worker[i].c = c;
+        if (pthread_create(&c->worker[i].worker, NULL, thread_worker, &c->worker[i]))
+            goto fail;
+        c->thread_count++;
+    }
+
+    return 0;
+
+  fail:
+    framethread_free(pctx);
+    return AVERROR(ENOMEM);
+}
+
+static void framethread_submit_task(FrameThread * c, void *priv)
+{
+    Task task = { 0 };
+    task.priv = priv;
+    task.index = c->task_index;
+    pthread_mutex_lock(&c->task_fifo_mutex);
+    av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
+    pthread_cond_signal(&c->task_fifo_cond);
+    pthread_mutex_unlock(&c->task_fifo_mutex);
+    c->task_index = (c->task_index + 1) % BUFFER_SIZE;
+}
+
+static void *framethread_wait_task(FrameThread * c, int flush, int *return_code)
+{
+    Task *task;
+    void *task_priv;
+
+    pthread_mutex_lock(&c->finished_task_mutex);
+    if (c->task_index == c->finished_task_index ||
+        (!flush && !c->finished_tasks[c->finished_task_index].finished &&
+         (c->task_index - c->finished_task_index) % BUFFER_SIZE <= c->thread_count)) {
+        pthread_mutex_unlock(&c->finished_task_mutex);
+        return NULL;
+    }
+    while (!c->finished_tasks[c->finished_task_index].finished) {
+        pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
+    }
+    task = &c->finished_tasks[c->finished_task_index];
+    task->finished = 0;
+    *return_code = task->return_code;
+    task_priv = task->priv;
+
+    c->finished_task_index = (c->finished_task_index + 1) % BUFFER_SIZE;
+    pthread_mutex_unlock(&c->finished_task_mutex);
+    return task_priv;
+}
-- 
2.16.4



More information about the ffmpeg-devel mailing list