[FFmpeg-devel] [PATCH v2] avcodec/pthread_slice: rewrite implementation
Muhammad Faiz
mfcc64 at gmail.com
Mon Jul 10 13:14:52 EEST 2017
On Mon, Jul 10, 2017 at 4:25 PM, wm4 <nfxjfg at googlemail.com> wrote:
> On Sun, 9 Jul 2017 23:26:54 +0700
> Muhammad Faiz <mfcc64 at gmail.com> wrote:
>
>> Avoid pthread_cond_broadcast that wakes up all workers. Make each of them
>> uses distict mutex/cond. Also let main thread help running jobs, but still
>> allocate thread_count workers. The last worker is currently unused, emulated
>> by main thread.
>> Similar to 'avfilter/pthread: rewrite implementation'
>>
>> Benchmark on x86_64 with 4 cpus (2 cores x 2 hyperthread)
>> ./ffmpeg -threads $threads -thread_type slice -i 10slices.mp4 -f rawvideo -y /dev/null
>> threads=2:
>> old: 1m15.888s
>> new: 1m5.710s
>> threads=3:
>> old: 1m6.480s
>> new: 1m5.260s
>> threads=4:
>> old: 1m2.292s
>> new: 59.677s
>> threads=5:
>> old: 58.939s
>> new: 55.166s
>>
>> Signed-off-by: Muhammad Faiz <mfcc64 at gmail.com>
>> ---
>> libavcodec/pthread_slice.c | 219 +++++++++++++++++++++++++++++----------------
>> 1 file changed, 142 insertions(+), 77 deletions(-)
>>
>> diff --git a/libavcodec/pthread_slice.c b/libavcodec/pthread_slice.c
>> index 60f5b78..7223205 100644
>> --- a/libavcodec/pthread_slice.c
>> +++ b/libavcodec/pthread_slice.c
>> @@ -22,6 +22,7 @@
>> * @see doc/multithreading.txt
>> */
>>
>> +#include <stdatomic.h>
>> #include "config.h"
>>
>> #include "avcodec.h"
>> @@ -38,21 +39,26 @@
>> typedef int (action_func)(AVCodecContext *c, void *arg);
>> typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
>>
>> +typedef struct WorkerContext WorkerContext;
>> +
>> typedef struct SliceThreadContext {
>> - pthread_t *workers;
>> + AVCodecContext *avctx;
>> + WorkerContext *workers;
>> action_func *func;
>> action_func2 *func2;
>> void *args;
>> int *rets;
>> - int job_count;
>> - int job_size;
>> -
>> - pthread_cond_t last_job_cond;
>> - pthread_cond_t current_job_cond;
>> - pthread_mutex_t current_job_lock;
>> - unsigned current_execute;
>> - int current_job;
>> + unsigned job_count;
>> + unsigned job_size;
>> +
>> + pthread_mutex_t mutex_user;
>> + pthread_mutex_t mutex_done;
>> + pthread_cond_t cond_done;
>> + atomic_uint first_job;
>> + atomic_uint current_job;
>> + atomic_uint nb_finished_jobs;
>> int done;
>> + int worker_done;
>>
>> int *entries;
>> int entries_count;
>> @@ -61,42 +67,55 @@ typedef struct SliceThreadContext {
>> pthread_mutex_t *progress_mutex;
>> } SliceThreadContext;
>>
>> -static void* attribute_align_arg worker(void *v)
>> +struct WorkerContext {
>> + SliceThreadContext *ctx;
>> + pthread_t thread;
>> + pthread_mutex_t mutex;
>> + pthread_cond_t cond;
>> + int done;
>> +};
>> +
>> +static unsigned run_jobs(SliceThreadContext *c)
>> {
>> - AVCodecContext *avctx = v;
>> - SliceThreadContext *c = avctx->internal->thread_ctx;
>> - unsigned last_execute = 0;
>> - int our_job = c->job_count;
>> - int thread_count = avctx->thread_count;
>> - int self_id;
>> -
>> - pthread_mutex_lock(&c->current_job_lock);
>> - self_id = c->current_job++;
>> - for (;;){
>> - int ret;
>> - while (our_job >= c->job_count) {
>> - if (c->current_job == thread_count + c->job_count)
>> - pthread_cond_signal(&c->last_job_cond);
>> -
>> - while (last_execute == c->current_execute && !c->done)
>> - pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
>> - last_execute = c->current_execute;
>> - our_job = self_id;
>> -
>> - if (c->done) {
>> - pthread_mutex_unlock(&c->current_job_lock);
>> - return NULL;
>> - }
>> - }
>> - pthread_mutex_unlock(&c->current_job_lock);
>> + unsigned current_job, first_job, nb_finished_jobs;
>> +
>> + current_job = first_job = atomic_fetch_add_explicit(&c->first_job, 1, memory_order_acq_rel);
>>
>> - ret = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
>> - c->func2(avctx, c->args, our_job, self_id);
>> + do {
>> + int ret = c->func ? c->func(c->avctx, (char *)c->args + current_job * (size_t) c->job_size)
>> + : c->func2(c->avctx, c->args, current_job, first_job);
>> if (c->rets)
>> - c->rets[our_job%c->job_count] = ret;
>> + c->rets[current_job] = ret;
>> + nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_relaxed) + 1;
>> + } while ((current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->job_count);
>>
>> - pthread_mutex_lock(&c->current_job_lock);
>> - our_job = c->current_job++;
>> + return nb_finished_jobs;
>> +}
>> +
>> +static void* attribute_align_arg worker(void *v)
>> +{
>> + WorkerContext *w = v;
>> + SliceThreadContext *c = w->ctx;
>> +
>> + pthread_mutex_lock(&w->mutex);
>> + pthread_cond_signal(&w->cond);
>> +
>> + while (1) {
>> + w->done = 1;
>> + while (w->done)
>> + pthread_cond_wait(&w->cond, &w->mutex);
>> +
>> + if (c->done) {
>> + pthread_mutex_unlock(&w->mutex);
>> + return NULL;
>> + }
>> +
>> + if (run_jobs(c) == c->job_count) {
>> + pthread_mutex_lock(&c->mutex_done);
>> + c->worker_done = 1;
>> + pthread_cond_signal(&c->cond_done);
>> + pthread_mutex_unlock(&c->mutex_done);
>> + }
>> }
>> }
>>
>> @@ -105,24 +124,36 @@ void ff_slice_thread_free(AVCodecContext *avctx)
>> SliceThreadContext *c = avctx->internal->thread_ctx;
>> int i;
>>
>> - pthread_mutex_lock(&c->current_job_lock);
>> + for (i = 0; i < avctx->thread_count; i++)
>> + pthread_mutex_lock(&c->workers[i].mutex);
>> +
>> c->done = 1;
>> - pthread_cond_broadcast(&c->current_job_cond);
>> +
>> for (i = 0; i < c->thread_count; i++)
>> pthread_cond_broadcast(&c->progress_cond[i]);
>> - pthread_mutex_unlock(&c->current_job_lock);
>>
>> - for (i=0; i<avctx->thread_count; i++)
>> - pthread_join(c->workers[i], NULL);
>> + for (i = 0; i < avctx->thread_count; i++) {
>> + WorkerContext *w = &c->workers[i];
>> + w->done = 0;
>> + pthread_cond_signal(&w->cond);
>> + pthread_mutex_unlock(&w->mutex);
>> + }
>> +
>> + for (i = 0; i < avctx->thread_count; i++) {
>> + WorkerContext *w = &c->workers[i];
>> + pthread_join(w->thread, NULL);
>> + pthread_cond_destroy(&w->cond);
>> + pthread_mutex_destroy(&w->mutex);
>> + }
>>
>> for (i = 0; i < c->thread_count; i++) {
>> pthread_mutex_destroy(&c->progress_mutex[i]);
>> pthread_cond_destroy(&c->progress_cond[i]);
>> }
>>
>> - pthread_mutex_destroy(&c->current_job_lock);
>> - pthread_cond_destroy(&c->current_job_cond);
>> - pthread_cond_destroy(&c->last_job_cond);
>> + pthread_cond_destroy(&c->cond_done);
>> + pthread_mutex_destroy(&c->mutex_done);
>
>> + pthread_mutex_lock(&c->mutex_user);
>
> This looks suspicious. Does it really acquire the lock and keep it
> locked after leaving this deinit function?
Yes. That is wrong.
>
>>
>> av_freep(&c->entries);
>> av_freep(&c->progress_mutex);
>> @@ -132,16 +163,11 @@ void ff_slice_thread_free(AVCodecContext *avctx)
>> av_freep(&avctx->internal->thread_ctx);
>> }
>>
>> -static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
>> -{
>> - while (c->current_job != thread_count + c->job_count)
>> - pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
>> - pthread_mutex_unlock(&c->current_job_lock);
>> -}
>> -
>> -static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
>> +static int thread_execute_internal(AVCodecContext *avctx, action_func *func, action_func2 *func2,
>> + void *arg, int *ret, int job_count, int job_size)
>> {
>> SliceThreadContext *c = avctx->internal->thread_ctx;
>> + int i, nb_workers;
>>
>> if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
>> return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
>> @@ -149,27 +175,49 @@ static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, i
>> if (job_count <= 0)
>> return 0;
>>
>> - pthread_mutex_lock(&c->current_job_lock);
>> + // last worker is unused
>> + nb_workers = FFMIN(job_count - 1, avctx->thread_count - 1);
>> +
>> + for (i = 0; i < nb_workers; i++)
>> + pthread_mutex_lock(&c->workers[i].mutex);
>
> This looks suspicious... does it lock all workers in the "hot" path?
> Wouldn't this cause a lot of contention? And why mix this with atomic
> accesses?
This is arranged to fix tsan warning (which actually doesn't fix it).
However I guess this is faster than using one mutex shared by all
workers. Atomic is for counter.
>
>>
>> - c->current_job = avctx->thread_count;
>> + atomic_store_explicit(&c->first_job, 0, memory_order_relaxed);
>> + atomic_store_explicit(&c->current_job, avctx->thread_count, memory_order_relaxed);
>> + atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed);
>> c->job_count = job_count;
>> c->job_size = job_size;
>> c->args = arg;
>> c->func = func;
>> + c->func2 = func2;
>> c->rets = ret;
>> - c->current_execute++;
>> - pthread_cond_broadcast(&c->current_job_cond);
>>
>> - thread_park_workers(c, avctx->thread_count);
>> + for (i = 0; i < nb_workers; i++) {
>> + WorkerContext *w = &c->workers[i];
>> + w->done = 0;
>> + pthread_cond_signal(&w->cond);
>> + pthread_mutex_unlock(&w->mutex);
>> + }
>> +
>> + // emulate the last worker, no need to wait if all jobs is complete
>> + if (run_jobs(c) != c->job_count) {
>> + pthread_mutex_lock(&c->mutex_done);
>> + while (!c->worker_done)
>> + pthread_cond_wait(&c->cond_done, &c->mutex_done);
>> + c->worker_done = 0;
>> + pthread_mutex_unlock(&c->mutex_done);
>> + }
>>
>> return 0;
>> }
>>
>> +static int thread_execute(AVCodecContext *avctx, action_func *func, void *arg, int *ret, int job_count, int job_size)
>> +{
>> + return thread_execute_internal(avctx, func, NULL, arg, ret, job_count, job_size);
>> +}
>> +
>> static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
>> {
>> - SliceThreadContext *c = avctx->internal->thread_ctx;
>> - c->func2 = func2;
>> - return thread_execute(avctx, NULL, arg, ret, job_count, 0);
>> + return thread_execute_internal(avctx, NULL, func2, arg, ret, job_count, 0);
>> }
>>
>> int ff_slice_thread_init(AVCodecContext *avctx)
>> @@ -208,31 +256,48 @@ int ff_slice_thread_init(AVCodecContext *avctx)
>> if (!c)
>> return -1;
>>
>> - c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
>> + // allocate thread_count workers, but currently last worker is unused, emulated by main thread
>> + // anticipate when main thread needs to do something
>> + c->workers = av_mallocz_array(thread_count, sizeof(*c->workers));
>> if (!c->workers) {
>> av_free(c);
>> return -1;
>> }
>>
>> avctx->internal->thread_ctx = c;
>> - c->current_job = 0;
>> + c->avctx = avctx;
>> + pthread_mutex_init(&c->mutex_user, NULL);
>> + pthread_mutex_init(&c->mutex_done, NULL);
>> + pthread_cond_init(&c->cond_done, NULL);
>> + atomic_init(&c->first_job, 0);
>> + atomic_init(&c->current_job, 0);
>> + atomic_init(&c->nb_finished_jobs, 0);
>> c->job_count = 0;
>> c->job_size = 0;
>> c->done = 0;
>> - pthread_cond_init(&c->current_job_cond, NULL);
>> - pthread_cond_init(&c->last_job_cond, NULL);
>> - pthread_mutex_init(&c->current_job_lock, NULL);
>> - pthread_mutex_lock(&c->current_job_lock);
>> - for (i=0; i<thread_count; i++) {
>> - if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
>> - avctx->thread_count = i;
>> - pthread_mutex_unlock(&c->current_job_lock);
>> + c->worker_done = 0;
>> +
>> + for (i = 0; i < thread_count; i++) {
>> + WorkerContext *w = &c->workers[i];
>> +
>> + w->ctx = c;
>> + pthread_mutex_init(&w->mutex, NULL);
>> + pthread_cond_init(&w->cond, NULL);
>> + pthread_mutex_lock(&w->mutex);
>> + w->done = 0;
>> + if (pthread_create(&w->thread, NULL, worker, w)) {
>> + avctx->thread_count = i + 1;
>> + pthread_mutex_unlock(&w->mutex);
>> + pthread_cond_destroy(&w->cond);
>> + pthread_mutex_destroy(&w->mutex);
>> ff_thread_free(avctx);
>> return -1;
>> }
>> - }
>>
>> - thread_park_workers(c, thread_count);
>> + while (!w->done)
>> + pthread_cond_wait(&w->cond, &w->mutex);
>> + pthread_mutex_unlock(&w->mutex);
>> + }
>>
>> avctx->execute = thread_execute;
>> avctx->execute2 = thread_execute2;
>
I'm currently reworking on this.
Thank's.
More information about the ffmpeg-devel
mailing list