[FFmpeg-devel] [PATCH] avformat/srt: add Haivision Open SRT protocol

Derek Buitenhuis derek.buitenhuis at gmail.com
Tue Oct 10 18:47:45 EEST 2017


On 10/10/2017 7:29 AM, Nablet Developer wrote:
> @@ -293,6 +293,7 @@ External library support:
>    --enable-opengl          enable OpenGL rendering [no]
>    --enable-openssl         enable openssl, needed for https support
>                             if gnutls is not used [no]
> +  --enable-opensrt         enable Haivision Open SRT protoco [no]

Usually we have this in the form of --enable-libXXX for external libs. OpenSSL
and stuff doesn't follow this for reasons I do not know... maybe someone knows
if we have a real rule set or (more likely) it's just random.

>  enabled omx               && require_header OMX_Core.h
> +enabled opensrt           && require_pkg_config libsrt srt srt/srt.h srt_socket

Is there a >= version required?

> +#define _DEFAULT_SOURCE
> +#define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */

These should be passed as -D_THING to the compiler, normally, no?
> +#ifdef __APPLE__
> +#include "TargetConditionals.h"
> +#endif

What is this and why is it with "" instead of <>? Google says it is a system header,
so "" is wrong.

> +#ifndef HAVE_PTHREAD_CANCEL
> +#define HAVE_PTHREAD_CANCEL 0
> +#endif

This looks terribly wrong. Why is this needed?

> +#ifndef IPV6_ADD_MEMBERSHIP
> +#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
> +#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
> +#endif

At the very list this needs a comment to explain why.

> +#define UDP_TX_BUF_SIZE 32768
> +#define UDP_MAX_PKT_SIZE 65536
> +#define UDP_HEADER_SIZE 8

Ditto.

> +#if HAVE_PTHREAD_CANCEL
> +    pthread_t circular_buffer_thread;
> +    pthread_mutex_t mutex;
> +    pthread_cond_t cond;
> +    int thread_started;
> +#endif

Am I mistaken in thinking this code is pretty iffy when pthread_cancel is not available?

> +    uint8_t tmp[UDP_MAX_PKT_SIZE+4];

+4 ?

> +    /* SRT socket options (srt/srt.h) */
> +    int64_t maxbw;
> +    int pbkeylen;
> +    char passphrase[65];

65?

> +    int mss;
> +    int fc;
> +    int ipttl;
> +    int iptos;
> +    int64_t inputbw;
> +    int64_t oheadbw;
> +    int tsbpddelay;
> +    int tlpktdrop;
> +    int nakreport;
> +    int conntimeo;

Why do variable names suddenly change from like_this jammedtogetherlikethis?

> +#define SRT_MODE_CALLER 0
> +#define SRT_MODE_LISTENER 1
> +#define SRT_MODE_RENDEZVOUS 2

enum?

> +    { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },

What is system data size, and can it be detected?

> +    { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
> +    { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, 

... what?

> +    { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       D|E },
> +    { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = D|E },

These duplicates are not acceptable.

> +    /* SRT socket options (srt/srt.h), see srt/common/socketoptions.hpp */

Is this even a public header? Does libsrt not have real documentation?

> +    { "passphrase",     "Crypto PBKDF2 Passphrase size[0,10..64] 0:disable crypto",                    OFFSET(passphrase), AV_OPT_TYPE_STRING, { .str = NULL },              .flags = D|E },
> +    { "mss",            "the Maximum Transfer Unit",                                                   OFFSET(mss),        AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },

Doesn't tell me about what unit the option is in. Same for most of the other
options. Bytes? Seconds? Great British Pounds?

> +static int srt_neterrno(void)

I know technically (void) is needed in C, but we don't do this in the codebase, AFAIK.
> +    int err = 0;
> +    err = srt_getlasterror(NULL);

err = 0 is a dead store.

> +    if (err == SRT_EASYNCRCV)
> +        return AVERROR(EAGAIN);
> +    return err;

Does err even mean anything meaningful now? The return code
for this is mixing AVERROR codes and SRT_ error codes...

> +static int srt_network_wait_fd(int fd, int write)

Does this kind of fd passing work on Winows properly? I seem to
recall passing int fds around directly in Windows could be iffy.
Ignore this if I'm mis-remembering. 

> +{
> +    SRTSOCKET ready[2];
> +    int len = 2;
> +    int ret = -1;
> +    int eid = 0;

Dead stores.

> +static void log_net_error(void *ctx, int level, const char* prefix)
> +{
> +    av_log(ctx, level, "%s %s\n", prefix, srt_getlasterror_str());
> +}

Pointless wrapper.

> +static struct addrinfo *srt_resolve_host(URLContext *h,
> +                                         const char *hostname, int port,
> +                                         int type, int family, int flags)
> +{
> +    struct addrinfo hints = { 0 }, *res = 0;
> +    int error;
> +    char sport[16];
> +    const char *node = 0, *service = "0";

Dead stores.

> +
> +    if (port > 0) {
> +        snprintf(sport, sizeof(sport), "%d", port)
Unchecked snprintf call. Port is a user option without a limit.

> +    if ((error = getaddrinfo(node, service, &hints, &res))) {
> +        res = NULL;
> +        av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n",
> +               node ? node : "unknown",
> +               service,
> +               gai_strerror(error));
> +    }
> +
> +    return res;

We're OK with not hard failing here?

> +    struct addrinfo *res0;
> +    int addr_len;
> +
> +    res0 = srt_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
> +    if (!res0) return AVERROR(EIO);

New line.

> +    memcpy(addr, res0->ai_addr, res0->ai_addrlen);

Does the SRT API guarantee it'll fit in this struct?

> +    if (((struct sockaddr *) &s->dest_addr)->sa_family)
> +        family = ((struct sockaddr *) &s->dest_addr)->sa_family;
> +    res0 = srt_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
> +                            s->local_port,
> +                            SOCK_DGRAM, family, AI_PASSIVE);

Shouldn't all these ternary checks for user options be in a single place,
instead of all over the file? It's pretty messy.

> +    for (res = res0; res; res=res->ai_next) {
> +        srt_fd = srt_socket(res->ai_family, SOCK_DGRAM, 0);
> +        if (srt_fd != SRT_INVALID_SOCK) break;
> +        log_net_error(NULL, AV_LOG_ERROR, "socket");
> +    }

Again no hard failing?

> + fail:
> +    if (srt_fd >= 0)
> +        srt_close(srt_fd);

0 doesn't seem like it's a valid fd to close.

> +static int srt_port(struct sockaddr_storage *addr, int addr_len)
> +{
> +    char sbuf[sizeof(int)*3+1];

Erm, what?

> +    if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV))) {
> +        av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
> +        return -1;

Mixing the use of that silly wrapper and av_log. Just use av_log everywhere.

> +/**
> + * If no filename is given to av_open_input_file because you want to
> + * get the local port first, then you must call this function to set
> + * the remote server address.
> + *
> + * url syntax: srt://host:port[?option=val...]
> + * option  'localport=n' : set the local port
> + *         'pkt_size=n'  : set max packet size
> + *         'reuse=1'     : enable reusing the socket
> + *         'overrun_nonfatal=1': survive in case of circular buffer overrun
> + *
> + * @param h media file context
> + * @param uri of the remote server
> + * @return zero if no error.
> + */

Why are internal functions getting doxygen style comments? Same for other
parts.

> +int ff_srt_set_remote_url(URLContext *h, const char *uri)
> +{
> +    SRTContext *s = h->priv_data;
> +    char hostname[256], buf[10];

256 is super arbitrary, no?

> +    int port;
> +    const char *p;
> +
> +    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);

Should check if port is negative or not, as per av_url_split doc.

> +/**
> + * Return the local port used by the UDP connection
> + * @param h media file context
> + * @return the local port number
> + */
> +int ff_srt_get_local_port(URLContext *h)
> +{
> +    SRTContext *s = h->priv_data;
> +    return s->local_port;
> +}

Pointless getter function.

> +
> +/**
> + * Return the srt file handle for select() usage to wait for several RTP
> + * streams at the same time.
> + * @param h media file context
> + */
> +static int srt_get_file_handle(URLContext *h)
> +{
> +    SRTContext *s = h->priv_data;
> +    return s->srt_fd;
> +}

Ditto.

> +        int len;
> +
> +        pthread_mutex_unlock(&s->mutex);
> +        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
> +        len = srt_recvmsg(s->srt_fd, s->tmp+4, sizeof(s->tmp)-4);
> +        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
> +        pthread_mutex_lock(&s->mutex);
> +        if (len < 0) {
> +
> +            if (srt_neterrno() != AVERROR(EAGAIN) && srt_neterrno() != AVERROR(EINTR)) {

Is it OK to call srt_neterrno() multiple times like this?

> +static void *circular_buffer_task_tx(void * ctx)
> +{
> +    URLContext *h = ctx;
> +    SRTContext *s = h->priv_data;
> +    int old_cancelstate;
> +    int64_t target_timestamp = av_gettime_relative();
> +    int64_t start_timestamp = av_gettime_relative();

These can and will differ due to subsequent calls. Is that intended, or do
you want them to be the same?
> +        p = s->tmp;
> +        while (len) {
> +            int ret;
> +            av_assert0(len > 0);

Should these really be asserts?

> +{
> +    const char *source_start;
> +
> +    source_start = buf;

Can be one statement.

> +    while (1) {

while (next) ?

> +        char *next = strchr(source_start, ',');
> +        if (next)
> +            *next = '\0';
> +        sources[*num_sources] = av_strdup(source_start);
> +        if (!sources[*num_sources])
> +            return AVERROR(ENOMEM);
> +        source_start = next + 1;
> +        (*num_sources)++;
> +        if (*num_sources >= max_sources || !next)
> +            break;
> +    }
> +    return 0;
> +}

[...]

> +
> +/* - The "PRE" options must be set prior to connecting and can't be altered
> +/    on a connected socket, however if set on a listening socket, they are
> +/    derived by accept-ed socket. */
> +static int srt_set_options_pre(URLContext *h, int srt_fd)
> +{
> +    SRTContext *s = h->priv_data;

You should not be defining functions using the srt_ namespaces, considering
this is the namespace of libsrt, which you are using!

> +    int yes = 1;
> +
> +    if (s->mode == SRT_MODE_RENDEZVOUS && srt_setsockopt(srt_fd, 0, SRTO_RENDEZVOUS, &yes, sizeof(yes)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_RENDEZVOUS)");
> +        goto fail;
> +    }
> +
> +    if (s->maxbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_MAXBW, &s->maxbw, sizeof(s->maxbw)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_MAXBW)");
> +        goto fail;
> +    }
> +    if (s->pbkeylen >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_PBKEYLEN, &s->pbkeylen, sizeof(s->pbkeylen)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_PBKEYLEN)");
> +        goto fail;
> +    }
> +    if (s->passphrase[0] && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_PASSPHRASE, &s->passphrase, sizeof(s->passphrase)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_PASSPHRASE)");
> +        goto fail;
> +    }
> +    if (s->mss >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_MSS, &s->mss, sizeof(s->mss)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_MSS)");
> +        goto fail;
> +    }
> +    if (s->fc >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_FC, &s->fc, sizeof(s->fc)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_FC)");
> +        goto fail;
> +    }
> +    if (s->ipttl >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_IPTTL, &s->ipttl, sizeof(s->ipttl)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_IPTTL)");
> +        goto fail;
> +    }
> +    if (s->iptos >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_IPTOS, &s->iptos, sizeof(s->iptos)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_IPTOS)");
> +        goto fail;
> +    }
> +    if (s->tsbpddelay >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_TSBPDDELAY, &s->tsbpddelay, sizeof(s->tsbpddelay)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_TSBPDDELAY)");
> +        goto fail;
> +    }
> +    if (s->tlpktdrop >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_TLPKTDROP, &s->tlpktdrop, sizeof(s->tlpktdrop)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_TLPKTDROP)");
> +        goto fail;
> +    }
> +    if (s->nakreport >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_NAKREPORT, &s->nakreport, sizeof(s->nakreport)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_NAKREPORT)");
> +        goto fail;
> +    }
> +    if (s->conntimeo >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_CONNTIMEO, &s->conntimeo, sizeof(s->conntimeo)) < 0) {
> +        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_CONNTIMEO)");
> +        goto fail;
> +    }

Like I said before, all user options should be validated in one place. This is gross.

> +    return 0;
> +fail:
> +    return AVERROR(EIO);

Why even have a goto fail? Just return.


> +static int srt_open(URLContext *h, const char *uri, int flags)
> +{

srt_ namespace abuse. Same of others.

> +    char hostname[1024], localaddr[1024] = "";

Arbitrary limit?

> +    int port, srt_fd = SRT_INVALID_SOCK, tmp, bind_ret = -1;
> +    int listen_fd = SRT_INVALID_SOCK;
> +    SRTContext *s = h->priv_data;
> +    int is_output;
> +    const char *p;
> +    char buf[256];

buf for what? Why is it 256?

> +    struct sockaddr_storage my_addr;
> +    socklen_t len, tmplen;
> +    int i, num_include_sources = 0, num_exclude_sources = 0;
> +    char *include_sources[32], *exclude_sources[32];

Is 32 guaranteed to be the max?


> +    p = strchr(uri, '?');
> +    if (p) {
> +        if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
> +            char *endptr = NULL;
> +            s->reuse_socket = strtol(buf, &endptr, 10);
> +            /* assume if no digits were found it is a request to enable it */
> +            if (buf == endptr)
> +                s->reuse_socket = 1;

If I'm reading this right, you're setting what's supposed to be a user option?

That seems... bad. Same for otehr instances.

> +            if (!HAVE_PTHREAD_CANCEL)
> +                av_log(h, AV_LOG_WARNING,
> +                       "'overrun_nonfatal' option was set but it is not supported "
> +                       "on this build (pthread support is required)\n");

Why is this runtime checked?

> +    /* fill the dest addr */
> +    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);

Missing port < 0 check.

> +
> +    /* XXX: fix av_url_split */

Should be done before pushing, I suppose?


> +        /* set srt recv buffer size to the requested value (default 64K) */
> +        tmp = s->buffer_size;
> +        if (srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
> +            log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
> +        }

These logs mean absolutely nothing to the user. Return an error, or log a real
user-understandable sentence, not a function call name and nothing else.

Same for all other places this occurs.


> +    len = sizeof(my_addr);
> +    srt_getsockname(srt_fd, (struct sockaddr *)&my_addr, &len);

Missing error check?



> +#if HAVE_PTHREAD_CANCEL
> +    /*
> +      Create thread in case of:
> +      1. Input and circular_buffer_size is set
> +      2. Output and bitrate and circular_buffer_size is set
> +    */
> +
> +    if (is_output && s->bitrate && !s->circular_buffer_size) {
> +        /* Warn user in case of 'circular_buffer_size' is not set */
> +        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
> +    }

Fail early, fail fast, fail hard. Don't do this kind of stuff.

> +
> +    if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
> +        int ret;
> +


> +    return 0;
> +#if HAVE_PTHREAD_CANCEL
> + thread_fail:
> +    pthread_cond_destroy(&s->cond);
> + cond_fail:
> +    pthread_mutex_destroy(&s->mutex);
> +#endif

I assume the fallthrough is intended here.

> +            } else {
> +                /* FIXME: using the monotonic clock would be better,
> +                   but it does not exist on all supported platforms. */

Can we provide an internal or external API to enable its use when available?


> +#if HAVE_PTHREAD_CANCEL
> +    /* Request close once writing is finished */
> +    if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
> +        pthread_mutex_lock(&s->mutex);
> +        s->close_req = 1;
> +        pthread_cond_signal(&s->cond);
> +        pthread_mutex_unlock(&s->mutex);
> +    }
> +#endif
> +
> +#if HAVE_PTHREAD_CANCEL

Pointless endif.

[...]

Has this patch been run under valgrind, on Windows, and with stuff like ASAN, etc.?
Some stuff here would even trigger compiler warnings normally.

Also, how much of this stuff is duplicated from the udp.c and pals?

- Derek


More information about the ffmpeg-devel mailing list