[FFmpeg-devel] [PATCH 3/8] avcodec/frame_thread_encoder: Avoid allocations of AVPackets, fix deadlock

Andreas Rheinhardt andreas.rheinhardt at gmail.com
Mon Feb 15 16:34:49 EET 2021


Andreas Rheinhardt:
> Up until now, when doing frame thread encoding, each worker thread
> tried to allocate an AVPacket for every AVFrame to be encoded; said
> packets would then be handed back to the main thread, where the content
> of said packet is copied into the packet actually destined for output;
> the temporary AVPacket is then freed.
> 
> Besides being wasteful this also has another problem: There is a risk of
> deadlock, namely if no AVPacket can be allocated at all. The user
> doesn't get an error at all in this case and the worker threads will
> simply try to allocate a packet again and again. If the user has
> supplied enough frames, the user's thread will block until a task has
> been completed, which just doesn't happen if no packet can ever be
> allocated.
> 
> This patch instead modifies the code to allocate the packets during
> init; they are then reused again and again.
> 
> Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt at gmail.com>
> ---
>  libavcodec/frame_thread_encoder.c | 61 +++++++++++++++++++------------
>  1 file changed, 37 insertions(+), 24 deletions(-)
> 
> diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c
> index 9ca34e7ffb..bcd3c94f8b 100644
> --- a/libavcodec/frame_thread_encoder.c
> +++ b/libavcodec/frame_thread_encoder.c
> @@ -32,13 +32,18 @@
>  #include "thread.h"
>  
>  #define MAX_THREADS 64
> -#define BUFFER_SIZE (2*MAX_THREADS)
> +/* There can be as many as MAX_THREADS + 1 outstanding tasks.
> + * An additional + 1 is needed so that one can distinguish
> + * the case of zero and MAX_THREADS + 1 outstanding tasks modulo
> + * the number of buffers. */
> +#define BUFFER_SIZE (MAX_THREADS + 2)
>  
>  typedef struct{
>      AVFrame  *indata;
>      AVPacket *outdata;
>      int64_t return_code;
>      unsigned index;
> +    int       finished;
>  } Task;
>  
>  typedef struct{
> @@ -49,8 +54,9 @@ typedef struct{
>      pthread_mutex_t task_fifo_mutex;
>      pthread_cond_t task_fifo_cond;
>  
> -    Task finished_tasks[BUFFER_SIZE];
> -    pthread_mutex_t finished_task_mutex;
> +    unsigned max_tasks;
> +    Task tasks[BUFFER_SIZE];
> +    pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */
>      pthread_cond_t finished_task_cond;
>  
>      unsigned task_index;
> @@ -63,17 +69,13 @@ typedef struct{
>  static void * attribute_align_arg worker(void *v){
>      AVCodecContext *avctx = v;
>      ThreadContext *c = avctx->internal->frame_thread_encoder;
> -    AVPacket *pkt = NULL;
>  
>      while (!atomic_load(&c->exit)) {
>          int got_packet = 0, ret;
> +        AVPacket *pkt;
>          AVFrame *frame;
>          Task task;
>  
> -        if(!pkt) pkt = av_packet_alloc();
> -        if(!pkt) continue;
> -        av_init_packet(pkt);
> -
>          pthread_mutex_lock(&c->task_fifo_mutex);
>          while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) {
>              if (atomic_load(&c->exit)) {
> @@ -84,7 +86,12 @@ static void * attribute_align_arg worker(void *v){
>          }
>          av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL);
>          pthread_mutex_unlock(&c->task_fifo_mutex);
> +        /* The main thread ensures that any two outstanding tasks have
> +         * different indices, ergo each worker thread owns its element
> +         * of c->tasks with the exception of finished, which is shared
> +         * with the main thread and guarded by finished_task_mutex. */
>          frame = task.indata;
> +        pkt   = c->tasks[task.index].outdata;
>  
>          ret = avctx->codec->encode2(avctx, pkt, frame, &got_packet);
>          if(got_packet) {
> @@ -101,13 +108,12 @@ static void * attribute_align_arg worker(void *v){
>          pthread_mutex_unlock(&c->buffer_mutex);
>          av_frame_free(&frame);
>          pthread_mutex_lock(&c->finished_task_mutex);
> -        c->finished_tasks[task.index].outdata = pkt; pkt = NULL;
> -        c->finished_tasks[task.index].return_code = ret;
> +        c->tasks[task.index].return_code = ret;
> +        c->tasks[task.index].finished    = 1;
>          pthread_cond_signal(&c->finished_task_cond);
>          pthread_mutex_unlock(&c->finished_task_mutex);
>      }
>  end:
> -    av_free(pkt);
>      pthread_mutex_lock(&c->buffer_mutex);
>      avcodec_close(avctx);
>      pthread_mutex_unlock(&c->buffer_mutex);
> @@ -194,6 +200,12 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){
>      pthread_cond_init(&c->finished_task_cond, NULL);
>      atomic_init(&c->exit, 0);
>  
> +    c->max_tasks = avctx->thread_count + 2;
> +    for (unsigned i = 0; i < c->max_tasks; i++) {
> +        if (!(c->tasks[i].outdata = av_packet_alloc()))
> +            goto fail;
> +    }
> +
>      for(i=0; i<avctx->thread_count ; i++){
>          AVDictionary *tmp = NULL;
>          int ret;
> @@ -261,8 +273,8 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){
>          av_frame_free(&task.indata);
>      }
>  
> -    for (i=0; i<BUFFER_SIZE; i++) {
> -        av_packet_free(&c->finished_tasks[i].outdata);
> +    for (unsigned i = 0; i < c->max_tasks; i++) {
> +        av_packet_free(&c->tasks[i].outdata);
>      }
>  
>      pthread_mutex_destroy(&c->task_fifo_mutex);
> @@ -276,7 +288,7 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){
>  
>  int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){
>      ThreadContext *c = avctx->internal->frame_thread_encoder;
> -    Task task;
> +    Task *outtask, task;
>      int ret;
>  
>      av_assert1(!*got_packet_ptr);
> @@ -298,27 +310,28 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF
>          pthread_cond_signal(&c->task_fifo_cond);
>          pthread_mutex_unlock(&c->task_fifo_mutex);
>  
> -        c->task_index = (c->task_index+1) % BUFFER_SIZE;
> +        c->task_index = (c->task_index + 1) % c->max_tasks;
>      }
>  
> +    outtask = &c->tasks[c->finished_task_index];
>      pthread_mutex_lock(&c->finished_task_mutex);
>      if (c->task_index == c->finished_task_index ||
> -        (frame && !c->finished_tasks[c->finished_task_index].outdata &&
> -         (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) {
> +        (frame && !outtask->finished &&
> +         (c->task_index - c->finished_task_index + c->max_tasks) % c->max_tasks <= avctx->thread_count)) {
>              pthread_mutex_unlock(&c->finished_task_mutex);
>              return 0;
>          }
> -
> -    while (!c->finished_tasks[c->finished_task_index].outdata) {
> +    while (!outtask->finished) {
>          pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex);
>      }
> -    task = c->finished_tasks[c->finished_task_index];
> -    *pkt = *(AVPacket*)(task.outdata);
> +    /* We now own outtask completely: No worker thread touches it any more,
> +     * because there is no outstanding task with this index. */
> +    outtask->finished = 0;
> +    av_packet_move_ref(pkt, outtask->outdata);
>      if(pkt->data)
>          *got_packet_ptr = 1;
> -    av_freep(&c->finished_tasks[c->finished_task_index].outdata);
> -    c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE;
> +    c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks;
>      pthread_mutex_unlock(&c->finished_task_mutex);
>  
> -    return task.return_code;
> +    return outtask->return_code;
>  }
> 
Will apply this patchset tomorrow unless there are objections. Thanks to
Paul for looking at some of the patches.

- Andreas


More information about the ffmpeg-devel mailing list