[FFmpeg-devel] [PATCH] zmq filters

Clément Bœsch ubitux at gmail.com
Mon May 13 00:08:24 CEST 2013


On Sat, May 11, 2013 at 02:46:24PM +0200, Stefano Sabatini wrote:
> On date Friday 2013-05-10 12:04:37 +0200, Stefano Sabatini encoded:
> > On date Wednesday 2013-04-24 17:35:51 +0200, Nicolas George encoded:
> > > Le tridi 3 floréal, an CCXXI, Stefano Sabatini a écrit :
> > > > I can't honestly say, but at least it seems deployed in Debuntu, and
> > > > the library itself looks cool and slick. It has a simple and flexible
> > > > API, with lots of high level bindings which makes it particularly
> > > > suited for scripting. See in attachment a C client application I'm
> > > > using to test the filter (which I can eventually clean and put in
> > > > tools in the same commit).
> > > 
> > > I am afraid that a library that is too obscure will only bitrot.
> > > 
> > > > I opted for 3.2 because I was following the official tutorial and the
> > > > previous API seems a bit more complex. I can switch back to the 2.x
> > > > (incompatible) API.
> > > 
> > > If the 2.x series is really incompatible, then I believe you were right to
> > > choose the most recent and simplest one.
> > > 
> > > >     zmq_connect(requester, "tcp://localhost:5555");
> > > 
> > > Is there any access control possible? If not, that limits the usability a
> > > lot.
> > 
> > Updated.
> > 
> > Still missing: message API?
> > 
> > Also check the communication "protocol", please suggest if you have
> > better ideas (but we could break-extend it later, or add support to
> > versioning).
> > 
> > The zmqshell.c patch depends on the av_dynarray_alloc_elem() patch. I
> > don't know if we should drop the C client in favor of the python
> > version (which would need to be extended, e.g. to make the bind
> > address configurable).
> 
> Updated with message API. I'm assuming that the client will always
> send a single message containing all the command, in order to avoid
> more complications.
> 
> I also added a zmqsend.c tool, which can be used in scripts, and in
> general seems more useful than zmqshell.c, especially considering that
> zmqshell is already implemented in a Python script. Thus I think I'll
> just drop zmqshell.c in favor of zmqsend.c, and let Clement improve
> his zmqshell script.
> -- 
> FFmpeg = Foolish Friendly Martial Powerful Epic Gadget

> From ffe93b24c533a316ff767091684aaa80dd8ad55c Mon Sep 17 00:00:00 2001
> From: Stefano Sabatini <stefasab at gmail.com>
> Date: Sun, 21 Apr 2013 15:00:11 +0200
> Subject: [PATCH] lavfi: add zmq filters
> 
> TODO: add ChangeLog entry, bump minor
> ---
>  configure                |    7 ++
>  doc/filters.texi         |   41 +++++++
>  libavfilter/Makefile     |    2 +
>  libavfilter/allfilters.c |    2 +
>  libavfilter/f_zmq.c      |  280 ++++++++++++++++++++++++++++++++++++++++++++++
>  5 files changed, 332 insertions(+)
>  create mode 100644 libavfilter/f_zmq.c
> 
> diff --git a/configure b/configure
> index 291fb2d..66e3688 100755
> --- a/configure
> +++ b/configure
> @@ -236,6 +236,7 @@ External library support:
>    --enable-libxavs         enable AVS encoding via xavs [no]
>    --enable-libxvid         enable Xvid encoding via xvidcore,
>                             native MPEG-4/Xvid encoder exists [no]
> +  --enable-libzmq          enable message passing via libzmq [no]
>    --enable-openal          enable OpenAL 1.1 capture support [no]
>    --enable-opencl          enable OpenCL code
>    --enable-openssl         enable openssl [no]
> @@ -1195,6 +1196,7 @@ EXTERNAL_LIBRARY_LIST="
>      libx264
>      libxavs
>      libxvid
> +    libzmq
>      openal
>      opencl
>      openssl
> @@ -2122,6 +2124,7 @@ aresample_filter_deps="swresample"
>  ass_filter_deps="libass"
>  asyncts_filter_deps="avresample"
>  atempo_filter_deps="avcodec rdft"
> +azmq_filter_deps="libzmq"
>  blackframe_filter_deps="gpl"
>  boxblur_filter_deps="gpl"
>  colormatrix_filter_deps="gpl"
> @@ -2166,6 +2169,7 @@ yadif_filter_deps="gpl"
>  pixfmts_super2xsai_test_deps="super2xsai_filter"
>  tinterlace_merge_test_deps="tinterlace_filter"
>  tinterlace_pad_test_deps="tinterlace_filter"
> +zmq_filter_deps="libzmq"
>  
>  # libraries
>  avcodec_deps="avutil"
> @@ -4051,6 +4055,9 @@ enabled libx264    && require  libx264 x264.h x264_encoder_encode -lx264 &&
>                          die "ERROR: libx264 must be installed and version must be >= 0.118."; }
>  enabled libxavs    && require  libxavs xavs.h xavs_encoder_encode -lxavs
>  enabled libxvid    && require  libxvid xvid.h xvid_global -lxvidcore
> +enabled libzmq     && require  libzmq zmq.h zmq_ctx_new -lzmq &&

require_pkgconfig libzmq possibly

> +                      { check_cpp_condition zmq.h "ZMQ_VERSION_MAJOR >= 3 && ZMQ_VERSION_MINOR >= 2" ||
> +                        die "ERROR: libzmq must be installed and version must be >= 3.2."; }

nit: trailing '.' in version

>  enabled openal     && { { for al_libs in "${OPENAL_LIBS}" "-lopenal" "-lOpenAL32"; do
>                          check_lib 'AL/al.h' alGetError "${al_libs}" && break; done } ||
>                          die "ERROR: openal not found"; } &&
> diff --git a/doc/filters.texi b/doc/filters.texi
> index fadae1b..950721f 100644
> --- a/doc/filters.texi
> +++ b/doc/filters.texi
> @@ -8287,6 +8287,47 @@ ffmpeg -i INPUT -filter_complex asplit=5 OUTPUT
>  @end example
>  @end itemize
>  
> + at section zmq, azmq
> +
> +Receive commands sent through a libzmq client, and forward them to
> +filters in the filtergraph.
> +

> + at code{zmq} must be inserted between two video filters, @code{azmq}
> +between two audio filters.
> +

So they are pass-through filters, with one input and one output?

> +To enable this filters you need to install the libzmq library and
> +headers and configure FFmpeg with @code{--enable-libzmq}.
> +
> +For more information about libzmq see:
> + at url{http://www.zeromq.org/}
> +
> +The @code{zmq} and @code{azmq} filters work as a libzmq server, which
> +receives messages sent through a network interface defined by the
> + at option{bind_address} option.
> +
> +The received message must be in the form:
> + at example

> + at var{TARGET} @var{COMMAND} @var{ARG}

possibly better:
  @var{TARGET} @var{COMMAND} [@var{ARG}]

> + at end example
> +
> + at var{TARGET} specifies the target of the command, usually the name of
> +the filter class or a specific filter instance name.
> +
> + at var{COMMAND} specifies the name of the command for the target filter.
> +
> + at var{ARG} is optional and specifies the optional list of argument for

arguments

> +the given @var{COMMAND}.
> +

An example would be welcome (typically one addressing escaping/quoting
concern for instance).

> +Upon reception, the message is processed and the corresponding command
> +is injected into the filtergraph. Depending on the result, the filter
> +will send a reply to the client, adopting the format:
> + at example
> + at var{ERROR_CODE} @var{ERROR_REASON}
> + at var{MESSAGE}
> + at end example
> +
> + at var{MESSAGE} is optional.
> +
>  @c man end MULTIMEDIA FILTERS
>  
>  @chapter Multimedia Sources
[...]
> diff --git a/libavfilter/f_zmq.c b/libavfilter/f_zmq.c
> new file mode 100644
> index 0000000..9e3b457
> --- /dev/null
> +++ b/libavfilter/f_zmq.c
> @@ -0,0 +1,280 @@
> +/*
> + * Copyright (c) 2013 Stefano Sabatini
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +/**
> + * @file

> + * receive commands through zeromq and broke them to filters

I'm not a native english, but I'm not sure "broke" can be used like this.
Maybe you meant "broadcast"?

> + */
> +
> +#include <zmq.h>
> +#include "libavutil/avstring.h"
> +#include "libavutil/bprint.h"
> +#include "libavutil/opt.h"
> +#include "avfilter.h"
> +#include "internal.h"
> +#include "avfiltergraph.h"
> +#include "audio.h"
> +#include "video.h"
> +
> +typedef struct {
> +    const AVClass *class;
> +    void *zmq;
> +    void *responder;
> +    char *bind_address;
> +    int command_count;
> +} ZMQContext;
> +
> +#define OFFSET(x) offsetof(ZMQContext, x)
> +#define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
> +static const AVOption options[] = {
> +    { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
> +    { "b",            "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },

> +    { NULL },

nit: useless trailing comma

> +};
> +
> +static av_cold int init(AVFilterContext *ctx)
> +{
> +    ZMQContext *zmq = ctx->priv;
> +
> +    zmq->zmq = zmq_ctx_new();
> +    if (!zmq->zmq) {
> +        av_log(ctx, AV_LOG_ERROR,
> +               "Could not create ZMQ context: %s\n", zmq_strerror(errno));
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
> +    if (!zmq->responder) {
> +        av_log(ctx, AV_LOG_ERROR,
> +               "Could not create ZMQ responder: %s\n", zmq_strerror(errno));
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
> +        av_log(ctx, AV_LOG_ERROR,
> +               "Could not bind ZMQ responder to address '%s': %s\n",
> +               zmq->bind_address, zmq_strerror(errno));
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    zmq->command_count = -1;
> +    return 0;
> +}
> +
> +static void av_cold uninit(AVFilterContext *ctx)
> +{
> +    ZMQContext *zmq = ctx->priv;
> +
> +    zmq_close(zmq->responder);
> +    zmq_ctx_destroy(zmq->zmq);
> +}
> +
> +typedef struct {
> +    char *target, *command, *arg;
> +} Command;
> +
> +#define SPACES " \f\t\n\r"
> +
> +static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
> +{
> +    const char **buf = &command_str;
> +    int ret;
> +
> +    cmd->target = av_get_token(buf, SPACES);
> +    if (!cmd->target || !cmd->target[0]) {
> +        av_log(log_ctx, AV_LOG_ERROR,
> +               "No target specified in command '%s'\n", command_str);
> +        ret = AVERROR(EINVAL);
> +        goto end;
> +    }
> +
> +    cmd->command = av_get_token(buf, SPACES);
> +    if (!cmd->command || !cmd->command[0]) {
> +        av_log(log_ctx, AV_LOG_ERROR,
> +               "No command specified in command '%s'\n", command_str);
> +        ret = AVERROR(EINVAL);
> +        goto end;
> +    }
> +
> +    cmd->arg = av_get_token(buf, SPACES);
> +
> +end:
> +    return ret;

Note: if no cleanup are required, maybe you can just drop that label and
return directly.

> +}
> +
> +static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
> +{
> +    ZMQContext *zmq = ctx->priv;
> +    zmq_msg_t msg;
> +    int ret = 0;
> +
> +    if (zmq_msg_init(&msg) == -1) {
> +        av_log(ctx, AV_LOG_WARNING,
> +               "Could not receive message: %s\n", zmq_strerror(errno));
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
> +        if (errno != EAGAIN)
> +            av_log(ctx, AV_LOG_WARNING,
> +                   "Could not receive message: %s\n", zmq_strerror(errno));
> +        ret = AVERROR_EXTERNAL;
> +        goto end;
> +    }
> +
> +    *buf_size = zmq_msg_size(&msg) + 1;
> +    *buf = av_malloc(*buf_size);
> +    if (!*buf) {
> +        ret = AVERROR(ENOMEM);
> +        goto end;
> +    }
> +    memcpy(*buf, zmq_msg_data(&msg), *buf_size);
> +    (*buf)[*buf_size-1] = 0;
> +
> +end:
> +    zmq_msg_close(&msg);
> +    return ret;
> +}
> +
> +static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
> +{
> +    AVFilterContext *ctx = inlink->dst;
> +    ZMQContext *zmq = ctx->priv;
> +
> +    while (1) {

> +        char cmd_buf[256];

I'd pick a larger buffer, possibly 1024 (think of a drawtext with a number
of parameters, and UTF-8 text for isntance)

> +        char *recv_buf;
> +        int recv_buf_size;
> +        AVBPrint bp;
> +        Command cmd = {0};
> +        int ret;
> +
> +        /* receive command */
> +        if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
> +            break;
> +        zmq->command_count++;
> +
> +        /* parse command */
> +        if (parse_command(&cmd, recv_buf, ctx) < 0) {
> +            av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
> +            goto end;
> +        }
> +
> +        /* process command */
> +        av_log(ctx, AV_LOG_VERBOSE,
> +               "Processing command #%d target:%s command:%s arg:%s\n",
> +               zmq->command_count, cmd.target, cmd.command, cmd.arg);
> +        ret = avfilter_graph_send_command(inlink->graph,
> +                                          cmd.target, cmd.command, cmd.arg,
> +                                          cmd_buf, sizeof(cmd_buf),
> +                                          AVFILTER_CMD_FLAG_ONE);
> +        av_bprint_init(&bp, 0, AV_BPRINT_SIZE_UNLIMITED);
> +        av_bprintf(&bp, "%d %s", -ret, av_err2str(ret));
> +        if (cmd_buf[0])
> +            av_bprintf(&bp, "\n%s", cmd_buf);
> +        av_log(ctx, AV_LOG_VERBOSE,
> +               "Sending command reply for command #%d:\n%s\n", zmq->command_count, bp.str);
> +        if (zmq_send(zmq->responder, bp.str, strlen(bp.str), 0) == -1)
> +            av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
> +                   zmq->command_count, zmq_strerror(ret));

The bprint looks extremely overkill. Can't you just do a av_asprintf()?

If you insist on using AVBPrint, maybe you could put it in the context,
and us av_bprint_clear() before usage.

> +
> +    end:
> +        av_bprint_finalize(&bp, NULL);
> +        av_freep(&recv_buf);
> +        recv_buf_size = 0;
> +        av_freep(&cmd.target);
> +        av_freep(&cmd.command);
> +        av_freep(&cmd.arg);
> +    }
> +

> +    return ff_filter_frame(inlink->dst->outputs[0], ref);

ctx->outputs[0]

> +}
> +
> +#if CONFIG_ZMQ_FILTER
> +
> +#define zmq_options options
> +AVFILTER_DEFINE_CLASS(zmq);
> +
> +static const AVFilterPad zmq_inputs[] = {
> +    {
> +        .name             = "default",
> +        .type             = AVMEDIA_TYPE_VIDEO,

> +        .get_video_buffer = ff_null_get_video_buffer,

Does this make any difference?

> +        .filter_frame     = filter_frame,
> +    },
> +    { NULL }
> +};
> +
> +static const AVFilterPad zmq_outputs[] = {
> +    {
> +        .name = "default",
> +        .type = AVMEDIA_TYPE_VIDEO,
> +    },
> +    { NULL }
> +};
> +
> +AVFilter avfilter_vf_zmq = {
> +    .name        = "zmq",

> +    .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),

ditto broke

[...]

> From 9034654487a0233ac4c6f6fd829b443986c33d12 Mon Sep 17 00:00:00 2001
> From: Stefano Sabatini <stefasab at gmail.com>
> Date: Fri, 3 May 2013 18:05:18 +0200
> Subject: [PATCH] tools: add zmqsend.c, zmqshell.c and zmqshell.py tools
> 
> The zmqshell.py script is by Ubitux.
> ---
>  doc/filters.texi     |   21 +++++++
>  libavfilter/Makefile |    2 +-
>  tools/zmqsend.c      |  167 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  tools/zmqshell.c     |  157 +++++++++++++++++++++++++++++++++++++++++++++++
>  tools/zmqshell.py    |   23 +++++++
>  5 files changed, 369 insertions(+), 1 deletion(-)
>  create mode 100644 tools/zmqsend.c
>  create mode 100644 tools/zmqshell.c
>  create mode 100755 tools/zmqshell.py
> 
> diff --git a/doc/filters.texi b/doc/filters.texi
> index 950721f..2d0425b 100644
> --- a/doc/filters.texi
> +++ b/doc/filters.texi
> @@ -8328,6 +8328,27 @@ will send a reply to the client, adopting the format:
>  
>  @var{MESSAGE} is optional.
>  
> + at subsection Examples
> +
> +Look at @file{tools/zmqsend} for an example of a zmq client which can
> +be used to send commands processed by these filters.
> +
> +Consider the following filtergraph generated by @command{ffplay}
> + at example
> +ffplay -dumpgraph 1 -f lavfi "color=s=100x100:c=red [l]; color=s=100x100:c=blue [r]; nullsrc=s=200x100, zmq [bg]; [bg][l]overlay[bg+l]; [bg+l][r]overlay=x=100"

Possibly more clear:

    ffplay -dumpgraph 1 -f lavfi "
        color=s=100x100:c=red  [l];
        color=s=100x100:c=blue [r];
        nullsrc=s=200x100, zmq [bg];
        [bg]  [l] overlay        [bg+l];
        [bg+l][r] overlay=x=100"

Also, I realize it would possibly be more elegant to have zmq working
(optionally?) as a source filter, maybe using an option, or simply
defining an extra zmqsrc filter or whatever.

> + at end example
> +
> +To change the color of the left side of the video, the following
> +command can be used:
> + at example
> +echo Parsed_color_0 c yellow | tools/zmqsend
> + at end example
> +
> +To change the right side:
> + at example
> +echo Parsed_color_1 c pink | tools/zmqsend
> + at end example
> +

I must say this is pretty nice.

>  @c man end MULTIMEDIA FILTERS
>  
>  @chapter Multimedia Sources
> diff --git a/libavfilter/Makefile b/libavfilter/Makefile
> index e3d01de..7c76539 100644
> --- a/libavfilter/Makefile
> +++ b/libavfilter/Makefile
> @@ -237,7 +237,7 @@ OBJS-$(CONFIG_MOVIE_FILTER)                  += src_movie.o
>  SKIPHEADERS-$(CONFIG_LIBVIDSTAB)             += vidstabutils.h
>  SKIPHEADERS-$(CONFIG_OPENCL)                 += opencl_internal.h deshake_opencl_kernel.h unsharp_opencl_kernel.h
>  
> -TOOLS     = graph2dot
> +TOOLS     = graph2dot zmqshell zmqsend
>  TESTPROGS = drawutils filtfmts formats
>  
>  clean::
> diff --git a/tools/zmqsend.c b/tools/zmqsend.c
> new file mode 100644
> index 0000000..738a5e5
> --- /dev/null
> +++ b/tools/zmqsend.c
> @@ -0,0 +1,167 @@
> +/*
> + * Copyright (c) 2013 Stefano Sabatini
> + *
> + * This file is part of FFmpeg.
> + *
> + * FFmpeg is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * FFmpeg is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with FFmpeg; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + */
> +
> +#include "config.h"
> +
> +#include <zmq.h>
> +
> +#include "libavutil/mem.h"
> +#include "libavutil/bprint.h"
> +
> +#if HAVE_UNISTD_H
> +#include <unistd.h>             /* getopt */
> +#endif
> +

> +#if !HAVE_GETOPT
> +#include "compat/getopt.c"
> +#endif
> +

Note: this is kind of rare violence.

> +/**
> + * @file
> + * zmq shell example, to be used with the zmq filters
> + */
> +

This doesn't look like a shell.

BTW, why is this required when you actually have a "shell" code? Can't you
have a single tool working in both mode (sending command or shell)?

[...]

Note: don't forget to update .gitignore.

[...]
> diff --git a/tools/zmqshell.py b/tools/zmqshell.py
> new file mode 100755
> index 0000000..87c7267
> --- /dev/null
> +++ b/tools/zmqshell.py
> @@ -0,0 +1,23 @@
> +#!/usr/bin/env python2
> +
> +import sys, zmq, cmd
> +
> +class LavfiCmd(cmd.Cmd):
> +    prompt = 'lavfi> '
> +
> +    def __init__(self):
> +        context = zmq.Context()
> +        self.requester = context.socket(zmq.REQ)
> +        self.requester.connect("tcp://localhost:5555")
> +        cmd.Cmd.__init__(self)
> +
> +    def onecmd(self, cmd):
> +        if cmd == 'EOF':
> +            sys.exit(0)
> +        print 'sending command: %s' % cmd
> +        self.requester.send(cmd)
> +        message = self.requester.recv()

> +        print "Received reply:[", message, "]"

nit: print 'Received reply:[%s]' % message

> +
> +LavfiCmd().cmdloop('FFmpeg libavfilter interactive shell')
> +

What a sexy shell script ;)

[...]

Nice work overall.

Did you have the opportunity to discuss such design & code with the zmq
developers? AFAIK they have a fairly active IRC channel; it might be
relevant to discuss this with them.

-- 
Clément B.
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 490 bytes
Desc: not available
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20130513/71312653/attachment.asc>


More information about the ffmpeg-devel mailing list