[FFmpeg-devel] [PATCH 2/2] avformat/udp: Replace use of pthread_cancel.

Hendrik Leppkes h.leppkes at gmail.com
Sat Dec 3 16:22:36 EET 2016


On Sat, Dec 3, 2016 at 2:13 PM, Matt Oliver <protogonoi at gmail.com> wrote:
> On 3 December 2016 at 23:40, Hendrik Leppkes <h.leppkes at gmail.com> wrote:
>
>> On Sat, Dec 3, 2016 at 1:33 PM, Matt Oliver <protogonoi at gmail.com> wrote:
>> >
>> > I havent fully tested Hendriks idea as the msdn docs dont recommend
>> calling
>> > multiple winsock functions at the same time from different threads so i
>> > wasnt sure about how well received a patch that relies on closesocket to
>> > unblock the recv function would be received (although i have seen it
>> > extensively used without issuers). If Hendrik has tested this though with
>> > his local patch without issues then I would accept that as a solution to
>> > fixing my issue. ping Hendrik!
>>
>> I don't really use UDP streaming on a regular basis, but I did test
>> this approach when I build it, and it works just fine.
>> What I did was basically just define pthread_cancel/setcancelstate to
>> empty in udp.c (and define HAVE_PTHREAD_CANCEL in that file to enable
>> the code) and re-shuffle the udp_close code to close the socket to
>> unblock the thread before waiting for it to exit.
>>
>> I don't have a clean and finished patch to go, because I had no real
>> interest in it working on other platforms, so its rather ugly. But the
>> approach is rather simple.
>
>
> Would something like the following work for you:
>
> ---
>  libavformat/udp.c | 46 +++++++++++++++++++++++++++++-----------------
>  1 file changed, 29 insertions(+), 17 deletions(-)
>
> diff --git a/libavformat/udp.c b/libavformat/udp.c
> index 3835f98..a30918b 100644
> --- a/libavformat/udp.c
> +++ b/libavformat/udp.c
> @@ -60,14 +60,26 @@
>  #define IPPROTO_UDPLITE                                  136
>  #endif
>
> -#if HAVE_PTHREAD_CANCEL
> -#include <pthread.h>
> -#endif
> -
>  #ifndef HAVE_PTHREAD_CANCEL
>  #define HAVE_PTHREAD_CANCEL 0
>  #endif
>
> +#if HAVE_PTHREAD_CANCEL || (HAVE_THREADS && HAVE_WINSOCK2_H)
> +#define THREAD_RXRW 1
> +#else
> +#define THREAD_RXRW 0
> +#endif
> +
> +#if !HAVE_PTHREAD_CANCEL && (HAVE_THREADS && HAVE_WINSOCK2_H)
> +/* Winsock2 recv function can be unblocked by shutting down and closing
> the socket */
> +#define pthread_setcancelstate(x, y)
> +#define pthread_cancel {shutdown(s->udp_fd, SD_BOTH);
> closesocket(s->udp_fd);}
> +#endif
> +

Directly defining pthread_cancel to this seems risky, as someone might
introduce such a call at another place at some point without
realizing.
I would probably feel better to directly see that code where its meant to go.

AFAIK you should only call closesocket without shutdown, though.

> +#if THREAD_RXRW
> +#include "libavutil/thread.h"
> +#endif
> +
>  #ifndef IPV6_ADD_MEMBERSHIP
>  #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
>  #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
> @@ -100,7 +112,7 @@ typedef struct UDPContext {
>      int64_t bitrate; /* number of bits to send per second */
>      int64_t burst_bits;
>      int close_req;
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>      pthread_t circular_buffer_thread;
>      pthread_mutex_t mutex;
>      pthread_cond_t cond;
> @@ -495,7 +507,7 @@ static int udp_get_file_handle(URLContext *h)
>      return s->udp_fd;
>  }
>
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>  static void *circular_buffer_task_rx( void *_URLContext)
>  {
>      URLContext *h = _URLContext;
> @@ -730,7 +742,7 @@ static int udp_open(URLContext *h, const char *uri, int
> flags)
>              /* assume if no digits were found it is a request to enable it
> */
>              if (buf == endptr)
>                  s->overrun_nonfatal = 1;
> -            if (!HAVE_PTHREAD_CANCEL)
> +            if (!THREAD_RXRW)
>                  av_log(h, AV_LOG_WARNING,
>                         "'overrun_nonfatal' option was set but it is not
> supported "
>                         "on this build (pthread support is required)\n");
> @@ -758,14 +770,14 @@ static int udp_open(URLContext *h, const char *uri,
> int flags)
>          }
>          if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
>              s->circular_buffer_size = strtol(buf, NULL, 10);
> -            if (!HAVE_PTHREAD_CANCEL)
> +            if (!THREAD_RXRW)
>                  av_log(h, AV_LOG_WARNING,
>                         "'circular_buffer_size' option was set but it is
> not supported "
>                         "on this build (pthread support is required)\n");
>          }
>          if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
>              s->bitrate = strtoll(buf, NULL, 10);
> -            if (!HAVE_PTHREAD_CANCEL)
> +            if (!THREAD_RXRW)
>                  av_log(h, AV_LOG_WARNING,
>                         "'bitrate' option was set but it is not supported "
>                         "on this build (pthread support is required)\n");
> @@ -951,7 +963,7 @@ static int udp_open(URLContext *h, const char *uri, int
> flags)
>
>      s->udp_fd = udp_fd;
>
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>      /*
>        Create thread in case of:
>        1. Input and circular_buffer_size is set
> @@ -988,7 +1000,7 @@ static int udp_open(URLContext *h, const char *uri,
> int flags)
>  #endif
>
>      return 0;
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>   thread_fail:
>      pthread_cond_destroy(&s->cond);
>   cond_fail:
> @@ -1019,7 +1031,7 @@ static int udp_read(URLContext *h, uint8_t *buf, int
> size)
>  {
>      UDPContext *s = h->priv_data;
>      int ret;
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>      int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
>
>      if (s->fifo) {
> @@ -1054,9 +1066,9 @@ static int udp_read(URLContext *h, uint8_t *buf, int
> size)
>                  int64_t t = av_gettime() + 100000;
>                  struct timespec tv = { .tv_sec  =  t / 1000000,
>                                         .tv_nsec = (t % 1000000) * 1000 };
> -                if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
> +                if (ret = pthread_cond_timedwait(&s->cond, &s->mutex, &tv)
> < 0) {
>                      pthread_mutex_unlock(&s->mutex);
> -                    return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
> +                    return AVERROR(ret == ETIMEDOUT ? EAGAIN : ret);
>                  }
>                  nonblock = 1;
>              }
> @@ -1079,7 +1091,7 @@ static int udp_write(URLContext *h, const uint8_t
> *buf, int size)
>      UDPContext *s = h->priv_data;
>      int ret;
>
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>      if (s->fifo) {
>          uint8_t tmp[4];
>
> @@ -1128,7 +1140,7 @@ static int udp_close(URLContext *h)
>  {
>      UDPContext *s = h->priv_data;
>
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>      // Request close once writing is finished
>      if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
>          pthread_mutex_lock(&s->mutex);
> @@ -1140,7 +1152,7 @@ static int udp_close(URLContext *h)
>
>      if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
>          udp_leave_multicast_group(s->udp_fd, (struct sockaddr
> *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
> -#if HAVE_PTHREAD_CANCEL
> +#if THREAD_RXRW
>      if (s->thread_started) {
>          int ret;
>          // Cancel only read, as write has been signaled as success to the
> user
> --
>
> Ill split the patch so that pthread_cond_timedwait is separate and name it
> properly if everyone is happy with it first.

In general I agree with Nicolas, split the ifdef renames and the win32
compat into two patches, that way its much clearer whats actually
going on in the patch.

- hendrik


More information about the ffmpeg-devel mailing list