[FFmpeg-devel] [PATCH] fix w32thread.c to allow job count > thread_count

Reimar Döffinger Reimar.Doeffinger
Sun Oct 11 15:10:03 CEST 2009


Hello,
This extends the code to allow more jobs than threads, as my benchmarks
have show this is important to reliably get good performance.
It also makes it easy to implement the proposed execute2 function.
It uses a bit more memory for the per-thread context (most of the
contents of which could be shared, but I'll not change that), but on
the other hand needs only 3 semaphores instead of 2 per thread.
Was only tested via "make test" (i.e. DNxHD only).
Should be trivial to port to beosthread and os2thread, too, but I'm not
going to do that (I btw. very much dislike the code duplications between
these, it's basically the same code, just some functions calls and
return codes are a bit different, if there is anyone interested in
keeping the beos and os2 ports functional, they should make sure these
share the w32 code).
-------------- next part --------------
Index: libavcodec/w32thread.c
===================================================================
--- libavcodec/w32thread.c	(revision 20202)
+++ libavcodec/w32thread.c	(working copy)
@@ -29,10 +29,13 @@
     AVCodecContext *avctx;
     HANDLE thread;
     HANDLE work_sem;
+    HANDLE job_sem;
     HANDLE done_sem;
     int (*func)(AVCodecContext *c, void *arg);
     void *arg;
-    int ret;
+    int argsize;
+    int *jobnr;
+    int *ret;
 }ThreadContext;
 
 
@@ -40,13 +43,19 @@
     ThreadContext *c= v;
 
     for(;;){
+        int ret, jobnr;
 //printf("thread_func %X enter wait\n", (int)v); fflush(stdout);
         WaitForSingleObject(c->work_sem, INFINITE);
+        WaitForSingleObject(c->job_sem, INFINITE);
+        jobnr = (*c->jobnr)++;
+        ReleaseSemaphore(c->job_sem, 1, 0);
 //printf("thread_func %X after wait (func=%X)\n", (int)v, (int)c->func); fflush(stdout);
         if(c->func)
-            c->ret= c->func(c->avctx, c->arg);
+            ret= c->func(c->avctx, (uint8_t *)c->arg + jobnr*c->argsize);
         else
             return 0;
+        if (c->ret)
+            c->ret[jobnr] = ret;
 //printf("thread_func %X signal complete\n", (int)v); fflush(stdout);
         ReleaseSemaphore(c->done_sem, 1, 0);
     }
@@ -65,12 +74,15 @@
     for(i=0; i<s->thread_count; i++){
 
         c[i].func= NULL;
-        ReleaseSemaphore(c[i].work_sem, 1, 0);
+    }
+    ReleaseSemaphore(c[0].work_sem, s->thread_count, 0);
+    for(i=0; i<s->thread_count; i++){
         WaitForSingleObject(c[i].thread, INFINITE);
-        if(c[i].work_sem) CloseHandle(c[i].work_sem);
-        if(c[i].done_sem) CloseHandle(c[i].done_sem);
         if(c[i].thread)   CloseHandle(c[i].thread);
     }
+    if(c[0].work_sem) CloseHandle(c[0].work_sem);
+    if(c[0].job_sem)  CloseHandle(c[0].job_sem);
+    if(c[0].done_sem) CloseHandle(c[0].done_sem);
 
     av_freep(&s->thread_opaque);
 }
@@ -78,25 +90,23 @@
 int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void *arg, int *ret, int count, int size){
     ThreadContext *c= s->thread_opaque;
     int i;
+    int jobnr = 0;
 
     assert(s == c->avctx);
-    assert(count <= s->thread_count);
 
     /* note, we can be certain that this is not called with the same AVCodecContext by different threads at the same time */
 
-    for(i=0; i<count; i++){
-        c[i].arg= (char*)arg + i*size;
+    for(i=0; i<s->thread_count; i++){
+        c[i].arg= arg;
+        c[i].argsize= size;
         c[i].func= func;
-        c[i].ret= 12345;
-
-        ReleaseSemaphore(c[i].work_sem, 1, 0);
+        c[i].ret= ret;
+        c[i].jobnr = &jobnr;
     }
-    for(i=0; i<count; i++){
-        WaitForSingleObject(c[i].done_sem, INFINITE);
+    ReleaseSemaphore(c[0].work_sem, count, 0);
+    for(i=0; i<count; i++)
+        WaitForSingleObject(c[0].done_sem, INFINITE);
 
-        c[i].func= NULL;
-        if(ret) ret[i]= c[i].ret;
-    }
     return 0;
 }
 
@@ -110,16 +120,20 @@
     assert(!s->thread_opaque);
     c= av_mallocz(sizeof(ThreadContext)*thread_count);
     s->thread_opaque= c;
+    if(!(c[0].work_sem = CreateSemaphore(NULL, 0, INT_MAX, NULL)))
+        goto fail;
+    if(!(c[0].job_sem  = CreateSemaphore(NULL, 1, 1, NULL)))
+        goto fail;
+    if(!(c[0].done_sem = CreateSemaphore(NULL, 0, INT_MAX, NULL)))
+        goto fail;
 
     for(i=0; i<thread_count; i++){
 //printf("init semaphors %d\n", i); fflush(stdout);
         c[i].avctx= s;
+        c[i].work_sem = c[0].work_sem;
+        c[i].job_sem  = c[0].job_sem;
+        c[i].done_sem = c[0].done_sem;
 
-        if(!(c[i].work_sem = CreateSemaphore(NULL, 0, s->thread_count, NULL)))
-            goto fail;
-        if(!(c[i].done_sem = CreateSemaphore(NULL, 0, s->thread_count, NULL)))
-            goto fail;
-
 //printf("create thread %d\n", i); fflush(stdout);
         c[i].thread = (HANDLE)_beginthreadex(NULL, 0, thread_func, &c[i], 0, &threadid );
         if( !c[i].thread ) goto fail;



More information about the ffmpeg-devel mailing list