[FFmpeg-devel] [PATCH] avformat/srt: add Haivision Open SRT protocol
Nablet Developer
sdk at nablet.com
Tue Oct 10 09:29:12 EEST 2017
protocol requires libsrt (https://github.com/Haivision/srt) to be
installed
Signed-off-by: Nablet Developer <sdk at nablet.com>
---
configure | 5 +
libavformat/Makefile | 1 +
libavformat/opensrt.c | 1105 +++++++++++++++++++++++++++++++++++++++++++++++
libavformat/protocols.c | 1 +
libavformat/url.h | 4 +
5 files changed, 1116 insertions(+)
create mode 100644 libavformat/opensrt.c
diff --git a/configure b/configure
index 391c141..312c632 100755
--- a/configure
+++ b/configure
@@ -293,6 +293,7 @@ External library support:
--enable-opengl enable OpenGL rendering [no]
--enable-openssl enable openssl, needed for https support
if gnutls is not used [no]
+ --enable-opensrt enable Haivision Open SRT protoco [no]
--disable-sndio disable sndio support [autodetect]
--disable-schannel disable SChannel SSP, needed for TLS support on
Windows if openssl and gnutls are not used [autodetect]
@@ -1638,6 +1639,7 @@ EXTERNAL_LIBRARY_LIST="
openal
opencl
opengl
+ opensrt
"
HWACCEL_AUTODETECT_LIBRARY_LIST="
@@ -3144,6 +3146,8 @@ libsmbclient_protocol_deps="libsmbclient gplv3"
libssh_protocol_deps="libssh"
mmsh_protocol_select="http_protocol"
mmst_protocol_select="network"
+opensrt_protocol_select="network"
+opensrt_protocol_deps="opensrt"
rtmp_protocol_conflict="librtmp_protocol"
rtmp_protocol_select="tcp_protocol"
rtmpe_protocol_select="ffrtmpcrypt_protocol"
@@ -6063,6 +6067,7 @@ enabled omx_rpi && { check_header OMX_Core.h ||
{ ! enabled cross_compile && add_cflags -isystem/opt/vc/include/IL && check_header OMX_Core.h ; } ||
die "ERROR: OpenMAX IL headers not found"; }
enabled omx && require_header OMX_Core.h
+enabled opensrt && require_pkg_config libsrt srt srt/srt.h srt_socket
enabled openssl && { use_pkg_config openssl openssl openssl/ssl.h OPENSSL_init_ssl ||
use_pkg_config openssl openssl openssl/ssl.h SSL_library_init ||
check_lib openssl openssl/ssl.h SSL_library_init -lssl -lcrypto ||
diff --git a/libavformat/Makefile b/libavformat/Makefile
index df709c29..a3cbb4e 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -593,6 +593,7 @@ OBJS-$(CONFIG_TLS_SCHANNEL_PROTOCOL) += tls_schannel.o tls.o
OBJS-$(CONFIG_TLS_SECURETRANSPORT_PROTOCOL) += tls_securetransport.o tls.o
OBJS-$(CONFIG_UDP_PROTOCOL) += udp.o
OBJS-$(CONFIG_UDPLITE_PROTOCOL) += udp.o
+OBJS-$(CONFIG_OPENSRT_PROTOCOL) += opensrt.o
OBJS-$(CONFIG_UNIX_PROTOCOL) += unix.o
# libavdevice dependencies
diff --git a/libavformat/opensrt.c b/libavformat/opensrt.c
new file mode 100644
index 0000000..6483942
--- /dev/null
+++ b/libavformat/opensrt.c
@@ -0,0 +1,1105 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file
+ * Haivision Open SRT (Secure Reliable Transport) protocol
+ */
+
+#define _DEFAULT_SOURCE
+#define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
+
+#include "avformat.h"
+#include "avio_internal.h"
+#include "libavutil/avassert.h"
+#include "libavutil/parseutils.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/log.h"
+#include "libavutil/time.h"
+#include "internal.h"
+#include "network.h"
+#include "os_support.h"
+#include "url.h"
+
+#ifdef __APPLE__
+#include "TargetConditionals.h"
+#endif
+
+#include <srt/srt.h>
+#include <srt/logging_api.h>
+
+
+#if HAVE_PTHREAD_CANCEL
+#include <pthread.h>
+#endif
+
+#ifndef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 0
+#endif
+
+#ifndef IPV6_ADD_MEMBERSHIP
+#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
+#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
+#endif
+
+#define UDP_TX_BUF_SIZE 32768
+#define UDP_MAX_PKT_SIZE 65536
+#define UDP_HEADER_SIZE 8
+
+typedef struct SRTContext {
+ const AVClass *class;
+ SRTSOCKET srt_fd;
+ int buffer_size;
+ int pkt_size;
+ int local_port;
+ int reuse_socket;
+ int overrun_nonfatal;
+ struct sockaddr_storage dest_addr;
+ int dest_addr_len;
+ int is_connected;
+
+ /* Circular Buffer variables for use in UDP receive code */
+ int circular_buffer_size;
+ AVFifoBuffer *fifo;
+ int circular_buffer_error;
+ int64_t bitrate; /* number of bits to send per second */
+ int64_t burst_bits;
+ int close_req;
+#if HAVE_PTHREAD_CANCEL
+ pthread_t circular_buffer_thread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int thread_started;
+#endif
+ uint8_t tmp[UDP_MAX_PKT_SIZE+4];
+ int remaining_in_dg;
+ char *localaddr;
+ int timeout;
+ const char *sources;
+ char *block;
+
+ /* SRT socket options (srt/srt.h) */
+ int64_t maxbw;
+ int pbkeylen;
+ char passphrase[65];
+ int mss;
+ int fc;
+ int ipttl;
+ int iptos;
+ int64_t inputbw;
+ int64_t oheadbw;
+ int tsbpddelay;
+ int tlpktdrop;
+ int nakreport;
+ int conntimeo;
+ int mode;
+} SRTContext;
+
+#define SRT_MODE_CALLER 0
+#define SRT_MODE_LISTENER 1
+#define SRT_MODE_RENDEZVOUS 2
+
+#define OFFSET(x) offsetof(SRTContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+ { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
+ { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
+ { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E },
+ { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
+ { "pkt_size", "Maximum UDP packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 1472 }, -1, INT_MAX, .flags = D|E },
+ { "reuse", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, D|E },
+ { "reuse_socket", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E },
+ { "fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+ { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D },
+ { "timeout", "set raise error timeout (only in read mode)", OFFSET(timeout), AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, D },
+ { "sources", "Source list", OFFSET(sources), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
+ { "block", "Block list", OFFSET(block), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
+
+ /* SRT socket options (srt/srt.h), see srt/common/socketoptions.hpp */
+ { "maxbw", "maximum bandwidth (bytes per second) that the connection can use", OFFSET(maxbw), AV_OPT_TYPE_INT64, { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+ { "pbkeylen", "Crypto key len in bytes {16,24,32} Default: 16 (128-bit)", OFFSET(pbkeylen), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, 32, .flags = D|E },
+ { "passphrase", "Crypto PBKDF2 Passphrase size[0,10..64] 0:disable crypto", OFFSET(passphrase), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
+ { "mss", "the Maximum Transfer Unit", OFFSET(mss), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "fc", "Flight flag size (window size)", OFFSET(fc), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "ipttl", "IP Time To Live", OFFSET(ipttl), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "iptos", "IP Type of Service", OFFSET(iptos), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "inputbw", "Estimated input stream rate", OFFSET(inputbw), AV_OPT_TYPE_INT64, { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+ { "oheadbw", "MaxBW ceiling based on % over input stream rate", OFFSET(oheadbw), AV_OPT_TYPE_INT64, { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+ { "tsbpddelay", "TsbPd receiver delay (mSec) to absorb burst of missed packet retransmission", OFFSET(tsbpddelay), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
+ { "tlpktdrop", "Enable receiver pkt drop", OFFSET(tlpktdrop), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, 1, .flags = D|E },
+ { "nakreport", "Enable receiver to send periodic NAK reports", OFFSET(nakreport), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, 1, .flags = D|E },
+ { "conntimeo", "Connect timeout in msec. Ccaller default: 3000, rendezvous (x 10)", OFFSET(conntimeo), AV_OPT_TYPE_INT64, { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+ { "mode", "connection mode (caller, listener, rendezvous)", OFFSET(mode), AV_OPT_TYPE_INT, { .i64 = SRT_MODE_CALLER }, SRT_MODE_CALLER, SRT_MODE_RENDEZVOUS, .flags = D|E },
+ { "caller", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRT_MODE_CALLER }, INT_MIN, INT_MAX, .flags = D|E },
+ { "listener", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRT_MODE_LISTENER }, INT_MIN, INT_MAX, .flags = D|E },
+ { "rendezvous", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRT_MODE_RENDEZVOUS }, INT_MIN, INT_MAX, .flags = D|E },
+
+ { NULL }
+};
+
+static const AVClass srt_class = {
+ .class_name = "srt",
+ .item_name = av_default_item_name,
+ .option = options,
+ .version = LIBAVUTIL_VERSION_INT,
+};
+
+static int srt_neterrno(void)
+{
+ int err = 0;
+ err = srt_getlasterror(NULL);
+ if (err == SRT_EASYNCRCV)
+ return AVERROR(EAGAIN);
+ return err;
+}
+
+static int srt_network_wait_fd(int fd, int write)
+{
+ SRTSOCKET ready[2];
+ int len = 2;
+ int ret = -1;
+ int eid = 0;
+ int modes = write ? SRT_EPOLL_OUT : SRT_EPOLL_IN;
+ eid = srt_epoll_create();
+ if (eid < 0)
+ return eid;
+ ret = srt_epoll_add_usock(eid, fd, &modes);
+ if (ret < 0) {
+ srt_epoll_release(eid);
+ return AVERROR(EINVAL);
+ }
+ if (write) {
+ ret = srt_epoll_wait(eid, 0, 0, ready, &len, -1, 0, 0, 0, 0);
+ } else {
+ ret = srt_epoll_wait(eid, ready, &len, 0, 0, -1, 0, 0, 0, 0);
+ }
+ srt_epoll_release(eid);
+ return (ret == SRT_ERROR) ? AVERROR(EAGAIN) : 0;
+}
+
+static int srt_socket_nonblock(int socket, int enable)
+{
+ int error = 0;
+ error = srt_setsockopt(socket, 0, SRTO_SNDSYN, &enable, sizeof(enable));
+ if (error < 0)
+ return error;
+ error = srt_setsockopt(socket, 0, SRTO_RCVSYN, &enable, sizeof(enable));
+ return error;
+}
+
+static void log_net_error(void *ctx, int level, const char* prefix)
+{
+ av_log(ctx, level, "%s %s\n", prefix, srt_getlasterror_str());
+}
+
+static struct addrinfo *srt_resolve_host(URLContext *h,
+ const char *hostname, int port,
+ int type, int family, int flags)
+{
+ struct addrinfo hints = { 0 }, *res = 0;
+ int error;
+ char sport[16];
+ const char *node = 0, *service = "0";
+
+ if (port > 0) {
+ snprintf(sport, sizeof(sport), "%d", port);
+ service = sport;
+ }
+ if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
+ node = hostname;
+ }
+ hints.ai_socktype = type;
+ hints.ai_family = family;
+ hints.ai_flags = flags;
+ if ((error = getaddrinfo(node, service, &hints, &res))) {
+ res = NULL;
+ av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n",
+ node ? node : "unknown",
+ service,
+ gai_strerror(error));
+ }
+
+ return res;
+}
+
+static int srt_set_url(URLContext *h,
+ struct sockaddr_storage *addr,
+ const char *hostname, int port)
+{
+ struct addrinfo *res0;
+ int addr_len;
+
+ res0 = srt_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
+ if (!res0) return AVERROR(EIO);
+ memcpy(addr, res0->ai_addr, res0->ai_addrlen);
+ addr_len = res0->ai_addrlen;
+ freeaddrinfo(res0);
+
+ return addr_len;
+}
+
+static int srt_socket_create(URLContext *h, struct sockaddr_storage *addr,
+ socklen_t *addr_len, const char *localaddr)
+{
+ SRTContext *s = h->priv_data;
+ int srt_fd = SRT_INVALID_SOCK;
+ struct addrinfo *res0, *res;
+ int family = AF_UNSPEC;
+
+ if (((struct sockaddr *) &s->dest_addr)->sa_family)
+ family = ((struct sockaddr *) &s->dest_addr)->sa_family;
+ res0 = srt_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
+ s->local_port,
+ SOCK_DGRAM, family, AI_PASSIVE);
+ if (!res0)
+ goto fail;
+ for (res = res0; res; res=res->ai_next) {
+ srt_fd = srt_socket(res->ai_family, SOCK_DGRAM, 0);
+ if (srt_fd != SRT_INVALID_SOCK) break;
+ log_net_error(NULL, AV_LOG_ERROR, "socket");
+ }
+
+ if (srt_fd < 0)
+ goto fail;
+
+ memcpy(addr, res->ai_addr, res->ai_addrlen);
+ *addr_len = res->ai_addrlen;
+
+ freeaddrinfo(res0);
+
+ return srt_fd;
+
+ fail:
+ if (srt_fd >= 0)
+ srt_close(srt_fd);
+ if (res0)
+ freeaddrinfo(res0);
+ return -1;
+}
+
+static int srt_port(struct sockaddr_storage *addr, int addr_len)
+{
+ char sbuf[sizeof(int)*3+1];
+ int error;
+
+ if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV))) {
+ av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
+ return -1;
+ }
+
+ return strtol(sbuf, NULL, 10);
+}
+
+
+/**
+ * If no filename is given to av_open_input_file because you want to
+ * get the local port first, then you must call this function to set
+ * the remote server address.
+ *
+ * url syntax: srt://host:port[?option=val...]
+ * option 'localport=n' : set the local port
+ * 'pkt_size=n' : set max packet size
+ * 'reuse=1' : enable reusing the socket
+ * 'overrun_nonfatal=1': survive in case of circular buffer overrun
+ *
+ * @param h media file context
+ * @param uri of the remote server
+ * @return zero if no error.
+ */
+int ff_srt_set_remote_url(URLContext *h, const char *uri)
+{
+ SRTContext *s = h->priv_data;
+ char hostname[256], buf[10];
+ int port;
+ const char *p;
+
+ av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+ /* set the destination address */
+ s->dest_addr_len = srt_set_url(h, &s->dest_addr, hostname, port);
+ if (s->dest_addr_len < 0) {
+ return AVERROR(EIO);
+ }
+ p = strchr(uri, '?');
+ if (p) {
+ if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
+ int was_connected = s->is_connected;
+ s->is_connected = strtol(buf, NULL, 10);
+ if (s->is_connected && !was_connected) {
+ if (srt_connect(s->srt_fd, (struct sockaddr *) &s->dest_addr,
+ s->dest_addr_len)) {
+ s->is_connected = 0;
+ log_net_error(h, AV_LOG_ERROR, "connect");
+ return AVERROR(EIO);
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * Return the local port used by the UDP connection
+ * @param h media file context
+ * @return the local port number
+ */
+int ff_srt_get_local_port(URLContext *h)
+{
+ SRTContext *s = h->priv_data;
+ return s->local_port;
+}
+
+/**
+ * Return the srt file handle for select() usage to wait for several RTP
+ * streams at the same time.
+ * @param h media file context
+ */
+static int srt_get_file_handle(URLContext *h)
+{
+ SRTContext *s = h->priv_data;
+ return s->srt_fd;
+}
+
+#if HAVE_PTHREAD_CANCEL
+static void *circular_buffer_task_rx(void * ctx)
+{
+ URLContext *h = ctx;
+ SRTContext *s = h->priv_data;
+ int old_cancelstate;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+ if (srt_socket_nonblock(s->srt_fd, 0) < 0) {
+ av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+ s->circular_buffer_error = AVERROR(EIO);
+ goto end;
+ }
+ while (1) {
+ int len;
+
+ pthread_mutex_unlock(&s->mutex);
+ /* Blocking operations are always cancellation points;
+ see "General Information" / "Thread Cancelation Overview"
+ in Single Unix. */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+ len = srt_recvmsg(s->srt_fd, s->tmp+4, sizeof(s->tmp)-4);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+ if (len < 0) {
+
+ if (srt_neterrno() != AVERROR(EAGAIN) && srt_neterrno() != AVERROR(EINTR)) {
+ log_net_error(NULL, AV_LOG_ERROR, "srt_recvmsg)");
+ s->circular_buffer_error = srt_neterrno();
+ goto end;
+ }
+ continue;
+ }
+ AV_WL32(s->tmp, len);
+
+ if (av_fifo_space(s->fifo) < len + 4) {
+ /* No Space left */
+ if (s->overrun_nonfatal) {
+ av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+ "Surviving due to overrun_nonfatal option\n");
+ continue;
+ } else {
+ av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+ "To avoid, increase fifo_size URL option. "
+ "To survive in such case, use overrun_nonfatal option\n");
+ s->circular_buffer_error = AVERROR(EIO);
+ goto end;
+ }
+ }
+ av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+ pthread_cond_signal(&s->cond);
+ }
+
+end:
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ return NULL;
+}
+
+static void *circular_buffer_task_tx(void * ctx)
+{
+ URLContext *h = ctx;
+ SRTContext *s = h->priv_data;
+ int old_cancelstate;
+ int64_t target_timestamp = av_gettime_relative();
+ int64_t start_timestamp = av_gettime_relative();
+ int64_t sent_bits = 0;
+ int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
+ int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+
+ if (srt_socket_nonblock(s->srt_fd, 0) < 0) {
+ av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+ s->circular_buffer_error = AVERROR(EIO);
+ goto end;
+ }
+
+ while (1) {
+ int len;
+ const uint8_t *p;
+ uint8_t tmp[4];
+ int64_t timestamp;
+
+ len=av_fifo_size(s->fifo);
+
+ while (len<4) {
+ if (s->close_req)
+ goto end;
+ if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
+ goto end;
+ }
+ len=av_fifo_size(s->fifo);
+ }
+
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+ len=AV_RL32(tmp);
+
+ av_assert0(len >= 0);
+ av_assert0(len <= sizeof(s->tmp));
+
+ av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+ pthread_mutex_unlock(&s->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+ if (s->bitrate) {
+ timestamp = av_gettime_relative();
+ if (timestamp < target_timestamp) {
+ int64_t delay = target_timestamp - timestamp;
+ if (delay > max_delay) {
+ delay = max_delay;
+ start_timestamp = timestamp + delay;
+ sent_bits = 0;
+ }
+ av_usleep(delay);
+ } else {
+ if (timestamp - burst_interval > target_timestamp) {
+ start_timestamp = timestamp - burst_interval;
+ sent_bits = 0;
+ }
+ }
+ sent_bits += len * 8;
+ target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
+ }
+
+ p = s->tmp;
+ while (len) {
+ int ret;
+ av_assert0(len > 0);
+ ret = srt_sendmsg(s->srt_fd, p, len, -1, 0);
+ if (ret >= 0) {
+ len -= ret;
+ p += ret;
+ } else {
+ ret = srt_neterrno();
+ if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+ pthread_mutex_lock(&s->mutex);
+ s->circular_buffer_error = ret;
+ pthread_mutex_unlock(&s->mutex);
+ return NULL;
+ }
+ }
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+ }
+
+end:
+ pthread_mutex_unlock(&s->mutex);
+ return NULL;
+}
+
+
+#endif
+
+static int parse_source_list(char *buf, const char **sources, int *num_sources,
+ int max_sources)
+{
+ const char *source_start;
+
+ source_start = buf;
+ while (1) {
+ char *next = strchr(source_start, ',');
+ if (next)
+ *next = '\0';
+ sources[*num_sources] = av_strdup(source_start);
+ if (!sources[*num_sources])
+ return AVERROR(ENOMEM);
+ source_start = next + 1;
+ (*num_sources)++;
+ if (*num_sources >= max_sources || !next)
+ break;
+ }
+ return 0;
+}
+
+/* - The "POST" options can be altered any time on a connected socket.
+ They MAY have also some meaning when set prior to connecting; such
+ option is SRTO_RCVSYN, which makes connect/accept call asynchronous.
+ Because of that this option is treated special way in this app. */
+static int srt_set_options_post(URLContext *h, int srt_fd)
+{
+ SRTContext *s = h->priv_data;
+
+ if (s->inputbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_INPUTBW, &s->inputbw, sizeof(s->inputbw)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_INPUTBW)");
+ goto fail;
+ }
+ if (s->oheadbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_OHEADBW, &s->oheadbw, sizeof(s->oheadbw)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_OHEADBW)");
+ goto fail;
+ }
+ return 0;
+fail:
+ return AVERROR(EIO);
+}
+
+/* - The "PRE" options must be set prior to connecting and can't be altered
+/ on a connected socket, however if set on a listening socket, they are
+/ derived by accept-ed socket. */
+static int srt_set_options_pre(URLContext *h, int srt_fd)
+{
+ SRTContext *s = h->priv_data;
+ int yes = 1;
+
+ if (s->mode == SRT_MODE_RENDEZVOUS && srt_setsockopt(srt_fd, 0, SRTO_RENDEZVOUS, &yes, sizeof(yes)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_RENDEZVOUS)");
+ goto fail;
+ }
+
+ if (s->maxbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_MAXBW, &s->maxbw, sizeof(s->maxbw)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_MAXBW)");
+ goto fail;
+ }
+ if (s->pbkeylen >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_PBKEYLEN, &s->pbkeylen, sizeof(s->pbkeylen)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_PBKEYLEN)");
+ goto fail;
+ }
+ if (s->passphrase[0] && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_PASSPHRASE, &s->passphrase, sizeof(s->passphrase)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_PASSPHRASE)");
+ goto fail;
+ }
+ if (s->mss >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_MSS, &s->mss, sizeof(s->mss)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_MSS)");
+ goto fail;
+ }
+ if (s->fc >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_FC, &s->fc, sizeof(s->fc)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_FC)");
+ goto fail;
+ }
+ if (s->ipttl >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_IPTTL, &s->ipttl, sizeof(s->ipttl)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_IPTTL)");
+ goto fail;
+ }
+ if (s->iptos >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_IPTOS, &s->iptos, sizeof(s->iptos)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_IPTOS)");
+ goto fail;
+ }
+ if (s->tsbpddelay >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_TSBPDDELAY, &s->tsbpddelay, sizeof(s->tsbpddelay)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_TSBPDDELAY)");
+ goto fail;
+ }
+ if (s->tlpktdrop >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_TLPKTDROP, &s->tlpktdrop, sizeof(s->tlpktdrop)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_TLPKTDROP)");
+ goto fail;
+ }
+ if (s->nakreport >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_NAKREPORT, &s->nakreport, sizeof(s->nakreport)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_NAKREPORT)");
+ goto fail;
+ }
+ if (s->conntimeo >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_CONNTIMEO, &s->conntimeo, sizeof(s->conntimeo)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_CONNTIMEO)");
+ goto fail;
+ }
+ return 0;
+fail:
+ return AVERROR(EIO);
+}
+
+/* put it in UDP context */
+/* return non zero if error */
+static int srt_open(URLContext *h, const char *uri, int flags)
+{
+ char hostname[1024], localaddr[1024] = "";
+ int port, srt_fd = SRT_INVALID_SOCK, tmp, bind_ret = -1;
+ int listen_fd = SRT_INVALID_SOCK;
+ SRTContext *s = h->priv_data;
+ int is_output;
+ const char *p;
+ char buf[256];
+ struct sockaddr_storage my_addr;
+ socklen_t len, tmplen;
+ int i, num_include_sources = 0, num_exclude_sources = 0;
+ char *include_sources[32], *exclude_sources[32];
+
+ h->is_streamed = 1;
+
+ is_output = !(flags & AVIO_FLAG_READ);
+ if (s->buffer_size < 0)
+ s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
+
+ if (s->sources) {
+ if (parse_source_list(s->sources, include_sources,
+ &num_include_sources,
+ FF_ARRAY_ELEMS(include_sources)))
+ goto fail;
+ }
+
+ if (s->block) {
+ if (parse_source_list(s->block, exclude_sources, &num_exclude_sources,
+ FF_ARRAY_ELEMS(exclude_sources)))
+ goto fail;
+ }
+
+ if (s->pkt_size > 0)
+ h->max_packet_size = s->pkt_size;
+
+ p = strchr(uri, '?');
+ if (p) {
+ if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
+ char *endptr = NULL;
+ s->reuse_socket = strtol(buf, &endptr, 10);
+ /* assume if no digits were found it is a request to enable it */
+ if (buf == endptr)
+ s->reuse_socket = 1;
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
+ char *endptr = NULL;
+ s->overrun_nonfatal = strtol(buf, &endptr, 10);
+ /* assume if no digits were found it is a request to enable it */
+ if (buf == endptr)
+ s->overrun_nonfatal = 1;
+ if (!HAVE_PTHREAD_CANCEL)
+ av_log(h, AV_LOG_WARNING,
+ "'overrun_nonfatal' option was set but it is not supported "
+ "on this build (pthread support is required)\n");
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
+ s->local_port = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
+ s->pkt_size = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
+ s->buffer_size = strtol(buf, NULL, 10);
+ }
+ s->is_connected = 1;
+ if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
+ s->circular_buffer_size = strtol(buf, NULL, 10);
+ if (!HAVE_PTHREAD_CANCEL)
+ av_log(h, AV_LOG_WARNING,
+ "'circular_buffer_size' option was set but it is not supported "
+ "on this build (pthread support is required)\n");
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
+ s->bitrate = strtoll(buf, NULL, 10);
+ if (!HAVE_PTHREAD_CANCEL)
+ av_log(h, AV_LOG_WARNING,
+ "'bitrate' option was set but it is not supported "
+ "on this build (pthread support is required)\n");
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
+ s->burst_bits = strtoll(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
+ av_strlcpy(localaddr, buf, sizeof(localaddr));
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
+ if (parse_source_list(buf, include_sources, &num_include_sources,
+ FF_ARRAY_ELEMS(include_sources)))
+ goto fail;
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
+ if (parse_source_list(buf, exclude_sources, &num_exclude_sources,
+ FF_ARRAY_ELEMS(exclude_sources)))
+ goto fail;
+ }
+ if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
+ s->timeout = strtol(buf, NULL, 10);
+
+ /* SRT options (srt/srt.h) */
+ if (av_find_info_tag(buf, sizeof(buf), "maxbw", p)) {
+ s->maxbw = strtoll(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "pbkeylen", p)) {
+ s->pbkeylen = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "passphrase", p)) {
+ av_strlcpy(s->passphrase, buf, sizeof(s->passphrase));
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "mss", p)) {
+ s->mss = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "fc", p)) {
+ s->fc = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "ipttl", p)) {
+ s->ipttl = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "iptos", p)) {
+ s->iptos = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "inputbw", p)) {
+ s->inputbw = strtoll(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "oheadbw", p)) {
+ s->oheadbw = strtoll(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "tsbpddelay", p)) {
+ s->tsbpddelay = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "tlpktdrop", p)) {
+ s->tlpktdrop = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "nakreport", p)) {
+ s->nakreport = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "conntimeo", p)) {
+ s->conntimeo = strtol(buf, NULL, 10);
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "mode", p)) {
+ if (!strcmp(buf, "caller")) {
+ s->mode = SRT_MODE_CALLER;
+ } else if (!strcmp(buf, "listener")) {
+ s->mode = SRT_MODE_LISTENER;
+ } else if (!strcmp(buf, "rendezvous")) {
+ s->mode = SRT_MODE_RENDEZVOUS;
+ }
+ }
+ }
+ /* handling needed to support options picking from both AVOption and URL */
+ s->circular_buffer_size *= 188;
+ if (flags & AVIO_FLAG_WRITE) {
+ h->max_packet_size = s->pkt_size;
+ } else {
+ h->max_packet_size = UDP_MAX_PKT_SIZE;
+ }
+ h->rw_timeout = s->timeout;
+
+ /* fill the dest addr */
+ av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+ /* XXX: fix av_url_split */
+ if (hostname[0] == '\0' || hostname[0] == '?') {
+ /* only accepts null hostname if input */
+ if (!(flags & AVIO_FLAG_READ))
+ goto fail;
+ } else {
+ if (ff_srt_set_remote_url(h, uri) < 0)
+ goto fail;
+ }
+
+ if ((s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
+ s->local_port = port;
+
+ if (localaddr[0]) {
+ srt_fd = srt_socket_create(h, &my_addr, &len, localaddr);
+ } else {
+ srt_fd = srt_socket_create(h, &my_addr, &len, s->localaddr);
+ }
+ if (srt_fd < 0)
+ goto fail;
+
+
+ if (srt_set_options_pre(h, srt_fd) < 0)
+ goto fail;
+
+ /* Follow the requested reuse option */
+ if (s->reuse_socket > 0) {
+ s->reuse_socket = 1;
+ if (srt_setsockopt (srt_fd, SOL_SOCKET, SRTO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)))
+ goto fail;
+ }
+ if (is_output) {
+ /* limit the tx buf size to limit latency */
+ tmp = s->buffer_size;
+ if (srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
+ goto fail;
+ }
+ } else {
+ /* set srt recv buffer size to the requested value (default 64K) */
+ tmp = s->buffer_size;
+ if (srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
+ log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
+ }
+ tmplen = sizeof(tmp);
+ if (srt_getsockopt(srt_fd, SOL_SOCKET, SRTO_RCVBUF, &tmp, &tmplen) < 0) {
+ log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
+ } else {
+ av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
+ if (tmp < s->buffer_size)
+ av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
+ }
+
+ /* make the socket non-blocking */
+ srt_socket_nonblock(srt_fd, 1);
+ }
+ /* the bind is needed to give a port to the socket now */
+ if (bind_ret < 0 && srt_bind(srt_fd,(struct sockaddr *)&my_addr, len) < 0) {
+ log_net_error(h, AV_LOG_ERROR, "bind failed");
+ goto fail;
+ }
+
+ if (s->mode == SRT_MODE_LISTENER) {
+ listen_fd = srt_fd;
+ srt_fd = SRT_INVALID_SOCK;
+
+ if (srt_listen(listen_fd, 1))
+ goto fail;
+ srt_fd = srt_accept(listen_fd, NULL, NULL);
+ if (srt_fd == SRT_INVALID_SOCK)
+ goto fail;
+ }
+
+ len = sizeof(my_addr);
+ srt_getsockname(srt_fd, (struct sockaddr *)&my_addr, &len);
+ s->local_port = srt_port(&my_addr, len);
+
+ if (s->mode != SRT_MODE_LISTENER) {
+ if (srt_connect(srt_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
+ log_net_error(h, AV_LOG_ERROR, "connect");
+ goto fail;
+ }
+ }
+ if (srt_set_options_post(h, srt_fd) < 0)
+ goto fail;
+
+ for (i = 0; i < num_include_sources; i++)
+ av_freep(&include_sources[i]);
+ for (i = 0; i < num_exclude_sources; i++)
+ av_freep(&exclude_sources[i]);
+
+ s->srt_fd = srt_fd;
+
+#if HAVE_PTHREAD_CANCEL
+ /*
+ Create thread in case of:
+ 1. Input and circular_buffer_size is set
+ 2. Output and bitrate and circular_buffer_size is set
+ */
+
+ if (is_output && s->bitrate && !s->circular_buffer_size) {
+ /* Warn user in case of 'circular_buffer_size' is not set */
+ av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
+ }
+
+ if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
+ int ret;
+
+ /* start the task going */
+ s->fifo = av_fifo_alloc(s->circular_buffer_size);
+ ret = pthread_mutex_init(&s->mutex, NULL);
+ if (ret) {
+ av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+ goto fail;
+ }
+ ret = pthread_cond_init(&s->cond, NULL);
+ if (ret) {
+ av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+ goto cond_fail;
+ }
+ ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
+ if (ret) {
+ av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+ goto thread_fail;
+ }
+ s->thread_started = 1;
+ }
+#endif
+
+ return 0;
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+ pthread_cond_destroy(&s->cond);
+ cond_fail:
+ pthread_mutex_destroy(&s->mutex);
+#endif
+ fail:
+ if (srt_fd >= 0)
+ srt_close(srt_fd);
+ av_fifo_freep(&s->fifo);
+ for (i = 0; i < num_include_sources; i++)
+ av_freep(&include_sources[i]);
+ for (i = 0; i < num_exclude_sources; i++)
+ av_freep(&exclude_sources[i]);
+ return AVERROR(EIO);
+}
+
+static int srt_read(URLContext *h, uint8_t *buf, int size)
+{
+ SRTContext *s = h->priv_data;
+ int ret;
+#if HAVE_PTHREAD_CANCEL
+ int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
+
+ if (s->fifo) {
+ pthread_mutex_lock(&s->mutex);
+ do {
+ avail = av_fifo_size(s->fifo);
+ if (avail) {
+ uint8_t tmp[4];
+
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+ avail= AV_RL32(tmp);
+ if (avail > size){
+ av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
+ avail= size;
+ }
+
+ av_fifo_generic_read(s->fifo, buf, avail, NULL);
+ av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
+ pthread_mutex_unlock(&s->mutex);
+ return avail;
+ } else if (s->circular_buffer_error){
+ int err = s->circular_buffer_error;
+ pthread_mutex_unlock(&s->mutex);
+ return err;
+ } else if (nonblock) {
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(EAGAIN);
+ } else {
+ /* FIXME: using the monotonic clock would be better,
+ but it does not exist on all supported platforms. */
+ int64_t t = av_gettime() + 100000;
+ struct timespec tv = { .tv_sec = t / 1000000,
+ .tv_nsec = (t % 1000000) * 1000 };
+ if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
+ }
+ nonblock = 1;
+ }
+ } while (1);
+ }
+#endif
+
+ if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+ ret = srt_network_wait_fd(s->srt_fd, 0);
+ if (ret < 0)
+ return ret;
+ }
+ ret = srt_recvmsg(s->srt_fd, buf, size);
+
+ return ret < 0 ? srt_neterrno() : ret;
+}
+
+static int srt_write(URLContext *h, const uint8_t *buf, int size)
+{
+ SRTContext *s = h->priv_data;
+ int ret;
+
+#if HAVE_PTHREAD_CANCEL
+ if (s->fifo) {
+ uint8_t tmp[4];
+
+ pthread_mutex_lock(&s->mutex);
+
+ /*
+ Return error if last tx failed.
+ Here we can't know on which packet error was, but it needs to know that error exists.
+ */
+ if (s->circular_buffer_error<0) {
+ int err=s->circular_buffer_error;
+ pthread_mutex_unlock(&s->mutex);
+ return err;
+ }
+
+ if (av_fifo_space(s->fifo) < size + 4) {
+ /* What about a partial packet tx ? */
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(ENOMEM);
+ }
+ AV_WL32(tmp, size);
+ av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+ av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ return size;
+ }
+#endif
+ if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+ ret = srt_network_wait_fd(s->srt_fd, 1);
+ if (ret < 0)
+ return ret;
+ }
+
+ ret = srt_sendmsg(s->srt_fd, buf, size, -1, 0);
+
+ return ret < 0 ? srt_neterrno() : ret;
+}
+
+/* use different name to avoid conflict with srt_close from srt/srt.h */
+static int do_srt_close(URLContext *h)
+{
+ SRTContext *s = h->priv_data;
+
+#if HAVE_PTHREAD_CANCEL
+ /* Request close once writing is finished */
+ if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
+ pthread_mutex_lock(&s->mutex);
+ s->close_req = 1;
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ }
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+ if (s->thread_started) {
+ int ret;
+ /* Cancel only read, as write has been signaled as success to the user */
+ if (h->flags & AVIO_FLAG_READ)
+ pthread_cancel(s->circular_buffer_thread);
+ ret = pthread_join(s->circular_buffer_thread, NULL);
+ if (ret)
+ av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
+ pthread_mutex_destroy(&s->mutex);
+ pthread_cond_destroy(&s->cond);
+ }
+#endif
+ srt_close(s->srt_fd);
+ av_fifo_freep(&s->fifo);
+ return 0;
+}
+
+const URLProtocol ff_opensrt_protocol = {
+ .name = "srt",
+ .url_open = srt_open,
+ .url_read = srt_read,
+ .url_write = srt_write,
+ .url_close = do_srt_close,
+ .url_get_file_handle = srt_get_file_handle,
+ .priv_data_size = sizeof(SRTContext),
+ .priv_data_class = &srt_class,
+ .flags = URL_PROTOCOL_FLAG_NETWORK,
+};
+
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index 8d3555e..f8f65cf 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -62,6 +62,7 @@ extern const URLProtocol ff_tls_securetransport_protocol;
extern const URLProtocol ff_tls_openssl_protocol;
extern const URLProtocol ff_udp_protocol;
extern const URLProtocol ff_udplite_protocol;
+extern const URLProtocol ff_opensrt_protocol;
extern const URLProtocol ff_unix_protocol;
extern const URLProtocol ff_librtmp_protocol;
extern const URLProtocol ff_librtmpe_protocol;
diff --git a/libavformat/url.h b/libavformat/url.h
index 4750bff..d6e034e 100644
--- a/libavformat/url.h
+++ b/libavformat/url.h
@@ -279,6 +279,10 @@ int ff_check_interrupt(AVIOInterruptCB *cb);
int ff_udp_set_remote_url(URLContext *h, const char *uri);
int ff_udp_get_local_port(URLContext *h);
+/* srt.c */
+int ff_srt_set_remote_url(URLContext *h, const char *uri);
+int ff_srt_get_local_port(URLContext *h);
+
/**
* Assemble a URL string from components. This is the reverse operation
* of av_url_split.
--
2.7.4
More information about the ffmpeg-devel
mailing list