[FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 protocol support

Marton Balint cus at passwd.hu
Sun Mar 8 13:25:29 EET 2020


> Subject: [FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 protocol support
> 
> From: Andriy Gelman <andriy.gelman at gmail.com>
> 
> Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
> 
> Signed-off-by: Andriy Gelman <andriy.gelman at gmail.com>
> ---
> 
> Changes in v2:
>     - Addressed comments from Marton
>     - Updated documentation
> 
> Compilation notes:
>     - Requires librabbitmq-dev package (on ubuntu).
>     - The pkg-config libprabbitmq.pc has a corrupt entry.
>       **update: fixed on the github master branch**
>       The line "Libs.private: rt; -lpthread" should be changed to
>       "Libs.private: -lrt -pthread".
>     - Compile FFmpeg with --enable-librabbitmq
>

[...]

> + at item connection_timeout
> +The timeout in microseconds during the initial connection to the broker. The

In *seconds* (because it is an AV_OPT_TYPE_DURATION)

> +default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.

5 seconds

[...]

> +static const AVOption options[] = {
> +    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
> +    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
> +    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
> +    { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},

INT64_MAX can be the maximum.

> +    { NULL }
> +};
> +
> +static int amqp_proto_open(URLContext *h, const char *uri, int flags)
> +{
> +    int ret, server_msg;
> +    char hostname[STR_LEN], credentials[STR_LEN];
> +    char *credentials_decoded;
> +    int port;
> +    const char *user, *password = NULL;
> +    char *p;
> +    amqp_rpc_reply_t broker_reply;
> +    struct timeval tval = { 0 };
> +
> +    AMQPContext *s = h->priv_data;
> +
> +    h->is_streamed     = 1;
> +    h->max_packet_size = s->pkt_size;
> +
> +    av_url_split(NULL, 0, credentials, sizeof(credentials),
> +                 hostname, sizeof(hostname), &port, NULL, 0, uri);
> +
> +    if (port < 0)
> +        port = 5672;
> +
> +    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
> +        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
> +        return AVERROR(EINVAL);
> +    }
> +
> +    credentials_decoded = ff_urldecode(credentials, 0);

This is not entirely correct, becase the username may contain ':'
characters... So you should split first and urldecode the splitted
components...

> +    if (!credentials_decoded)
> +        return AVERROR(ENOMEM);
> +
> +    p = strchr(credentials_decoded, ':');
> +    if (p) {
> +        *p = '\0';
> +        password = p + 1;
> +    }
> +
> +    if (!password || *password == '\0')
> +        password = "guest";
> +
> +    user = credentials_decoded;
> +    if (*user == '\0')
> +        user = "guest";
> +
> +    s->conn = amqp_new_connection();
> +    if (!s->conn) {
> +        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    s->socket = amqp_tcp_socket_new(s->conn);
> +    if (!s->socket) {
> +        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
> +        goto destroy_connection;
> +    }
> +
> +    if (s->connection_timeout < 0)
> +        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
> +
> +    tval.tv_sec  = s->connection_timeout / 1000000;
> +    tval.tv_usec = s->connection_timeout % 1000000;
> +    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");

This should log the useful error, e.g:
         av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n", amqp_error_string2(ret));

[...]

> +static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
> +{
> +    int ret;
> +    AMQPContext *s = h->priv_data;
> +    int fd = amqp_socket_get_sockfd(s->socket);
> +
> +    amqp_bytes_t message = { size, (void *)buf };
> +    amqp_basic_properties_t props;
> +
> +    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
> +    if (ret)
> +        return ret;
> +
> +    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
> +    props.content_type = amqp_cstring_bytes("octet/stream");
> +    props.delivery_mode = 2; /* persistent delivery mode */
> +
> +    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
> +                             amqp_cstring_bytes(s->routing_key), 0, 0,
> +                             &props, message);
> +
> +    if (ret) {
> +        av_log(h, AV_LOG_ERROR, "Error publish\n");

Same here

> +        return AVERROR_EXTERNAL;
> +    }
> +
> +    return size;
> +}
> +

[...]

Thanks,
Marton



More information about the ffmpeg-devel mailing list