[FFmpeg-devel] [PATCH v2 10/11] avformat/fifo: Add AVFMT_FLAG_NONBLOCK support

sebechlebskyjan at gmail.com sebechlebskyjan at gmail.com
Thu Aug 4 16:04:15 EEST 2016


From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>

Add support for nonblocking calls.

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
---
 Changes from the last version:
 - boolean flags accessed from both threads are ints now
   and are accessed with atomic operations.
 - pthread_tryjoin_np is replaced by flag set before
   fifo_consumer_thread returns

 libavformat/fifo.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 55 insertions(+), 6 deletions(-)

diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index bd9d934..9b92eab 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -19,12 +19,14 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#include "libavutil/atomic.h"
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavutil/threadmessage.h"
 #include "avformat.h"
 #include "internal.h"
 #include "pthread.h"
+#include "url.h"
 
 #define FIFO_DEFAULT_QUEUE_SIZE              60
 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   16
@@ -77,6 +79,17 @@ typedef struct FifoContext {
     /* Value > 0 signalizes queue overflow */
     volatile uint8_t overflow_flag;
 
+    /* Whether termination was requested by invoking deinit
+     * before the thread was finished. Used only in non-blocking
+     * mode - when AVFMT_FLAG_NONBLOCK is set. */
+    volatile int termination_requested;
+
+    /* Initially 0, set to 1 immediately before thread function
+     * returns */
+    volatile int thread_finished_flag;
+
+    /* Original interrupt callback of the underlying muxer. */
+    AVIOInterruptCB orig_interrupt_callback;
 } FifoContext;
 
 typedef struct FifoThreadContext {
@@ -110,6 +123,16 @@ typedef struct FifoMessage {
     AVPacket pkt;
 } FifoMessage;
 
+static int fifo_interrupt_callback_wrapper(void *arg)
+{
+    FifoContext *ctx = arg;
+
+    if (avpriv_atomic_int_get(&ctx->termination_requested))
+        return 1;
+
+    return ff_check_interrupt(&ctx->orig_interrupt_callback);
+}
+
 static int fifo_thread_write_header(FifoThreadContext *ctx)
 {
     AVFormatContext *avf = ctx->avf;
@@ -442,12 +465,17 @@ static void *fifo_consumer_thread(void *data)
 
     fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
 
+    /* This must be only return path from fifo_consumer_thread function,
+     * so the thread_finised_flag is set. */
+    avpriv_atomic_int_set(&fifo->thread_finished_flag, 1);
     return NULL;
 }
 
 static int fifo_mux_init(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
+    AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper,
+                                    .opaque = fifo};
     AVFormatContext *avf2;
     int ret = 0, i;
 
@@ -458,7 +486,8 @@ static int fifo_mux_init(AVFormatContext *avf)
 
     fifo->avf = avf2;
 
-    avf2->interrupt_callback = avf->interrupt_callback;
+    fifo->orig_interrupt_callback = avf->interrupt_callback;
+    avf2->interrupt_callback = interrupt_cb;
     avf2->max_delay = avf->max_delay;
     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
     if (ret < 0)
@@ -543,7 +572,7 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
 {
     FifoContext *fifo = avf->priv_data;
     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
-    int ret;
+    int ret, queue_flags = 0;
 
     if (pkt) {
         av_init_packet(&msg.pkt);
@@ -552,15 +581,21 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
             return ret;
     }
 
-    ret = av_thread_message_queue_send(fifo->queue, &msg,
-                                       fifo->drop_pkts_on_overflow ?
-                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK))
+        queue_flags |= AVFMT_FLAG_NONBLOCK;
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags);
+
     if (ret == AVERROR(EAGAIN)) {
-        uint8_t overflow_set = 0;
+        uint8_t overflow_set;
+
+        if (avf->flags & AVFMT_FLAG_NONBLOCK)
+            return ret;
 
         /* Queue is full, set fifo->overflow_flag to 1
          * to let consumer thread know the queue should
          * be flushed. */
+        overflow_set = 0;
         pthread_mutex_lock(&fifo->overflow_flag_lock);
         if (!fifo->overflow_flag)
             fifo->overflow_flag = overflow_set = 1;
@@ -588,6 +623,10 @@ static int fifo_write_trailer(AVFormatContext *avf)
 
     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
 
+    if ((avf->flags & AVFMT_FLAG_NONBLOCK) &&
+        !avpriv_atomic_int_get(&fifo->thread_finished_flag))
+       return AVERROR(EAGAIN);
+
     ret = pthread_join( fifo->writer_thread, NULL);
     if (ret < 0) {
         av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
@@ -603,6 +642,16 @@ static void fifo_deinit(AVFormatContext *avf)
 {
     FifoContext *fifo = avf->priv_data;
 
+    if (avf->flags & AVFMT_FLAG_NONBLOCK) {
+        int ret;
+        avpriv_atomic_int_set(&fifo->termination_requested, 1);
+        ret = pthread_join( fifo->writer_thread, NULL);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+                   av_err2str(AVERROR(ret)));
+        }
+    }
+
     if (fifo->format_options)
         av_dict_free(&fifo->format_options);
 
-- 
1.9.1



More information about the ffmpeg-devel mailing list