[FFmpeg-devel] [PATCH v4 10/11] avformat/fifo: Add AVFMT_FLAG_NONBLOCK support
sebechlebskyjan at gmail.com
sebechlebskyjan at gmail.com
Thu Aug 11 15:48:23 EEST 2016
From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
Add support for nonblocking calls.
Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
---
Changes since the last version:
- fixed wrong flag passed to av_thread_message_queue_recv()
- fixed memleak when queue is full in nonblocking mode
libavformat/fifo.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 55 insertions(+), 6 deletions(-)
diff --git a/libavformat/fifo.c b/libavformat/fifo.c
index 8896b9e..cdf9f49 100644
--- a/libavformat/fifo.c
+++ b/libavformat/fifo.c
@@ -19,12 +19,14 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#include "libavutil/atomic.h"
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/thread.h"
#include "libavutil/threadmessage.h"
#include "avformat.h"
#include "internal.h"
+#include "url.h"
#define FIFO_DEFAULT_QUEUE_SIZE 60
#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 16
@@ -77,6 +79,17 @@ typedef struct FifoContext {
/* Value > 0 signals queue overflow */
volatile uint8_t overflow_flag;
+ /* Whether termination was requested by invoking deinit
+ * before the thread was finished. Used only in non-blocking
+ * mode - when AVFMT_FLAG_NONBLOCK is set. */
+ volatile int termination_requested;
+
+ /* Initially 0, set to 1 immediately before thread function
+ * returns */
+ volatile int thread_finished_flag;
+
+ /* Original interrupt callback of the underlying muxer. */
+ AVIOInterruptCB orig_interrupt_callback;
} FifoContext;
typedef struct FifoThreadContext {
@@ -111,6 +124,16 @@ typedef struct FifoMessage {
AVPacket pkt;
} FifoMessage;
+static int fifo_interrupt_callback_wrapper(void *arg)
+{
+ FifoContext *ctx = arg;
+
+ if (avpriv_atomic_int_get(&ctx->termination_requested))
+ return 1;
+
+ return ff_check_interrupt(&ctx->orig_interrupt_callback);
+}
+
static int fifo_thread_write_header(FifoThreadContext *ctx)
{
AVFormatContext *avf = ctx->avf;
@@ -433,12 +456,17 @@ static void *fifo_consumer_thread(void *data)
fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
+ /* This must be only return path from fifo_consumer_thread function,
+ * so the thread_finised_flag is set. */
+ avpriv_atomic_int_set(&fifo->thread_finished_flag, 1);
return NULL;
}
static int fifo_mux_init(AVFormatContext *avf)
{
FifoContext *fifo = avf->priv_data;
+ AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper,
+ .opaque = fifo};
AVFormatContext *avf2;
int ret = 0, i;
@@ -449,7 +477,8 @@ static int fifo_mux_init(AVFormatContext *avf)
fifo->avf = avf2;
- avf2->interrupt_callback = avf->interrupt_callback;
+ fifo->orig_interrupt_callback = avf->interrupt_callback;
+ avf2->interrupt_callback = interrupt_cb;
avf2->max_delay = avf->max_delay;
ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
if (ret < 0)
@@ -536,7 +565,7 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
{
FifoContext *fifo = avf->priv_data;
FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
- int ret;
+ int ret, queue_flags = 0;
if (pkt) {
av_init_packet(&msg.pkt);
@@ -545,15 +574,21 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
return ret;
}
- ret = av_thread_message_queue_send(fifo->queue, &msg,
- fifo->drop_pkts_on_overflow ?
- AV_THREAD_MESSAGE_NONBLOCK : 0);
+ if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK))
+ queue_flags |= AV_THREAD_MESSAGE_NONBLOCK;
+
+ ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags);
+
if (ret == AVERROR(EAGAIN)) {
- uint8_t overflow_set = 0;
+ uint8_t overflow_set;
+
+ if (avf->flags & AVFMT_FLAG_NONBLOCK)
+ goto fail;
/* Queue is full, set fifo->overflow_flag to 1
* to let consumer thread know the queue should
* be flushed. */
+ overflow_set = 0;
pthread_mutex_lock(&fifo->overflow_flag_lock);
if (!fifo->overflow_flag)
fifo->overflow_flag = overflow_set = 1;
@@ -581,6 +616,10 @@ static int fifo_write_trailer(AVFormatContext *avf)
av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+ if ((avf->flags & AVFMT_FLAG_NONBLOCK) &&
+ !avpriv_atomic_int_get(&fifo->thread_finished_flag))
+ return AVERROR(EAGAIN);
+
ret = pthread_join( fifo->writer_thread, NULL);
if (ret < 0) {
av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
@@ -596,6 +635,16 @@ static void fifo_deinit(AVFormatContext *avf)
{
FifoContext *fifo = avf->priv_data;
+ if (avf->flags & AVFMT_FLAG_NONBLOCK) {
+ int ret;
+ avpriv_atomic_int_set(&fifo->termination_requested, 1);
+ ret = pthread_join( fifo->writer_thread, NULL);
+ if (ret < 0) {
+ av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+ av_err2str(AVERROR(ret)));
+ }
+ }
+
if (fifo->format_options)
av_dict_free(&fifo->format_options);
--
1.9.1
More information about the ffmpeg-devel
mailing list