[FFmpeg-devel] [PATCH] avformat/udp: Add a delay between packets for streaming to clients with short buffer

Nicolas George george at nsup.org
Thu Mar 3 20:51:44 CET 2016


Le primidi 11 ventôse, an CCXXIV, Pavel Nikiforov a écrit :
> Hello !
> 
> This patch enables background sending of UDP packets with specified delay.
> When sending packets without a delay some devices with small RX buffer
> ( MAG200 STB, for example) will drop tail packets in burst causing
> decoding errors.
> 
> It needs to specify "fifo_size" with "packet_gap" .
> 
> The output url will looks like udp://xxx:yyy?fifo_size=<output fifo
> size>&packet_gap=<delay in usecs>
> 
> Patch attached.

Thanks for the patch. Comments below.

>  libavformat/udp.c | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 129 insertions(+), 4 deletions(-)

Missing documentation update.

> 
> diff --git a/libavformat/udp.c b/libavformat/udp.c
> index ea80e52..ff676f4 100644
> --- a/libavformat/udp.c
> +++ b/libavformat/udp.c
> @@ -92,6 +92,7 @@ typedef struct UDPContext {
>      int circular_buffer_size;
>      AVFifoBuffer *fifo;
>      int circular_buffer_error;
> +    int packet_gap; /* delay between transmitted packets */
>  #if HAVE_PTHREAD_CANCEL
>      pthread_t circular_buffer_thread;
>      pthread_mutex_t mutex;
> @@ -112,6 +113,7 @@ typedef struct UDPContext {
>  #define E AV_OPT_FLAG_ENCODING_PARAM
>  static const AVOption options[] = {
>      { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },

> +    { "packet_gap",     "Delay between packets, in usec",                  OFFSET(packet_gap),     AV_OPT_TYPE_INT,    { .i64 = 0  },     0, INT_MAX, .flags = E },

As pointed by Michael, this should use AV_OPT_TYPE_DURATION.

>      { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
>      { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
>      { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
> @@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h)
>  }
>  
>  #if HAVE_PTHREAD_CANCEL
> -static void *circular_buffer_task( void *_URLContext)
> +static void *circular_buffer_task_rx( void *_URLContext)
>  {
>      URLContext *h = _URLContext;
>      UDPContext *s = h->priv_data;
> @@ -499,7 +501,7 @@ static void *circular_buffer_task( void *_URLContext)
>          s->circular_buffer_error = AVERROR(EIO);
>          goto end;
>      }

> -    while(1) {
> +    for(;;) {

Stray change.

>          int len;
>  
>          pthread_mutex_unlock(&s->mutex);
> @@ -542,6 +544,81 @@ end:
>      pthread_mutex_unlock(&s->mutex);
>      return NULL;
>  }
> +
> +static void do_udp_write(void *arg, void *buf, int size) {
> +    URLContext *h = arg;
> +    UDPContext *s = h->priv_data;
> +
> +    int ret;
> +

> +    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {

This looks suspicious: blocking/nonblocking is a property of the protocol as
seen by the calling application, it should not apply in a thread used
internally. See below.

> +        ret = ff_network_wait_fd(s->udp_fd, 1);
> +        if (ret < 0) {

> +            s->circular_buffer_error=ret;

Please keep in line with surrounding coding style. In this instance: spaces
around operators.

> +            return;
> +        }
> +    }
> +
> +    if (!s->is_connected) {

> +        ret = sendto (s->udp_fd, buf, size, 0,
> +                      (struct sockaddr *) &s->dest_addr,

Style: no spaces after function names and casts.

> +                      s->dest_addr_len);
> +    } else
> +        ret = send(s->udp_fd, buf, size, 0);
> +
> +    s->circular_buffer_error=ret;
> +}
> +
> +static void *circular_buffer_task_tx( void *_URLContext)
> +{
> +    URLContext *h = _URLContext;
> +    UDPContext *s = h->priv_data;
> +    int old_cancelstate;
> +
> +    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +
> +    for(;;) {
> +        int len;
> +        uint8_t tmp[4];
> +
> +        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
> +

> +        av_usleep(s->packet_gap);

Not very reliable: if the task gets unscheduled for some time, it will be
added to the gap and never catch, possibly causing playback hiccups. A
system with a kind of rate control would probably be better.

> +
> +        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +
> +        pthread_mutex_lock(&s->mutex);
> +
> +        len=av_fifo_size(s->fifo);
> +
> +        while (len<4) {
> +            if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
> +                goto end;
> +            }
> +            len=av_fifo_size(s->fifo);
> +        }
> +
> +        av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
> +        len=AV_RL32(tmp);
> +

> +        if (len>0 && av_fifo_size(s->fifo)>=len) {

len + 4

> +            av_fifo_drain(s->fifo, 4); /* skip packet length */

> +            av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */

The comment looks useless.

> +            if (s->circular_buffer_error == len) {
> +              /* all ok - reset error */
> +              s->circular_buffer_error=0;
> +            }
> +        }
> +
> +        pthread_mutex_unlock(&s->mutex);
> +    }
> +
> +end:
> +    pthread_mutex_unlock(&s->mutex);
> +    return NULL;
> +}
> +
> +
>  #endif
>  
>  static int parse_source_list(char *buf, char **sources, int *num_sources,
> @@ -650,6 +727,13 @@ static int udp_open(URLContext *h, const char *uri, int flags)
>                         "'circular_buffer_size' option was set but it is not supported "
>                         "on this build (pthread support is required)\n");
>          }
> +        if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) {
> +            s->packet_gap = strtol(buf, NULL, 10);
> +            if (!HAVE_PTHREAD_CANCEL)
> +                av_log(h, AV_LOG_WARNING,
> +                       "'packet_gap' option was set but it is not supported "
> +                       "on this build (pthread support is required)\n");
> +        }
>          if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
>              av_strlcpy(localaddr, buf, sizeof(localaddr));
>          }
> @@ -829,7 +913,18 @@ static int udp_open(URLContext *h, const char *uri, int flags)
>      s->udp_fd = udp_fd;
>  
>  #if HAVE_PTHREAD_CANCEL
> -    if (!is_output && s->circular_buffer_size) {

> +    /*
> +      Create thread in case of:
> +      1. Input and circular_buffer_size is set
> +      2. Output and packet_gap and circular_buffer_size is set
> +    */

UDP sockets can be read+write; in this case, two threads are needed.

> +
> +    if (is_output && s->packet_gap && !s->circular_buffer_size) {
> +      /* Warn user in case of 'circular_buffer_size' is not set */
> +      av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n");
> +    }
> +
> +    if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) {
>          int ret;
>  
>          /* start the task going */
> @@ -844,7 +939,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
>              av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
>              goto cond_fail;
>          }
> -        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
> +        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
>          if (ret != 0) {
>              av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
>              goto thread_fail;
> @@ -945,6 +1040,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
>      UDPContext *s = h->priv_data;
>      int ret;
>  
> +#if HAVE_PTHREAD_CANCEL
> +    if (s->fifo) {
> +        uint8_t tmp[4];
> +
> +        pthread_mutex_lock(&s->mutex);
> +
> +        /*
> +          Return error if last tx failed.
> +          Here we can't know on which packet error was, but it needs to know that error exists.
> +        */
> +        if (s->circular_buffer_error<0) {
> +            int err=s->circular_buffer_error;
> +            s->circular_buffer_error=0;
> +            pthread_mutex_unlock(&s->mutex);
> +            return err;
> +        }
> +

> +        if(av_fifo_space(s->fifo) < size + 4) {
> +            /* What about a partial packet tx ? */
> +            pthread_mutex_unlock(&s->mutex);
> +            return AVERROR(ENOMEM);
> +        }

ENOMEM means that memory allocation failed, it is not true here. This is the
point where blocking or non-blocking matters: in non-blocking mode, the
return value must be AVERROR(EAGAIN). In blocking mode, it should block
until the thread catches up.

> +        AV_WL32(tmp, size);
> +        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
> +        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
> +        pthread_cond_signal(&s->cond);
> +        pthread_mutex_unlock(&s->mutex);
> +        return size;
> +    }
> +#endif
>      if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
>          ret = ff_network_wait_fd(s->udp_fd, 1);
>          if (ret < 0)

Regards,

-- 
  Nicolas George
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: Digital signature
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20160303/79342cce/attachment.sig>


More information about the ffmpeg-devel mailing list