[FFmpeg-devel] [PATCH] avcodec/pthread_frame: Fix checks and cleanup during init

Andreas Rheinhardt andreas.rheinhardt at gmail.com
Tue Mar 23 14:50:23 EET 2021


Nuo Mi:
> On Fri, Feb 12, 2021 at 12:06 AM Andreas Rheinhardt <
> andreas.rheinhardt at gmail.com> wrote:
> 
>> Up until now, ff_frame_thread_init had several bugs:
>> 1. It did not check whether the condition and mutexes
>>    could be successfully created.
>> 2. In case an error happened when setting up the child threads,
>>    ff_frame_thread_free is called to clean up all threads set up so far,
>>    including the current one. But a half-allocated context needs
>>    special handling which ff_frame_thread_frame_free doesn't provide.
>>    Notably, if allocating the AVCodecInternal, the codec's private data
>>    or setting the options fails, the codec's close function will be
>>    called (if there is one); it will also be called if the codec's init
>>    function fails, regardless of whether the FF_CODEC_CAP_INIT_CLEANUP
>>    is set. This is not supported by all codecs; in ticket #9099 (which
>>    this commit fixes) it led to a crash.
>>
>> This commit fixes both of these issues. Given that it is not documented
>> to be save to destroy mutexes/conditions that have only been zeroed, but
>> not initialized (or whose initialization has failed), one either needs to
>> duplicate the code to destroy them in ff_frame_thread_init or record
>> which mutexes/condition variables need to be destroyed and check for this
>> in ff_frame_thread_free. This patch uses the former way for the
>> mutexes/condition variables, but lets ff_frame_thread_free take
>> over for all threads whose AVCodecContext could be allocated.
>>
>> Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt at gmail.com>
>> ---
>> After several unsuccessful attempts to make pthread_cond/mutex_init
>> fail, I looked at the source [1] and it seems that the glibc versions of
>> these functions can't fail at all (unless one sets nondefault attributes).
>> Therefore this part of the code is untested, unfortunately.
>>
>> (Removing all pthread_mutex/cond_destroy calls does not result in any
>> complaints from Valgrind/ASAN either; seems the glibc implementation
>> doesn't need allocations.)
>>
>> [1]: https://github.com/bminor/glibc/blob/master/nptl/pthread_cond_init.c
>> https://github.com/bminor/glibc/blob/master/nptl/pthread_mutex_init.c
>>
>>  libavcodec/pthread_frame.c | 175 ++++++++++++++++++++++---------------
>>  1 file changed, 106 insertions(+), 69 deletions(-)
>>
>> diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
>> index 4429a4d59c..a10d8c1266 100644
>> --- a/libavcodec/pthread_frame.c
>> +++ b/libavcodec/pthread_frame.c
>> @@ -64,6 +64,12 @@ enum {
>>      STATE_SETUP_FINISHED,
>>  };
>>
>> +enum {
>> +    UNINITIALIZED,  ///< Thread has not been created, AVCodec->close
>> mustn't be called
>> +    NEEDS_CLOSE,    ///< AVCodec->close needs to be called
>> +    INITIALIZED,    ///< Thread has been properly set up
>> +};
>> +
>>  /**
>>   * Context used by codec threads and stored in their AVCodecInternal
>> thread_ctx.
>>   */
>> @@ -698,27 +704,40 @@ void ff_frame_thread_free(AVCodecContext *avctx, int
>> thread_count)
>>
>>      for (i = 0; i < thread_count; i++) {
>>          PerThreadContext *p = &fctx->threads[i];
>> +        AVCodecContext *ctx = p->avctx;
>>
>> -        pthread_mutex_lock(&p->mutex);
>> -        p->die = 1;
>> -        pthread_cond_signal(&p->input_cond);
>> -        pthread_mutex_unlock(&p->mutex);
>> -
>> -        if (p->thread_init)
>> -            pthread_join(p->thread, NULL);
>> -        p->thread_init=0;
>> +        if (ctx->internal) {
>> +            if (p->thread_init == INITIALIZED) {
>> +                pthread_mutex_lock(&p->mutex);
>> +                p->die = 1;
>> +                pthread_cond_signal(&p->input_cond);
>> +                pthread_mutex_unlock(&p->mutex);
>>
>> -        if (codec->close && p->avctx)
>> -            codec->close(p->avctx);
>> +                pthread_join(p->thread, NULL);
>>
> This will signal exit for one thread, then wait for it completely. All
> operation are serilized.
> How about we split it into two loops. one loop signal all threads for exit.
> One loop to join all thread.
> When we join a thread, other threads can work on exiting.
> 

This patch is not about speeding up freeing the worker threads. If you
want to take a look at this, go ahead. Here are some hints for this:

1. die is a PerThreadContext variable so that it can be protected by
PerThreadContext.mutex. A better approach is to make it atomic and
signal this at the beginning (or so) of ff_frame_thread_free() to all
threads at once.
2. I am very certain that the "final thread update failed" block in
ff_frame_thread_free() is unnecessary (it is from a time when init has
only been called once with the first worker thread being a distinguished
worker thread; this is simply not true any more). But I have not found
time to thoroughly investigate the sample that led to this code block in
the first place.

>> +            }
>> +            if (codec->close && p->thread_init != UNINITIALIZED)
>> +                codec->close(ctx);
>>
>>  #if FF_API_THREAD_SAFE_CALLBACKS
>> -        release_delayed_buffers(p);
>> +            release_delayed_buffers(p);
>> +            for (int j = 0; j < p->released_buffers_allocated; j++)
>> +                av_frame_free(&p->released_buffers[j]);
>> +            av_freep(&p->released_buffers);
>>  #endif
>> -        av_frame_free(&p->frame);
>> -    }
>> +            if (ctx->priv_data) {
>> +                if (codec->priv_class)
>> +                    av_opt_free(ctx->priv_data);
>> +                av_freep(&ctx->priv_data);
>> +            }
>>
>> -    for (i = 0; i < thread_count; i++) {
>> -        PerThreadContext *p = &fctx->threads[i];
>> +            av_freep(&ctx->slice_offset);
>> +
>> +            av_buffer_unref(&ctx->internal->pool);
>> +            av_freep(&ctx->internal);
>> +            av_buffer_unref(&ctx->hw_frames_ctx);
>> +        }
>> +
>> +        av_frame_free(&p->frame);
>>
>>          pthread_mutex_destroy(&p->mutex);
>>          pthread_mutex_destroy(&p->progress_mutex);
>> @@ -727,26 +746,6 @@ void ff_frame_thread_free(AVCodecContext *avctx, int
>> thread_count)
>>          pthread_cond_destroy(&p->output_cond);
>>          av_packet_unref(&p->avpkt);
>>
>> -#if FF_API_THREAD_SAFE_CALLBACKS
>> -        for (int j = 0; j < p->released_buffers_allocated; j++)
>> -            av_frame_free(&p->released_buffers[j]);
>> -        av_freep(&p->released_buffers);
>> -#endif
>> -
>> -        if (p->avctx) {
>> -            if (codec->priv_class)
>> -                av_opt_free(p->avctx->priv_data);
>> -            av_freep(&p->avctx->priv_data);
>> -
>> -            av_freep(&p->avctx->slice_offset);
>> -        }
>> -
>> -        if (p->avctx) {
>> -            av_buffer_unref(&p->avctx->internal->pool);
>> -            av_freep(&p->avctx->internal);
>> -            av_buffer_unref(&p->avctx->hw_frames_ctx);
>> -        }
>> -
>>          av_freep(&p->avctx);
>>      }
>>
>> @@ -791,14 +790,19 @@ int ff_frame_thread_init(AVCodecContext *avctx)
>>
>>      fctx->threads = av_mallocz_array(thread_count,
>> sizeof(PerThreadContext));
>>      if (!fctx->threads) {
>> -        av_freep(&avctx->internal->thread_ctx);
>> -        return AVERROR(ENOMEM);
>> +        err = AVERROR(ENOMEM);
>> +        goto threads_alloc_fail;
>>      }
>>
>> -    pthread_mutex_init(&fctx->buffer_mutex, NULL);
>> -    pthread_mutex_init(&fctx->hwaccel_mutex, NULL);
>> -    pthread_mutex_init(&fctx->async_mutex, NULL);
>> -    pthread_cond_init(&fctx->async_cond, NULL);
>> +#define PTHREAD_INIT(type, ctx, field)                         \
>> +    if (err = pthread_ ## type ## _init(&ctx->field, NULL)) {  \
>> +        err = AVERROR(err);                                    \
>> +        goto field ## _fail;                                   \
>> +    }
>> +    PTHREAD_INIT(mutex, fctx, buffer_mutex)
>> +    PTHREAD_INIT(mutex, fctx, hwaccel_mutex)
>> +    PTHREAD_INIT(mutex, fctx, async_mutex)
>> +    PTHREAD_INIT(cond, fctx, async_cond)
>>
> Since we used a macro here, could we just record type to an on-stack array,
> then loop the array to free them.
> It will help us get rid of most of the labels. example:
> 
> pthread_mutex_t mutexes[3];  mutex_count = 0;
> pthread_cond_t   conds[1], cond_count = 0;
> #define PTHREAD_INIT(type, ctx, field)                         \
>    if (err = pthread_ ## type ## _init(&ctx->field, NULL)) {  \
>         err = AVERROR(err);                                    \
>         goto fail;                                   \
>    }\
>   type##es[type##_count++] = &ctx->field;
> 
> failed:
>   free_mutexs( mutexes,  mutex_count);
>   free_conditions(conds,  cond_count);
> 

I don't share your dislike of labels; and when I tried your approach, it
led to increased codesize. But in the end I implemented something even
better: While the pointers depend on the
FrameThreadContext/PerThreadContext, the offsets do not, so I used an
array of offsets both for initializing and destroying. This avoids most
code duplication for freeing. See
https://ffmpeg.org/pipermail/ffmpeg-devel/2021-March/278180.html

> 
> 
>>
>>      fctx->async_lock = 1;
>>      fctx->delaying = 1;
>> @@ -806,40 +810,37 @@ int ff_frame_thread_init(AVCodecContext *avctx)
>>      if (codec->type == AVMEDIA_TYPE_VIDEO)
>>          avctx->delay = src->thread_count - 1;
>>
>> -    for (i = 0; i < thread_count; i++) {
>> -        AVCodecContext *copy = av_malloc(sizeof(AVCodecContext));
>> +    for (i = 0; i < thread_count; ) {
>>
> Is it possible to move this loop body to a function?  It will make this
> function more readable.

Did so here:
https://ffmpeg.org/pipermail/ffmpeg-devel/2021-March/278179.html

> and we can use a stack array to get rid of labels too.
> 
>          PerThreadContext *p  = &fctx->threads[i];
>> +        AVCodecContext *copy;
>> +        int first = !i;
>>
>> -        pthread_mutex_init(&p->mutex, NULL);
>> -        pthread_mutex_init(&p->progress_mutex, NULL);
>> -        pthread_cond_init(&p->input_cond, NULL);
>> -        pthread_cond_init(&p->progress_cond, NULL);
>> -        pthread_cond_init(&p->output_cond, NULL);
>> -
>> -        p->frame = av_frame_alloc();
>> -        if (!p->frame) {
>> -            av_freep(&copy);
>> -            err = AVERROR(ENOMEM);
>> -            goto error;
>> -        }
>> +        PTHREAD_INIT(mutex, p, mutex)
>> +        PTHREAD_INIT(mutex, p, progress_mutex)
>> +        PTHREAD_INIT(cond,  p, input_cond)
>> +        PTHREAD_INIT(cond,  p, progress_cond)
>> +        PTHREAD_INIT(cond,  p, output_cond)
>>
>> -        p->parent = fctx;
>> -        p->avctx  = copy;
>> +        atomic_init(&p->state, STATE_INPUT_READY);
>>
>> +        copy = av_memdup(src, sizeof(*src));
>>          if (!copy) {
>>              err = AVERROR(ENOMEM);
>> -            goto error;
>> +            goto copy_fail;
>>          }
>> +        /* From now on, this PerThreadContext will be cleaned up by
>> +         * ff_frame_thread_free in case of errors. */
>> +        i++;
>>
>> -        *copy = *src;
>> +        p->parent = fctx;
>> +        p->avctx  = copy;
>>
>> -        copy->internal = av_malloc(sizeof(AVCodecInternal));
>> +        copy->internal = av_memdup(src->internal, sizeof(*src->internal));
>>          if (!copy->internal) {
>>              copy->priv_data = NULL;
>>              err = AVERROR(ENOMEM);
>>              goto error;
>>          }
>> -        *copy->internal = *src->internal;
>>          copy->internal->thread_ctx = p;
>>          copy->internal->last_pkt_props = &p->avpkt;
>>
>> @@ -860,30 +861,66 @@ int ff_frame_thread_init(AVCodecContext *avctx)
>>              }
>>          }
>>
>> -        if (i)
>> +        p->frame = av_frame_alloc();
>> +        if (!p->frame) {
>> +            err = AVERROR(ENOMEM);
>> +            goto error;
>> +        }
>> +
>> +        if (!first)
>>              copy->internal->is_copy = 1;
>>
>> -        if (codec->init)
>> +        if (codec->init) {
>>              err = codec->init(copy);
>> +            if (err) {
>> +                if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP)
>> +                    p->thread_init = NEEDS_CLOSE;
>> +                goto error;
>> +            }
>> +        }
>> +        p->thread_init = NEEDS_CLOSE;
>>
>> -        if (err) goto error;
>> -
>> -        if (!i)
>> +        if (first)
>>              update_context_from_thread(avctx, copy, 1);
>>
>>          atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS)
>> != 0);
>>
>>          err = AVERROR(pthread_create(&p->thread, NULL,
>> frame_worker_thread, p));
>> -        p->thread_init= !err;
>> -        if(!p->thread_init)
>> +        if (err)
>>              goto error;
>> +        p->thread_init = INITIALIZED;
>> +        continue;
>> +
>> +    copy_fail:
>> +        pthread_cond_destroy(&p->output_cond);
>> +    output_cond_fail:
>> +        pthread_cond_destroy(&p->progress_cond);
>> +    progress_cond_fail:
>> +        pthread_cond_destroy(&p->input_cond);
>> +    input_cond_fail:
>> +        pthread_mutex_destroy(&p->progress_mutex);
>> +    progress_mutex_fail:
>> +        pthread_mutex_destroy(&p->mutex);
>> +    mutex_fail:
>> +        goto error;
>>      }
>>
>>      return 0;
>>
>>  error:
>> -    ff_frame_thread_free(avctx, i+1);
>> +    ff_frame_thread_free(avctx, i);
>> +    return err;
>>
>> +async_cond_fail:
>> +    pthread_mutex_destroy(&fctx->async_mutex);
>> +async_mutex_fail:
>> +    pthread_mutex_destroy(&fctx->hwaccel_mutex);
>> +hwaccel_mutex_fail:
>> +    pthread_mutex_destroy(&fctx->buffer_mutex);
>> +buffer_mutex_fail:
>> +    av_freep(&fctx->threads);
>> +threads_alloc_fail:
>> +    av_freep(&avctx->internal->thread_ctx);
>>      return err;
>>  }
>>


More information about the ffmpeg-devel mailing list