[FFmpeg-cvslog] fftools/ffmpeg: rework -shortest implementation

Anton Khirnov git at videolan.org
Sat Jul 23 13:07:40 EEST 2022


ffmpeg | branch: master | Anton Khirnov <anton at khirnov.net> | Fri Jun 10 14:21:42 2022 +0200| [4740fea7ddf5f81577c9f5a0c096a8a16a54716e] | committer: Anton Khirnov

fftools/ffmpeg: rework -shortest implementation

The -shortest option (which finishes the output file at the time the
shortest stream ends) is currently implemented by faking the -t option
when an output stream ends. This approach is fragile, since it depends
on the frames/packets being processed in a specific order. E.g. there
are currently some situations in which the output file length will
depend unpredictably on unrelated factors like encoder delay. More
importantly, the present work aiming at splitting various ffmpeg
components into different threads will make this approach completely
unworkable, since the frames/packets will arrive in effectively random
order.

This commit introduces a "sync queue", which is essentially a collection
of FIFOs, one per stream. Frames/packets are submitted to these FIFOs
and are then released for further processing (encoding or muxing) when
it is ensured that the frame in question will not cause its stream to
get ahead of the other streams (the logic is similar to libavformat's
interleaving queue).

These sync queues are then used for encoding and/or muxing when the
-shortest option is specified.

A new option – -shortest_buf_duration – controls the maximum number of
queued packets, to avoid runaway memory usage.

This commit changes the results of the following tests:
- copy-shortest[12]: the last audio frame is now gone. This is
  correct, since it actually outlasts the last video frame.
- shortest-sub: the video packets following the last subtitle packet are
  now gone. This is also correct.

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=4740fea7ddf5f81577c9f5a0c096a8a16a54716e
---

 Changelog                     |   1 +
 doc/ffmpeg.texi               |  16 ++
 fftools/Makefile              |   2 +
 fftools/ffmpeg.c              | 109 ++++++++---
 fftools/ffmpeg.h              |   9 +
 fftools/ffmpeg_mux.c          |  67 ++++++-
 fftools/ffmpeg_opt.c          |  82 ++++++++
 fftools/sync_queue.c          | 425 ++++++++++++++++++++++++++++++++++++++++++
 fftools/sync_queue.h          | 100 ++++++++++
 tests/fate/ffmpeg.mak         |   2 +-
 tests/ref/fate/copy-shortest1 |   1 -
 tests/ref/fate/copy-shortest2 |   1 -
 tests/ref/fate/shortest-sub   |   4 +-
 13 files changed, 778 insertions(+), 41 deletions(-)

diff --git a/Changelog b/Changelog
index f9cd44f184..2e034ff15a 100644
--- a/Changelog
+++ b/Changelog
@@ -4,6 +4,7 @@ releases are sorted from youngest to oldest.
 version <next>:
 - Radiance HDR image support
 - ddagrab (Desktop Duplication) video capture filter
+- ffmpeg -shortest_buf_duration option
 
 
 version 5.1:
diff --git a/doc/ffmpeg.texi b/doc/ffmpeg.texi
index 767df69b7f..b97496d315 100644
--- a/doc/ffmpeg.texi
+++ b/doc/ffmpeg.texi
@@ -1765,6 +1765,22 @@ Default value is 0.
 Enable bitexact mode for (de)muxer and (de/en)coder
 @item -shortest (@emph{output})
 Finish encoding when the shortest output stream ends.
+
+Note that this option may require buffering frames, which introduces extra
+latency. The maximum amount of this latency may be controlled with the
+ at code{-shortest_buf_duration} option.
+
+ at item -shortest_buf_duration @var{duration} (@emph{output})
+The @code{-shortest} option may require buffering potentially large amounts
+of data when at least one of the streams is "sparse" (i.e. has large gaps
+between frames – this is typically the case for subtitles).
+
+This option controls the maximum duration of buffered frames in seconds.
+Larger values may allow the @code{-shortest} option to produce more accurate
+results, but increase memory use and latency.
+
+The default value is 10 seconds.
+
 @item -dts_delta_threshold
 Timestamp discontinuity delta threshold.
 @item -dts_error_threshold @var{seconds}
diff --git a/fftools/Makefile b/fftools/Makefile
index 81ad6c4f4f..bc57ebe748 100644
--- a/fftools/Makefile
+++ b/fftools/Makefile
@@ -14,6 +14,8 @@ OBJS-ffmpeg +=                  \
     fftools/ffmpeg_hw.o         \
     fftools/ffmpeg_mux.o        \
     fftools/ffmpeg_opt.o        \
+    fftools/objpool.o           \
+    fftools/sync_queue.o        \
 
 define DOFFTOOL
 OBJS-$(1) += fftools/cmdutils.o fftools/opt_common.o fftools/$(1).o $(OBJS-$(1)-yes)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 50e17b1890..9b6bb3d759 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -104,6 +104,7 @@
 
 #include "ffmpeg.h"
 #include "cmdutils.h"
+#include "sync_queue.h"
 
 #include "libavutil/avassert.h"
 
@@ -569,6 +570,7 @@ static void ffmpeg_cleanup(int ret)
         av_bsf_free(&ost->bsf_ctx);
 
         av_frame_free(&ost->filtered_frame);
+        av_frame_free(&ost->sq_frame);
         av_frame_free(&ost->last_frame);
         av_packet_free(&ost->pkt);
         av_dict_free(&ost->encoder_opts);
@@ -691,13 +693,10 @@ static void update_benchmark(const char *fmt, ...)
 static void close_output_stream(OutputStream *ost)
 {
     OutputFile *of = output_files[ost->file_index];
-    AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
     ost->finished |= ENCODER_FINISHED;
-    if (of->shortest) {
-        int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
-        of->recording_time = FFMIN(of->recording_time, end);
-    }
+
+    if (ost->sq_idx_encode >= 0)
+        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
 }
 
 /*
@@ -726,10 +725,15 @@ static void output_packet(OutputFile *of, AVPacket *pkt,
             goto finish;
         while ((ret = av_bsf_receive_packet(ost->bsf_ctx, pkt)) >= 0)
             of_submit_packet(of, pkt, ost);
+        if (ret == AVERROR_EOF)
+            of_submit_packet(of, NULL, ost);
         if (ret == AVERROR(EAGAIN))
             ret = 0;
-    } else if (!eof)
-        of_submit_packet(of, pkt, ost);
+    } else
+        of_submit_packet(of, eof ? NULL : pkt, ost);
+
+    if (eof)
+        ost->finished |= MUXER_FINISHED;
 
 finish:
     if (ret < 0 && ret != AVERROR_EOF) {
@@ -899,6 +903,7 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
 
     if (frame) {
         ost->frames_encoded++;
+        ost->samples_encoded += frame->nb_samples;
 
         if (debug_ts) {
             av_log(NULL, AV_LOG_INFO, "encoder <- type:%s "
@@ -971,6 +976,52 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame)
     av_assert0(0);
 }
 
+static int submit_encode_frame(OutputFile *of, OutputStream *ost,
+                               AVFrame *frame)
+{
+    int ret;
+
+    if (ost->sq_idx_encode < 0)
+        return encode_frame(of, ost, frame);
+
+    if (frame) {
+        ret = av_frame_ref(ost->sq_frame, frame);
+        if (ret < 0)
+            return ret;
+        frame = ost->sq_frame;
+    }
+
+    ret = sq_send(of->sq_encode, ost->sq_idx_encode,
+                  SQFRAME(frame));
+    if (ret < 0) {
+        if (frame)
+            av_frame_unref(frame);
+        if (ret != AVERROR_EOF)
+            return ret;
+    }
+
+    while (1) {
+        AVFrame *enc_frame = ost->sq_frame;
+
+        ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
+                               SQFRAME(enc_frame));
+        if (ret == AVERROR_EOF) {
+            enc_frame = NULL;
+        } else if (ret < 0) {
+            return (ret == AVERROR(EAGAIN)) ? 0 : ret;
+        }
+
+        ret = encode_frame(of, ost, enc_frame);
+        if (enc_frame)
+            av_frame_unref(enc_frame);
+        if (ret < 0) {
+            if (ret == AVERROR_EOF)
+                close_output_stream(ost);
+            return ret;
+        }
+    }
+}
+
 static void do_audio_out(OutputFile *of, OutputStream *ost,
                          AVFrame *frame)
 {
@@ -984,10 +1035,9 @@ static void do_audio_out(OutputFile *of, OutputStream *ost,
     if (frame->pts == AV_NOPTS_VALUE || audio_sync_method < 0)
         frame->pts = ost->sync_opts;
     ost->sync_opts = frame->pts + frame->nb_samples;
-    ost->samples_encoded += frame->nb_samples;
 
-    ret = encode_frame(of, ost, frame);
-    if (ret < 0)
+    ret = submit_encode_frame(of, ost, frame);
+    if (ret < 0 && ret != AVERROR_EOF)
         exit_program(1);
 }
 
@@ -1151,15 +1201,18 @@ static void do_video_out(OutputFile *of,
                 if (delta0 > 1.1)
                     nb0_frames = llrintf(delta0 - 0.6);
             }
+            next_picture->pkt_duration = 1;
             break;
         case VSYNC_VFR:
             if (delta <= -0.6)
                 nb_frames = 0;
             else if (delta > 0.6)
                 ost->sync_opts = llrint(sync_ipts);
+            next_picture->pkt_duration = duration;
             break;
         case VSYNC_DROP:
         case VSYNC_PASSTHROUGH:
+            next_picture->pkt_duration = duration;
             ost->sync_opts = llrint(sync_ipts);
             break;
         default:
@@ -1273,8 +1326,8 @@ static void do_video_out(OutputFile *of,
             av_log(NULL, AV_LOG_DEBUG, "Forced keyframe at time %f\n", pts_time);
         }
 
-        ret = encode_frame(of, ost, in_picture);
-        if (ret < 0)
+        ret = submit_encode_frame(of, ost, in_picture);
+        if (ret < 0 && ret != AVERROR_EOF)
             exit_program(1);
 
         ost->sync_opts++;
@@ -1286,19 +1339,6 @@ static void do_video_out(OutputFile *of,
         av_frame_move_ref(ost->last_frame, next_picture);
 }
 
-static void finish_output_stream(OutputStream *ost)
-{
-    OutputFile *of = output_files[ost->file_index];
-    AVRational time_base = ost->stream_copy ? ost->mux_timebase : ost->enc_ctx->time_base;
-
-    ost->finished = ENCODER_FINISHED | MUXER_FINISHED;
-
-    if (of->shortest) {
-        int64_t end = av_rescale_q(ost->sync_opts - ost->first_pts, time_base, AV_TIME_BASE_Q);
-        of->recording_time = FFMIN(of->recording_time, end);
-    }
-}
-
 /**
  * Get and encode new output from any of the filtergraphs, without causing
  * activity.
@@ -1766,7 +1806,7 @@ static void flush_encoders(void)
                     exit_program(1);
                 }
 
-                finish_output_stream(ost);
+                output_packet(of, ost->pkt, ost, 1);
             }
 
             init_output_stream_wrapper(ost, NULL, 1);
@@ -1775,7 +1815,7 @@ static void flush_encoders(void)
         if (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)
             continue;
 
-        ret = encode_frame(of, ost, NULL);
+        ret = submit_encode_frame(of, ost, NULL);
         if (ret != AVERROR_EOF)
             exit_program(1);
     }
@@ -3086,6 +3126,9 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
         break;
     }
 
+    if (ost->sq_idx_encode >= 0)
+        sq_set_tb(of->sq_encode, ost->sq_idx_encode, enc_ctx->time_base);
+
     ost->mux_timebase = enc_ctx->time_base;
 
     return 0;
@@ -3094,6 +3137,7 @@ static int init_output_stream_encode(OutputStream *ost, AVFrame *frame)
 static int init_output_stream(OutputStream *ost, AVFrame *frame,
                               char *error, int error_len)
 {
+    OutputFile *of = output_files[ost->file_index];
     int ret = 0;
 
     if (ost->encoding_needed) {
@@ -3226,6 +3270,9 @@ static int init_output_stream(OutputStream *ost, AVFrame *frame,
     if (ret < 0)
         return ret;
 
+    if (ost->sq_idx_mux >= 0)
+        sq_set_tb(of->sq_mux, ost->sq_idx_mux, ost->mux_timebase);
+
     ost->initialized = 1;
 
     ret = of_check_init(output_files[ost->file_index]);
@@ -3930,8 +3977,10 @@ static int process_input(int file_index)
                 OutputStream *ost = output_streams[j];
 
                 if (ost->source_index == ifile->ist_index + i &&
-                    (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE))
-                    finish_output_stream(ost);
+                    (ost->stream_copy || ost->enc->type == AVMEDIA_TYPE_SUBTITLE)) {
+                    OutputFile *of = output_files[ost->file_index];
+                    output_packet(of, ost->pkt, ost, 1);
+                }
             }
         }
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 090bf67d2d..58e093b2cb 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -26,6 +26,7 @@
 #include <signal.h>
 
 #include "cmdutils.h"
+#include "sync_queue.h"
 
 #include "libavformat/avformat.h"
 #include "libavformat/avio.h"
@@ -151,6 +152,7 @@ typedef struct OptionsContext {
     int64_t limit_filesize;
     float mux_preload;
     float mux_max_delay;
+    float shortest_buf_duration;
     int shortest;
     int bitexact;
 
@@ -484,6 +486,7 @@ typedef struct OutputStream {
     int64_t max_frames;
     AVFrame *filtered_frame;
     AVFrame *last_frame;
+    AVFrame *sq_frame;
     AVPacket *pkt;
     int64_t last_dropped;
     int64_t last_nb0_frames[3];
@@ -575,6 +578,9 @@ typedef struct OutputStream {
 
     /* frame encode sum of squared error values */
     int64_t error[4];
+
+    int sq_idx_encode;
+    int sq_idx_mux;
 } OutputStream;
 
 typedef struct Muxer Muxer;
@@ -585,6 +591,9 @@ typedef struct OutputFile {
     Muxer                *mux;
     const AVOutputFormat *format;
 
+    SyncQueue *sq_encode;
+    SyncQueue *sq_mux;
+
     AVFormatContext *ctx;
     int ost_index;       /* index of the first stream in output_streams */
     int64_t recording_time;  ///< desired length of the resulting file in microseconds == AV_TIME_BASE units
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index a3350a73e9..453ccac912 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -20,6 +20,7 @@
 #include <string.h>
 
 #include "ffmpeg.h"
+#include "sync_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
@@ -56,6 +57,8 @@ struct Muxer {
     int64_t limit_filesize;
     int64_t final_filesize;
     int header_written;
+
+    AVPacket *sq_pkt;
 };
 
 static int want_sdp = 1;
@@ -72,13 +75,14 @@ static void close_all_output_streams(OutputStream *ost, OSTFinished this_stream,
 static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
     MuxStream *ms = &of->mux->streams[ost->index];
-    AVPacket *tmp_pkt;
+    AVPacket *tmp_pkt = NULL;
     int ret;
 
     if (!av_fifo_can_write(ms->muxing_queue)) {
         size_t cur_size = av_fifo_can_read(ms->muxing_queue);
+        size_t pkt_size = pkt ? pkt->size : 0;
         unsigned int are_we_over_size =
-            (ms->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold;
+            (ms->muxing_queue_data_size + pkt_size) > ost->muxing_queue_data_threshold;
         size_t limit    = are_we_over_size ? ost->max_muxing_queue_size : SIZE_MAX;
         size_t new_size = FFMIN(2 * cur_size, limit);
 
@@ -93,6 +97,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
             return ret;
     }
 
+    if (pkt) {
     ret = av_packet_make_refcounted(pkt);
     if (ret < 0)
         return ret;
@@ -103,6 +108,7 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 
     av_packet_move_ref(tmp_pkt, pkt);
     ms->muxing_queue_data_size += tmp_pkt->size;
+    }
     av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
 
     return 0;
@@ -192,11 +198,44 @@ static void write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
     }
 }
 
+static void submit_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
+{
+    if (ost->sq_idx_mux >= 0) {
+        int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
+        if (ret < 0) {
+            if (pkt)
+                av_packet_unref(pkt);
+            if (ret == AVERROR_EOF) {
+                ost->finished |= MUXER_FINISHED;
+                return;
+            } else
+                exit_program(1);
+        }
+
+        while (1) {
+            ret = sq_receive(of->sq_mux, -1, SQPKT(of->mux->sq_pkt));
+            if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
+                return;
+            else if (ret < 0)
+                exit_program(1);
+
+            write_packet(of, output_streams[of->ost_index + ret],
+                         of->mux->sq_pkt);
+        }
+    } else {
+        if (pkt)
+            write_packet(of, ost, pkt);
+        else
+            ost->finished |= MUXER_FINISHED;
+    }
+}
+
 void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
 {
     AVStream *st = ost->st;
     int ret;
 
+    if (pkt) {
     /*
      * Audio encoders may split the packets --  #frames in != #packets out.
      * But there is no reordering, so we can limit the number of output packets
@@ -211,9 +250,10 @@ void of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost)
         }
         ost->frame_number++;
     }
+    }
 
     if (of->mux->header_written) {
-        write_packet(of, ost, pkt);
+        submit_packet(of, ost, pkt);
     } else {
         /* the muxer is not initialized yet, buffer the packet */
         ret = queue_packet(of, ost, pkt);
@@ -321,9 +361,11 @@ int of_check_init(OutputFile *of)
             ost->mux_timebase = ost->st->time_base;
 
         while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
-            ms->muxing_queue_data_size -= pkt->size;
-            write_packet(of, ost, pkt);
-            av_packet_free(&pkt);
+            submit_packet(of, ost, pkt);
+            if (pkt) {
+                ms->muxing_queue_data_size -= pkt->size;
+                av_packet_free(&pkt);
+            }
         }
     }
 
@@ -383,6 +425,8 @@ static void mux_free(Muxer **pmux, int nb_streams)
     av_freep(&mux->streams);
     av_dict_free(&mux->opts);
 
+    av_packet_free(&mux->sq_pkt);
+
     av_freep(pmux);
 }
 
@@ -394,6 +438,9 @@ void of_close(OutputFile **pof)
     if (!of)
         return;
 
+    sq_free(&of->sq_encode);
+    sq_free(&of->sq_mux);
+
     s = of->ctx;
 
     mux_free(&of->mux, s ? s->nb_streams : 0);
@@ -437,6 +484,14 @@ int of_muxer_init(OutputFile *of, AVDictionary *opts, int64_t limit_filesize)
     if (strcmp(of->format->name, "rtp"))
         want_sdp = 0;
 
+    if (of->sq_mux) {
+        mux->sq_pkt = av_packet_alloc();
+        if (!mux->sq_pkt) {
+            ret = AVERROR(ENOMEM);
+            goto fail;
+        }
+    }
+
     /* write the header for files with no streams */
     if (of->format->flags & AVFMT_NOSTREAMS && of->ctx->nb_streams == 0) {
         ret = of_check_init(of);
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index db8ec33cde..4281644cfc 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -31,6 +31,7 @@
 #include "fopen_utf8.h"
 #include "cmdutils.h"
 #include "opt_common.h"
+#include "sync_queue.h"
 
 #include "libavformat/avformat.h"
 
@@ -236,6 +237,7 @@ static void init_options(OptionsContext *o)
     o->accurate_seek  = 1;
     o->thread_queue_size = -1;
     o->input_sync_ref = -1;
+    o->shortest_buf_duration = 10.f;
 }
 
 static int show_hwaccels(void *optctx, const char *opt, const char *arg)
@@ -2385,6 +2387,78 @@ static int init_complex_filters(void)
     return 0;
 }
 
+static int setup_sync_queues(OutputFile *of, AVFormatContext *oc, int64_t buf_size_us)
+{
+    int nb_av_enc = 0, nb_interleaved = 0;
+
+#define IS_AV_ENC(ost, type)  \
+    (ost->encoding_needed && (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO))
+#define IS_INTERLEAVED(type) (type != AVMEDIA_TYPE_ATTACHMENT)
+
+    for (int i = 0; i < oc->nb_streams; i++) {
+        OutputStream *ost = output_streams[of->ost_index + i];
+        enum AVMediaType type = ost->st->codecpar->codec_type;
+
+        ost->sq_idx_encode = -1;
+        ost->sq_idx_mux    = -1;
+
+        nb_interleaved += IS_INTERLEAVED(type);
+        nb_av_enc      += IS_AV_ENC(ost, type);
+    }
+
+    if (!(nb_interleaved > 1 && of->shortest))
+        return 0;
+
+    /* if we have more than one encoded audio/video streams, then we
+     * synchronize them before encoding */
+    if (nb_av_enc > 1) {
+        of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us);
+        if (!of->sq_encode)
+            return AVERROR(ENOMEM);
+
+        for (int i = 0; i < oc->nb_streams; i++) {
+            OutputStream *ost = output_streams[of->ost_index + i];
+            enum AVMediaType type = ost->st->codecpar->codec_type;
+
+            if (!IS_AV_ENC(ost, type))
+                continue;
+
+            ost->sq_idx_encode = sq_add_stream(of->sq_encode);
+            if (ost->sq_idx_encode < 0)
+                return ost->sq_idx_encode;
+
+            ost->sq_frame = av_frame_alloc();
+            if (!ost->sq_frame)
+                return AVERROR(ENOMEM);
+        }
+    }
+
+    /* if there are any additional interleaved streams, then ALL the streams
+     * are also synchronized before sending them to the muxer */
+    if (nb_interleaved > nb_av_enc) {
+        of->sq_mux = sq_alloc(SYNC_QUEUE_PACKETS, buf_size_us);
+        if (!of->sq_mux)
+            return AVERROR(ENOMEM);
+
+        for (int i = 0; i < oc->nb_streams; i++) {
+            OutputStream *ost = output_streams[of->ost_index + i];
+            enum AVMediaType type = ost->st->codecpar->codec_type;
+
+            if (!IS_INTERLEAVED(type))
+                continue;
+
+            ost->sq_idx_mux = sq_add_stream(of->sq_mux);
+            if (ost->sq_idx_mux < 0)
+                return ost->sq_idx_mux;
+        }
+    }
+
+#undef IS_AV_ENC
+#undef IS_INTERLEAVED
+
+    return 0;
+}
+
 static int open_output_file(OptionsContext *o, const char *filename)
 {
     AVFormatContext *oc;
@@ -3022,6 +3096,12 @@ loop_end:
         exit_program(1);
     }
 
+    err = setup_sync_queues(of, oc, o->shortest_buf_duration * AV_TIME_BASE);
+    if (err < 0) {
+        av_log(NULL, AV_LOG_FATAL, "Error setting up output sync queues\n");
+        exit_program(1);
+    }
+
     err = of_muxer_init(of, format_opts, o->limit_filesize);
     if (err < 0) {
         av_log(NULL, AV_LOG_FATAL, "Error initializing internal muxing state\n");
@@ -3739,6 +3819,8 @@ const OptionDef options[] = {
     { "shortest",       OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
                         OPT_OUTPUT,                                  { .off = OFFSET(shortest) },
         "finish encoding within shortest input" },
+    { "shortest_buf_duration", HAS_ARG | OPT_FLOAT | OPT_EXPERT | OPT_OFFSET | OPT_OUTPUT, { .off = OFFSET(shortest_buf_duration) },
+        "maximum buffering duration (in seconds) for the -shortest option" },
     { "bitexact",       OPT_BOOL | OPT_EXPERT | OPT_OFFSET |
                         OPT_OUTPUT | OPT_INPUT,                      { .off = OFFSET(bitexact) },
         "bitexact mode" },
diff --git a/fftools/sync_queue.c b/fftools/sync_queue.c
new file mode 100644
index 0000000000..ab654ca790
--- /dev/null
+++ b/fftools/sync_queue.c
@@ -0,0 +1,425 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdint.h>
+#include <string.h>
+
+#include "libavutil/avassert.h"
+#include "libavutil/error.h"
+#include "libavutil/fifo.h"
+#include "libavutil/mathematics.h"
+#include "libavutil/mem.h"
+
+#include "objpool.h"
+#include "sync_queue.h"
+
+typedef struct SyncQueueStream {
+    AVFifo          *fifo;
+    AVRational       tb;
+
+    /* stream head: largest timestamp seen */
+    int64_t          head_ts;
+    /* no more frames will be sent for this stream */
+    int              finished;
+} SyncQueueStream;
+
+struct SyncQueue {
+    enum SyncQueueType type;
+
+    /* no more frames will be sent for any stream */
+    int finished;
+    /* sync head: the stream with the _smallest_ head timestamp
+     * this stream determines which frames can be output */
+    int head_stream;
+    /* the finished stream with the smallest finish timestamp or -1 */
+    int head_finished_stream;
+
+    // maximum buffering duration in microseconds
+    int64_t buf_size_us;
+
+    SyncQueueStream *streams;
+    unsigned int  nb_streams;
+
+    // pool of preallocated frames to avoid constant allocations
+    ObjPool *pool;
+};
+
+static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
+                       SyncQueueFrame src)
+{
+    if (sq->type == SYNC_QUEUE_PACKETS)
+        av_packet_move_ref(dst.p, src.p);
+    else
+        av_frame_move_ref(dst.f, src.f);
+}
+
+static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
+{
+    return (sq->type == SYNC_QUEUE_PACKETS) ?
+           frame.p->pts + frame.p->duration :
+           frame.f->pts + frame.f->pkt_duration;
+}
+
+static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
+{
+    return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
+}
+
+static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
+{
+    SyncQueueStream *st = &sq->streams[stream_idx];
+
+    st->finished = 1;
+
+    if (st->head_ts != AV_NOPTS_VALUE) {
+        /* check if this stream is the new finished head */
+        if (sq->head_finished_stream < 0 ||
+            av_compare_ts(st->head_ts, st->tb,
+                          sq->streams[sq->head_finished_stream].head_ts,
+                          sq->streams[sq->head_finished_stream].tb) < 0) {
+            sq->head_finished_stream = stream_idx;
+        }
+
+        /* mark as finished all streams that should no longer receive new frames,
+         * due to them being ahead of some finished stream */
+        st = &sq->streams[sq->head_finished_stream];
+        for (unsigned int i = 0; i < sq->nb_streams; i++) {
+            SyncQueueStream *st1 = &sq->streams[i];
+            if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
+                av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
+                st1->finished = 1;
+        }
+    }
+
+    /* mark the whole queue as finished if all streams are finished */
+    for (unsigned int i = 0; i < sq->nb_streams; i++) {
+        if (!sq->streams[i].finished)
+            return;
+    }
+    sq->finished = 1;
+}
+
+static void queue_head_update(SyncQueue *sq)
+{
+    if (sq->head_stream < 0) {
+        /* wait for one timestamp in each stream before determining
+         * the queue head */
+        for (unsigned int i = 0; i < sq->nb_streams; i++) {
+            SyncQueueStream *st = &sq->streams[i];
+            if (st->head_ts == AV_NOPTS_VALUE)
+                return;
+        }
+
+        // placeholder value, correct one will be found below
+        sq->head_stream = 0;
+    }
+
+    for (unsigned int i = 0; i < sq->nb_streams; i++) {
+        SyncQueueStream *st_head  = &sq->streams[sq->head_stream];
+        SyncQueueStream *st_other = &sq->streams[i];
+        if (st_other->head_ts != AV_NOPTS_VALUE &&
+            av_compare_ts(st_other->head_ts, st_other->tb,
+                          st_head->head_ts,  st_head->tb) < 0)
+            sq->head_stream = i;
+    }
+}
+
+/* update this stream's head timestamp */
+static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
+{
+    SyncQueueStream *st = &sq->streams[stream_idx];
+
+    if (ts == AV_NOPTS_VALUE ||
+        (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
+        return;
+
+    st->head_ts = ts;
+
+    /* if this stream is now ahead of some finished stream, then
+     * this stream is also finished */
+    if (sq->head_finished_stream >= 0 &&
+        av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
+                      sq->streams[sq->head_finished_stream].tb,
+                      ts, st->tb) <= 0)
+        finish_stream(sq, stream_idx);
+
+    /* update the overall head timestamp if it could have changed */
+    if (sq->head_stream < 0 || sq->head_stream == stream_idx)
+        queue_head_update(sq);
+}
+
+/* If the queue for the given stream (or all streams when stream_idx=-1)
+ * is overflowing, trigger a fake heartbeat on lagging streams.
+ *
+ * @return 1 if heartbeat triggered, 0 otherwise
+ */
+static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
+{
+    SyncQueueStream *st;
+    SyncQueueFrame frame;
+    int64_t tail_ts = AV_NOPTS_VALUE;
+
+    /* if no stream specified, pick the one that is most ahead */
+    if (stream_idx < 0) {
+        int64_t ts = AV_NOPTS_VALUE;
+
+        for (int i = 0; i < sq->nb_streams; i++) {
+            st = &sq->streams[i];
+            if (st->head_ts != AV_NOPTS_VALUE &&
+                (ts == AV_NOPTS_VALUE ||
+                 av_compare_ts(ts, sq->streams[stream_idx].tb,
+                               st->head_ts, st->tb) < 0)) {
+                ts = st->head_ts;
+                stream_idx = i;
+            }
+        }
+        /* no stream has a timestamp yet -> nothing to do */
+        if (stream_idx < 0)
+            return 0;
+    }
+
+    st = &sq->streams[stream_idx];
+
+    /* get the chosen stream's tail timestamp */
+    for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
+                       av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
+        tail_ts = frame_ts(sq, frame);
+
+    /* overflow triggers when the tail is over specified duration behind the head */
+    if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
+        av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
+        return 0;
+
+    /* signal a fake timestamp for all streams that prevent tail_ts from being output */
+    tail_ts++;
+    for (unsigned int i = 0; i < sq->nb_streams; i++) {
+        SyncQueueStream *st1 = &sq->streams[i];
+        int64_t ts;
+
+        if (st == st1 || st1->finished ||
+            (st1->head_ts != AV_NOPTS_VALUE &&
+             av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
+            continue;
+
+        ts = av_rescale_q(tail_ts, st->tb, st1->tb);
+        if (st1->head_ts != AV_NOPTS_VALUE)
+            ts = FFMAX(st1->head_ts + 1, ts);
+
+        stream_update_ts(sq, i, ts);
+    }
+
+    return 1;
+}
+
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
+{
+    SyncQueueStream *st;
+    SyncQueueFrame dst;
+    int64_t ts;
+    int ret;
+
+    av_assert0(stream_idx < sq->nb_streams);
+    st = &sq->streams[stream_idx];
+
+    av_assert0(st->tb.num > 0 && st->tb.den > 0);
+
+    if (frame_null(sq, frame)) {
+        finish_stream(sq, stream_idx);
+        return 0;
+    }
+    if (st->finished)
+        return AVERROR_EOF;
+
+    ret = objpool_get(sq->pool, (void**)&dst);
+    if (ret < 0)
+        return ret;
+
+    frame_move(sq, dst, frame);
+
+    ts = frame_ts(sq, dst);
+
+    ret = av_fifo_write(st->fifo, &dst, 1);
+    if (ret < 0) {
+        frame_move(sq, frame, dst);
+        objpool_release(sq->pool, (void**)&dst);
+        return ret;
+    }
+
+    stream_update_ts(sq, stream_idx, ts);
+
+    return 0;
+}
+
+static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
+                              SyncQueueFrame frame)
+{
+    SyncQueueStream *st_head = sq->head_stream >= 0 ?
+                               &sq->streams[sq->head_stream] : NULL;
+    SyncQueueStream *st;
+
+    av_assert0(stream_idx < sq->nb_streams);
+    st = &sq->streams[stream_idx];
+
+    if (av_fifo_can_read(st->fifo)) {
+        SyncQueueFrame peek;
+        int64_t ts;
+        int cmp = 1;
+
+        av_fifo_peek(st->fifo, &peek, 1, 0);
+        ts = frame_ts(sq, peek);
+
+        /* check if this stream's tail timestamp does not overtake
+         * the overall queue head */
+        if (ts != AV_NOPTS_VALUE && st_head)
+            cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
+
+        /* We can release frames that do not end after the queue head.
+         * Frames with no timestamps are just passed through with no conditions.
+         */
+        if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
+            frame_move(sq, frame, peek);
+            objpool_release(sq->pool, (void**)&peek);
+            av_fifo_drain2(st->fifo, 1);
+            return 0;
+        }
+    }
+
+    return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
+            AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+    int nb_eof = 0;
+    int ret;
+
+    /* read a frame for a specific stream */
+    if (stream_idx >= 0) {
+        ret = receive_for_stream(sq, stream_idx, frame);
+        return (ret < 0) ? ret : stream_idx;
+    }
+
+    /* read a frame for any stream with available output */
+    for (unsigned int i = 0; i < sq->nb_streams; i++) {
+        ret = receive_for_stream(sq, i, frame);
+        if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
+            nb_eof += (ret == AVERROR_EOF);
+            continue;
+        }
+        return (ret < 0) ? ret : i;
+    }
+
+    return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
+}
+
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
+{
+    int ret = receive_internal(sq, stream_idx, frame);
+
+    /* try again if the queue overflowed and triggered a fake heartbeat
+     * for lagging streams */
+    if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
+        ret = receive_internal(sq, stream_idx, frame);
+
+    return ret;
+}
+
+int sq_add_stream(SyncQueue *sq)
+{
+    SyncQueueStream *tmp, *st;
+
+    tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
+    if (!tmp)
+        return AVERROR(ENOMEM);
+    sq->streams = tmp;
+
+    st = &sq->streams[sq->nb_streams];
+    memset(st, 0, sizeof(*st));
+
+    st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
+    if (!st->fifo)
+        return AVERROR(ENOMEM);
+
+    /* we set a valid default, so that a pathological stream that never
+     * receives even a real timebase (and no frames) won't stall all other
+     * streams forever; cf. overflow_heartbeat() */
+    st->tb      = (AVRational){ 1, 1 };
+    st->head_ts = AV_NOPTS_VALUE;
+
+    return sq->nb_streams++;
+}
+
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
+{
+    SyncQueueStream *st;
+
+    av_assert0(stream_idx < sq->nb_streams);
+    st = &sq->streams[stream_idx];
+
+    av_assert0(!av_fifo_can_read(st->fifo));
+
+    if (st->head_ts != AV_NOPTS_VALUE)
+        st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
+
+    st->tb = tb;
+}
+
+SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
+{
+    SyncQueue *sq = av_mallocz(sizeof(*sq));
+
+    if (!sq)
+        return NULL;
+
+    sq->type                 = type;
+    sq->buf_size_us          = buf_size_us;
+
+    sq->head_stream          = -1;
+    sq->head_finished_stream = -1;
+
+    sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() :
+                                              objpool_alloc_frames();
+    if (!sq->pool) {
+        av_freep(&sq);
+        return NULL;
+    }
+
+    return sq;
+}
+
+void sq_free(SyncQueue **psq)
+{
+    SyncQueue *sq = *psq;
+
+    if (!sq)
+        return;
+
+    for (unsigned int i = 0; i < sq->nb_streams; i++) {
+        SyncQueueFrame frame;
+        while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
+            objpool_release(sq->pool, (void**)&frame);
+
+        av_fifo_freep2(&sq->streams[i].fifo);
+    }
+
+    av_freep(&sq->streams);
+
+    objpool_free(&sq->pool);
+
+    av_freep(psq);
+}
diff --git a/fftools/sync_queue.h b/fftools/sync_queue.h
new file mode 100644
index 0000000000..e08780b7bf
--- /dev/null
+++ b/fftools/sync_queue.h
@@ -0,0 +1,100 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FFTOOLS_SYNC_QUEUE_H
+#define FFTOOLS_SYNC_QUEUE_H
+
+#include <stdint.h>
+
+#include "libavcodec/packet.h"
+
+#include "libavutil/frame.h"
+
+enum SyncQueueType {
+    SYNC_QUEUE_PACKETS,
+    SYNC_QUEUE_FRAMES,
+};
+
+typedef union SyncQueueFrame {
+    AVFrame  *f;
+    AVPacket *p;
+} SyncQueueFrame;
+
+#define SQFRAME(frame) ((SyncQueueFrame){ .f = (frame) })
+#define SQPKT(pkt)     ((SyncQueueFrame){ .p = (pkt) })
+
+typedef struct SyncQueue SyncQueue;
+
+/**
+ * Allocate a sync queue of the given type.
+ *
+ * @param buf_size_us maximum duration that will be buffered in microseconds
+ */
+SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us);
+void       sq_free(SyncQueue **sq);
+
+/**
+ * Add a new stream to the sync queue.
+ *
+ * @return
+ * - a non-negative stream index on success
+ * - a negative error code on error
+ */
+int sq_add_stream(SyncQueue *sq);
+
+/**
+ * Set the timebase for the stream with index stream_idx. Should be called
+ * before sending any frames for this stream.
+ */
+void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb);
+
+/**
+ * Submit a frame for the stream with index stream_idx.
+ *
+ * On success, the sync queue takes ownership of the frame and will reset the
+ * contents of the supplied frame. On failure, the frame remains owned by the
+ * caller.
+ *
+ * Sending a frame with NULL contents marks the stream as finished.
+ *
+ * @return
+ * - 0 on success
+ * - AVERROR_EOF when no more frames should be submitted for this stream
+ * - another a negative error code on failure
+ */
+int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame);
+
+/**
+ * Read a frame from the queue.
+ *
+ * @param stream_idx index of the stream to read a frame for. May be -1, then
+ *                   try to read a frame from any stream that is ready for
+ *                   output.
+ * @param frame output frame will be written here on success. The frame is owned
+ *              by the caller.
+ *
+ * @return
+ * - a non-negative index of the stream to which the returned frame belongs
+ * - AVERROR(EAGAIN) when more frames need to be submitted to the queue
+ * - AVERROR_EOF when no more frames will be available for this stream (for any
+ *               stream if stream_idx is -1)
+ * - another negative error code on failure
+ */
+int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame);
+
+#endif // FFTOOLS_SYNC_QUEUE_H
diff --git a/tests/fate/ffmpeg.mak b/tests/fate/ffmpeg.mak
index 154af2fac8..38a1ae7ed5 100644
--- a/tests/fate/ffmpeg.mak
+++ b/tests/fate/ffmpeg.mak
@@ -105,7 +105,7 @@ FATE_SAMPLES_FFMPEG-$(call ALLYES, COLOR_FILTER, VOBSUB_DEMUXER, MATROSKA_DEMUXE
 fate-shortest-sub: CMD = enc_dec                                                                      \
         vobsub $(TARGET_SAMPLES)/sub/vobsub.idx matroska                                              \
         "-filter_complex 'color=s=1x1:rate=1:duration=400' -pix_fmt rgb24 -allow_raw_vfw 1 -c:s copy -c:v rawvideo"  \
-        framecrc "-map 0 -c copy -shortest"
+        framecrc "-map 0 -c copy -shortest -shortest_buf_duration 40"
 
 # Basic test for fix_sub_duration, which calculates duration based on the
 # following subtitle's pts.
diff --git a/tests/ref/fate/copy-shortest1 b/tests/ref/fate/copy-shortest1
index 5038973e4e..87bee4c41f 100644
--- a/tests/ref/fate/copy-shortest1
+++ b/tests/ref/fate/copy-shortest1
@@ -120,4 +120,3 @@
 0,      98304,      98304,     2048,    11182, e35a2ab846029effdbca0e43639717f2
 1,      85760,      85760,     1536,      418, cf52ea7fc69e4c5bc8f75b354dfe60af
 0,     100352,     100352,     2048,     1423, f480272c7d0b97834bc8ea36cceca61d
-1,      87296,      87296,     1536,      418, 78ab22657a1b6c8a0e5b8612ceb8081d
diff --git a/tests/ref/fate/copy-shortest2 b/tests/ref/fate/copy-shortest2
index 5038973e4e..87bee4c41f 100644
--- a/tests/ref/fate/copy-shortest2
+++ b/tests/ref/fate/copy-shortest2
@@ -120,4 +120,3 @@
 0,      98304,      98304,     2048,    11182, e35a2ab846029effdbca0e43639717f2
 1,      85760,      85760,     1536,      418, cf52ea7fc69e4c5bc8f75b354dfe60af
 0,     100352,     100352,     2048,     1423, f480272c7d0b97834bc8ea36cceca61d
-1,      87296,      87296,     1536,      418, 78ab22657a1b6c8a0e5b8612ceb8081d
diff --git a/tests/ref/fate/shortest-sub b/tests/ref/fate/shortest-sub
index be0922fd56..0da4ba2e95 100644
--- a/tests/ref/fate/shortest-sub
+++ b/tests/ref/fate/shortest-sub
@@ -1,4 +1,4 @@
 145b9b48d56f9c966bf41657f7569954 *tests/data/fate/shortest-sub.matroska
 139232 tests/data/fate/shortest-sub.matroska
-d71f5d359ef788ea689415bc1e4a90df *tests/data/fate/shortest-sub.out.framecrc
-stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes:     2591/    26055
+876ac3fa52e467050ab843969d4cf343 *tests/data/fate/shortest-sub.out.framecrc
+stddev:11541.12 PSNR: 15.08 MAXDIFF:22854 bytes:     2591/    23735




More information about the ffmpeg-cvslog mailing list