[FFmpeg-devel] [PATCH] Tee muxer improvement (handling slave failure)

Jan Sebechlebsky sebechlebskyjan at gmail.com
Mon Mar 21 19:31:32 CET 2016


Adds per slave option 'onfail' to the tee muxer allowing an output to
fail, allowing other slaves output to continue.

Certain situations, when it does not make sense to continue are still
fatal (like allocation errors, or invalid options).

Update of tee muxer documentation.

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
---
 Solution to GSOC 2016 qualification task.

 doc/muxers.texi   |  11 +++-
 libavformat/tee.c | 150 ++++++++++++++++++++++++++++++++++++++++++------------
 2 files changed, 127 insertions(+), 34 deletions(-)

diff --git a/doc/muxers.texi b/doc/muxers.texi
index c36c72c..70f2c48 100644
--- a/doc/muxers.texi
+++ b/doc/muxers.texi
@@ -1354,6 +1354,12 @@ output name suffix.
 Specify a list of bitstream filters to apply to the specified
 output.
 
+ at item onfail
+Specify behaviour on output failure. This can be set to either 'abort' (which is
+default) or 'ignore'. 'abort' will cause whole process to fail in case of failure
+on this slave output. 'ignore' will ignore failure on this output, so other outputs
+will continue without being affected.
+
 It is possible to specify to which streams a given bitstream filter
 applies, by appending a stream specifier to the option separated by
 @code{/}. @var{spec} must be a stream specifier (see @ref{Format
@@ -1374,10 +1380,11 @@ separated by commas (@code{,}) e.g.: @code{a:0,v}
 @itemize
 @item
 Encode something and both archive it in a WebM file and stream it
-as MPEG-TS over UDP (the streams need to be explicitly mapped):
+as MPEG-TS over UDP (the streams need to be explicitly mapped).
+Continue streaming even if output to file fails:
 @example
 ffmpeg -i ... -c:v libx264 -c:a mp2 -f tee -map 0:v -map 0:a
-  "archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/"
+  "[onfail=ignore]archive-20121107.mkv|[f=mpegts]udp://10.0.1.255:1234/"
 @end example
 
 @item
diff --git a/libavformat/tee.c b/libavformat/tee.c
index 1390705..c72bc7d 100644
--- a/libavformat/tee.c
+++ b/libavformat/tee.c
@@ -29,10 +29,20 @@
 
 #define MAX_SLAVES 16
 
+typedef enum {
+    ON_SLAVE_FAILURE_ABORT  = 1,
+    ON_SLAVE_FAILURE_IGNORE = 2
+} SlaveFailurePolicy;
+
+#define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
+
 typedef struct {
     AVFormatContext *avf;
     AVBitStreamFilterContext **bsfs; ///< bitstream filters per stream
 
+    SlaveFailurePolicy on_fail;
+    unsigned char is_alive;
+
     /** map from input to output streams indexes,
      * disabled output streams are set to -1 */
     int *stream_map;
@@ -41,6 +51,7 @@ typedef struct {
 typedef struct TeeContext {
     const AVClass *class;
     unsigned nb_slaves;
+    unsigned nb_alive;
     TeeSlave slaves[MAX_SLAVES];
 } TeeContext;
 
@@ -135,13 +146,25 @@ end:
     return ret;
 }
 
+static inline int parse_slave_failure_policy_option(const char * opt)
+{
+    if (!opt) {
+        return DEFAULT_SLAVE_FAILURE_POLICY;
+    } else if (!av_strcasecmp("abort",opt)) {
+        return ON_SLAVE_FAILURE_ABORT;
+    } else if (!av_strcasecmp("ignore",opt)) {
+        return ON_SLAVE_FAILURE_IGNORE;
+    }
+    return 0;
+}
+
 static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
 {
     int i, ret;
     AVDictionary *options = NULL;
     AVDictionaryEntry *entry;
     char *filename;
-    char *format = NULL, *select = NULL;
+    char *format = NULL, *select = NULL, *on_fail = NULL;
     AVFormatContext *avf2 = NULL;
     AVStream *st, *st2;
     int stream_count;
@@ -161,6 +184,17 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
 
     STEAL_OPTION("f", format);
     STEAL_OPTION("select", select);
+    STEAL_OPTION("onfail", on_fail);
+
+    tee_slave->on_fail = (SlaveFailurePolicy) parse_slave_failure_policy_option(on_fail);
+    if (!tee_slave->on_fail) {
+        av_log(avf, AV_LOG_ERROR,
+                "Invalid onfail option value, valid options are 'abort' and 'ignore'\n");
+        ret = AVERROR(EINVAL);
+        /// Set failure behaviour on abort, so invalid option error will not be ignored
+        tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
+        goto end;
+    }
 
     ret = avformat_alloc_output_context2(&avf2, NULL, format, filename);
     if (ret < 0)
@@ -234,14 +268,14 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
         if ((ret = avf2->io_open(avf2, &avf2->pb, filename, AVIO_FLAG_WRITE, NULL)) < 0) {
             av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n",
                    slave, av_err2str(ret));
-            goto end;
+            goto open_failure;
         }
     }
 
     if ((ret = avformat_write_header(avf2, &options)) < 0) {
         av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
                slave, av_err2str(ret));
-        goto end;
+        goto open_failure;
     }
 
     tee_slave->avf = avf2;
@@ -306,34 +340,49 @@ static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
 end:
     av_free(format);
     av_free(select);
+    av_free(on_fail);
     av_dict_free(&options);
     av_freep(&tmp_select);
     return ret;
+
+open_failure:
+    /// Handle situation when slave initialization fails in a way that
+    /// others may proceed successfully
+    av_free(tee_slave->stream_map);
+    avformat_free_context(avf2);
+    goto end;
+}
+
+static void close_slave(TeeSlave* tee_slave)
+{
+    AVFormatContext * avf;
+    unsigned i;
+
+    avf = tee_slave->avf;
+    for (i=0; i < avf->nb_streams; ++i) {
+        AVBitStreamFilterContext *bsf_next, *bsf = tee_slave->bsfs[i];
+        while (bsf) {
+            bsf_next = bsf->next;
+            av_bitstream_filter_close(bsf);
+            bsf = bsf_next;
+        }
+    }
+    av_freep(&tee_slave->stream_map);
+    av_freep(&tee_slave->bsfs);
+
+    ff_format_io_close(avf,&avf->pb);
+    avformat_free_context(avf);
+    tee_slave->avf = NULL;
 }
 
 static void close_slaves(AVFormatContext *avf)
 {
     TeeContext *tee = avf->priv_data;
-    AVFormatContext *avf2;
-    unsigned i, j;
+    unsigned i;
 
     for (i = 0; i < tee->nb_slaves; i++) {
-        avf2 = tee->slaves[i].avf;
-
-        for (j = 0; j < avf2->nb_streams; j++) {
-            AVBitStreamFilterContext *bsf_next, *bsf = tee->slaves[i].bsfs[j];
-            while (bsf) {
-                bsf_next = bsf->next;
-                av_bitstream_filter_close(bsf);
-                bsf = bsf_next;
-            }
-        }
-        av_freep(&tee->slaves[i].stream_map);
-        av_freep(&tee->slaves[i].bsfs);
-
-        ff_format_io_close(avf2, &avf2->pb);
-        avformat_free_context(avf2);
-        tee->slaves[i].avf = NULL;
+        if (tee->slaves[i].is_alive)
+            close_slave(&tee->slaves[i]);
     }
 }
 
@@ -361,6 +410,28 @@ static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
     }
 }
 
+static int tee_process_slave_failure(AVFormatContext * avf,
+        TeeContext * tee,TeeSlave * slave,int err_n,unsigned char needs_closing)
+{
+    slave->is_alive = 0;
+    tee->nb_alive--;
+
+    if (needs_closing)
+        close_slave(slave);
+
+    if ( !tee->nb_alive ) {
+        av_log(avf, AV_LOG_ERROR, "All tee slaves failed.\n");
+        return err_n;
+    } else if (slave->on_fail == ON_SLAVE_FAILURE_ABORT ) {
+        av_log(avf, AV_LOG_ERROR, "Slave failed,aborting.\n");
+        return err_n;
+    } else {
+        av_log(avf, AV_LOG_ERROR, "Slave failed, continuing with %u/%u slaves.\n",
+                tee->nb_alive,tee->nb_slaves);
+        return 0;
+    }
+}
+
 static int tee_write_header(AVFormatContext *avf)
 {
     TeeContext *tee = avf->priv_data;
@@ -384,19 +455,25 @@ static int tee_write_header(AVFormatContext *avf)
             filename++;
     }
 
+    tee->nb_slaves = tee->nb_alive = nb_slaves;
+
     for (i = 0; i < nb_slaves; i++) {
-        if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0)
-            goto fail;
-        log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
-        av_freep(&slaves[i]);
+        if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
+            ret = tee_process_slave_failure(avf,tee,&tee->slaves[i],ret,0);
+            if ( ret < 0 )
+                goto fail;
+        } else {
+            log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
+            tee->slaves[i].is_alive = 1;
+        }
+            av_freep(&slaves[i]);
     }
 
-    tee->nb_slaves = nb_slaves;
-
     for (i = 0; i < avf->nb_streams; i++) {
         int j, mapped = 0;
         for (j = 0; j < tee->nb_slaves; j++)
-            mapped += tee->slaves[j].stream_map[i] >= 0;
+            if (tee->slaves[j].is_alive)
+                mapped += tee->slaves[j].stream_map[i] >= 0;
         if (!mapped)
             av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
                    "to any slave.\n", i);
@@ -405,7 +482,8 @@ static int tee_write_header(AVFormatContext *avf)
 
 fail:
     for (i = 0; i < nb_slaves; i++)
-        av_freep(&slaves[i]);
+        if (&slaves[i])
+            av_freep(&slaves[i]);
     close_slaves(avf);
     return ret;
 }
@@ -418,6 +496,8 @@ static int tee_write_trailer(AVFormatContext *avf)
     unsigned i;
 
     for (i = 0; i < tee->nb_slaves; i++) {
+        if (!tee->slaves[i].is_alive)
+            continue;
         avf2 = tee->slaves[i].avf;
         if ((ret = av_write_trailer(avf2)) < 0)
             if (!ret_all)
@@ -440,6 +520,9 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
     AVRational tb, tb2;
 
     for (i = 0; i < tee->nb_slaves; i++) {
+        if (!tee->slaves[i].is_alive)
+            continue;
+
         avf2 = tee->slaves[i].avf;
         s = pkt->stream_index;
         s2 = tee->slaves[i].stream_map[s];
@@ -447,11 +530,12 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
             continue;
 
         if ((ret = av_copy_packet(&pkt2, pkt)) < 0 ||
-            (ret = av_dup_packet(&pkt2))< 0)
+            (ret = av_dup_packet(&pkt2)) < 0)
             if (!ret_all) {
                 ret_all = ret;
                 continue;
             }
+
         tb  = avf ->streams[s ]->time_base;
         tb2 = avf2->streams[s2]->time_base;
         pkt2.pts      = av_rescale_q(pkt->pts,      tb, tb2);
@@ -461,9 +545,11 @@ static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
 
         if ((ret = av_apply_bitstream_filters(avf2->streams[s2]->codec, &pkt2,
                                               tee->slaves[i].bsfs[s2])) < 0 ||
-            (ret = av_interleaved_write_frame(avf2, &pkt2)) < 0)
-            if (!ret_all)
+            (ret = av_interleaved_write_frame(avf2, &pkt2)) < 0) {
+            ret = tee_process_slave_failure(avf,tee,&tee->slaves[i],ret,1);
+            if (!ret_all && ret < 0)
                 ret_all = ret;
+        }
     }
     return ret_all;
 }
-- 
1.9.1



More information about the ffmpeg-devel mailing list