[FFmpeg-cvslog] fftools/ffmpeg: optimize inter-thread queue sizes

Anton Khirnov git at videolan.org
Sun Jan 28 14:41:54 EET 2024


ffmpeg | branch: master | Anton Khirnov <anton at khirnov.net> | Wed Jan 24 20:21:37 2024 +0100| [e0da916b8f5b079a4865eef7f64863f50785463d] | committer: Anton Khirnov

fftools/ffmpeg: optimize inter-thread queue sizes

Use 8 packets/frames by default rather than 1, which seems to provide
better throughput.

Allow -thread_queue_size to set the muxer queue size manually again.

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=e0da916b8f5b079a4865eef7f64863f50785463d
---

 fftools/ffmpeg_mux.h                             |  2 --
 fftools/ffmpeg_mux_init.c                        |  3 +--
 fftools/ffmpeg_opt.c                             |  2 +-
 fftools/ffmpeg_sched.c                           | 15 ++++++++++-----
 fftools/ffmpeg_sched.h                           |  4 +++-
 tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat |  5 -----
 6 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d0be8a51ea..e1b44142cf 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -94,8 +94,6 @@ typedef struct Muxer {
 
     AVDictionary *opts;
 
-    int thread_queue_size;
-
     /* filesize limit expressed in bytes */
     int64_t limit_filesize;
     atomic_int_least64_t last_filesize;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 6b5e4f8b3c..8ada837555 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     of->start_time     = o->start_time;
     of->shortest       = o->shortest;
 
-    mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
     mux->limit_filesize    = o->limit_filesize;
     av_dict_copy(&mux->opts, o->g->format_opts, 0);
 
@@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     }
 
     err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
-                      !strcmp(oc->oformat->name, "rtp"));
+                      !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
     if (err < 0)
         return err;
     mux->sch     = sch;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 304c493dcf..7505b0cf90 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
     o->limit_filesize = INT64_MAX;
     o->chapters_input_file = INT_MAX;
     o->accurate_seek  = 1;
-    o->thread_queue_size = -1;
+    o->thread_queue_size = 0;
     o->input_sync_ref = -1;
     o->find_stream_info = 1;
     o->shortest_buf_duration = 10.f;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index c6ed2e21ff..d91968822f 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -218,6 +218,7 @@ typedef struct SchMux {
      */
     atomic_int          mux_started;
     ThreadQueue        *queue;
+    unsigned            queue_size;
 
     AVPacket           *sub_heartbeat_pkt;
 } SchMux;
@@ -360,6 +361,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
     ThreadQueue *tq;
     ObjPool *op;
 
+    queue_size = queue_size > 0 ? queue_size : 8;
+
     op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
                                    objpool_alloc_frames();
     if (!op)
@@ -655,7 +658,7 @@ static const AVClass sch_mux_class = {
 };
 
 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
-                void *arg, int sdp_auto)
+                void *arg, int sdp_auto, unsigned thread_queue_size)
 {
     const unsigned idx = sch->nb_mux;
 
@@ -669,6 +672,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
     mux             = &sch->mux[idx];
     mux->class      = &sch_mux_class;
     mux->init       = init;
+    mux->queue_size = thread_queue_size;
 
     task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
 
@@ -775,7 +779,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
     if (!dec->send_frame)
         return AVERROR(ENOMEM);
 
-    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+    ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
     if (ret < 0)
         return ret;
 
@@ -815,7 +819,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
 
     task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
 
-    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+    ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
     if (ret < 0)
         return ret;
 
@@ -863,7 +867,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
     if (ret < 0)
         return ret;
 
-    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
+    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
     if (ret < 0)
         return ret;
 
@@ -1315,7 +1319,8 @@ int sch_start(Scheduler *sch)
             }
         }
 
-        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+        ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
+                          QUEUE_PACKETS);
         if (ret < 0)
             return ret;
 
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index 811146f6ed..95f9c1d4db 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
  *             streams in the muxer.
  * @param ctx Muxer state; will be passed to func/init and used for logging.
  * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ * @param thread_queue_size number of packets that can be buffered before
+ *                          sending to the muxer blocks
  *
  * @retval ">=0" Index of the newly-created muxer.
  * @retval "<0"  Error code.
  */
 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
-                void *ctx, int sdp_auto);
+                void *ctx, int sdp_auto, unsigned thread_queue_size);
 /**
  * Add a muxed stream for a previously added muxer.
  *
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index bc9b833799..3a3ec96637 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -33,8 +33,3 @@
 <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our numb</font>
 
-9
-00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
->> Safety remains our number one</font>
-



More information about the ffmpeg-cvslog mailing list