[FFmpeg-devel] [PATCH v3] libavformat: Add ZeroMQ as a protocol option

Marton Balint cus at passwd.hu
Sat Aug 24 20:33:09 EEST 2019



On Fri, 23 Aug 2019, Andriy Gelman wrote:

> On Mon, 19. Aug 17:28, Andriy Gelman wrote:
>> Minor changes in v3: 
>> 1. Removed tab character from as per feedback 
>> 2. Removed unused timeout variable from ZMQContext 
>> 
>> Andriy
>
>> From 66c11c12fcfa8a7fbb3c8c09d23c017992229a99 Mon Sep 17 00:00:00 2001
>> From: Andriy Gelman <andriy.gelman at gmail.com>
>> Date: Tue, 30 Jul 2019 14:39:32 -0400
>> Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option
>> 
>> Currently multiple clients are only supported by using a multicast
>> destination address. An alternative is to stream to a server which
>> re-distributes the content. This commit adds ZeroMQ as a protocol
>> option, which allows multiple clients to connect to a single ffmpeg
>> instance.
>> ---
>>  configure               |   2 +
>>  doc/general.texi        |   1 +
>>  doc/protocols.texi      |  32 ++++++++
>>  libavformat/Makefile    |   1 +
>>  libavformat/libzmq.c    | 158 ++++++++++++++++++++++++++++++++++++++++
>>  libavformat/protocols.c |   1 +
>>  6 files changed, 195 insertions(+)
>>  create mode 100644 libavformat/libzmq.c

Missing changelog entry and libavformat minor version bump.

>> 
>> diff --git a/configure b/configure
>> index c09c842809..a4134024c2 100755
>> --- a/configure
>> +++ b/configure
>> @@ -3411,6 +3411,8 @@ libsrt_protocol_deps="libsrt"
>>  libsrt_protocol_select="network"
>>  libssh_protocol_deps="libssh"
>>  libtls_conflict="openssl gnutls mbedtls"
>> +libzmq_protocol_deps="libzmq"
>> +libzmq_protocol_select="network"

You may want to enforce a minimum version requirement for libzmq in 
the pkg_config part of configure depending on the API you use.

>>
>>  # filters
>>  afftdn_filter_deps="avcodec"
>> diff --git a/doc/general.texi b/doc/general.texi
>> index 3c0c803449..b8e063268c 100644
>> --- a/doc/general.texi
>> +++ b/doc/general.texi
>> @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point support).
>>  @item TCP          @tab X
>>  @item TLS          @tab X
>>  @item UDP          @tab X
>> + at item ZMQ          @tab E
>>  @end multitable
>>
>>  @code{X} means that the protocol is supported.
>> diff --git a/doc/protocols.texi b/doc/protocols.texi
>> index 3e4e7af3d4..174eaacd0f 100644
>> --- a/doc/protocols.texi
>> +++ b/doc/protocols.texi
>> @@ -1728,4 +1728,36 @@ Timeout in ms.
>>  Create the Unix socket in listening mode.
>>  @end table
>> 
>> + at section libzmq
>> +
>> +ZeroMQ asynchronous messaging library.
>> +
>> +This library supports unicast streaming to multiple clients without relying on
>> +an external server.
>> +
>> +The required syntax for streaming or connecting to a stream is:
>> + at example
>> +zmq:tcp://ip-address:port
>> + at end example
>> +
>> +Example:
>> +Create a localhost stream on port 5555:
>> + at example
>> +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555
>> + at end example
>> +
>> +Multiple clients may connect to the stream using:
>> + at example
>> +ffplay zmq:tcp://127.0.0.1:5555
>> + at end example
>> +
>> +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern.
>> +The server binds to a port and publishes data. Clients connect to the
>> +server (IP address/port) and subscribe to the stream. The order in which
>> +the server and client start generally does not matter.
>> +
>> +ffmpeg must be compiled with the --enable-libzmq option to support
>> +this protocol option. See the compilation guide @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu}
>> +for an example on how this option may be set.

I think I'd rather not reference the compilation guide, as there are no 
specific instructions there to compile with libzmq. If you insist, then at 
least loose the "Ubuntu" part from the link.

>> +
>>  @c man end PROTOCOLS
>> diff --git a/libavformat/Makefile b/libavformat/Makefile
>> index a434b005a4..efa3a112ae 100644
>> --- a/libavformat/Makefile
>> +++ b/libavformat/Makefile
>> @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL)        += librtmp.o
>>  OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL)     += libsmbclient.o
>>  OBJS-$(CONFIG_LIBSRT_PROTOCOL)           += libsrt.o
>>  OBJS-$(CONFIG_LIBSSH_PROTOCOL)           += libssh.o
>> +OBJS-$(CONFIG_LIBZMQ_PROTOCOL)           += libzmq.o
>>
>>  # libavdevice dependencies
>>  OBJS-$(CONFIG_IEC61883_INDEV)            += dv.o
>> diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c
>> new file mode 100644
>> index 0000000000..ac35c01cf8
>> --- /dev/null
>> +++ b/libavformat/libzmq.c
>> @@ -0,0 +1,158 @@
>> +/*
>> + * ZMQ URLProtocol
>> + *
>> + * This file is part of FFmpeg.
>> + *
>> + * FFmpeg is free software; you can redistribute it and/or
>> + * modify it under the terms of the GNU Lesser General Public
>> + * License as published by the Free Software Foundation; either
>> + * version 2.1 of the License, or (at your option) any later version.
>> + *
>> + * FFmpeg is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>> + * Lesser General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU Lesser General Public
>> + * License along with FFmpeg; if not, write to the Free Software
>> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
>> + */
>> +
>> +#include <zmq.h>
>> +#include "url.h"
>> +#include "network.h"
>> +#include "libavutil/avstring.h"
>> +#include "libavutil/opt.h"
>> +
>> +typedef struct ZMQContext {
>> +    const AVClass *class;
>> +    void *context;
>> +    void *socket;
>> +} ZMQContext;
>> +
>> +static const AVOption options[] = {
>> +    { NULL }
>> +};
>> +
>> +static int ff_zmq_open(URLContext *h, const char *uri, int flags)
>> +{
>> +    int ret;
>> +    ZMQContext *s   = h->priv_data;
>> +    s->context      = zmq_ctx_new();

You should check if the context creation was successful.

>> +    h->is_streamed  = 1;
>> +
>> +    av_strstart(uri, "zmq:", &uri);
>> +
>> +    /*publish during write*/
>> +    if (h->flags & AVIO_FLAG_WRITE) {
>> +        s->socket = zmq_socket(s->context, ZMQ_PUB);
>> +        if (!s->socket) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));

zmq_errno() instead of errno? Same goes for all similar cases.

>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +
>> +        ret = zmq_bind(s->socket, uri);
>> +        if (ret == -1) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", zmq_strerror(errno));
>> +            zmq_close(s->socket);
>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +    }
>> +
>> +    /*subscribe for read*/
>> +    if (h->flags & AVIO_FLAG_READ) {
>> +        s->socket = zmq_socket(s->context, ZMQ_SUB);
>> +        if (!s->socket) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", zmq_strerror(errno));
>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +
>> +        zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
>> +        ret = zmq_connect(s->socket, uri);
>> +        if (ret == -1) {
>> +            av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", zmq_strerror(errno));
>> +            zmq_close(s->socket);
>> +            zmq_ctx_destroy(s->context);
>> +            return AVERROR_EXTERNAL;
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +
>> +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size)
>> +{
>> +    int ret;
>> +    ZMQContext *s = h->priv_data;
>> +
>> +    ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT);

I can see that you are using non-blocking mode. A polling with timeout 
approach is preferred, see how tcp or unix does it.

>> +    if (ret >= 0)
>> +        return ret; /*number of sent bytes*/
>> +
>> +    /*errno = EAGAIN if messages cannot be pushed*/
>> +    if (ret == -1 && errno == EAGAIN) {
>> +        return AVERROR(EAGAIN);
>> +    } else
>> +        return AVERROR_EXTERNAL;
>> +}
>> +
>> +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size)
>> +{
>> +    int ret;
>> +    ZMQContext *s = h->priv_data;
>> +    zmq_msg_t msg;
>> +    int msg_size;
>> +
>> +    ret = zmq_msg_init(&msg);
>> +    if (ret == -1) {
>> +      av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", zmq_strerror(errno));
>> +      return AVERROR_EXTERNAL;
>> +    }
>> +
>> +    ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT);

Same here, a polling with timeout is preferred.

>> +    if (ret == -1) {
>> +        ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL;
>> +        if (ret == AVERROR_EXTERNAL)
>> +          av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): %s\n", zmq_strerror(errno));

identation

>> +        goto finish;
>> +    }
>> +
>> +    msg_size = zmq_msg_size(&msg);
>> +    if (msg_size > size) {
>> +        msg_size = size;
>> +        av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in the buffer. Message will be truncated\n");

Probably a user settable pkt_size option would be useful which sets the 
URLContext max_packet_size which basically controls the size of the 
allocated IO buffer.

>> +    }
>> +    memcpy(buf, zmq_msg_data(&msg), msg_size);

If you are truncating anyway then please use zmq_recv directly, so you can 
avoid the memcpy.

>> +
>> +finish:
>> +    zmq_msg_close(&msg);
>> +    return ret;
>> +}
>> +
>> +static int ff_zmq_close(URLContext *h)
>> +{
>> +    ZMQContext *s = h->priv_data;
>> +    zmq_close(s->socket);
>> +    zmq_ctx_destroy(s->context);
>> +    return 0;
>> +}
>> +
>> +static const AVClass zmq_context_class = {
>> +    .class_name = "zmq",
>> +    .item_name  = av_default_item_name,
>> +    .option     = options,
>> +    .version    = LIBAVUTIL_VERSION_INT,
>> +};
>> +
>> +const URLProtocol ff_libzmq_protocol = {
>> +    .name            = "zmq",
>> +    .url_close       = ff_zmq_close,
>> +    .url_open        = ff_zmq_open,
>> +    .url_read        = ff_zmq_read,
>> +    .url_write       = ff_zmq_write,
>> +    .priv_data_size  = sizeof(ZMQContext),
>> +    .priv_data_class = &zmq_context_class,
>> +    .flags           = URL_PROTOCOL_FLAG_NETWORK,
>> +};
>> diff --git a/libavformat/protocols.c b/libavformat/protocols.c
>> index ad95659795..face5b29b5 100644
>> --- a/libavformat/protocols.c
>> +++ b/libavformat/protocols.c
>> @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol;
>>  extern const URLProtocol ff_libsrt_protocol;
>>  extern const URLProtocol ff_libssh_protocol;
>>  extern const URLProtocol ff_libsmbclient_protocol;
>> +extern const URLProtocol ff_libzmq_protocol;
>>
>>  #include "libavformat/protocol_list.c"
>> 
>> -- 
>> 2.22.0
>> 
>
> ping

Regards,
Marton


More information about the ffmpeg-devel mailing list