[FFmpeg-devel] [PATCH] libavformat: Add FIFO pseudo-muxer

Nicolas George george at nsup.org
Tue Jun 28 15:42:23 CEST 2016


Le primidi 11 messidor, an CCXXIV, sebechlebskyjan at gmail.com a écrit :
> From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
> 
> FIFO pseudo-muxer allows to separate decoder from the
> actual output by using first-in-first-out queue and
> running actual muxer asynchronously in separate thread.
> 
> It can be configured to attempt transparent recovery
> of output on failure.
> 
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
> ---
>  configure                |   1 +
>  doc/muxers.texi          |  77 ++++++
>  libavformat/Makefile     |   1 +
>  libavformat/allformats.c |   1 +
>  libavformat/fifo.c       | 657 +++++++++++++++++++++++++++++++++++++++++++++++
>  5 files changed, 737 insertions(+)
>  create mode 100644 libavformat/fifo.c
> 
> diff --git a/configure b/configure
> index 007c953..eacda09 100755
> --- a/configure
> +++ b/configure
> @@ -2826,6 +2826,7 @@ dv_muxer_select="dvprofile"
>  dxa_demuxer_select="riffdec"
>  eac3_demuxer_select="ac3_parser"
>  f4v_muxer_select="mov_muxer"
> +fifo_muxer_deps="pthreads"
>  flac_demuxer_select="flac_parser"
>  hds_muxer_select="flv_muxer"
>  hls_muxer_select="mpegts_muxer"
> diff --git a/doc/muxers.texi b/doc/muxers.texi
> index c2ca0ba..e545bc7 100644
> --- a/doc/muxers.texi
> +++ b/doc/muxers.texi
> @@ -1408,6 +1408,83 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
>  
>  @end table
>  
> + at section fifo
> +
> +The fifo pseudo-muxer allows to separate encoding from any other muxer
> +by using first-in-first-out queue and running the actual muxer in separate
> +thread. This is especially useful in combination with tee muxer
> +and output to several destinations with different reliability/writing speed/latency.
> +
> +The fifo muxer can operate in regular or fully non-blocking mode. This determines

Not true: the current code is non-blocking for packets, but not for the
trailer. It can not be really non-blocking but as is it can block
indefinitely.

> +the behaviour in case the queue fills up. In regular mode the encoding is blocked
> +until muxer processes some of the packets from queue, in non-blocking mode the packets
> +are thrown away rather than blocking the encoder (this might be useful in real-time
> +streaming scenario).

Nit: lines <80 chars.

> +
> + at table @option
> +

> + at item fifo_format
> +Specify the format name. Useful if it cannot be guessed from the
> +output name suffix.

This option is defined but never used.

> +
> + at item queue_size
> +Specify size of the queue (number of packets)
> +

> + at item format_opts
> +Specify format options for the underlying muxer. Muxer options can be specified
> +as a list of @var{key}=@var{value} pairs separated by ':'.

Yay, another trip to escaping hell!

> +

> + at item block_on_overflow 0|1

Nit: "0|1" is unusual.

> +If set to 0, fully non-blocking mode will be used and in case the queue
> +fills up, packets will be dropped. By default this option is set to 1,
> +so in case of queue overflow the encoder will be block until muxer
> +processes some of the packets.

IMHO, this should use AVFMT_FLAG_NONBLOCK.

> +
> + at item attempt_recovery 0|1
> +If failure happens, attempt to recover the output. This is especially useful
> +when used with network output, allows to restart streaming transparently.
> +By default this option is turned off.
> +
> + at item max_recovery_attempts
> +Sets maximal number of successive unsucessfull recovery attempts after which
> +the output fails permanently. Unlimited if set to zero.
> +
> + at item recovery_wait_time
> +Waiting time before the next recovery attempt after previous unsuccessfull
> +recovery attempt.
> +
> + at item recovery_wait_streamtime 0|1
> +If set to 0 (default), the real time is used when waiting for the recovery attempt
> +(i.e. the recovery will be attempted after at least recovery_wait_time seconds).
> +If set to 1, the time of the processed stream is taken into account instead
> +(i.e. the recovery will be attempted after at least recovery_wait_time seconds
> +of the stream is ommited).
> +
> + at item recover_any_error 0|1
> +If set to 1, recovery will be attempted regardless of type of the error causing
> +the failure (by default, in case of certain errors the recovery is not attempted
> +even when attempt_recovery is on).
> +
> + at item restart_with_keyframe 0|1
> +Specify whether to wait for the keyframe after recovering from
> +queue overflow or failure.
> +
> + at end table
> +
> + at subsection Examples
> +
> + at itemize
> +
> + at item
> +Stream something to rtmp server using non-blocking mode and recover automatically
> +in case failure happens (for example the network becomes unavailable for a moment).
> + at example
> +ffmpeg -re -i ... -c:v libx264 -c:a mp2 -f fifo -fifo_format flv -map 0:v -map 0:a
> +  -block_on_overflow 0 -attempt_recovery 1 rtmp://example.com/live/stream_name
> + at end example
> +
> + at end itemize
> +
>  @section tee
>  
>  The tee muxer can be used to write the same data to several files or any
> diff --git a/libavformat/Makefile b/libavformat/Makefile
> index c49f9de..42fb9be 100644
> --- a/libavformat/Makefile
> +++ b/libavformat/Makefile
> @@ -162,6 +162,7 @@ OBJS-$(CONFIG_FFM_DEMUXER)               += ffmdec.o
>  OBJS-$(CONFIG_FFM_MUXER)                 += ffmenc.o
>  OBJS-$(CONFIG_FFMETADATA_DEMUXER)        += ffmetadec.o
>  OBJS-$(CONFIG_FFMETADATA_MUXER)          += ffmetaenc.o
> +OBJS-$(CONFIG_FIFO_MUXER)                += fifo.o
>  OBJS-$(CONFIG_FILMSTRIP_DEMUXER)         += filmstripdec.o
>  OBJS-$(CONFIG_FILMSTRIP_MUXER)           += filmstripenc.o
>  OBJS-$(CONFIG_FLAC_DEMUXER)              += flacdec.o rawdec.o \
> diff --git a/libavformat/allformats.c b/libavformat/allformats.c
> index d490cc4..b21a3de 100644
> --- a/libavformat/allformats.c
> +++ b/libavformat/allformats.c
> @@ -123,6 +123,7 @@ void av_register_all(void)
>      REGISTER_MUXER   (F4V,              f4v);
>      REGISTER_MUXDEMUX(FFM,              ffm);
>      REGISTER_MUXDEMUX(FFMETADATA,       ffmetadata);
> +    REGISTER_MUXER   (FIFO,             fifo);
>      REGISTER_MUXDEMUX(FILMSTRIP,        filmstrip);
>      REGISTER_MUXDEMUX(FLAC,             flac);
>      REGISTER_DEMUXER (FLIC,             flic);
> diff --git a/libavformat/fifo.c b/libavformat/fifo.c
> new file mode 100644
> index 0000000..37e9f82
> --- /dev/null
> +++ b/libavformat/fifo.c
> @@ -0,0 +1,657 @@
> +/*
> + * FIFO pseudo-muxer
> + * Copyright (c) 2016 Jan Sebechlebsky
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public License
> + * as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
> + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include "libavutil/opt.h"
> +#include "libavutil/time.h"
> +#include "libavutil/threadmessage.h"
> +#include "avformat.h"
> +#include "internal.h"
> +#include "pthread.h"
> +
> +#define FIFO_DEFAULT_QUEUE_SIZE            60
> +#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 16
> +#define FIFO_DEFAULT_RECOVERY_WAIT_TIME    1000000 // 1 second
> +
> +typedef struct FifoContext {
> +    const AVClass *class;
> +    AVFormatContext *avf;
> +
> +    char *format;
> +    AVOutputFormat *oformat;
> +
> +    char *format_options_str;
> +    AVDictionary *format_options;
> +

> +    /* Whether to block in case the queue is full. */
> +    uint8_t block_on_overflow;

Nit: to save memory, if convenient, put smaller fields after larger ones.

> +
> +    /* Whether to wait for keyframe when recovering
> +     * from failure or queue overflow */
> +    uint8_t restart_with_keyframe;
> +
> +    /* Whether to attempt recovery from failure */
> +    uint8_t attempt_recovery;
> +
> +    /* Maximal number of unsuccessful successive recovery attempts */
> +    int max_recovery_attempts;
> +
> +    /* Number of current recovery process
> +     * Value > 0 means we are in recovery process */
> +    int recovery_nr;
> +
> +    /* Time to wait before next recovery attempt
> +     * This can refer to the time in processed stream,

> +     * or real time.*/

Nit: space.

> +    int64_t recovery_wait_time;
> +
> +    /* If >0 stream time will be used when waiting
> +     * for the recovery attempt instead of real time */
> +    uint8_t recovery_wait_streamtime;
> +
> +    /* If >0 recovery will be attempted regardless of error code
> +     * (except AVERROR_EXIT, so exit request is never ignored)*/
> +    uint8_t recover_any_error;
> +
> +    /* Timestamp of last failure.
> +     * This is either pts in case stream time is used,
> +     * or microseconds as returned by av_getttime_relative()*/
> +    int64_t last_recovery_ts;
> +
> +    int queue_size;
> +    AVThreadMessageQueue *queue;
> +    pthread_t writer_thread;
> +

> +    /* Value > 0 signalizes the previous write_header call was successful

Nit: "signalize" looks wrong.

> +     * so finalization by calling write_trailer and ff_io_close must be done
> +     * before exiting / reinitialization of underlying muxer */
> +    uint8_t header_written;

I had to look twice to be sure it was only used in the thread.

Suggestions:

- make sure function names describe exactly where they can be called:
  fifo_thread_description() for functions called in the thread,
  fifo_description() for functions called from the main thread,
  description() for utility functions that are used in both but do nothing
  dangerous.

- move fields that are only useful in the thread to a different struct,
  allocate it on the stack in the main function of the thread;

- try to make the FifoContext pointer const in the thread.

> +
> +    /* If > 0 all frames will be dropped until keyframe is received */
> +    uint8_t drop_until_keyframe;
> +
> +    /* Return value of last write_trailer_call */
> +    int write_trailer_ret;
> +
> +    pthread_mutex_t overflow_flag_lock;
> +    /* Value > 0 signalizes queue overflow */
> +    uint8_t overflow_flag;
> +} FifoContext;
> +
> +typedef enum FifoMessageType {
> +    FIFO_WRITE_HEADER,
> +    FIFO_WRITE_PACKET,
> +    FIFO_FLUSH_OUTPUT
> +} FifoMessageType;
> +
> +typedef struct FifoMessage {
> +    FifoMessageType type;
> +    AVPacket pkt;
> +} FifoMessage;
> +
> +static int fifo_thread_write_header(AVFormatContext *avf)
> +{

> +    int ret, i;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;

Nit: the private context is usually the first variable.

> +    AVDictionary *format_options = NULL;
> +
> +    ret = av_dict_copy(&format_options, fifo->format_options, 0);
> +    if (ret < 0)
> +        return ret;
> +

> +    if (!(avf2->oformat->flags & AVFMT_NOFILE)) {
> +        ret = avf2->io_open(avf2, &avf2->pb, avf->filename, AVIO_FLAG_WRITE, &format_options);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n",
> +                   avf->filename, av_err2str(ret));
> +            goto end;
> +        }
> +    }

Why do we not have a utility function for that?

> +
> +    for (i = 0;i < avf2->nb_streams; i++)
> +        avf2->streams[i]->cur_dts = 0;
> +
> +    ret = avformat_write_header(avf2, &fifo->format_options);
> +    if (!ret)
> +        fifo->header_written = 1;
> +
> +    // Check for options unrecognized by underlying muxer
> +    if (format_options) {
> +        AVDictionaryEntry *entry = NULL;
> +        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
> +            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
> +        ret = AVERROR(EINVAL);
> +    }
> +
> +end:
> +    av_dict_free(&format_options);
> +    return ret;
> +}
> +
> +static int fifo_thread_flush_output(AVFormatContext *avf)
> +{
> +    int ret;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +
> +    ret = av_write_frame(avf2, NULL);
> +    return ret;
> +}
> +
> +static int fifo_thread_write_packet(AVFormatContext *avf, AVPacket *pkt)
> +{
> +    int ret, s_idx;
> +    AVRational src_tb, dst_tb;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +
> +    if (fifo->drop_until_keyframe) {
> +        if (pkt->flags & AV_PKT_FLAG_KEY) {
> +            fifo->drop_until_keyframe = 0;
> +            av_log(avf, AV_LOG_INFO, "Keyframe received, recovering...\n");
> +        } else {

> +            av_log(avf, AV_LOG_INFO, "Dropping non-keyframe packet\n");

Nit: VERBOSE?

> +            av_packet_unref(pkt);
> +            return 0;
> +        }
> +    }
> +

> +    s_idx = pkt->stream_index;
> +    src_tb = avf->streams[s_idx]->time_base;
> +    dst_tb = avf2->streams[s_idx]->time_base;
> +
> +    pkt->pts = av_rescale_q(pkt->pts, src_tb, dst_tb);
> +    pkt->dts = av_rescale_q(pkt->dts, src_tb, dst_tb);
> +    pkt->duration = av_rescale_q(pkt->duration, src_tb, dst_tb);

This looks suspicious.

For starters, it does not handle the NOPTS value, but that is an easy fix.

More worryingly, it looks that the application does not see the time base
selected by the real muxer. It could be a problem.

> +
> +    ret = av_write_frame(avf2, pkt);
> +    if (ret >= 0)
> +        av_packet_unref(pkt);
> +    return ret;
> +}
> +
> +static int fifo_thread_write_trailer(AVFormatContext *avf)
> +{
> +    int ret;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2 = fifo->avf;
> +
> +    if (!fifo->header_written)
> +        return 0;
> +
> +    ret = av_write_trailer(avf2);
> +    ff_format_io_close(avf2, &avf2->pb);
> +
> +    return ret;
> +}
> +
> +static int fifo_dispatch_message(AVFormatContext *avf, FifoMessage *msg)
> +{
> +    int ret;
> +
> +    switch (msg->type) {
> +        case FIFO_WRITE_HEADER:
> +            ret = fifo_thread_write_header(avf);
> +            break;
> +        case FIFO_WRITE_PACKET:
> +            ret = fifo_thread_write_packet(avf, &msg->pkt);
> +            break;
> +        case FIFO_FLUSH_OUTPUT:
> +            ret = fifo_thread_flush_output(avf);
> +            break;
> +    }
> +
> +    return ret;
> +}
> +
> +static int is_recoverable(FifoContext *fifo, int err_no) {
> +    if (!fifo->attempt_recovery)
> +        return 0;
> +
> +    if (fifo->recover_any_error)
> +        return err_no != AVERROR_EXIT;
> +
> +    switch (err_no) {
> +        case AVERROR(EINVAL):
> +        case AVERROR(ENOSYS):
> +        case AVERROR_EOF:
> +        case AVERROR_EXIT:
> +        case AVERROR_PATCHWELCOME:
> +            return 0;
> +        default:
> +            return 1;
> +    }
> +}
> +
> +static int fifo_attempt_recovery(AVFormatContext *avf, FifoMessage *msg, int err_no)
> +{
> +    int ret;
> +    FifoContext *fifo = avf->priv_data;
> +    AVPacket *pkt = &msg->pkt;
> +    int64_t time_since_recovery;
> +
> +    if (!is_recoverable(fifo, err_no)) {
> +        ret = err_no;
> +        goto fail;
> +    }
> +
> +    if (fifo->header_written) {
> +        fifo->write_trailer_ret = fifo_thread_write_trailer(avf);
> +        fifo->header_written = 0;
> +    }
> +
> +    if (!fifo->recovery_nr)
> +        fifo->last_recovery_ts = 0;
> +    else {
> +        if (fifo->recovery_wait_streamtime) {
> +            AVRational tb = avf->streams[pkt->stream_index]->time_base;

> +            time_since_recovery = av_rescale_q(pkt->pts - fifo->last_recovery_ts,
> +                                           tb, AV_TIME_BASE_Q);

Nit: indentation.

> +        } else {
> +            time_since_recovery = av_gettime_relative() - fifo->last_recovery_ts;
> +        }
> +
> +        if (time_since_recovery < fifo->recovery_wait_time)
> +            return AVERROR(EAGAIN);
> +    }
> +
> +    fifo->recovery_nr++;
> +
> +    if (fifo->max_recovery_attempts)

> +        av_log(avf, AV_LOG_INFO, "Recovery attempt #%d/%d\n",
> +            fifo->recovery_nr, fifo->max_recovery_attempts);

Nit: indentation.

> +    else
> +        av_log(avf, AV_LOG_INFO, "Recovery attempt #%d\n",
> +               fifo->recovery_nr);
> +
> +
> +    if (msg->type != FIFO_WRITE_HEADER) {
> +        ret = fifo_thread_write_header(avf);
> +        if (ret < 0)
> +            goto recovery_fail;
> +    }
> +
> +    if (fifo->restart_with_keyframe && !fifo->block_on_overflow)
> +        fifo->drop_until_keyframe = 1;
> +
> +    ret = fifo_dispatch_message(avf, msg);
> +    if (ret < 0) {
> +        if (is_recoverable(fifo, ret))
> +            goto recovery_fail;
> +        else
> +            goto fail;
> +    } else {
> +        av_log(avf, AV_LOG_INFO, "Recovery successful\n");
> +        fifo->recovery_nr = 0;
> +    }
> +
> +    return 0;
> +

> +recovery_fail:
> +    av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
> +           av_err2str(ret));
> +
> +    fifo->last_recovery_ts = fifo->recovery_wait_streamtime ?
> +                             pkt->pts : av_gettime_relative();
> +
> +    if (fifo->max_recovery_attempts &&
> +        fifo->recovery_nr >= fifo->max_recovery_attempts) {
> +        av_log(avf, AV_LOG_ERROR,
> +               "Maximal number of %d recovery attempts reached.\n",
> +               fifo->max_recovery_attempts);
> +    } else
> +        return AVERROR(EAGAIN);
> +fail:
> +    if (msg->type == FIFO_WRITE_PACKET)
> +        av_packet_unref(&msg->pkt);
> +    return ret;

The use of goto in this function seems to be going away from the
simple-cleanup case towards the this-is-why-goto-is-evil area.

> +}
> +
> +static int fifo_recover(AVFormatContext *avf, FifoMessage *msg, int err_no)
> +{
> +    int ret;
> +    FifoContext *fifo = avf->priv_data;
> +
> +    do {
> +        if (!fifo->recovery_wait_streamtime && fifo->recovery_nr > 0) {
> +            int64_t time_since_recovery = av_gettime_relative() - fifo->last_recovery_ts;
> +            int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
> +            if (time_to_wait)
> +                av_usleep(FFMIN(10000, time_to_wait));
> +        }
> +
> +        ret = fifo_attempt_recovery(avf, msg, err_no);
> +    } while(ret == AVERROR(EAGAIN) && fifo->block_on_overflow);
> +
> +    if (ret == AVERROR(EAGAIN) && !fifo->block_on_overflow) {
> +        if (msg->type == FIFO_WRITE_PACKET)
> +            av_packet_unref(&msg->pkt);
> +        ret = 0;
> +    }
> +
> +    return ret;
> +}
> +
> +static void *fifo_writer_thread(void *data)
> +{
> +    int ret;
> +    FifoMessage msg;
> +    AVFormatContext *avf = data;
> +    FifoContext *fifo = avf->priv_data;
> +    AVThreadMessageQueue *queue = fifo->queue;
> +
> +    while (1) {
> +        uint8_t just_flushed = 0;
> +

> +        /* Check and solve overflow */
> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
> +        if (fifo->overflow_flag) {
> +            av_thread_message_flush(queue);
> +            if (fifo->restart_with_keyframe)
> +                fifo->drop_until_keyframe = 1;
> +            fifo->overflow_flag = 0;
> +            just_flushed = 1;
> +        }
> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);

I think the communication logic here needs an extensive comment here.

And I am a bit worried about the extra lock: mixing several communication
mechanisms between threads is a recipe for headache.

> +
> +        if (just_flushed)
> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
> +
> +        ret = av_thread_message_queue_recv(queue, &msg, 0);
> +        if (ret < 0) {
> +            av_thread_message_queue_set_err_send(queue, ret);
> +            break;
> +        }
> +
> +        if (!fifo->recovery_nr)
> +            ret = fifo_dispatch_message(avf, &msg);
> +
> +        if (ret < 0 || fifo->recovery_nr > 0) {
> +            int rec_ret = fifo_recover(avf, &msg, ret);
> +            if (rec_ret < 0) {
> +                av_thread_message_queue_set_err_send(queue, rec_ret);
> +                break;
> +            }
> +        }
> +    }
> +
> +    fifo->write_trailer_ret = fifo_thread_write_trailer(avf);
> +
> +    return NULL;
> +}
> +
> +static int fifo_mux_init(AVFormatContext *avf)
> +{
> +    int ret = 0, i;
> +    FifoContext *fifo = avf->priv_data;
> +    AVFormatContext *avf2;
> +
> +    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    fifo->avf = avf2;
> +

> +    avf2->interrupt_callback = avf->interrupt_callback;

This is wrong. First, we do not know that the application-provided callback
is thread-safe. Second, the thread needs an interrupt_callback of its own to
interrupt the actual I/O when asked to abort.

> +    avf2->max_delay = avf->max_delay;
> +    ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
> +    if (ret < 0)
> +        return ret;
> +    avf2->opaque = avf->opaque;

> +    avf2->io_close = avf->io_close;
> +    avf2->io_open = avf->io_open;

This could be dangerous too, I am not sure these functions are guaranteed to
be thread-safe. It needs at least documentation.

> +    avf2->flags = avf->flags;
> +
> +    for (i = 0; i < avf->nb_streams; ++i) {
> +        AVStream *st2, *st;
> +        AVCodecParameters *ipar, *opar;
> +
> +        st = avf->streams[i];
> +        st2 = avformat_new_stream(avf2, NULL);
> +        if (!st)
> +            return AVERROR(ENOMEM);
> +
> +        ipar = avf->streams[i]->codecpar;
> +        opar = st2->codecpar;
> +
> +        ret = avcodec_parameters_copy(opar, ipar);
> +        if (ret < 0)
> +            return ret;
> +        st2->id = st->id;

> +        st2->r_frame_rate        = st->r_frame_rate;
> +        st2->time_base           = st->time_base;
> +        st2->start_time          = st->start_time;
> +        st2->duration            = st->duration;
> +        st2->nb_frames           = st->nb_frames;
> +        st2->disposition         = st->disposition;
> +        st2->sample_aspect_ratio = st->sample_aspect_ratio;
> +        st2->avg_frame_rate      = st->avg_frame_rate;

Why do we not have an utility function for that (bis)?

> +
> +        ret = av_dict_copy(&st2->metadata, st->metadata, 0);
> +        if (ret < 0)
> +            return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +static void fifo_message_free(void *msg)
> +{
> +    FifoMessage *fifo_msg = msg;
> +

> +    if(fifo_msg->type == FIFO_WRITE_PACKET)

Nit: space.

> +        av_packet_unref(&fifo_msg->pkt);
> +}
> +
> +static int fifo_init(AVFormatContext *avf)
> +{
> +    int ret = 0;
> +    FifoContext *fifo = avf->priv_data;
> +
> +    if (fifo->recovery_wait_streamtime && fifo->block_on_overflow) {
> +        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be used only when "

> +               "--block_on_overflow is turned off\n");

Double-dashes are not used in FFmpeg. And single dashes are an artifact of
command-line tools and should not be used in the libraries.

> +        return AVERROR(EINVAL);
> +    }
> +
> +    if (fifo->format_options_str) {
> +        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
> +                                   "=", ":", 0);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
> +                   fifo->format_options_str);
> +            return ret;
> +        }
> +    }
> +
> +    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
> +    if (!fifo->oformat) {
> +        ret = AVERROR_MUXER_NOT_FOUND;
> +        return ret;
> +    }
> +
> +    ret = fifo_mux_init(avf);
> +    if (ret < 0)
> +        return ret;
> +
> +    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
> +                                        sizeof(FifoMessage));
> +    if (!ret)
> +        av_thread_message_queue_set_free_func(fifo->queue, fifo_message_free);
> +
> +    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
> +    if (ret < 0)
> +        return AVERROR(ret);
> +
> +    return 0;
> +}
> +
> +static int fifo_write_header(AVFormatContext *avf)
> +{
> +    int ret;
> +    FifoContext * fifo = avf->priv_data;

> +    FifoMessage message = {.type = FIFO_WRITE_HEADER};
> +
> +    ret = av_thread_message_queue_send(fifo->queue, &message, 0);
> +    if (ret < 0)
> +        return ret;

This message is sent only once. Maybe take it out of the loop to make things
simpler.

> +
> +    ret = pthread_create(&fifo->writer_thread, NULL, fifo_writer_thread, avf);
> +    if (ret) {
> +        av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
> +               av_err2str(AVERROR(ret)));
> +        ret = AVERROR(ret);
> +    }
> +
> +    return 0;
> +}
> +
> +static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
> +{
> +    int ret = 0;
> +    FifoContext *fifo = avf->priv_data;
> +    FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
> +
> +    if (pkt) {

> +        memset(&msg.pkt, 0,sizeof(AVPacket));

av_init_packet()?

> +        ret = av_packet_ref(&msg.pkt,pkt);
> +        if (ret < 0)
> +            return ret;
> +    }
> +

> +    ret = av_thread_message_queue_send(fifo->queue, &msg,
> +                                       fifo->block_on_overflow ?
> +                                       0 : AV_THREAD_MESSAGE_NONBLOCK);
> +    if (ret == AVERROR(EAGAIN)) {
> +        uint8_t overflow_set = 0;
> +
> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
> +        if (!fifo->overflow_flag)
> +            fifo->overflow_flag = overflow_set = 1;
> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
> +
> +        if (overflow_set)
> +            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
> +        ret = 0;
> +        goto fail;
> +    }else if (ret < 0) {
> +        goto fail;
> +    }

I think the logic here needs another extensive comment.

> +
> +    return ret;
> +fail:
> +    if (pkt)
> +        av_packet_unref(&msg.pkt);
> +    return ret;
> +}
> +
> +static int fifo_write_trailer(AVFormatContext *avf)
> +{
> +    int ret = 0;
> +    FifoContext *fifo= avf->priv_data;
> +
> +    av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
> +
> +    ret = pthread_join( fifo->writer_thread, NULL);
> +    if (ret < 0) {
> +        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
> +               av_err2str(AVERROR(ret)));
> +        return AVERROR(ret);
> +    }
> +
> +    ret = fifo->write_trailer_ret;
> +    return ret;
> +}
> +
> +static void fifo_deinit(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +
> +    if (fifo->format_options)
> +        av_dict_free(&fifo->format_options);
> +
> +    if (avf)
> +        avformat_free_context(fifo->avf);
> +
> +    if (fifo->queue) {
> +        av_thread_message_flush(fifo->queue);
> +        av_thread_message_queue_free(&fifo->queue);
> +    }
> +
> +    pthread_mutex_destroy(&fifo->overflow_flag_lock);
> +}
> +
> +#define OFFSET(x) offsetof(FifoContext, x)
> +static const AVOption options[] = {
> +        {"fifo_format", "Target muxer", OFFSET(format),
> +         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"queue_size", "Size of fifo queue", OFFSET(queue_size),
> +         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
> +         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"block_on_overflow", "Block output on FIFO queue overflow until queue frees up.", OFFSET(block_on_overflow),
> +         AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
> +         AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
> +        AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
> +         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"recovery_wait_time", "Waiting time between recovery attempts (seconds)", OFFSET(recovery_wait_time),
> +         AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
> +         OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
> +         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
> +
> +        {NULL},
> +};
> +

> +static const AVClass fifo_muxer_class = {
> +    .class_name = "Fifo muxer",
> +    .item_name  = av_default_item_name,
> +    .option = options,
> +    .version    = LIBAVUTIL_VERSION_INT,
> +};
> +
> +AVOutputFormat ff_fifo_muxer = {
> +    .name = "fifo",
> +    .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
> +    .priv_data_size = sizeof(FifoContext),
> +    .init = fifo_init,
> +    .write_header = fifo_write_header,
> +    .write_packet = fifo_write_packet,
> +    .write_trailer = fifo_write_trailer,
> +    .deinit = fifo_deinit,
> +    .priv_class = &fifo_muxer_class,
> +    .flags = AVFMT_NOFILE,
> +};

Nit: some alignment would be nice.

> +
> -- 
> 1.9.1
> 
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: Digital signature
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20160628/5288e5b8/attachment.sig>


More information about the ffmpeg-devel mailing list