[FFmpeg-devel] [PATCH 17/28] ffmpeg: move the mux queue into muxer private data

Andreas Rheinhardt andreas.rheinhardt at outlook.com
Thu Jan 13 12:50:48 EET 2022


Anton Khirnov:
> The muxing queue currently lives in OutputStream, which is a very large
> struct storing the state for both encoding and muxing. The muxing queue
> is only used by the code in ffmpeg_mux, so it makes sense to restrict it
> to that file.
> 
> This makes the first step towards reducing the scope of OutputStream.
> ---
>  fftools/ffmpeg.c     |  9 -----
>  fftools/ffmpeg.h     |  9 -----
>  fftools/ffmpeg_mux.c | 91 ++++++++++++++++++++++++++++++++++++--------
>  fftools/ffmpeg_opt.c |  6 ---
>  4 files changed, 76 insertions(+), 39 deletions(-)
> 
> diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
> index 6c774e9615..c1bb3926c4 100644
> --- a/fftools/ffmpeg.c
> +++ b/fftools/ffmpeg.c
> @@ -595,15 +595,6 @@ static void ffmpeg_cleanup(int ret)
>          avcodec_free_context(&ost->enc_ctx);
>          avcodec_parameters_free(&ost->ref_par);
>  
> -        if (ost->muxing_queue) {
> -            while (av_fifo_size(ost->muxing_queue)) {
> -                AVPacket *pkt;
> -                av_fifo_generic_read(ost->muxing_queue, &pkt, sizeof(pkt), NULL);
> -                av_packet_free(&pkt);
> -            }
> -            av_fifo_freep(&ost->muxing_queue);
> -        }
> -
>          av_freep(&output_streams[i]);
>      }
>  #if HAVE_THREADS
> diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
> index e828f71dc0..28df1b179f 100644
> --- a/fftools/ffmpeg.h
> +++ b/fftools/ffmpeg.h
> @@ -555,15 +555,6 @@ typedef struct OutputStream {
>  
>      int max_muxing_queue_size;
>  
> -    /* the packets are buffered here until the muxer is ready to be initialized */
> -    AVFifoBuffer *muxing_queue;
> -
> -    /*
> -     * The size of the AVPackets' buffers in queue.
> -     * Updated when a packet is either pushed or pulled from the queue.
> -     */
> -    size_t muxing_queue_data_size;
> -
>      /* Threshold after which max_muxing_queue_size will be in effect */
>      size_t muxing_queue_data_threshold;
>  
> diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
> index f4d76e1533..f03202bbb7 100644
> --- a/fftools/ffmpeg_mux.c
> +++ b/fftools/ffmpeg_mux.c
> @@ -32,7 +32,20 @@
>  #include "libavformat/avformat.h"
>  #include "libavformat/avio.h"
>  
> +typedef struct MuxStream {
> +    /* the packets are buffered here until the muxer is ready to be initialized */
> +    AVFifoBuffer *muxing_queue;
> +
> +    /*
> +     * The size of the AVPackets' buffers in queue.
> +     * Updated when a packet is either pushed or pulled from the queue.
> +     */
> +    size_t muxing_queue_data_size;
> +} MuxStream;
> +
>  struct Muxer {
> +    MuxStream *streams;
> +
>      /* filesize limit expressed in bytes */
>      int64_t limit_filesize;
>      int64_t final_filesize;
> @@ -55,6 +68,7 @@ void of_write_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost,
>  {
>      AVFormatContext *s = of->ctx;
>      AVStream *st = ost->st;
> +    MuxStream *ms = &of->mux->streams[st->index];
>      int ret;
>  
>      /*
> @@ -76,10 +90,10 @@ void of_write_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost,
>      if (!of->mux->header_written) {
>          AVPacket *tmp_pkt;
>          /* the muxer is not initialized yet, buffer the packet */
> -        if (!av_fifo_space(ost->muxing_queue)) {
> -            size_t cur_size = av_fifo_size(ost->muxing_queue);
> +        if (!av_fifo_space(ms->muxing_queue)) {
> +            size_t cur_size = av_fifo_size(ms->muxing_queue);
>              unsigned int are_we_over_size =
> -                (ost->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold;
> +                (ms->muxing_queue_data_size + pkt->size) > ost->muxing_queue_data_threshold;
>              size_t limit    = are_we_over_size ? ost->max_muxing_queue_size : INT_MAX;
>              size_t new_size = FFMIN(2 * cur_size, limit);
>  
> @@ -89,7 +103,7 @@ void of_write_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost,
>                         ost->file_index, ost->st->index);
>                  exit_program(1);
>              }
> -            ret = av_fifo_realloc2(ost->muxing_queue, new_size);
> +            ret = av_fifo_realloc2(ms->muxing_queue, new_size);
>              if (ret < 0)
>                  exit_program(1);
>          }
> @@ -100,8 +114,8 @@ void of_write_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost,
>          if (!tmp_pkt)
>              exit_program(1);
>          av_packet_move_ref(tmp_pkt, pkt);
> -        ost->muxing_queue_data_size += tmp_pkt->size;
> -        av_fifo_generic_write(ost->muxing_queue, &tmp_pkt, sizeof(tmp_pkt), NULL);
> +        ms->muxing_queue_data_size += tmp_pkt->size;
> +        av_fifo_generic_write(ms->muxing_queue, &tmp_pkt, sizeof(tmp_pkt), NULL);
>          return;
>      }
>  
> @@ -283,16 +297,17 @@ int of_check_init(OutputFile *of)
>  
>      /* flush the muxing queues */
>      for (i = 0; i < of->ctx->nb_streams; i++) {
> +        MuxStream     *ms = &of->mux->streams[i];
>          OutputStream *ost = output_streams[of->ost_index + i];
>  
>          /* try to improve muxing time_base (only possible if nothing has been written yet) */
> -        if (!av_fifo_size(ost->muxing_queue))
> +        if (!av_fifo_size(ms->muxing_queue))
>              ost->mux_timebase = ost->st->time_base;
>  
> -        while (av_fifo_size(ost->muxing_queue)) {
> +        while (av_fifo_size(ms->muxing_queue)) {
>              AVPacket *pkt;
> -            av_fifo_generic_read(ost->muxing_queue, &pkt, sizeof(pkt), NULL);
> -            ost->muxing_queue_data_size -= pkt->size;
> +            av_fifo_generic_read(ms->muxing_queue, &pkt, sizeof(pkt), NULL);
> +            ms->muxing_queue_data_size -= pkt->size;
>              of_write_packet(of, pkt, ost, 1);
>              av_packet_free(&pkt);
>          }
> @@ -333,6 +348,31 @@ int of_write_trailer(OutputFile *of)
>      return 0;
>  }
>  
> +static void mux_free(Muxer **pmux, int nb_streams)
> +{
> +    Muxer *mux = *pmux;
> +
> +    if (!mux)
> +        return;
> +
> +    for (int i = 0; i < nb_streams; i++) {
> +        MuxStream *ms = &mux->streams[i];
> +
> +        if (!ms->muxing_queue)
> +            continue;
> +
> +        while (av_fifo_size(ms->muxing_queue)) {
> +            AVPacket *pkt;
> +            av_fifo_generic_read(ms->muxing_queue, &pkt, sizeof(pkt), NULL);
> +            av_packet_free(&pkt);
> +        }
> +        av_fifo_freep(&ms->muxing_queue);
> +    }
> +    av_freep(&mux->streams);
> +
> +    av_freep(pmux);
> +}
> +
>  void of_close(OutputFile **pof)
>  {
>      OutputFile *of = *pof;
> @@ -342,25 +382,42 @@ void of_close(OutputFile **pof)
>          return;
>  
>      s = of->ctx;
> +
> +    mux_free(&of->mux, s ? s->nb_streams : 0);
> +
>      if (s && s->oformat && !(s->oformat->flags & AVFMT_NOFILE))
>          avio_closep(&s->pb);
>      avformat_free_context(s);
>      av_dict_free(&of->opts);
>  
> -    av_freep(&of->mux);
> -
>      av_freep(pof);
>  }
>  
>  int of_muxer_init(OutputFile *of, int64_t limit_filesize)
>  {
>      Muxer *mux = av_mallocz(sizeof(*mux));
> +    int ret = 0;
>  
>      if (!mux)
>          return AVERROR(ENOMEM);
>  
> +    mux->streams = av_calloc(of->ctx->nb_streams, sizeof(*mux->streams));
> +    if (!mux->streams) {
> +        av_freep(&mux);
> +        return AVERROR(ENOMEM);
> +    }
> +
>      of->mux  = mux;
>  
> +    for (int i = 0; i < of->ctx->nb_streams; i++) {
> +        MuxStream *ms = &mux->streams[i];
> +        ms->muxing_queue = av_fifo_alloc(8 * sizeof(AVPacket));
> +        if (!ms->muxing_queue) {
> +            ret = AVERROR(ENOMEM);
> +            goto fail;
> +        }
> +    }
> +
>      mux->limit_filesize = limit_filesize;
>  
>      if (strcmp(of->format->name, "rtp"))
> @@ -368,12 +425,16 @@ int of_muxer_init(OutputFile *of, int64_t limit_filesize)
>  
>      /* write the header for files with no streams */
>      if (of->format->flags & AVFMT_NOSTREAMS && of->ctx->nb_streams == 0) {
> -        int ret = of_check_init(of);
> +        ret = of_check_init(of);
>          if (ret < 0)
> -            return ret;
> +            goto fail;
>      }
>  
> -    return 0;
> +fail:
> +    if (ret < 0)
> +        mux_free(&of->mux, of->ctx->nb_streams);
> +
> +    return ret;
>  }
>  
>  int of_finished(OutputFile *of)
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index ed3fd818d0..c7d1d21a37 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -1613,8 +1613,6 @@ static OutputStream *new_output_stream(OptionsContext *o, AVFormatContext *oc, e
>      ost->max_muxing_queue_size = FFMIN(ost->max_muxing_queue_size, INT_MAX / sizeof(ost->pkt));
>      ost->max_muxing_queue_size *= sizeof(ost->pkt);
>  
> -    ost->muxing_queue_data_size = 0;
> -
>      ost->muxing_queue_data_threshold = 50*1024*1024;
>      MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, ost->muxing_queue_data_threshold, oc, st);
>  
> @@ -1638,10 +1636,6 @@ static OutputStream *new_output_stream(OptionsContext *o, AVFormatContext *oc, e
>      }
>      ost->last_mux_dts = AV_NOPTS_VALUE;
>  
> -    ost->muxing_queue = av_fifo_alloc(8 * sizeof(AVPacket));
> -    if (!ost->muxing_queue)
> -        exit_program(1);
> -
>      return ost;
>  }
>  
> 

My objections to adding a separately allocated muxing context and to
this MuxStream have not changed. Both incur unnecessary allocations and
indirections and (in case of the latter) loops; the latter is also very
unnatural. The patch here actually shows it: You only use the muxer
context to get the MuxStream context corresponding to the OutputStream
you are interested in:

>      for (i = 0; i < of->ctx->nb_streams; i++) {
> +        MuxStream     *ms = &of->mux->streams[i];
>          OutputStream *ost = output_streams[of->ost_index + i];

>      AVStream *st = ost->st;
> +    MuxStream *ms = &of->mux->streams[st->index];
>      int ret;

Your aim of making sure what code can use/modify what parts can also be
fulfilled by comments.

- Andreas


More information about the ffmpeg-devel mailing list