[FFmpeg-devel] [PATCH v3] libavformat: Add ZeroMQ as a protocol option

Andriy Gelman andriy.gelman at gmail.com
Thu Aug 29 00:42:15 EEST 2019


Marton, 
Thanks for reviewing this patch.

On Sat, 24. Aug 19:33, Marton Balint wrote:
> 
> 
> On Fri, 23 Aug 2019, Andriy Gelman wrote:
> 
> > On Mon, 19. Aug 17:28, Andriy Gelman wrote:
> > > Minor changes in v3: 1. Removed tab character from as per feedback
> > > 2. Removed unused timeout variable from ZMQContext
> > > 
> > > Andriy
> > 
> > > From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
> > > From: Andriy Gelman <andriy.gelman at gmail.com>
> > > Date: Tue, 30 Jul 2019 14:39:32 -0400
> > > Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option
> > > 
> > > Currently multiple clients are only supported by using a multicast
> > > destination address. An alternative is to stream to a server which
> > > re-distributes the content. This commit adds ZeroMQ as a protocol
> > > option, which allows multiple clients to connect to a single ffmpeg
> > > instance.
> > > ---
> > >  configure               |   2 +
> > >  doc/general.texi        |   1 +
> > >  doc/protocols.texi      |  32 ++++++++
> > >  libavformat/Makefile    |   1 +
> > >  libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
> > >  libavformat/protocols.c |   1 +
> > >  6 files changed, 195 insertions(+)
> > >  create mode 100644 libavformat/libzmq.c
> 
> Missing changelog entry and libavformat minor version bump.

ok, will update.

> 
> > > 
> > > diff --git a/configure b/configure
> > > index c09c842809..a4134024c2 100755
> > > --- a/configure
> > > +++ b/configure
> > > @@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt"
> > >  libsrt_protocol_select="network"
> > >  libssh_protocol_deps="libssh"
> > >  libtls_conflict="openssl gnutls mbedtls"
> > > +libzmq_protocol_deps="libzmq"
> > > +libzmq_protocol_select="network"
> 
> You may want to enforce a minimum version requirement for libzmq in the
> pkg_config part of configure depending on the API you use.

ok

> 
> > > 
> > >  # filters
> > >  afftdn_filter_deps="avcodec"
> > > diff --git a/doc/general.texi b/doc/general.texi
> > > index 3c0c803449..b8e063268c 100644
> > > --- a/doc/general.texi
> > > +++ b/doc/general.texi
> > > @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
> > >  @item TCP          @tab X
> > >  @item TLS          @tab X
> > >  @item UDP          @tab X
> > > + at item ZMQ          @tab E
> > >  @end multitable
> > > 
> > >  @code{X} means that the protocol is supported.
> > > diff --git a/doc/protocols.texi b/doc/protocols.texi
> > > index 3e4e7af3d4..174eaacd0f 100644
> > > --- a/doc/protocols.texi
> > > +++ b/doc/protocols.texi
> > > @@ -1728,4 +1728,36 @@ Timeout in ms.
> > >  Create the Unix socket in listening mode.
> > >  @end table
> > > 
> > > + at section libzmq
> > > +
> > > +ZeroMQ asynchronous messaging library.
> > > +
> > > +This library supports unicast streaming to multiple clients without relying on
> > > +an external server.
> > > +
> > > +The required syntax for streaming or connecting to a stream is:
> > > + at example
> > > +zmq:tcp://ip-address:port
> > > + at end example
> > > +
> > > +Example:
> > > +Create a localhost stream on port 5555:
> > > + at example
> > > +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
> > > + at end example
> > > +
> > > +Multiple clients may connect to the stream using:
> > > + at example
> > > +ffplay zmq:tcp://127.0.0.1:5555
> > > + at end example
> > > +
> > > +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
> > > +The server binds to a port and publishes data. Clients connect to the
> > > +server (IP address/port) and subscribe to the stream. The order in which
> > > +the server and client start generally does not matter.
> > > +
> > > +ffmpeg must be compiled with the --enable-libzmq option to support
> > > +this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
> > > +for an example on how this option may be set.
> 
> I think I'd rather not reference the compilation guide, as there are no
> specific instructions there to compile with libzmq. If you insist, then at
> least loose the "Ubuntu" part from the link.

I'll remove the reference to the compilation guide.

> 
> > > +
> > >  @c man end PROTOCOLS
> > > diff --git a/libavformat/Makefile b/libavformat/Makefile
> > > index a434b005a4..efa3a112ae 100644
> > > --- a/libavformat/Makefile
> > > +++ b/libavformat/Makefile
> > > @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
> > >  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
> > >  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
> > >  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
> > > +OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
> > > 
> > >  # libavdevice dependencies
> > >  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
> > > diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
> > > new file mode 100644
> > > index 0000000000..ac35c01cf8
> > > --- /dev/null
> > > +++ b/libavformat/libzmq.c
> > > @@ -0,0 +1,158 @@
> > > +/*
> > > + * ZMQ URLProtocol
> > > + *
> > > + * This file is part of FFmpeg.
> > > + *
> > > + * FFmpeg is free software; you can redistribute it and/or
> > > + * modify it under the terms of the GNU Lesser General Public
> > > + * License as published by the Free Software Foundation; either
> > > + * version 2.1 of the License, or (at your option) any later version.
> > > + *
> > > + * FFmpeg is distributed in the hope that it will be useful,
> > > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > > + * Lesser General Public License for more details.
> > > + *
> > > + * You should have received a copy of the GNU Lesser General Public
> > > + * License along with FFmpeg; if not, write to the Free Software
> > > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> > > + */
> > > +
> > > +#include <zmq.h>
> > > +#include "url.h"
> > > +#include "network.h"
> > > +#include "libavutil/avstring.h"
> > > +#include "libavutil/opt.h"
> > > +
> > > +typedef struct ZMQContext {
> > > +    const AVClass *class;
> > > +    void *context;
> > > +    void *socket;
> > > +} ZMQContext;
> > > +
> > > +static const AVOption options[] = {
> > > +    { NULL }
> > > +};
> > > +
> > > +static int ff_zmq_open(URLContext *h, const char *uri, int flags)
> > > +{
> > > +    int ret;
> > > +    ZMQContext *s   = h->priv_data;
> > > +    s->context      = zmq_ctx_new();
> 
> You should check if the context creation was successful.

thanks, I'll add the check.

> 
> > > +    h->is_streamed  = 1;
> > > +
> > > +    av_strstart(uri, "zmq:", &uri);
> > > +
> > > +    /*publish during write*/
> > > +    if (h->flags & AVIO_FLAG_WRITE) {
> > > +        s->socket = zmq_socket(s->context, ZMQ_PUB);
> > > +        if (!s->socket) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> 
> zmq_errno() instead of errno? Same goes for all similar cases.

The documentation says to use zmq_errno() on non-POSIX systems. 
But also suggests:
"Users not experiencing issues with retrieving the correct value of errno should
not use this function and should instead access the errno variable directly."

On IRC J_Darnley pointed out that ffmpeg.c includes errno.h without any checks.
It seems reasonable to assume that errno is available.

> 
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +
> > > +        ret = zmq_bind(s->socket, uri);
> > > +        if (ret == -1) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
> > > +            zmq_close(s->socket);
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +    }
> > > +
> > > +    /*subscribe for read*/
> > > +    if (h->flags & AVIO_FLAG_READ) {
> > > +        s->socket = zmq_socket(s->context, ZMQ_SUB);
> > > +        if (!s->socket) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +
> > > +        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
> > > +        ret = zmq_connect(s->socket, uri);
> > > +        if (ret == -1) {
> > > +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
> > > +            zmq_close(s->socket);
> > > +            zmq_ctx_destroy(s->context);
> > > +            return AVERROR_EXTERNAL;
> > > +        }
> > > +    }
> > > +    return 0;
> > > +}
> > > +
> > > +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
> > > +{
> > > +    int ret;
> > > +    ZMQContext *s = h->priv_data;
> > > +
> > > +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);
> 
> I can see that you are using non-blocking mode. A polling with timeout
> approach is preferred, see how tcp or unix does it.

I used polling in the initial patch, but I switched to non-blocking because I
thought it was a cleaner solution. I'll revert to polling with a timeout in the
next version.

> 
> > > +    if (ret >= 0)
> > > +        return ret; /*number of sent bytes*/
> > > +
> > > +    /*errno = EAGAIN if messages cannot be pushed*/
> > > +    if (ret == -1 && errno == EAGAIN) {
> > > +        return AVERROR(EAGAIN);
> > > +    } else
> > > +        return AVERROR_EXTERNAL;
> > > +}
> > > +
> > > +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
> > > +{
> > > +    int ret;
> > > +    ZMQContext *s = h->priv_data;
> > > +    zmq_msg_t msg;
> > > +    int msg_size;
> > > +
> > > +    ret = zmq_msg_init(&msg);
> > > +    if (ret == -1) {
> > > +      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
> > > +      return AVERROR_EXTERNAL;
> > > +    }
> > > +
> > > +    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);
> 
> Same here, a polling with timeout is preferred.
> 
> > > +    if (ret == -1) {
> > > +        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
> > > +        if (ret == AVERROR_EXTERNAL)
> > > +          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));
> 
> identation
> 
> > > +        goto finish;
> > > +    }
> > > +
> > > +    msg_size = zmq_msg_size(&msg);
> > > +    if (msg_size > size) {
> > > +        msg_size = size;
> > > +        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");
> 
> Probably a user settable pkt_size option would be useful which sets the
> URLContext max_packet_size which basically controls the size of the
> allocated IO buffer.
> 
> > > +    }
> > > +    memcpy(buf, zmq_msg_data(&msg), msg_size);
> 
> If you are truncating anyway then please use zmq_recv directly, so you can
> avoid the memcpy.

yes, good idea on setting pkt_size and avoiding memcpy.

> 
> > > +
> > > +finish:
> > > +    zmq_msg_close(&msg);
> > > +    return ret;
> > > +}
> > > +
> > > +static int ff_zmq_close(URLContext *h)
> > > +{
> > > +    ZMQContext *s = h->priv_data;
> > > +    zmq_close(s->socket);
> > > +    zmq_ctx_destroy(s->context);
> > > +    return 0;
> > > +}
> > > +
> > > +static const AVClass zmq_context_class = {
> > > +    .class_name = "zmq",
> > > +    .item_name  = av_default_item_name,
> > > +    .option     = options,
> > > +    .version    = LIBAVUTIL_VERSION_INT,
> > > +};
> > > +
> > > +const URLProtocol ff_libzmq_protocol = {
> > > +    .name            = "zmq",
> > > +    .url_close       = ff_zmq_close,
> > > +    .url_open        = ff_zmq_open,
> > > +    .url_read        = ff_zmq_read,
> > > +    .url_write       = ff_zmq_write,
> > > +    .priv_data_size  = sizeof(ZMQContext),
> > > +    .priv_data_class = &zmq_context_class,
> > > +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
> > > +};
> > > diff --git a/libavformat/protocols.c b/libavformat/protocols.c
> > > index ad95659795..face5b29b5 100644
> > > --- a/libavformat/protocols.c
> > > +++ b/libavformat/protocols.c
> > > @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
> > >  extern const URLProtocol ff_libsrt_protocol;
> > >  extern const URLProtocol ff_libssh_protocol;
> > >  extern const URLProtocol ff_libsmbclient_protocol;
> > > +extern const URLProtocol ff_libzmq_protocol;
> > > 
> > >  #include "libavformat/protocol_list.c"
> > > 
> > > -- 
> > > 2.22.0
> > > 
> > 
> > ping
> 
> Regards,
> Marton

Thanks, 
Andriy


More information about the ffmpeg-devel mailing list