[FFmpeg-cvslog] network: Add RFC 8305 style "Happy Eyeballs"/"Fast Fallback" helper function

Martin Storsjö git at videolan.org
Tue Sep 11 19:56:50 EEST 2018


ffmpeg | branch: master | Martin Storsjö <martin at martin.st> | Fri Aug 10 10:25:59 2018 +0300| [9b4c3f5aadf54ffd2a6e15746b1fd736379883c4] | committer: Martin Storsjö

network: Add RFC 8305 style "Happy Eyeballs"/"Fast Fallback" helper function

For cases with dual stack (IPv4 + IPv6) connectivity, but where one
stack potentially is less reliable, strive to trying to connect over
both protocols in parallel, using whichever address connected first.

In cases with a hostname resolving to multiple IPv4 and IPv6
addresses, the current connection mechanism would try all addresses
in the order returned by getaddrinfo (with all IPv6 addresses ordered
before the IPv4 addresses normally). If connection attempts to the
IPv6 addresses return quickly with an error, this was no problem, but
if they were unsuccessful leading up to timeouts, the connection process
would have to wait for timeouts on all IPv6 target addresses before
attempting any IPv4 address.

Similar to what RFC 8305 suggests, reorder the list of addresses to
try connecting to, interleaving address families. After starting one
connection attempt, start another one in parallel after a small delay
(200 ms as suggested by the RFC).

For cases with unreliable IPv6 but reliable IPv4, this should make
connection attempts work as reliably as with plain IPv4, with only an
extra 200 ms of connection delay.

Signed-off-by: Martin Storsjö <martin at martin.st>

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=9b4c3f5aadf54ffd2a6e15746b1fd736379883c4
---

 libavformat/network.c | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++
 libavformat/network.h |  28 +++++++
 2 files changed, 254 insertions(+)

diff --git a/libavformat/network.c b/libavformat/network.c
index 24fcf20539..2d281539c6 100644
--- a/libavformat/network.c
+++ b/libavformat/network.c
@@ -23,7 +23,9 @@
 #include "tls.h"
 #include "url.h"
 #include "libavcodec/internal.h"
+#include "libavutil/avassert.h"
 #include "libavutil/mem.h"
+#include "libavutil/time.h"
 
 void ff_tls_init(void)
 {
@@ -240,6 +242,230 @@ int ff_listen_connect(int fd, const struct sockaddr *addr,
     return ret;
 }
 
+static void interleave_addrinfo(struct addrinfo *base)
+{
+    struct addrinfo **next = &base->ai_next;
+    while (*next) {
+        struct addrinfo *cur = *next;
+        // Iterate forward until we find an entry of a different family.
+        if (cur->ai_family == base->ai_family) {
+            next = &cur->ai_next;
+            continue;
+        }
+        if (cur == base->ai_next) {
+            // If the first one following base is of a different family, just
+            // move base forward one step and continue.
+            base = cur;
+            next = &base->ai_next;
+            continue;
+        }
+        // Unchain cur from the rest of the list from its current spot.
+        *next = cur->ai_next;
+        // Hook in cur directly after base.
+        cur->ai_next = base->ai_next;
+        base->ai_next = cur;
+        // Restart with a new base. We know that before moving the cur element,
+        // everything between the previous base and cur had the same family,
+        // different from cur->ai_family. Therefore, we can keep next pointing
+        // where it was, and continue from there with base at the one after
+        // cur.
+        base = cur->ai_next;
+    }
+}
+
+static void print_address_list(void *ctx, const struct addrinfo *addr,
+                               const char *title)
+{
+    char hostbuf[100], portbuf[20];
+    av_log(ctx, AV_LOG_DEBUG, "%s:\n", title);
+    while (addr) {
+        getnameinfo(addr->ai_addr, addr->ai_addrlen,
+                    hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf),
+                    NI_NUMERICHOST | NI_NUMERICSERV);
+        av_log(ctx, AV_LOG_DEBUG, "Address %s port %s\n", hostbuf, portbuf);
+        addr = addr->ai_next;
+    }
+}
+
+struct ConnectionAttempt {
+    int fd;
+    int64_t deadline_us;
+    struct addrinfo *addr;
+};
+
+// Returns < 0 on error, 0 on successfully started connection attempt,
+// > 0 for a connection that succeeded already.
+static int start_connect_attempt(struct ConnectionAttempt *attempt,
+                                 struct addrinfo **ptr, int timeout_ms,
+                                 URLContext *h,
+                                 void (*customize_fd)(void *, int), void *customize_ctx)
+{
+    struct addrinfo *ai = *ptr;
+    int ret;
+
+    *ptr = ai->ai_next;
+
+    attempt->fd = ff_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+    if (attempt->fd < 0)
+        return ff_neterrno();
+    attempt->deadline_us = av_gettime_relative() + timeout_ms * 1000;
+    attempt->addr = ai;
+
+    ff_socket_nonblock(attempt->fd, 1);
+
+    if (customize_fd)
+        customize_fd(customize_ctx, attempt->fd);
+
+    while ((ret = connect(attempt->fd, ai->ai_addr, ai->ai_addrlen))) {
+        ret = ff_neterrno();
+        switch (ret) {
+        case AVERROR(EINTR):
+            if (ff_check_interrupt(&h->interrupt_callback)) {
+                closesocket(attempt->fd);
+                attempt->fd = -1;
+                return AVERROR_EXIT;
+            }
+            continue;
+        case AVERROR(EINPROGRESS):
+        case AVERROR(EAGAIN):
+            return 0;
+        default:
+            closesocket(attempt->fd);
+            attempt->fd = -1;
+            return ret;
+        }
+    }
+    return 1;
+}
+
+// Try a new connection to another address after 200 ms, as suggested in
+// RFC 8305 (or sooner if an earlier attempt fails).
+#define NEXT_ATTEMPT_DELAY_MS 200
+
+int ff_connect_parallel(struct addrinfo *addrs, int timeout_ms_per_address,
+                        int parallel, URLContext *h, int *fd,
+                        void (*customize_fd)(void *, int), void *customize_ctx)
+{
+    struct ConnectionAttempt attempts[3];
+    struct pollfd pfd[3];
+    int nb_attempts = 0, i, j;
+    int64_t next_attempt_us = av_gettime_relative(), next_deadline_us;
+    int last_err = AVERROR(EIO);
+    socklen_t optlen;
+    char errbuf[100], hostbuf[100], portbuf[20];
+
+    if (parallel > FF_ARRAY_ELEMS(attempts))
+        parallel = FF_ARRAY_ELEMS(attempts);
+
+    print_address_list(h, addrs, "Original list of addresses");
+    // This mutates the list, but the head of the list is still the same
+    // element, so the caller, who owns the list, doesn't need to get
+    // an updated pointer.
+    interleave_addrinfo(addrs);
+    print_address_list(h, addrs, "Interleaved list of addresses");
+
+    while (nb_attempts > 0 || addrs) {
+        // Start a new connection attempt, if possible.
+        if (nb_attempts < parallel && addrs) {
+            getnameinfo(addrs->ai_addr, addrs->ai_addrlen,
+                        hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf),
+                        NI_NUMERICHOST | NI_NUMERICSERV);
+            av_log(h, AV_LOG_VERBOSE, "Starting connection attempt to %s port %s\n",
+                                      hostbuf, portbuf);
+            last_err = start_connect_attempt(&attempts[nb_attempts], &addrs,
+                                             timeout_ms_per_address, h,
+                                             customize_fd, customize_ctx);
+            if (last_err < 0) {
+                av_strerror(last_err, errbuf, sizeof(errbuf));
+                av_log(h, AV_LOG_VERBOSE, "Connected attempt failed: %s\n",
+                                          errbuf);
+                continue;
+            }
+            if (last_err > 0) {
+                for (i = 0; i < nb_attempts; i++)
+                    closesocket(attempts[i].fd);
+                *fd = attempts[nb_attempts].fd;
+                return 0;
+            }
+            pfd[nb_attempts].fd = attempts[nb_attempts].fd;
+            pfd[nb_attempts].events = POLLOUT;
+            next_attempt_us = av_gettime_relative() + NEXT_ATTEMPT_DELAY_MS * 1000;
+            nb_attempts++;
+        }
+
+        av_assert0(nb_attempts > 0);
+        // The connection attempts are sorted from oldest to newest, so the
+        // first one will have the earliest deadline.
+        next_deadline_us = attempts[0].deadline_us;
+        // If we can start another attempt in parallel, wait until that time.
+        if (nb_attempts < parallel && addrs)
+            next_deadline_us = FFMIN(next_deadline_us, next_attempt_us);
+        last_err = ff_poll_interrupt(pfd, nb_attempts,
+                                     (next_deadline_us - av_gettime_relative())/1000,
+                                     &h->interrupt_callback);
+        if (last_err < 0 && last_err != AVERROR(ETIMEDOUT))
+            break;
+
+        // Check the status from the poll output.
+        for (i = 0; i < nb_attempts; i++) {
+            last_err = 0;
+            if (pfd[i].revents) {
+                // Some sort of action for this socket, check its status (either
+                // a successful connection or an error).
+                optlen = sizeof(last_err);
+                if (getsockopt(attempts[i].fd, SOL_SOCKET, SO_ERROR, &last_err, &optlen))
+                    last_err = ff_neterrno();
+                else if (last_err != 0)
+                    last_err = AVERROR(last_err);
+                if (last_err == 0) {
+                    // Everything is ok, we seem to have a successful
+                    // connection. Close other sockets and return this one.
+                    for (j = 0; j < nb_attempts; j++)
+                        if (j != i)
+                            closesocket(attempts[j].fd);
+                    *fd = attempts[i].fd;
+                    getnameinfo(attempts[i].addr->ai_addr, attempts[i].addr->ai_addrlen,
+                                hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf),
+                                NI_NUMERICHOST | NI_NUMERICSERV);
+                    av_log(h, AV_LOG_VERBOSE, "Successfully connected to %s port %s\n",
+                                              hostbuf, portbuf);
+                    return 0;
+                }
+            }
+            if (attempts[i].deadline_us < av_gettime_relative() && !last_err)
+                last_err = AVERROR(ETIMEDOUT);
+            if (!last_err)
+                continue;
+            // Error (or timeout) for this socket; close the socket and remove
+            // it from the attempts/pfd arrays, to let a new attempt start
+            // directly.
+            getnameinfo(attempts[i].addr->ai_addr, attempts[i].addr->ai_addrlen,
+                        hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf),
+                        NI_NUMERICHOST | NI_NUMERICSERV);
+            av_strerror(last_err, errbuf, sizeof(errbuf));
+            av_log(h, AV_LOG_VERBOSE, "Connection attempt to %s port %s "
+                                      "failed: %s\n", hostbuf, portbuf, errbuf);
+            closesocket(attempts[i].fd);
+            memmove(&attempts[i], &attempts[i + 1],
+                    (nb_attempts - i - 1) * sizeof(*attempts));
+            memmove(&pfd[i], &pfd[i + 1],
+                    (nb_attempts - i - 1) * sizeof(*pfd));
+            i--;
+            nb_attempts--;
+        }
+    }
+    for (i = 0; i < nb_attempts; i++)
+        closesocket(attempts[i].fd);
+    if (last_err >= 0)
+        last_err = AVERROR(ECONNREFUSED);
+    if (last_err != AVERROR_EXIT) {
+        av_strerror(last_err, errbuf, sizeof(errbuf));
+        av_log(h, AV_LOG_ERROR, "Connection to %s failed: %s\n",
+               h->filename, errbuf);
+    }
+    return last_err;
+}
+
 static int match_host_pattern(const char *pattern, const char *hostname)
 {
     int len_p, len_h;
diff --git a/libavformat/network.h b/libavformat/network.h
index 09cee58a5c..f4cb59e5ff 100644
--- a/libavformat/network.h
+++ b/libavformat/network.h
@@ -256,4 +256,32 @@ int ff_http_match_no_proxy(const char *no_proxy, const char *hostname);
 
 int ff_socket(int domain, int type, int protocol);
 
+/**
+ * Connect to any of the given addrinfo addresses, with multiple attempts
+ * running in parallel.
+ *
+ * @param addrs    The list of addresses to try to connect to.
+ *                 This list will be mutated internally, but the list head
+ *                 will remain as such, so this doesn't affect the caller
+ *                 freeing the list afterwards.
+ * @param timeout_ms_per_address The number of milliseconds to wait for each
+ *                 connection attempt. Since multiple addresses are tried,
+ *                 some of them in parallel, the total run time will at most
+ *                 be timeout_ms_per_address*ceil(nb_addrs/parallel) +
+ *                 (parallel - 1) * NEXT_ATTEMPT_DELAY_MS.
+ * @param parallel The maximum number of connections to attempt in parallel.
+ *                 This is limited to an internal maximum capacity.
+ * @param h        URLContext providing interrupt check
+ *                 callback and logging context.
+ * @param fd       If successful, the connected socket is returned here.
+ * @param customize_fd Function that will be called for each socket created,
+ *                 to allow the caller to set socket options before calling
+ *                 connect() on it, may be NULL.
+ * @param customize_ctx Context parameter passed to customize_fd.
+ * @return         0 on success, AVERROR on failure.
+ */
+int ff_connect_parallel(struct addrinfo *addrs, int timeout_ms_per_address,
+                        int parallel, URLContext *h, int *fd,
+                        void (*customize_fd)(void *, int), void *customize_ctx);
+
 #endif /* AVFORMAT_NETWORK_H */



More information about the ffmpeg-cvslog mailing list