[FFmpeg-cvslog] avformat: Add ZeroMQ as a protocol

Andriy Gelman git at videolan.org
Tue Sep 3 00:11:57 EEST 2019


ffmpeg | branch: master | Andriy Gelman <andriy.gelman at gmail.com> | Tue Jul 30 14:39:32 2019 -0400| [ef43a4d6b38de941dd2ede0711d4fd5d811127ed] | committer: Marton Balint

avformat: Add ZeroMQ as a protocol

When ffmpeg was streaming, multiple clients were only supported by using a
multicast destination address. An alternative was to stream to a server which
re-distributes the content. This commit adds ZeroMQ as a protocol, which allows
multiple clients to connect to a single ffmpeg instance.

Signed-off-by: Marton Balint <cus at passwd.hu>

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=ef43a4d6b38de941dd2ede0711d4fd5d811127ed
---

 Changelog               |   1 +
 configure               |   4 +-
 doc/general.texi        |   1 +
 doc/protocols.texi      |  47 ++++++++++++
 libavformat/Makefile    |   1 +
 libavformat/libzmq.c    | 199 ++++++++++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 libavformat/version.h   |   2 +-
 8 files changed, 254 insertions(+), 2 deletions(-)

diff --git a/Changelog b/Changelog
index 20e42964da..4b29e015a0 100644
--- a/Changelog
+++ b/Changelog
@@ -8,6 +8,7 @@ version <next>:
 - support for TrueHD in mp4
 - Supoort AMD AMF encoder on Linux (via Vulkan)
 - IMM5 video decoder
+- ZeroMQ protocol
 
 
 version 4.2:
diff --git a/configure b/configure
index 9d70375036..4c77e1cab1 100755
--- a/configure
+++ b/configure
@@ -3415,6 +3415,8 @@ libsrt_protocol_deps="libsrt"
 libsrt_protocol_select="network"
 libssh_protocol_deps="libssh"
 libtls_conflict="openssl gnutls mbedtls"
+libzmq_protocol_deps="libzmq"
+libzmq_protocol_select="network"
 
 # filters
 afftdn_filter_deps="avcodec"
@@ -6325,7 +6327,7 @@ enabled libxavs           && require libxavs "stdint.h xavs.h" xavs_encoder_enco
 enabled libxavs2          && require_pkg_config libxavs2 "xavs2 >= 1.3.0" "stdint.h xavs2.h" xavs2_api_get
 enabled libxvid           && require libxvid xvid.h xvid_global -lxvidcore
 enabled libzimg           && require_pkg_config libzimg "zimg >= 2.7.0" zimg.h zimg_get_api_version
-enabled libzmq            && require_pkg_config libzmq libzmq zmq.h zmq_ctx_new
+enabled libzmq            && require_pkg_config libzmq "libzmq >= 4.2.1" zmq.h zmq_ctx_new
 enabled libzvbi           && require_pkg_config libzvbi zvbi-0.2 libzvbi.h vbi_decoder_new &&
                              { test_cpp_condition libzvbi.h "VBI_VERSION_MAJOR > 0 || VBI_VERSION_MINOR > 2 || VBI_VERSION_MINOR == 2 && VBI_VERSION_MICRO >= 28" ||
                                enabled gpl || die "ERROR: libzvbi requires version 0.2.28 or --enable-gpl."; }
diff --git a/doc/general.texi b/doc/general.texi
index d0c3525e02..2744c238cf 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1339,6 +1339,7 @@ performance on systems without hardware floating point support).
 @item TCP          @tab X
 @item TLS          @tab X
 @item UDP          @tab X
+ at item ZMQ          @tab E
 @end multitable
 
 @code{X} means that the protocol is supported.
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 3e4e7af3d4..b03432e3e5 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -1728,4 +1728,51 @@ Timeout in ms.
 Create the Unix socket in listening mode.
 @end table
 
+ at section zmq
+
+ZeroMQ asynchronous messaging using the libzmq library.
+
+This library supports unicast streaming to multiple clients without relying on
+an external server.
+
+The required syntax for streaming or connecting to a stream is:
+ at example
+zmq:tcp://ip-address:port
+ at end example
+
+Example:
+Create a localhost stream on port 5555:
+ at example
+ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
+ at end example
+
+Multiple clients may connect to the stream using:
+ at example
+ffplay zmq:tcp://127.0.0.1:5555
+ at end example
+
+Streaming to multiple clients is implemented using a ZeroMQ Pub-Sub pattern.
+The server side binds to a port and publishes data. Clients connect to the
+server (via IP address/port) and subscribe to the stream. The order in which
+the server and client start generally does not matter.
+
+ffmpeg must be compiled with the --enable-libzmq option to support
+this protocol.
+
+Options can be set on the @command{ffmpeg}/@command{ffplay} command
+line. The following options are supported:
+
+ at table @option
+
+ at item pkt_size
+Forces the maximum packet size for sending/receiving data. The default value is
+32,768 bytes. On the server side, this sets the maximum size of sent packets
+via ZeroMQ. On the clients, it sets an internal buffer size for receiving
+packets. Note that pkt_size on the clients should be equal to or greater than
+pkt_size on the server. Otherwise the received message may be truncated causing
+decoding errors.
+
+ at end table
+
+
 @c man end PROTOCOLS
diff --git a/libavformat/Makefile b/libavformat/Makefile
index a434b005a4..efa3a112ae 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
 OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
 OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
 OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
+OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
 
 # libavdevice dependencies
 OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
new file mode 100644
index 0000000000..d86488293f
--- /dev/null
+++ b/libavformat/libzmq.c
@@ -0,0 +1,199 @@
+/*
+ * ZeroMQ Protocol
+ * Copyright (c) 2019 Andriy Gelman
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <zmq.h>
+#include "url.h"
+#include "network.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+
+#define ZMQ_STRERROR zmq_strerror(zmq_errno())
+
+typedef struct ZMQContext {
+    const AVClass *class;
+    void *context;
+    void *socket;
+    int   pkt_size;
+    int   pkt_size_overflow; /*keep track of the largest packet during overflow*/
+} ZMQContext;
+
+#define OFFSET(x) offsetof(ZMQContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 32768 }, -1, INT_MAX, .flags = D | E },
+    { NULL }
+};
+
+static int zmq_proto_wait(URLContext *h, void *socket, int write)
+{
+    int ret;
+    int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
+    zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
+    ret = zmq_poll(&items, 1, POLLING_TIME);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+    return items.revents & ev ? 0 : AVERROR(EAGAIN);
+}
+
+static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
+{
+    int ret;
+    int64_t wait_start = 0;
+
+    while (1) {
+        if (ff_check_interrupt(int_cb))
+            return AVERROR_EXIT;
+        ret = zmq_proto_wait(h, socket, write);
+        if (ret != AVERROR(EAGAIN))
+            return ret;
+        if (timeout > 0) {
+            if (!wait_start)
+                wait_start = av_gettime_relative();
+            else if (av_gettime_relative() - wait_start > timeout)
+                return AVERROR(ETIMEDOUT);
+        }
+    }
+}
+
+static int zmq_proto_open(URLContext *h, const char *uri, int flags)
+{
+    int ret;
+    ZMQContext *s        = h->priv_data;
+    s->pkt_size_overflow = 0;
+    h->is_streamed       = 1;
+
+    if (s->pkt_size > 0)
+        h->max_packet_size = s->pkt_size;
+
+    s->context = zmq_ctx_new();
+    if (!s->context) {
+        /*errno not set on failure during zmq_ctx_new()*/
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    av_strstart(uri, "zmq:", &uri);
+
+    /*publish during write*/
+    if (h->flags & AVIO_FLAG_WRITE) {
+        s->socket = zmq_socket(s->context, ZMQ_PUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
+            zmq_ctx_term(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        ret = zmq_bind(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
+            zmq_close(s->socket);
+            zmq_ctx_term(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+
+    /*subscribe for read*/
+    if (h->flags & AVIO_FLAG_READ) {
+        s->socket = zmq_socket(s->context, ZMQ_SUB);
+        if (!s->socket) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
+            zmq_ctx_term(s->context);
+            return AVERROR_EXTERNAL;
+        }
+
+        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
+        ret = zmq_connect(s->socket, uri);
+        if (ret == -1) {
+            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
+            zmq_close(s->socket);
+            zmq_ctx_term(s->context);
+            return AVERROR_EXTERNAL;
+        }
+    }
+    return 0;
+}
+
+static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    ret = zmq_proto_wait_timeout(h, s->socket, 1, h->rw_timeout, &h->interrupt_callback);
+    if (ret)
+        return ret;
+    ret = zmq_send(s->socket, buf, size, 0);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+    return ret; /*number of bytes sent*/
+}
+
+static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
+{
+    int ret;
+    ZMQContext *s = h->priv_data;
+
+    ret = zmq_proto_wait_timeout(h, s->socket, 0, h->rw_timeout, &h->interrupt_callback);
+    if (ret)
+        return ret;
+    ret = zmq_recv(s->socket, buf, size, 0);
+    if (ret == -1) {
+        av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
+        return AVERROR_EXTERNAL;
+    }
+    if (ret > size) {
+        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret);
+        av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
+        ret = size;
+    }
+    return ret; /*number of bytes read*/
+}
+
+static int zmq_proto_close(URLContext *h)
+{
+    ZMQContext *s = h->priv_data;
+    zmq_close(s->socket);
+    zmq_ctx_term(s->context);
+    return 0;
+}
+
+static const AVClass zmq_context_class = {
+    .class_name = "zmq",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libzmq_protocol = {
+    .name            = "zmq",
+    .url_close       = zmq_proto_close,
+    .url_open        = zmq_proto_open,
+    .url_read        = zmq_proto_read,
+    .url_write       = zmq_proto_write,
+    .priv_data_size  = sizeof(ZMQContext),
+    .priv_data_class = &zmq_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index ad95659795..face5b29b5 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
 extern const URLProtocol ff_libsrt_protocol;
 extern const URLProtocol ff_libssh_protocol;
 extern const URLProtocol ff_libsmbclient_protocol;
+extern const URLProtocol ff_libzmq_protocol;
 
 #include "libavformat/protocol_list.c"
 
diff --git a/libavformat/version.h b/libavformat/version.h
index af0db1e970..edfa73fb97 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,7 +32,7 @@
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with Chromium)
 // Also please add any ticket numbers that you believe might be affected here
 #define LIBAVFORMAT_VERSION_MAJOR  58
-#define LIBAVFORMAT_VERSION_MINOR  31
+#define LIBAVFORMAT_VERSION_MINOR  32
 #define LIBAVFORMAT_VERSION_MICRO 104
 
 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \



More information about the ffmpeg-cvslog mailing list