[FFmpeg-devel] [PATCH 2/3] librist: allow use of circular buffer for receiving.

Gijs Peskens gijs at peskens.net
Tue Sep 28 11:22:40 EEST 2021


libRIST internally stores packets in a fifo of 1024 packets, overwriting
old packets when not read in a sufficient pace. Unfortunately this results
in many fifo overflow errors when ffmpeg consumes a libRIST stream.
This patch creates a receiver thread based on the UDP circular buffer code.

Signed-off-by: Gijs Peskens <gijs at peskens.net>
---
 libavformat/librist.c | 201 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 196 insertions(+), 5 deletions(-)

diff --git a/libavformat/librist.c b/libavformat/librist.c
index b120346f48..47c01a8432 100644
--- a/libavformat/librist.c
+++ b/libavformat/librist.c
@@ -26,6 +26,8 @@
 #include "libavutil/opt.h"
 #include "libavutil/parseutils.h"
 #include "libavutil/time.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
 
 #include "avformat.h"
 #include "internal.h"
@@ -33,6 +35,15 @@
 #include "os_support.h"
 #include "url.h"
 
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
 #include <librist/librist.h>
 #include <librist/version.h>
 // RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead
@@ -67,6 +78,19 @@ typedef struct RISTContext {
 
     struct rist_peer *peer;
     struct rist_ctx *ctx;
+
+    int circular_buffer_size;
+    AVFifoBuffer *fifo;
+    int circular_buffer_error;
+    int overrun_nonfatal;
+
+#if HAVE_PTHREAD_CANCEL
+    pthread_t receiver_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    int thread_started;
+    int thread_stop;
+#endif
 } RISTContext;
 
 #define D AV_OPT_FLAG_DECODING_PARAM
@@ -82,6 +106,8 @@ static const AVOption librist_options[] = {
     { "log_level",   "set loglevel",    OFFSET(log_level),   AV_OPT_TYPE_INT,   {.i64=RIST_LOG_INFO},        -1, INT_MAX, .flags = D|E },
     { "secret", "set encryption secret",OFFSET(secret),      AV_OPT_TYPE_STRING,{.str=NULL},                  0, 0,       .flags = D|E },
     { "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT   ,{.i64=0},                     0, INT_MAX, .flags = D|E },
+    { "fifo_size",      "set the receiving circular buffer size, expressed as a number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+    { "overrun_nonfatal", "survive in case of receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
     { NULL }
 };
 
@@ -119,6 +145,15 @@ static int librist_close(URLContext *h)
     RISTContext *s = h->priv_data;
     int ret = 0;
 
+#if HAVE_PTHREAD_CANCEL
+    if (s->thread_started) {
+        pthread_mutex_lock(&s->mutex);
+        s->thread_stop = 1;
+        pthread_mutex_unlock(&s->mutex);
+        pthread_join(s->receiver_thread, NULL);
+    }
+#endif
+    av_fifo_freep(&s->fifo);
     s->peer = NULL;
 
     if (s->ctx)
@@ -128,6 +163,78 @@ static int librist_close(URLContext *h)
     return risterr2ret(ret);
 }
 
+static void *receiver_thread(void *_url_context)
+{
+    URLContext *h = _url_context;
+    RISTContext *s = h->priv_data;
+    int ret;
+    uint8_t tmp[4];
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+    const struct rist_data_block *data_block;
+#else
+    struct rist_data_block *data_block;
+#endif
+
+    while (1)
+    {
+        pthread_mutex_lock(&s->mutex);
+        if (s->thread_stop)
+            break;
+        pthread_mutex_unlock(&s->mutex);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+        ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
+#else
+        ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
+#endif
+        if (ret == 0)
+            continue;
+
+        pthread_mutex_lock(&s->mutex);
+        if (ret < 0) {
+            s->circular_buffer_error = ret;
+            break;
+        }
+
+        if (data_block->payload_len > MAX_PAYLOAD_SIZE) {
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+            rist_receiver_data_block_free((struct rist_data_block**)&data_block);
+#else
+            rist_receiver_data_block_free2(&data_block);
+#endif
+            s->circular_buffer_error = AVERROR_EXTERNAL;
+            break;
+        }
+        AV_WL32(tmp, data_block->payload_len);
+        if (av_fifo_space(s->fifo) < (data_block->payload_len +4))
+        {
+            /* No Space left */
+            if (s->overrun_nonfatal) {
+                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+                        "Surviving due to overrun_nonfatal option\n");
+                continue;
+            } else {
+                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+                        "To avoid, increase fifo_size URL option. "
+                        "To survive in such case, use overrun_nonfatal option\n");
+                s->circular_buffer_error = AVERROR(EIO);
+                break;
+            }
+        }
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL);
+        av_fifo_generic_write(s->fifo, (void*)data_block->payload, data_block->payload_len, NULL);
+        pthread_mutex_unlock(&s->mutex);
+        pthread_cond_signal(&s->cond);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+        rist_receiver_data_block_free((struct rist_data_block**)&data_block);
+#else
+        rist_receiver_data_block_free2(&data_block);
+#endif
+    }
+    pthread_mutex_unlock(&s->mutex);
+    pthread_cond_signal(&s->cond);
+    return NULL;
+}
+
 static int librist_open(URLContext *h, const char *uri, int flags)
 {
     RISTContext *s = h->priv_data;
@@ -194,27 +301,111 @@ static int librist_open(URLContext *h, const char *uri, int flags)
     if (ret < 0)
         goto err;
 
+    s->circular_buffer_size *= 188;
+
+#if HAVE_PTHREAD_CANCEL
+    //Create receiver thread if circular buffer size is set and we are receiving
+    if ((flags & AVIO_FLAG_READ) && s->circular_buffer_size > 0) {
+        /* start the task going */
+        s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        if (!s->fifo) {
+            ret = AVERROR(ENOMEM);
+            goto err;
+        }
+        ret = pthread_mutex_init(&s->mutex, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
+            goto err;
+        }
+        ret = pthread_cond_init(&s->cond, NULL);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
+            goto cond_fail;
+        }
+        ret = pthread_create(&s->receiver_thread, NULL, receiver_thread, h);
+        if (ret != 0) {
+            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+            ret = AVERROR(ret);
+            goto thread_fail;
+        }
+        s->thread_started = 1;
+    }
+#endif
     return 0;
-
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+    pthread_cond_destroy(&s->cond);
+ cond_fail:
+    pthread_mutex_destroy(&s->mutex);
+#endif
 err:
     librist_close(h);
-
+    av_fifo_freep(&s->fifo);
     return risterr2ret(ret);
 }
 
 static int librist_read(URLContext *h, uint8_t *buf, int size)
 {
     RISTContext *s = h->priv_data;
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+    const struct rist_data_block *data_block;
+#else
+    struct rist_data_block *data_block;
+#endif
     int ret;
 
+#if HAVE_PTHREAD_CANCEL
+    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
+
+    if (s->fifo) {
+        pthread_mutex_lock(&s->mutex);
+        do {
+            avail = av_fifo_size(s->fifo);
+            if (avail) { // >=size) {
+                uint8_t tmp[4];
+
+                av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+                avail = AV_RL32(tmp);
+                if(avail > size){
+                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
+                    avail = size;
+                }
+
+                av_fifo_generic_read(s->fifo, buf, avail, NULL);
+                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
+                pthread_mutex_unlock(&s->mutex);
+                return avail;
+            } else if(s->circular_buffer_error){
+                int err = s->circular_buffer_error;
+                pthread_mutex_unlock(&s->mutex);
+                return err;
+            } else if(nonblock) {
+                pthread_mutex_unlock(&s->mutex);
+                return AVERROR(EAGAIN);
+            } else {
+                /* FIXME: using the monotonic clock would be better,
+                   but it does not exist on all supported platforms. */
+                int64_t t = av_gettime() + 100000;
+                struct timespec tv = { .tv_sec  =  t / 1000000,
+                                       .tv_nsec = (t % 1000000) * 1000 };
+                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
+                if (err) {
+                    pthread_mutex_unlock(&s->mutex);
+                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
+                }
+                nonblock = 1;
+            }
+        } while(1);
+    }
+#endif
+
 #if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
-    const struct rist_data_block *data_block;
     ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
 #else
-    struct rist_data_block *data_block;
     ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
 #endif
-
     if (ret < 0)
         return risterr2ret(ret);
 
-- 
2.30.2



More information about the ffmpeg-devel mailing list