[FFmpeg-devel] [PATCH 4/5] lavc: add frame multithreading capability (currently intra only)

Paul B Mahol onemda at gmail.com
Thu Jun 14 23:41:58 CEST 2012


On 6/14/12, Michael Niedermayer <michaelni at gmx.at> wrote:
> Compared to the decoder side, this code is able to change both the
> delay and the number of threads seamlessly during encoding. Also
> any idle thread can pick up tasks, the strict round robin in order
> limit is gone too.
>
> Signed-off-by: Michael Niedermayer <michaelni at gmx.at>
> ---
>  libavcodec/Makefile               |    6 +-
>  libavcodec/frame_thread_encoder.c |  248
> +++++++++++++++++++++++++++++++++++++
>  libavcodec/frame_thread_encoder.h |   26 ++++
>  libavcodec/internal.h             |    2 +
>  libavcodec/utils.c                |   20 ++-
>  5 files changed, 297 insertions(+), 5 deletions(-)
>  create mode 100644 libavcodec/frame_thread_encoder.c
>  create mode 100644 libavcodec/frame_thread_encoder.h
>
> diff --git a/libavcodec/Makefile b/libavcodec/Makefile
> index 851fe1d..27fa556 100644
> --- a/libavcodec/Makefile
> +++ b/libavcodec/Makefile
> @@ -755,9 +755,9 @@ OBJS-$(CONFIG_REMOVE_EXTRADATA_BSF)       +=
> remove_extradata_bsf.o
>  OBJS-$(CONFIG_TEXT2MOVSUB_BSF)            += movsub_bsf.o
>
>  # thread libraries
> -OBJS-$(HAVE_PTHREADS)                  += pthread.o
> -OBJS-$(HAVE_W32THREADS)                += pthread.o
> -OBJS-$(HAVE_OS2THREADS)                += pthread.o
> +OBJS-$(HAVE_PTHREADS)                  += pthread.o frame_thread_encoder.o
> +OBJS-$(HAVE_W32THREADS)                += pthread.o frame_thread_encoder.o
> +OBJS-$(HAVE_OS2THREADS)                += pthread.o frame_thread_encoder.o
>
>  # inverse.o contains the ff_inverse table definition, which is used by
>  # the FASTDIV macro (from libavutil); since referencing the external
> diff --git a/libavcodec/frame_thread_encoder.c
> b/libavcodec/frame_thread_encoder.c
> new file mode 100644
> index 0000000..fd2f379
> --- /dev/null
> +++ b/libavcodec/frame_thread_encoder.c
> @@ -0,0 +1,248 @@
> +/*
> + * Copyright (c) 2012 Michael Niedermayer <michaelni at gmx.at>
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include "frame_thread_encoder.h"
> +
> +#include "libavutil/fifo.h"
> +#include "libavutil/avassert.h"
> +#include "libavutil/imgutils.h"
> +#include "avcodec.h"
> +#include "internal.h"
> +#include "thread.h"
> +
> +#if HAVE_PTHREADS
> +#include <pthread.h>
> +#elif HAVE_W32THREADS
> +#include "w32pthreads.h"
> +#elif HAVE_OS2THREADS
> +#include "os2threads.h"
> +#endif
> +
> +#define MAX_THREADS 64
> +#define BUFFER_SIZE (2*MAX_THREADS)
> +
> +typedef struct{
> +    void *indata;
> +    void *outdata;
> +    int64_t return_code;
> +    unsigned index;
> +} Task;
> +
> +typedef struct{
> +    AVCodecContext *parent_avctx;
> +    pthread_mutex_t buffer_mutex;
> +
> +    AVFifoBuffer *task_fifo;
> +    pthread_mutex_t task_fifo_mutex;
> +    pthread_cond_t task_fifo_cond;
> +
> +    Task finished_tasks[BUFFER_SIZE];
> +    pthread_mutex_t finished_task_mutex;
> +    pthread_cond_t finished_task_cond;
> +
> +    unsigned task_index;
> +    unsigned finished_task_index;
> +
> +    pthread_t worker[MAX_THREADS];
> +    int exit;
> +} ThreadContext;
> +
> +static void * attribute_align_arg worker(void *v){
> +    AVCodecContext *avctx = v;
> +    ThreadContext *c = avctx->internal->frame_thread_encoder;
> +    AVPacket *pkt = NULL;
> +
> +    while(!c->exit){
> +        int got_packet, ret;
> +        AVFrame *frame;
> +        Task task;
> +
> +        if(!pkt) pkt= av_mallocz(sizeof(*pkt));

Please check return value (while also preventing memory leaks).

> +        av_init_packet(pkt);
> +
> +        pthread_mutex_lock(&c->task_fifo_mutex);
> +        while (av_fifo_size(c->task_fifo) <= 0) {
> +            pthread_cond_wait(&c->task_fifo_cond, &c->task_fifo_mutex);
> +            if(c->exit){
> +                pthread_mutex_unlock(&c->task_fifo_mutex);
> +                goto end;
> +            }
> +        }
> +        av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
> +        pthread_mutex_unlock(&c->task_fifo_mutex);
> +        frame = task.indata;
> +
> +        ret = avcodec_encode_video2(avctx, pkt, frame, &got_packet);
> +        pthread_mutex_lock(&c->buffer_mutex);
> +        c->parent_avctx->release_buffer(c->parent_avctx, frame);
> +        pthread_mutex_unlock(&c->buffer_mutex);
> +        av_freep(&frame);
> +        if(!got_packet)
> +            continue;
> +        av_dup_packet(pkt);
> +        pthread_mutex_lock(&c->finished_task_mutex);
> +        c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
> +        c->finished_tasks[task.index].return_code = ret;
> +        pthread_cond_signal(&c->finished_task_cond);
> +        pthread_mutex_unlock(&c->finished_task_mutex);
> +    }
> +end:
> +    av_free(pkt);
> +    pthread_mutex_lock(&c->buffer_mutex);
> +    avcodec_close(avctx);
> +    pthread_mutex_unlock(&c->buffer_mutex);
> +    av_freep(&avctx);
> +    return NULL;
> +}
> +
> +int ff_frame_thread_encoder_init(AVCodecContext *avctx){
> +    int i;
> +    ThreadContext *c;
> +
> +
> +    if(   !(avctx->thread_type & FF_THREAD_FRAME)
> +       || !(avctx->codec->capabilities & CODEC_CAP_INTRA_ONLY))
> +        return 0;
> +
> +    if(!avctx->thread_count) {
> +        avctx->thread_count = ff_get_logical_cpus(avctx);
> +        avctx->thread_count = FFMIN(avctx->thread_count, MAX_THREADS);
> +    }
> +
> +    if(avctx->thread_count <= 1)
> +        return 0;
> +
> +    if(avctx->thread_count > MAX_THREADS)
> +        return AVERROR(EINVAL);
> +
> +    av_assert0(!avctx->internal->frame_thread_encoder);
> +    c = avctx->internal->frame_thread_encoder = av_mallocz(sizeof(ThreadContext));

ditto

> +    c->parent_avctx = avctx;
> +
> +    c->task_fifo = av_fifo_alloc(sizeof(Task) * BUFFER_SIZE);

ditto

> +    pthread_mutex_init(&c->task_fifo_mutex, NULL);
> +    pthread_mutex_init(&c->finished_task_mutex, NULL);
> +    pthread_mutex_init(&c->buffer_mutex, NULL);
> +    pthread_cond_init(&c->task_fifo_cond, NULL);
> +    pthread_cond_init(&c->finished_task_cond, NULL);
> +
> +    for(i=0; i<avctx->thread_count ; i++){
> +        AVCodecContext *thread_avctx = avcodec_alloc_context3(avctx->codec);

ditto

> +        *thread_avctx = *avctx;
> +        thread_avctx->internal = NULL;
> +        thread_avctx->priv_data = av_malloc(avctx->codec->priv_data_size);

ditto

> +        memcpy(thread_avctx->priv_data, avctx->priv_data, avctx->codec->priv_data_size);
> +        thread_avctx->thread_count = 1;
> +        thread_avctx->active_thread_type &= ~FF_THREAD_FRAME;
> +
> +        //FIXME pass private options to encoder
IMHO this should be implemented.

> +        if(avcodec_open2(thread_avctx, avctx->codec, NULL) < 0) {
> +            goto fail;
> +        }
> +        av_assert0(!thread_avctx->internal->frame_thread_encoder);
> +        thread_avctx->internal->frame_thread_encoder = c;
> +        if(pthread_create(&c->worker[i], NULL, worker, thread_avctx)) {
> +           goto fail;
> +        }
> +    }
> +
> +    avctx->active_thread_type = FF_THREAD_FRAME;
> +
> +    return 0;
> +fail:
> +    av_log(avctx, AV_LOG_ERROR, "ff_frame_thread_encoder_init failed\n");
> +    avctx->thread_count = i;
> +    ff_thread_free(avctx);
> +    return -1;
> +}
> +
> +void ff_frame_thread_encoder_free(AVCodecContext *avctx){
> +    int i;
> +    ThreadContext *c= avctx->internal->frame_thread_encoder;
> +
> +    pthread_mutex_lock(&c->task_fifo_mutex);
> +    c->exit = 1;
> +    pthread_cond_broadcast(&c->task_fifo_cond);
> +    pthread_mutex_unlock(&c->task_fifo_mutex);
> +
> +    for (i=0; i<avctx->thread_count; i++) {
> +         pthread_join(c->worker[i], NULL);
> +    }
> +
> +    pthread_mutex_destroy(&c->task_fifo_mutex);
> +    pthread_mutex_destroy(&c->finished_task_mutex);
> +    pthread_mutex_destroy(&c->buffer_mutex);
> +    pthread_cond_destroy(&c->task_fifo_cond);
> +    pthread_cond_destroy(&c->finished_task_cond);
> +    av_fifo_free(c->task_fifo); c->task_fifo = NULL;
> +    av_freep(&avctx->internal->frame_thread_encoder);
> +}
> +
> +int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
> +    ThreadContext *c = avctx->internal->frame_thread_encoder;
> +    Task task;
> +    int ret;
> +
> +    av_assert1(!*got_packet_ptr);
> +
> +    if(frame){
> +        if(!(avctx->flags & CODEC_FLAG_INPUT_PRESERVED)){
> +            AVFrame *new = avcodec_alloc_frame();

ditto
> +            pthread_mutex_lock(&c->buffer_mutex);
> +            ret = c->parent_avctx->get_buffer(c->parent_avctx, new);
> +            pthread_mutex_unlock(&c->buffer_mutex);
> +            if(ret<0)
> +                return ret;
> +            new->pts = frame->pts;
> +            av_image_copy(new->data, new->linesize, (const uint8_t **)frame->data, frame->linesize,
> +                          avctx->pix_fmt, avctx->width, avctx->height);
> +            frame = new;
> +        }
> +
> +        task.index = c->task_index;
> +        task.indata = (void*)frame;
> +        pthread_mutex_lock(&c->task_fifo_mutex);
> +        av_fifo_generic_write(c->task_fifo, &task, sizeof(task), NULL);
> +        pthread_cond_signal(&c->task_fifo_cond);
> +        pthread_mutex_unlock(&c->task_fifo_mutex);
> +
> +        c->task_index = (c->task_index+1) % BUFFER_SIZE;
> +
> +        if(!c->finished_tasks[c->finished_task_index].outdata && (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)
> +            return 0;
> +    }
> +
> +    if(c->task_index == c->finished_task_index)
> +        return 0;
> +
> +    pthread_mutex_lock(&c->finished_task_mutex);
> +    while (!c->finished_tasks[c->finished_task_index].outdata) {
> +        pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
> +    }
> +    task = c->finished_tasks[c->finished_task_index];
> +    *pkt = *(AVPacket*)(task.outdata);
> +    c->finished_tasks[c->finished_task_index].outdata= NULL;
> +    c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
> +    pthread_mutex_unlock(&c->finished_task_mutex);
> +
> +    *got_packet_ptr = 1;
> +
> +    return task.return_code;
> +}
> \ No newline at end of file

Please fix this.
> diff --git a/libavcodec/frame_thread_encoder.h
> b/libavcodec/frame_thread_encoder.h
> new file mode 100644
> index 0000000..398c7f5
> --- /dev/null
> +++ b/libavcodec/frame_thread_encoder.h
> @@ -0,0 +1,26 @@
> +/*
> + * Copyright (c) 2012 Michael Niedermayer <michaelni at gmx.at>
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include "avcodec.h"
> +
> +int ff_frame_thread_encoder_init(AVCodecContext *avctx);
> +void ff_frame_thread_encoder_free(AVCodecContext *avctx);
> +int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr);
> +
> diff --git a/libavcodec/internal.h b/libavcodec/internal.h
> index 198483a..50f00ef 100644
> --- a/libavcodec/internal.h
> +++ b/libavcodec/internal.h
> @@ -82,6 +82,8 @@ typedef struct AVCodecInternal {
>       */
>      uint8_t *byte_buffer;
>      unsigned int byte_buffer_size;
> +
> +    void *frame_thread_encoder;
>  } AVCodecInternal;
>
>  struct AVCodecDefault {
> diff --git a/libavcodec/utils.c b/libavcodec/utils.c
> index 8ed78d6..49c9188 100644
> --- a/libavcodec/utils.c
> +++ b/libavcodec/utils.c
> @@ -40,6 +40,7 @@
>  #include "libavutil/opt.h"
>  #include "imgconvert.h"
>  #include "thread.h"
> +#include "frame_thread_encoder.h"
>  #include "audioconvert.h"
>  #include "internal.h"
>  #include "bytestream.h"
> @@ -851,7 +852,14 @@ int attribute_align_arg avcodec_open2(AVCodecContext
> *avctx, AVCodec *codec, AVD
>      if (!HAVE_THREADS)
>          av_log(avctx, AV_LOG_WARNING, "Warning: not compiled with thread
> support, using thread emulation\n");
>
> -    if (HAVE_THREADS && !avctx->thread_opaque) {
> +    entangled_thread_counter--; //we will instanciate a few encoders thus kick the counter to prevent false detection of a problem
> +    ret = ff_frame_thread_encoder_init(avctx);
> +    entangled_thread_counter++;
> +    if (ret < 0)
> +        goto free_and_end;
> +
> +    if (HAVE_THREADS && !avctx->thread_opaque
> +        && !(avctx->internal->frame_thread_encoder && (avctx->active_thread_type&FF_THREAD_FRAME)))
>          ret = ff_thread_init(avctx);
>          if (ret < 0) {
>              goto free_and_end;
> @@ -931,7 +939,7 @@ int attribute_align_arg avcodec_open2(AVCodecContext
> *avctx, AVCodec *codec, AVD
>      avctx->pts_correction_last_pts =
>      avctx->pts_correction_last_dts = INT64_MIN;
>
> -    if(avctx->codec->init &&
> !(avctx->active_thread_type&FF_THREAD_FRAME)){
> +    if(avctx->codec->init && (!(avctx->active_thread_type&FF_THREAD_FRAME) || avctx->internal->frame_thread_encoder)){
>          ret = avctx->codec->init(avctx);
>          if (ret < 0) {
>              goto free_and_end;
> @@ -1301,6 +1309,9 @@ int attribute_align_arg
> avcodec_encode_video2(AVCodecContext *avctx,
>
>      *got_packet_ptr = 0;
>
> +    if(avctx->internal->frame_thread_encoder &&
> (avctx->active_thread_type&FF_THREAD_FRAME))
> +        return ff_thread_video_encode_frame(avctx, avpkt, frame, got_packet_ptr);
> +
>      if (!(avctx->codec->capabilities & CODEC_CAP_DELAY) && !frame) {
>          av_free_packet(avpkt);
>          av_init_packet(avpkt);
> @@ -1661,6 +1672,11 @@ av_cold int avcodec_close(AVCodecContext *avctx)
>      }
>
>      if (avcodec_is_open(avctx)) {
> +        if (avctx->internal->frame_thread_encoder && avctx->thread_count >
> 1) {
> +            entangled_thread_counter --;
> +            ff_frame_thread_encoder_free(avctx);
> +            entangled_thread_counter ++;
> +        }
>          if (HAVE_THREADS && avctx->thread_opaque)
>              ff_thread_free(avctx);
>          if (avctx->codec && avctx->codec->close)
> --
> 1.7.9.5
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>


More information about the ffmpeg-devel mailing list