[FFmpeg-devel] [PATCH 35/35] fftools/ffmpeg: move each muxer to a separate thread

Anton Khirnov anton at khirnov.net
Thu Jun 16 22:55:34 EEST 2022


---
 doc/ffmpeg.texi      |  11 +-
 fftools/ffmpeg.c     |  18 ++-
 fftools/ffmpeg.h     |   7 +-
 fftools/ffmpeg_mux.c | 299 +++++++++++++++++++++++++++++++------------
 fftools/ffmpeg_opt.c |   4 +-
 5 files changed, 236 insertions(+), 103 deletions(-)

diff --git a/doc/ffmpeg.texi b/doc/ffmpeg.texi
index 7542832eb3..e4e2d6ddac 100644
--- a/doc/ffmpeg.texi
+++ b/doc/ffmpeg.texi
@@ -1900,13 +1900,16 @@ to the @option{-ss} option is considered an actual timestamp, and is not
 offset by the start time of the file. This matters only for files which do
 not start from timestamp 0, such as transport streams.
 
- at item -thread_queue_size @var{size} (@emph{input})
-This option sets the maximum number of queued packets when reading from the
-file or device. With low latency / high rate live streams, packets may be
-discarded if they are not read in a timely manner; setting this value can
+ at item -thread_queue_size @var{size} (@emph{input/output})
+For input, this option sets the maximum number of queued packets when reading
+from the file or device. With low latency / high rate live streams, packets may
+be discarded if they are not read in a timely manner; setting this value can
 force ffmpeg to use a separate input thread and read packets as soon as they
 arrive. By default ffmpeg only does this if multiple inputs are specified.
 
+For output, this option specified the maximum number of packets that may be
+queued to each muxing thread.
+
 @item -sdp_file @var{file} (@emph{global})
 Print sdp information for an output stream to @var{file}.
 This allows dumping sdp information when at least one output isn't an
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index aea7335c8f..2a7ff16b74 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -746,9 +746,6 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
             goto mux_fail;
     }
 
-    if (eof)
-        ost->finished |= MUXER_FINISHED;
-
     return;
 
 mux_fail:
@@ -1521,7 +1518,7 @@ static void print_final_stats(int64_t total_size)
             enum AVMediaType type = ost->enc_ctx->codec_type;
 
             total_size    += ost->data_size;
-            total_packets += ost->packets_written;
+            total_packets += atomic_load(&ost->packets_written);
 
             av_log(NULL, AV_LOG_VERBOSE, "  Output stream #%d:%d (%s): ",
                    i, j, av_get_media_type_string(type));
@@ -1534,7 +1531,7 @@ static void print_final_stats(int64_t total_size)
             }
 
             av_log(NULL, AV_LOG_VERBOSE, "%"PRIu64" packets muxed (%"PRIu64" bytes); ",
-                   ost->packets_written, ost->data_size);
+                   atomic_load(&ost->packets_written), ost->data_size);
 
             av_log(NULL, AV_LOG_VERBOSE, "\n");
         }
@@ -1603,7 +1600,7 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
         if (!vid && enc->codec_type == AVMEDIA_TYPE_VIDEO) {
             float fps;
 
-            frame_number = ost->packets_written;
+            frame_number = atomic_load(&ost->packets_written);
             fps = t > 1 ? frame_number / t : 0;
             av_bprintf(&buf, "frame=%5d fps=%3.*f q=%3.1f ",
                      frame_number, fps < 9.95, fps, q);
@@ -3480,9 +3477,8 @@ static int need_output(void)
 
     for (i = 0; i < nb_output_streams; i++) {
         OutputStream *ost    = output_streams[i];
-        OutputFile *of       = output_files[ost->file_index];
 
-        if (ost->finished || of_finished(of))
+        if (ost->finished)
             continue;
 
         return 1;
@@ -4402,9 +4398,11 @@ static int transcode(void)
 
     /* close each encoder */
     for (i = 0; i < nb_output_streams; i++) {
+        uint64_t packets_written;
         ost = output_streams[i];
-        total_packets_written += ost->packets_written;
-        if (!ost->packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) {
+        packets_written = atomic_load(&ost->packets_written);
+        total_packets_written += packets_written;
+        if (!packets_written && (abort_on_flags & ABORT_ON_FLAG_EMPTY_OUTPUT_STREAM)) {
             av_log(NULL, AV_LOG_FATAL, "Empty output on stream %d.\n", i);
             exit_program(1);
         }
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 9baa701c67..8940f719b0 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -21,6 +21,7 @@
 
 #include "config.h"
 
+#include <stdatomic.h>
 #include <stdint.h>
 #include <stdio.h>
 #include <signal.h>
@@ -555,7 +556,7 @@ typedef struct OutputStream {
     // combined size of all the packets written
     uint64_t data_size;
     // number of packets send to the muxer
-    uint64_t packets_written;
+    atomic_uint_least64_t packets_written;
     // number of frames/samples sent to the encoder
     uint64_t frames_encoded;
     uint64_t samples_encoded;
@@ -697,14 +698,14 @@ int hw_device_setup_for_filter(FilterGraph *fg);
 int hwaccel_decode_init(AVCodecContext *avctx);
 
 int of_muxer_init(OutputFile *of, AVFormatContext *fc,
-                  AVDictionary *opts, int64_t limit_filesize);
+                  AVDictionary *opts, int64_t limit_filesize,
+                  int thread_queue_size);
 /* open the muxer when all the streams are initialized */
 int of_check_init(OutputFile *of);
 int of_write_trailer(OutputFile *of);
 void of_close(OutputFile **pof);
 
 int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost);
-int of_finished(OutputFile *of);
 int64_t of_filesize(OutputFile *of);
 AVChapter * const *
 of_get_chapters(OutputFile *of, unsigned int *nb_chapters);
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 2abadd3f9b..67b875d41d 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -16,17 +16,21 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#include <stdatomic.h>
 #include <stdio.h>
 #include <string.h>
 
 #include "ffmpeg.h"
+#include "objpool.h"
 #include "sync_queue.h"
+#include "thread_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/timestamp.h"
+#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -51,13 +55,18 @@ typedef struct MuxStream {
 struct Muxer {
     AVFormatContext *fc;
 
+    pthread_t    thread;
+    ThreadQueue *tq;
+
     MuxStream *streams;
 
     AVDictionary *opts;
 
+    int thread_queue_size;
+
     /* filesize limit expressed in bytes */
     int64_t limit_filesize;
-    int64_t final_filesize;
+    atomic_int_least64_t last_filesize;
     int header_written;
 
     AVPacket *sq_pkt;
@@ -65,15 +74,6 @@ struct Muxer {
 
 static int want_sdp = 1;
 
-static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream, OSTFinished others)
-{
-    int i;
-    for (i = 0; i < nb_output_streams; i++) {
-        OutputStream *ost2 = output_streams[i];
-        ost2->finished |= ost == ost2 ? this_stream : others;
-    }
-}
-
 static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
     MuxStream *ms = &of->mux->streams[ost->index];
@@ -116,13 +116,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
     return 0;
 }
 
+static int64_t filesize(AVIOContext *pb)
+{
+    int64_t ret = -1;
+
+    if (pb) {
+        ret = avio_size(pb);
+        if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
+            ret = avio_tell(pb);
+    }
+
+    return ret;
+}
+
 static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
     MuxStream *ms = &of->mux->streams[ost->index];
     AVFormatContext *s = of->mux->fc;
     AVStream *st = ost->st;
+    int64_t fs;
     int ret;
 
+    fs = filesize(s->pb);
+    atomic_store(&of->mux->last_filesize, fs);
+    if (fs >= of->mux->limit_filesize)
+        return AVERROR_EOF;
+
     if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && ost->vsync_method == VSYNC_DROP) ||
         (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0))
         pkt->pts = pkt->dts = AV_NOPTS_VALUE;
@@ -175,7 +194,7 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
     ms->last_mux_dts = pkt->dts;
 
     ost->data_size += pkt->size;
-    ost->packets_written++;
+    atomic_fetch_add(&ost->packets_written, 1);
 
     pkt->stream_index = ost->index;
 
@@ -193,66 +212,81 @@ static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
     ret = av_interleaved_write_frame(s, pkt);
     if (ret < 0) {
         print_error("av_interleaved_write_frame()", ret);
-        main_return_code = 1;
-        close_all_output_streams(ost, MUXER_FINISHED | ENCODER_FINISHED, ENCODER_FINISHED);
         return ret;
     }
 
     return 0;
 }
 
-static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
     if (ost->sq_idx_mux >= 0) {
         int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
-        if (ret < 0) {
-            if (pkt)
-                av_packet_unref(pkt);
-            if (ret == AVERROR_EOF) {
-                ost->finished |= MUXER_FINISHED;
-                return 0;
-            } else
-                return ret;
-        }
+        if (ret < 0)
+            return ret;
 
         while (1) {
             ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
-            if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
-                return 0;
-            else if (ret < 0)
-                return ret;
+            if (ret < 0)
+                return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret;
 
             ret = write_packet(of, output_streams[of->ost_index + ret],
                                of->mux->sq_pkt);
             if (ret < 0)
                 return ret;
         }
-    } else {
-        if (pkt)
-            return write_packet(of, ost, pkt);
-
-        ost->finished |= MUXER_FINISHED;
-    }
+    } else if (pkt)
+        return write_packet(of, ost, pkt);
 
     return 0;
 }
 
-int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+static void *muxer_thread(void *arg)
 {
-    int ret;
+    OutputFile *of = arg;
+    Muxer     *mux = of->mux;
+    AVPacket  *pkt = NULL;
+    int        ret = 0;
+
+    pkt = av_packet_alloc();
+    if (!pkt) {
+        ret = AVERROR(ENOMEM);
+        goto finish;
+    }
 
-    if (of->mux->header_written) {
-        return submit_packet(of, ost, pkt);
-    } else {
-        /* the muxer is not initialized yet, buffer the packet */
-        ret = queue_packet(of, ost, pkt);
-        if (ret < 0) {
-            av_packet_unref(pkt);
-            return ret;
+    while (1) {
+        OutputStream *ost;
+        int stream_idx;
+
+        ret = tq_receive(mux->tq, &stream_idx, pkt);
+        if (stream_idx < 0) {
+            av_log(NULL, AV_LOG_DEBUG,
+                   "All streams finished for output file #%d\n", of->index);
+            ret = 0;
+            break;
+        }
+
+        ost = output_streams[of->ost_index + stream_idx];
+        ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt);
+        av_packet_unref(pkt);
+        if (ret == AVERROR_EOF)
+            tq_receive_finish(mux->tq, stream_idx);
+        else if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Error muxing a packet for output file #%d\n", of->index);
+            break;
         }
     }
 
-    return 0;
+finish:
+    av_packet_free(&pkt);
+
+    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+        tq_receive_finish(mux->tq, i);
+
+    av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index);
+
+    return (void*)(intptr_t)ret;
 }
 
 static int print_sdp(void)
@@ -303,11 +337,125 @@ static int print_sdp(void)
         av_freep(&sdp_filename);
     }
 
+    // SDP successfully written, allow muxer threads to start
+    ret = 1;
+
 fail:
     av_freep(&avc);
     return ret;
 }
 
+static int submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+{
+    Muxer *mux = of->mux;
+    int ret = 0;
+
+    if (!pkt || ost->finished & MUXER_FINISHED)
+        goto finish;
+
+    ret = tq_send(mux->tq, ost->index, pkt);
+    if (ret < 0)
+        goto finish;
+
+    return 0;
+
+finish:
+    if (pkt)
+        av_packet_unref(pkt);
+
+    ost->finished |= MUXER_FINISHED;
+    tq_send_finish(mux->tq, ost->index);
+    return ret == AVERROR_EOF ? 0 : ret;
+}
+
+int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
+{
+    int ret;
+
+    if (of->mux->tq) {
+        return submit_packet(of, ost, pkt);
+    } else {
+        /* the muxer is not initialized yet, buffer the packet */
+        ret = queue_packet(of, ost, pkt);
+        if (ret < 0) {
+            av_packet_unref(pkt);
+            return ret;
+        }
+    }
+
+    return 0;
+}
+
+static int thread_stop(OutputFile *of)
+{
+    Muxer *mux = of->mux;
+    void *ret;
+
+    if (!mux || !mux->tq)
+        return 0;
+
+    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+        tq_send_finish(mux->tq, i);
+
+    pthread_join(mux->thread, &ret);
+
+    tq_free(&mux->tq);
+
+    return (int)(intptr_t)ret;
+}
+
+static void pkt_move(void *dst, void *src)
+{
+    av_packet_move_ref(dst, src);
+}
+
+static int thread_start(OutputFile *of)
+{
+    Muxer          *mux = of->mux;
+    AVFormatContext *fc = mux->fc;
+    ObjPool *op;
+    int ret;
+
+    op = objpool_alloc_packets();
+    if (!op)
+        return AVERROR(ENOMEM);
+
+    mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
+    if (!mux->tq) {
+        objpool_free(&op);
+        return AVERROR(ENOMEM);
+    }
+
+    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of);
+    if (ret) {
+        tq_free(&mux->tq);
+        return AVERROR(ret);
+    }
+
+    /* flush the muxing queues */
+    for (int i = 0; i < fc->nb_streams; i++) {
+        MuxStream     *ms = &of->mux->streams[i];
+        OutputStream *ost = output_streams[of->ost_index + i];
+        AVPacket *pkt;
+
+        /* try to improve muxing time_base (only possible if nothing has been written yet) */
+        if (!av_fifo_can_read(ms->muxing_queue))
+            ost->mux_timebase = ost->st->time_base;
+
+        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
+            ret = submit_packet(of, ost, pkt);
+            if (pkt) {
+                ms->muxing_queue_data_size -= pkt->size;
+                av_packet_free(&pkt);
+            }
+            if (ret < 0)
+                return ret;
+        }
+    }
+
+    return 0;
+}
+
 /* open the muxer when all the streams are initialized */
 int of_check_init(OutputFile *of)
 {
@@ -339,28 +487,19 @@ int of_check_init(OutputFile *of)
         if (ret < 0) {
             av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
             return ret;
-        }
-    }
-
-    /* flush the muxing queues */
-    for (i = 0; i < fc->nb_streams; i++) {
-        MuxStream     *ms = &of->mux->streams[i];
-        OutputStream *ost = output_streams[of->ost_index + i];
-        AVPacket *pkt;
-
-        /* try to improve muxing time_base (only possible if nothing has been written yet) */
-        if (!av_fifo_can_read(ms->muxing_queue))
-            ost->mux_timebase = ost->st->time_base;
-
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
-            ret = submit_packet(of, ost, pkt);
-            if (pkt) {
-                ms->muxing_queue_data_size -= pkt->size;
-                av_packet_free(&pkt);
+        } else if (ret == 1) {
+            /* SDP is written only after all the muxers are ready, so now we
+             * start ALL the threads */
+            for (i = 0; i < nb_output_files; i++) {
+                ret = thread_start(output_files[i]);
+                if (ret < 0)
+                    return ret;
             }
-            if (ret < 0)
-                return ret;
         }
+    } else {
+        ret = thread_start(of);
+        if (ret < 0)
+            return ret;
     }
 
     return 0;
@@ -371,7 +510,7 @@ int of_write_trailer(OutputFile *of)
     AVFormatContext *fc = of->mux->fc;
     int ret;
 
-    if (!of->mux->header_written) {
+    if (!of->mux->tq) {
         av_log(NULL, AV_LOG_ERROR,
                "Nothing was written into output file %d (%s), because "
                "at least one of its streams received no packets.\n",
@@ -379,13 +518,17 @@ int of_write_trailer(OutputFile *of)
         return AVERROR(EINVAL);
     }
 
+    ret = thread_stop(of);
+    if (ret < 0)
+        main_return_code = ret;
+
     ret = av_write_trailer(fc);
     if (ret < 0) {
         av_log(NULL, AV_LOG_ERROR, "Error writing trailer of %s: %s\n", fc->url, av_err2str(ret));
         return ret;
     }
 
-    of->mux->final_filesize = of_filesize(of);
+    of->mux->last_filesize = filesize(fc->pb);
 
     if (!(of->format->flags & AVFMT_NOFILE)) {
         ret = avio_closep(&fc->pb);
@@ -448,6 +591,8 @@ void of_close(OutputFile **pof)
     if (!of)
         return;
 
+    thread_stop(of);
+
     sq_free(&of->sq_encode);
     sq_free(&of->sq_mux);
 
@@ -457,7 +602,8 @@ void of_close(OutputFile **pof)
 }
 
 int of_muxer_init(OutputFile *of, AVFormatContext *fc,
-                  AVDictionary *opts, int64_t limit_filesize)
+                  AVDictionary *opts, int64_t limit_filesize,
+                  int thread_queue_size)
 {
     Muxer *mux = av_mallocz(sizeof(*mux));
     int ret = 0;
@@ -487,6 +633,7 @@ int of_muxer_init(OutputFile *of, AVFormatContext *fc,
         ms->last_mux_dts = AV_NOPTS_VALUE;
     }
 
+    mux->thread_queue_size = thread_queue_size > 0 ? thread_queue_size : 8;
     mux->limit_filesize = limit_filesize;
     mux->opts           = opts;
 
@@ -515,25 +662,9 @@ fail:
     return ret;
 }
 
-int of_finished(OutputFile *of)
-{
-    return of_filesize(of) >= of->mux->limit_filesize;
-}
-
 int64_t of_filesize(OutputFile *of)
 {
-    AVIOContext *pb = of->mux->fc->pb;
-    int64_t ret = -1;
-
-    if (of->mux->final_filesize)
-        ret = of->mux->final_filesize;
-    else if (pb) {
-        ret = avio_size(pb);
-        if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
-            ret = avio_tell(pb);
-    }
-
-    return ret;
+    return atomic_load(&of->mux->last_filesize);
 }
 
 AVChapter * const *
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 7f46238534..807c0263b9 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -3055,7 +3055,7 @@ loop_end:
     of->nb_streams = oc->nb_streams;
     of->url        = filename;
 
-    err = of_muxer_init(of, oc, format_opts, o->limit_filesize);
+    err = of_muxer_init(of, oc, format_opts, o->limit_filesize, o->thread_queue_size);
     if (err < 0) {
         av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
         exit_program(1);
@@ -3841,7 +3841,7 @@ const OptionDef options[] = {
     { "disposition",    OPT_STRING | HAS_ARG | OPT_SPEC |
                         OPT_OUTPUT,                                  { .off = OFFSET(disposition) },
         "disposition", "" },
-    { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT,
+    { "thread_queue_size", HAS_ARG | OPT_INT | OPT_OFFSET | OPT_EXPERT | OPT_INPUT | OPT_OUTPUT,
                                                                      { .off = OFFSET(thread_queue_size) },
         "set the maximum number of queued packets from the demuxer" },
     { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { &find_stream_info },
-- 
2.34.1



More information about the ffmpeg-devel mailing list