[FFmpeg-devel] [PATCH] Tee muxer improvement (handling slave failure)

Marton Balint cus at passwd.hu
Mon Mar 21 23:37:02 CET 2016


On Mon, 21 Mar 2016, Jan Sebechlebsky wrote:

> Adds per slave option 'onfail' to the tee muxer allowing an output to
> fail, allowing other slaves output to continue.
>
> Certain situations, when it does not make sense to continue are still
> fatal (like allocation errors, or invalid options).
>
> Update of tee muxer documentation.
>
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
> ---
> Solution to GSOC 2016 qualification task.
>
> doc/muxers.texi   |  11 +++-
> libavformat/tee.c | 150 ++++++++++++++++++++++++++++++++++++++++++------------
> 2 files changed, 127 insertions(+), 34 deletions(-)
>
> diff --git a/doc/muxers.texi b/doc/muxers.texi
> index c36c72c..70f2c48 100644
> --- a/doc/muxers.texi
> +++ b/doc/muxers.texi
> @@ -1354,6 +1354,12 @@ output name suffix.
> Specify a list of bitstream filters to apply to the specified
> output.
> 
> + at item onfail
> +Specify behaviour on output failure. This can be set to either 'abort' (which is
> +default) or 'ignore'. 'abort' will cause whole process to fail in case of failure
> +on this slave output. 'ignore' will ignore failure on this output, so other outputs
> +will continue without being affected.
> +

This is misplaced, you put this in the middle of the bsfs documentation.

> It is possible to specify to which streams a given bitstream filter
> applies, by appending a stream specifier to the option separated by
> @code{/}. @var{spec} must be a stream specifier (see @ref{Format
> @@ -1374,10 +1380,11 @@ separated by commas (@code{,}) e.g.: @code{a:0,v}
> @itemize
> @item
> Encode something and both archive it in a WebM file and stream it
> -as MPEG-TS over UDP (the streams need to be explicitly mapped):
> +as MPEG-TS over UDP (the streams need to be explicitly mapped).
> +Continue streaming even if output to file fails:
> @example
> ffmpeg -i ... -c:v libx264 -c:a mp2 -f tee -map 0:v -map 0:a
> -  "archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/"
> +  "[onfail=ignore]archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/"
> @end example

You might add a separate example, so the existing can remain as it is. 
Also it is worth explaining a use case, e.g. the following 
example keeps on streaming to net even if the local disk fills up.

> @item
> diff --git a/libavformat/tee.c b/libavformat/tee.c
> index 1390705..c72bc7d 100644
> --- a/libavformat/tee.c
> +++ b/libavformat/tee.c
> @@ -29,10 +29,20 @@
> 
> #define MAX_SLAVES 16
> 
> +typedef enum {
> +    ON_SLAVE_FAILURE_ABORT  = 1,
> +    ON_SLAVE_FAILURE_IGNORE = 2
> +} SlaveFailurePolicy;
> +
> +#define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
> +
> typedef struct {
>     AVFormatContext *avf;
>     AVBitStreamFilterContext **bsfs; ///< bitstream filters per stream
> 
> +    SlaveFailurePolicy on_fail;
> +    unsigned char is_alive;
> +
>     /** map from input to output streams indexes,
>      * disabled output streams are set to -1 */
>     int *stream_map;
> @@ -41,6 +51,7 @@ typedef struct {
> typedef struct TeeContext {
>     const AVClass *class;
>     unsigned nb_slaves;
> +    unsigned nb_alive;
>     TeeSlave slaves[MAX_SLAVES];
> } TeeContext;
> 
> @@ -135,13 +146,25 @@ end:
>     return ret;
> }
> 
> +static inline int parse_slave_failure_policy_option(const char * opt)

*opt

> +{
> +    if (!opt) {
> +        return DEFAULT_SLAVE_FAILURE_POLICY;
> +    } else if (!av_strcasecmp("abort",opt)) {
> +        return ON_SLAVE_FAILURE_ABORT;
> +    } else if (!av_strcasecmp("ignore",opt)) {
> +        return ON_SLAVE_FAILURE_IGNORE;
> +    }
> +    return 0;
> +}
> +
> static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
> {
>     int i, ret;
>     AVDictionary *options = NULL;
>     AVDictionaryEntry *entry;
>     char *filename;
> -    char *format = NULL, *select = NULL;
> +    char *format = NULL, *select = NULL, *on_fail = NULL;
>     AVFormatContext *avf2 = NULL;
>     AVStream *st, *st2;
>     int stream_count;
> @@ -161,6 +184,17 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
>
>     STEAL_OPTION("f", format);
>     STEAL_OPTION("select", select);
> +    STEAL_OPTION("onfail", on_fail);
> +
> +    tee_slave->on_fail = (SlaveFailurePolicy) parse_slave_failure_policy_option(on_fail);
> +    if (!tee_slave->on_fail) {
> +        av_log(avf, AV_LOG_ERROR,
> +                "Invalid onfail option value, valid options are 'abort' and 'ignore'\n");
> +        ret = AVERROR(EINVAL);
> +        /// Set failure behaviour on abort, so invalid option error will not be ignored
> +        tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
> +        goto end;
> +    }
>
>     ret = avformat_alloc_output_context2(&avf2, NULL, format, filename);
>     if (ret < 0)
> @@ -234,14 +268,14 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
>         if ((ret = avf2->io_open(avf2, &avf2->pb, filename, AVIO_FLAG_WRITE, NULL)) < 0) {
>             av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n",
>                    slave, av_err2str(ret));
> -            goto end;
> +            goto open_failure;
>         }
>     }
>
>     if ((ret = avformat_write_header(avf2, &options)) < 0) {
>         av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
>                slave, av_err2str(ret));
> -        goto end;
> +        goto open_failure;
>     }
>
>     tee_slave->avf = avf2;
> @@ -306,34 +340,49 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
> end:
>     av_free(format);
>     av_free(select);
> +    av_free(on_fail);
>     av_dict_free(&options);
>     av_freep(&tmp_select);
>     return ret;
> +
> +open_failure:
> +    /// Handle situation when slave initialization fails in a way that
> +    /// others may proceed successfully
> +    av_free(tee_slave->stream_map);
> +    avformat_free_context(avf2);
> +    goto end;
> +}

I read along this function, and it seems to me that even without your 
patch there are cases where open_slave leaks avf2. I guess the intention 
was that close_slave will free it, but obviously this can only happen 
after avf2 was assigned to tee_slave->avf.

So you might consider moving the assignment further up the code, this 
may also allow you to use close_slave to free existing resources, so you 
don't have to create a separate exit label (open_failure).

As this fix (avf2 leakage) is separate from your feature patch, you should 
make a patch for that only, and rebase your feature patch on top of that. 
(so you will have to send a patch series next time).

> +
> +static void close_slave(TeeSlave* tee_slave)
> +{
> +    AVFormatContext * avf;
> +    unsigned i;
> +
> +    avf = tee_slave->avf;
> +    for (i=0; i < avf->nb_streams; ++i) {
> +        AVBitStreamFilterContext *bsf_next, *bsf = tee_slave->bsfs[i];
> +        while (bsf) {
> +            bsf_next = bsf->next;
> +            av_bitstream_filter_close(bsf);
> +            bsf = bsf_next;
> +        }
> +    }
> +    av_freep(&tee_slave->stream_map);
> +    av_freep(&tee_slave->bsfs);
> +
> +    ff_format_io_close(avf,&avf->pb);
> +    avformat_free_context(avf);
> +    tee_slave->avf = NULL;
> }

Factorizing code to a separate function is also independent of your 
feature patch, so you should create a separate patch for this as well. 
This kind of separation makes the work of revieweres much simple. So I 
guess your patch series next time will contain now 3 patches, the avf2 
leakage fix, the close_slave factorization, and your feature patch on top 
of them.

> 
> static void close_slaves(AVFormatContext *avf)
> {
>     TeeContext *tee = avf->priv_data;
> -    AVFormatContext *avf2;
> -    unsigned i, j;
> +    unsigned i;
>
>     for (i = 0; i < tee->nb_slaves; i++) {
> -        avf2 = tee->slaves[i].avf;
> -
> -        for (j = 0; j < avf2->nb_streams; j++) {
> -            AVBitStreamFilterContext *bsf_next, *bsf = tee->slaves[i].bsfs[j];
> -            while (bsf) {
> -                bsf_next = bsf->next;
> -                av_bitstream_filter_close(bsf);
> -                bsf = bsf_next;
> -            }
> -        }
> -        av_freep(&tee->slaves[i].stream_map);
> -        av_freep(&tee->slaves[i].bsfs);
> -
> -        ff_format_io_close(avf2, &avf2->pb);
> -        avformat_free_context(avf2);
> -        tee->slaves[i].avf = NULL;
> +        if (tee->slaves[i].is_alive)
> +            close_slave(&tee->slaves[i]);
>     }
> }
> 
> @@ -361,6 +410,28 @@ static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
>     }
> }
> 
> +static int tee_process_slave_failure(AVFormatContext * avf,
> +        TeeContext * tee,TeeSlave * slave,int err_n,unsigned char needs_closing)

You can remove the TeeContext from the parameters, it can be retreived 
from avf->priv_data.

> +{
> +    slave->is_alive = 0;
> +    tee->nb_alive--;
> +
> +    if (needs_closing)
> +        close_slave(slave);
> +
> +    if ( !tee->nb_alive ) {
> +        av_log(avf, AV_LOG_ERROR, "All tee slaves failed.\n");
> +        return err_n;
> +    } else if (slave->on_fail == ON_SLAVE_FAILURE_ABORT ) {
> +        av_log(avf, AV_LOG_ERROR, "Slave failed,aborting.\n");

missing space after comma. Also you can be more informative here, e.g. 
report the index of the failed slave.

> +        return err_n;
> +    } else {
> +        av_log(avf, AV_LOG_ERROR, "Slave failed, continuing with %u/%u slaves.\n",
> +                tee->nb_alive,tee->nb_slaves);
> +        return 0;
> +    }
> +}
> +
> static int tee_write_header(AVFormatContext *avf)
> {
>     TeeContext *tee = avf->priv_data;
> @@ -384,19 +455,25 @@ static int tee_write_header(AVFormatContext *avf)
>             filename++;
>     }
> 
> +    tee->nb_slaves = tee->nb_alive = nb_slaves;
> +
>     for (i = 0; i < nb_slaves; i++) {
> -        if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0)
> -            goto fail;
> -        log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
> -        av_freep(&slaves[i]);
> +        if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {

As far as I see you can't distinguish here between ignorable open_slave 
failures (which are caused by the underlying muxer) and hard failures - 
e.g. ENOMEM, etc. This may be fine, but you mentioned in the commit 
message that allocation failures are handled as fatal. As I said, this 
probably can stay as it is if distinguishing between the two would add too 
much complexity to the code.

> +            ret = tee_process_slave_failure(avf,tee,&tee->slaves[i],ret,0);
> +            if ( ret < 0 )
> +                goto fail;
> +        } else {
> +            log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
> +            tee->slaves[i].is_alive = 1;
> +        }
> +            av_freep(&slaves[i]);

Strange identation.

>     }
> 
> -    tee->nb_slaves = nb_slaves;
> -
>     for (i = 0; i < avf->nb_streams; i++) {
>         int j, mapped = 0;
>         for (j = 0; j < tee->nb_slaves; j++)
> -            mapped += tee->slaves[j].stream_map[i] >= 0;
> +            if (tee->slaves[j].is_alive)
> +                mapped += tee->slaves[j].stream_map[i] >= 0;
>         if (!mapped)
>             av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
>                    "to any slave.\n", i);
> @@ -405,7 +482,8 @@ static int tee_write_header(AVFormatContext *avf)
> 
> fail:
>     for (i = 0; i < nb_slaves; i++)
> -        av_freep(&slaves[i]);
> +        if (&slaves[i])

Isn't this true all the time? Why this change? av_freep(&p) is fine 
even if p == NULL.

> +            av_freep(&slaves[i]);
>     close_slaves(avf);
>     return ret;
> }
> @@ -418,6 +496,8 @@ static int tee_write_trailer(AVFormatContext *avf)
>     unsigned i;
>
>     for (i = 0; i < tee->nb_slaves; i++) {
> +        if (!tee->slaves[i].is_alive)
> +            continue;
>         avf2 = tee->slaves[i].avf;
>         if ((ret = av_write_trailer(avf2)) < 0)
>             if (!ret_all)
> @@ -440,6 +520,9 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
>     AVRational tb, tb2;
>
>     for (i = 0; i < tee->nb_slaves; i++) {
> +        if (!tee->slaves[i].is_alive)
> +            continue;
> +
>         avf2 = tee->slaves[i].avf;
>         s = pkt->stream_index;
>         s2 = tee->slaves[i].stream_map[s];
> @@ -447,11 +530,12 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
>             continue;
>
>         if ((ret = av_copy_packet(&pkt2, pkt)) < 0 ||
> -            (ret = av_dup_packet(&pkt2))< 0)
> +            (ret = av_dup_packet(&pkt2)) < 0)

This whitespace change is unrelated to your patch, so you should not 
change this in your feature patch.

>             if (!ret_all) {
>                 ret_all = ret;
>                 continue;
>             }
> +

Same for this.

>         tb  = avf ->streams[s ]->time_base;
>         tb2 = avf2->streams[s2]->time_base;
>         pkt2.pts      = av_rescale_q(pkt->pts,      tb, tb2);
> @@ -461,9 +545,11 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
>
>         if ((ret = av_apply_bitstream_filters(avf2->streams[s2]->codec, &pkt2,
>                                               tee->slaves[i].bsfs[s2])) < 0 ||
> -            (ret = av_interleaved_write_frame(avf2, &pkt2)) < 0)
> -            if (!ret_all)
> +            (ret = av_interleaved_write_frame(avf2, &pkt2)) < 0) {
> +            ret = tee_process_slave_failure(avf,tee,&tee->slaves[i],ret,1);
> +            if (!ret_all && ret < 0)
>                 ret_all = ret;
> +        }
>     }
>     return ret_all;
> }
> --

I have made some tests with your code as well, and it seems to work, so 
you should really start working on your proposal now, deadline is coming. 
You can also upload a draft proposal, (share it via google docs) so you 
can get feedback from mentors before you submit your final applicaton.

Let me know if you have any further questions.

Regards,
Marton


More information about the ffmpeg-devel mailing list