[FFmpeg-devel] [PATCH 2/3] fate/api: test threadmessage

Nicolas George george at nsup.org
Sun Dec 6 12:38:09 CET 2015


Le duodi 12 frimaire, an CCXXIV, Clement Boesch a écrit :
> From: Clément Bœsch <clement at stupeflix.com>
> 
> ---
>  tests/api/Makefile                 |   1 +
>  tests/api/api-threadmessage-test.c | 263 +++++++++++++++++++++++++++++++++++++
>  tests/fate/api.mak                 |   6 +
>  3 files changed, 270 insertions(+)
>  create mode 100644 tests/api/api-threadmessage-test.c
> 
> diff --git a/tests/api/Makefile b/tests/api/Makefile
> index c48c34a..3556a9b 100644
> --- a/tests/api/Makefile
> +++ b/tests/api/Makefile
> @@ -3,6 +3,7 @@ APITESTPROGS-$(call DEMDEC, H264, H264) += api-h264
>  APITESTPROGS-yes += api-seek
>  APITESTPROGS-yes += api-codec-param
>  APITESTPROGS-$(call DEMDEC, H263, H263) += api-band
> +APITESTPROGS-yes += api-threadmessage
>  APITESTPROGS += $(APITESTPROGS-yes)
>  
>  APITESTOBJS  := $(APITESTOBJS:%=$(APITESTSDIR)%) $(APITESTPROGS:%=$(APITESTSDIR)/%-test.o)
> diff --git a/tests/api/api-threadmessage-test.c b/tests/api/api-threadmessage-test.c
> new file mode 100644
> index 0000000..5c75051
> --- /dev/null
> +++ b/tests/api/api-threadmessage-test.c
> @@ -0,0 +1,263 @@
> +/*
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +/**
> + * Thread message API test
> + */
> +
> +#include <pthread.h>
> +
> +#include "libavutil/avassert.h"
> +#include "libavutil/avstring.h"
> +#include "libavutil/frame.h"
> +#include "libavutil/threadmessage.h"
> +
> +struct worker_data {
> +    int id;
> +    pthread_t tid;
> +    int workload;
> +    AVThreadMessageQueue *queue;
> +};
> +

> +/* same as worker_data but shuffled for testing purpose */

Is it really useful? If you merge both, you can probably get rid of the
macros below with just a conditional for the function on pthread_create().

> +struct reader_data {

Nit: worker/reader sounds like a mixed metaphor.

> +    pthread_t tid;
> +    int workload;
> +    int id;
> +    AVThreadMessageQueue *queue;
> +};
> +
> +struct message {
> +    AVFrame *frame;
> +    // we add some junk in the message to make sure the message size is >
> +    // sizeof(void*)
> +    int magic;
> +};
> +
> +#define MAGIC 0xdeadc0de
> +
> +static void free_frame(void *arg)
> +{
> +    struct message *msg = arg;
> +    av_assert0(msg->magic == MAGIC);
> +    av_frame_free(&msg->frame);
> +}
> +
> +static void *worker_thread(void *arg)
> +{
> +    int i, ret = 0;
> +    struct worker_data *wd = arg;
> +
> +    av_log(NULL, AV_LOG_INFO, "worker #%d: workload=%d\n", wd->id, wd->workload);
> +    for (i = 0; i < wd->workload; i++) {

> +        if (rand() % wd->workload < wd->workload / 10) {

Nit: using module for PRNG is not recommended, it gives very bad results
with LCG.

> +            av_log(NULL, AV_LOG_INFO, "worker #%d: flushing the queue\n", wd->id);
> +            av_thread_message_flush(wd->queue);
> +        } else {

Nit: inconsistent structure: if/else here, if+continue for readers.

> +            char *val;
> +            AVDictionary *meta = NULL;
> +            struct message msg = {
> +                .magic = MAGIC,
> +                .frame = av_frame_alloc(),
> +            };
> +
> +            if (!msg.frame) {
> +                ret = AVERROR(ENOMEM);
> +                break;
> +            }
> +
> +            /* we add some metadata to identify the frames */
> +            val = av_asprintf("frame from worker %d", wd->id);
> +            if (!val) {
> +                av_frame_free(&msg.frame);
> +                ret = AVERROR(ENOMEM);
> +                break;
> +            }
> +            ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
> +            if (ret < 0) {
> +                av_frame_free(&msg.frame);
> +                break;
> +            }
> +            av_frame_set_metadata(msg.frame, meta);
> +
> +            /* allocate a real frame in order to simulate "real" work */
> +            msg.frame->format = AV_PIX_FMT_RGBA;
> +            msg.frame->width  = 320;
> +            msg.frame->height = 240;
> +            ret = av_frame_get_buffer(msg.frame, 32);
> +            if (ret < 0) {
> +                av_frame_free(&msg.frame);
> +                break;
> +            }
> +
> +            /* push the frame in the common queue */
> +            av_log(NULL, AV_LOG_INFO, "worker #%d: sending my work (frame:%p), %d left\n",
> +                   wd->id, msg.frame, wd->workload - i - 1);
> +            ret = av_thread_message_queue_send(wd->queue, &msg, 0);
> +            if (ret < 0) {
> +                av_frame_free(&msg.frame);
> +                break;
> +            }
> +        }
> +    }
> +    av_log(NULL, AV_LOG_INFO, "worker #%d: my work is done here (%s)\n",
> +           wd->id, av_err2str(ret));
> +    av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
> +    return NULL;
> +}
> +
> +static void *reader_thread(void *arg)
> +{
> +    int i, ret = 0;
> +    struct reader_data *rd = arg;
> +
> +    for (i = 0; i < rd->workload; i++) {
> +        struct message msg;
> +        AVDictionary *meta;
> +        AVDictionaryEntry *e;
> +
> +        if (rand() % rd->workload < rd->workload / 10) {
> +            av_log(NULL, AV_LOG_INFO, "reader #%d: flushing the queue\n", rd->id);
> +            av_thread_message_flush(rd->queue);
> +            continue;
> +        }
> +
> +        ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
> +        if (ret < 0)
> +            break;
> +        av_assert0(msg.magic == MAGIC);
> +        meta = av_frame_get_metadata(msg.frame);
> +        e = av_dict_get(meta, "sig", NULL, 0);
> +        av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
> +        av_frame_free(&msg.frame);
> +    }
> +
> +    av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
> +    av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
> +
> +    return NULL;
> +}
> +
> +static int get_workload(int minv, int maxv)
> +{
> +    return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
> +}
> +
> +int main(int ac, char **av)
> +{
> +    int i, ret = 0;
> +    int max_queue_size;
> +    int nb_workers, worker_min_load, worker_max_load;
> +    int nb_readers, reader_min_load, reader_max_load;
> +    struct worker_data *workers;
> +    struct reader_data *readers;
> +    AVThreadMessageQueue *queue = NULL;
> +

> +    if (ac != 8) {
> +        av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
> +               "<nb_workers> <worker_min_send> <worker_max_send> "
> +               "<nb_readers> <reader_min_recv> <reader_max_recv>\n", av[0]);
> +        return 1;
> +    }

Nit: huge number of arguments is annoying. Maybe give sensible default
values and use getopt()?

> +
> +    max_queue_size  = atoi(av[1]);
> +    nb_workers      = atoi(av[2]);
> +    worker_min_load = atoi(av[3]);
> +    worker_max_load = atoi(av[4]);
> +    nb_readers      = atoi(av[5]);
> +    reader_min_load = atoi(av[6]);
> +    reader_max_load = atoi(av[7]);
> +
> +    if (max_queue_size <= 0 ||
> +        nb_workers <= 0 || worker_min_load <= 0 || worker_max_load <= 0 ||
> +        nb_readers <= 0 || reader_min_load <= 0 || reader_max_load <= 0) {
> +        av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
> +        return 1;
> +    }
> +
> +    av_log(NULL, AV_LOG_INFO, "qsize:%d / %d workers sending [%d-%d] / "
> +           "%d readers receiving [%d-%d]\n", max_queue_size,
> +           nb_workers, worker_min_load, worker_max_load,
> +           nb_readers, reader_min_load, reader_max_load);
> +
> +    workers = av_mallocz_array(nb_workers, sizeof(*workers));
> +    readers = av_mallocz_array(nb_readers, sizeof(*readers));
> +    if (!workers || !readers) {
> +        ret = AVERROR(ENOMEM);
> +        goto end;
> +    }
> +
> +    ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
> +    if (ret < 0)
> +        goto end;
> +

> +    ret = av_thread_message_queue_set_free_func(queue, free_frame);
> +    if (ret < 0)
> +        goto end;

Returns void now, or did I review the wrong version of the patch?

> +
> +#define SPAWN_THREADS(type) do {                                                \
> +    for (i = 0; i < nb_##type##s; i++) {                                        \
> +        struct type##_data *td = &type##s[i];                                   \
> +                                                                                \
> +        td->id = i;                                                             \
> +        td->queue = queue;                                                      \
> +        td->workload = get_workload(type##_min_load, type##_max_load);          \
> +                                                                                \
> +        ret = pthread_create(&td->tid, NULL, type##_thread, td);                \
> +        if (ret) {                                                              \
> +            const int err = AVERROR(ret);                                       \
> +            av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type)    \
> +                   " thread: %s\n", av_err2str(err));                           \
> +            goto end;                                                           \
> +        }                                                                       \
> +    }                                                                           \
> +} while (0)
> +
> +#define WAIT_THREADS(type) do {                                                 \
> +    for (i = 0; i < nb_##type##s; i++) {                                        \
> +        struct type##_data *td = &type##s[i];                                   \
> +                                                                                \
> +        ret = pthread_join(td->tid, NULL);                                      \
> +        if (ret) {                                                              \
> +            const int err = AVERROR(ret);                                       \
> +            av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type)     \
> +                   " thread: %s\n", av_err2str(err));                           \
> +            goto end;                                                           \
> +        }                                                                       \
> +    }                                                                           \
> +} while (0)
> +
> +    SPAWN_THREADS(reader);
> +    SPAWN_THREADS(worker);
> +
> +    WAIT_THREADS(worker);
> +    WAIT_THREADS(reader);
> +
> +end:
> +    av_thread_message_queue_free(&queue);
> +    av_freep(&workers);
> +    av_freep(&readers);

Ideally, at this point the driving thread should have the readers and
writers compare notes to see that no message was lost or handled
twice. But that is annoying.

> +
> +    if (ret < 0 && ret != AVERROR_EOF) {
> +        av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
> +        return 1;
> +    }
> +    return 0;
> +}
> diff --git a/tests/fate/api.mak b/tests/fate/api.mak
> index 325f64a..e58b392 100644
> --- a/tests/fate/api.mak
> +++ b/tests/fate/api.mak
> @@ -28,6 +28,12 @@ FATE_API_SAMPLES_LIBAVFORMAT-yes += fate-api-jpeg-codec-param
>  fate-api-jpeg-codec-param: $(APITESTSDIR)/api-codec-param-test$(EXESUF)
>  fate-api-jpeg-codec-param: CMD = run $(APITESTSDIR)/api-codec-param-test $(TARGET_SAMPLES)/exif/image_small.jpg
>  
> +FATE_API-$(CONFIG_AVUTIL) += fate-api-threadmessage
> +fate-api-threadmessage: $(APITESTSDIR)/api-threadmessage-test$(EXESUF)
> +fate-api-threadmessage: CMD = run $(APITESTSDIR)/api-threadmessage-test 3 10 30 50 2 20 40
> +fate-api-threadmessage: CMP = null
> +fate-api-threadmessage: REF = /dev/null
> +
>  FATE_API_SAMPLES-$(CONFIG_AVFORMAT) += $(FATE_API_SAMPLES_LIBAVFORMAT-yes)
>  
>  ifdef SAMPLES

Regards,

-- 
  Nicolas George
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: Digital signature
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20151206/6ed8bdcc/attachment.sig>


More information about the ffmpeg-devel mailing list