[FFmpeg-devel] [PATCH 2/3] librist: allow use of circular buffer for receiving.

Marton Balint cus at passwd.hu
Tue Sep 28 20:02:16 EEST 2021



On Tue, 28 Sep 2021, Gijs Peskens wrote:

> libRIST internally stores packets in a fifo of 1024 packets, overwriting
> old packets when not read in a sufficient pace. Unfortunately this results
> in many fifo overflow errors when ffmpeg consumes a libRIST stream.
> This patch creates a receiver thread based on the UDP circular buffer code.

The user is better off adjusting the libRIST fifo size, so this patch adds 
extra complexity for no good reason.

Regards,
Marton

>
> Signed-off-by: Gijs Peskens <gijs at peskens.net>
> ---
> libavformat/librist.c | 201 ++++++++++++++++++++++++++++++++++++++++--
> 1 file changed, 196 insertions(+), 5 deletions(-)
>
> diff --git a/libavformat/librist.c b/libavformat/librist.c
> index b120346f48..47c01a8432 100644
> --- a/libavformat/librist.c
> +++ b/libavformat/librist.c
> @@ -26,6 +26,8 @@
> #include "libavutil/opt.h"
> #include "libavutil/parseutils.h"
> #include "libavutil/time.h"
> +#include "libavutil/fifo.h"
> +#include "libavutil/intreadwrite.h"
>
> #include "avformat.h"
> #include "internal.h"
> @@ -33,6 +35,15 @@
> #include "os_support.h"
> #include "url.h"
>
> +#if HAVE_W32THREADS
> +#undef HAVE_PTHREAD_CANCEL
> +#define HAVE_PTHREAD_CANCEL 1
> +#endif
> +
> +#if HAVE_PTHREAD_CANCEL
> +#include "libavutil/thread.h"
> +#endif
> +
> #include <librist/librist.h>
> #include <librist/version.h>
> // RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead
> @@ -67,6 +78,19 @@ typedef struct RISTContext {
>
>     struct rist_peer *peer;
>     struct rist_ctx *ctx;
> +
> +    int circular_buffer_size;
> +    AVFifoBuffer *fifo;
> +    int circular_buffer_error;
> +    int overrun_nonfatal;
> +
> +#if HAVE_PTHREAD_CANCEL
> +    pthread_t receiver_thread;
> +    pthread_mutex_t mutex;
> +    pthread_cond_t cond;
> +    int thread_started;
> +    int thread_stop;
> +#endif
> } RISTContext;
>
> #define D AV_OPT_FLAG_DECODING_PARAM
> @@ -82,6 +106,8 @@ static const AVOption librist_options[] = {
>     { "log_level",   "set loglevel",    OFFSET(log_level),   AV_OPT_TYPE_INT,   {.i64=RIST_LOG_INFO},        -1, INT_MAX, .flags = D|E },
>     { "secret", "set encryption secret",OFFSET(secret),      AV_OPT_TYPE_STRING,{.str=NULL},                  0, 0,       .flags = D|E },
>     { "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT   ,{.i64=0},                     0, INT_MAX, .flags = D|E },
> +    { "fifo_size",      "set the receiving circular buffer size, expressed as a number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
> +    { "overrun_nonfatal", "survive in case of receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
>     { NULL }
> };
>
> @@ -119,6 +145,15 @@ static int librist_close(URLContext *h)
>     RISTContext *s = h->priv_data;
>     int ret = 0;
>
> +#if HAVE_PTHREAD_CANCEL
> +    if (s->thread_started) {
> +        pthread_mutex_lock(&s->mutex);
> +        s->thread_stop = 1;
> +        pthread_mutex_unlock(&s->mutex);
> +        pthread_join(s->receiver_thread, NULL);
> +    }
> +#endif
> +    av_fifo_freep(&s->fifo);
>     s->peer = NULL;
>
>     if (s->ctx)
> @@ -128,6 +163,78 @@ static int librist_close(URLContext *h)
>     return risterr2ret(ret);
> }
>
> +static void *receiver_thread(void *_url_context)
> +{
> +    URLContext *h = _url_context;
> +    RISTContext *s = h->priv_data;
> +    int ret;
> +    uint8_t tmp[4];
> +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
> +    const struct rist_data_block *data_block;
> +#else
> +    struct rist_data_block *data_block;
> +#endif
> +
> +    while (1)
> +    {
> +        pthread_mutex_lock(&s->mutex);
> +        if (s->thread_stop)
> +            break;
> +        pthread_mutex_unlock(&s->mutex);
> +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
> +        ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
> +#else
> +        ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
> +#endif
> +        if (ret == 0)
> +            continue;
> +
> +        pthread_mutex_lock(&s->mutex);
> +        if (ret < 0) {
> +            s->circular_buffer_error = ret;
> +            break;
> +        }
> +
> +        if (data_block->payload_len > MAX_PAYLOAD_SIZE) {
> +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
> +            rist_receiver_data_block_free((struct rist_data_block**)&data_block);
> +#else
> +            rist_receiver_data_block_free2(&data_block);
> +#endif
> +            s->circular_buffer_error = AVERROR_EXTERNAL;
> +            break;
> +        }
> +        AV_WL32(tmp, data_block->payload_len);
> +        if (av_fifo_space(s->fifo) < (data_block->payload_len +4))
> +        {
> +            /* No Space left */
> +            if (s->overrun_nonfatal) {
> +                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
> +                        "Surviving due to overrun_nonfatal option\n");
> +                continue;
> +            } else {
> +                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
> +                        "To avoid, increase fifo_size URL option. "
> +                        "To survive in such case, use overrun_nonfatal option\n");
> +                s->circular_buffer_error = AVERROR(EIO);
> +                break;
> +            }
> +        }
> +        av_fifo_generic_write(s->fifo, tmp, 4, NULL);
> +        av_fifo_generic_write(s->fifo, (void*)data_block->payload, data_block->payload_len, NULL);
> +        pthread_mutex_unlock(&s->mutex);
> +        pthread_cond_signal(&s->cond);
> +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
> +        rist_receiver_data_block_free((struct rist_data_block**)&data_block);
> +#else
> +        rist_receiver_data_block_free2(&data_block);
> +#endif
> +    }
> +    pthread_mutex_unlock(&s->mutex);
> +    pthread_cond_signal(&s->cond);
> +    return NULL;
> +}
> +
> static int librist_open(URLContext *h, const char *uri, int flags)
> {
>     RISTContext *s = h->priv_data;
> @@ -194,27 +301,111 @@ static int librist_open(URLContext *h, const char *uri, int flags)
>     if (ret < 0)
>         goto err;
>
> +    s->circular_buffer_size *= 188;
> +
> +#if HAVE_PTHREAD_CANCEL
> +    //Create receiver thread if circular buffer size is set and we are receiving
> +    if ((flags & AVIO_FLAG_READ) && s->circular_buffer_size > 0) {
> +        /* start the task going */
> +        s->fifo = av_fifo_alloc(s->circular_buffer_size);
> +        if (!s->fifo) {
> +            ret = AVERROR(ENOMEM);
> +            goto err;
> +        }
> +        ret = pthread_mutex_init(&s->mutex, NULL);
> +        if (ret != 0) {
> +            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
> +            ret = AVERROR(ret);
> +            goto err;
> +        }
> +        ret = pthread_cond_init(&s->cond, NULL);
> +        if (ret != 0) {
> +            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
> +            ret = AVERROR(ret);
> +            goto cond_fail;
> +        }
> +        ret = pthread_create(&s->receiver_thread, NULL, receiver_thread, h);
> +        if (ret != 0) {
> +            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
> +            ret = AVERROR(ret);
> +            goto thread_fail;
> +        }
> +        s->thread_started = 1;
> +    }
> +#endif
>     return 0;
> -
> +#if HAVE_PTHREAD_CANCEL
> + thread_fail:
> +    pthread_cond_destroy(&s->cond);
> + cond_fail:
> +    pthread_mutex_destroy(&s->mutex);
> +#endif
> err:
>     librist_close(h);
> -
> +    av_fifo_freep(&s->fifo);
>     return risterr2ret(ret);
> }
>
> static int librist_read(URLContext *h, uint8_t *buf, int size)
> {
>     RISTContext *s = h->priv_data;
> +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
> +    const struct rist_data_block *data_block;
> +#else
> +    struct rist_data_block *data_block;
> +#endif
>     int ret;
>
> +#if HAVE_PTHREAD_CANCEL
> +    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
> +
> +    if (s->fifo) {
> +        pthread_mutex_lock(&s->mutex);
> +        do {
> +            avail = av_fifo_size(s->fifo);
> +            if (avail) { // >=size) {
> +                uint8_t tmp[4];
> +
> +                av_fifo_generic_read(s->fifo, tmp, 4, NULL);
> +                avail = AV_RL32(tmp);
> +                if(avail > size){
> +                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
> +                    avail = size;
> +                }
> +
> +                av_fifo_generic_read(s->fifo, buf, avail, NULL);
> +                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
> +                pthread_mutex_unlock(&s->mutex);
> +                return avail;
> +            } else if(s->circular_buffer_error){
> +                int err = s->circular_buffer_error;
> +                pthread_mutex_unlock(&s->mutex);
> +                return err;
> +            } else if(nonblock) {
> +                pthread_mutex_unlock(&s->mutex);
> +                return AVERROR(EAGAIN);
> +            } else {
> +                /* FIXME: using the monotonic clock would be better,
> +                   but it does not exist on all supported platforms. */
> +                int64_t t = av_gettime() + 100000;
> +                struct timespec tv = { .tv_sec  =  t / 1000000,
> +                                       .tv_nsec = (t % 1000000) * 1000 };
> +                int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
> +                if (err) {
> +                    pthread_mutex_unlock(&s->mutex);
> +                    return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
> +                }
> +                nonblock = 1;
> +            }
> +        } while(1);
> +    }
> +#endif
> +
> #if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
> -    const struct rist_data_block *data_block;
>     ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
> #else
> -    struct rist_data_block *data_block;
>     ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
> #endif
> -
>     if (ret < 0)
>         return risterr2ret(ret);
>
> -- 
> 2.30.2
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
>


More information about the ffmpeg-devel mailing list