[FFmpeg-devel] Fix for poor bandwidth usage on Windows
Farhad Roueintan
farhad at justin.tv
Fri Oct 5 03:47:12 CEST 2012
Hi,
We had noticed that when we streamed a video using ffmpeg we'd get much
lower bitrates than what we expected based on our bandwidth. We did some
investigation and tracked down the problem to the way ffmpeg's
networking code calls send(). It was calling the function many times
with small chunks of data (including lots of calls with 1 byte, 8 bytes,
etc.). This appears to be fine on Linux but on Windows this causes a
significant slowdown due to the way the send() function behaves. To fix
this we changed the ffmpeg RTMP code to buffer the data and only call
send() when the buffer fills up (we use a 64K buffer). This improved the
speed by over 10x. Our patch was specific to RTMP since that's the only
thing we were using; however, you probably want to do this buffering in
a more generic way at the TCP level.
I've attached the affected files. The buffering happens in the
ff_buffered_urlwrite function in rtmppkt.c.
Regards,
Farhad
-------------- next part --------------
/*
* RTMP input format
* Copyright (c) 2009 Kostya Shishkov
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "libavcodec/bytestream.h"
#include "libavutil/avstring.h"
#include "libavutil/intfloat.h"
#include "avformat.h"
#include "rtmppkt.h"
#include "flv.h"
#include "url.h"
void ff_amf_write_bool(uint8_t **dst, int val)
{
bytestream_put_byte(dst, AMF_DATA_TYPE_BOOL);
bytestream_put_byte(dst, val);
}
void ff_amf_write_number(uint8_t **dst, double val)
{
bytestream_put_byte(dst, AMF_DATA_TYPE_NUMBER);
bytestream_put_be64(dst, av_double2int(val));
}
void ff_amf_write_string(uint8_t **dst, const char *str)
{
bytestream_put_byte(dst, AMF_DATA_TYPE_STRING);
bytestream_put_be16(dst, strlen(str));
bytestream_put_buffer(dst, str, strlen(str));
}
void ff_amf_write_null(uint8_t **dst)
{
bytestream_put_byte(dst, AMF_DATA_TYPE_NULL);
}
void ff_amf_write_object_start(uint8_t **dst)
{
bytestream_put_byte(dst, AMF_DATA_TYPE_OBJECT);
}
void ff_amf_write_field_name(uint8_t **dst, const char *str)
{
bytestream_put_be16(dst, strlen(str));
bytestream_put_buffer(dst, str, strlen(str));
}
void ff_amf_write_object_end(uint8_t **dst)
{
/* first two bytes are field name length = 0,
* AMF object should end with it and end marker
*/
bytestream_put_be24(dst, AMF_DATA_TYPE_OBJECT_END);
}
int ff_amf_read_bool(GetByteContext *bc, int *val)
{
if (bytestream2_get_byte(bc) != AMF_DATA_TYPE_BOOL)
return AVERROR_INVALIDDATA;
*val = bytestream2_get_byte(bc);
return 0;
}
int ff_amf_read_number(GetByteContext *bc, double *val)
{
uint64_t read;
if (bytestream2_get_byte(bc) != AMF_DATA_TYPE_NUMBER)
return AVERROR_INVALIDDATA;
read = bytestream2_get_be64(bc);
*val = av_int2double(read);
return 0;
}
int ff_amf_read_string(GetByteContext *bc, uint8_t *str,
int strsize, int *length)
{
int stringlen = 0;
int readsize;
if (bytestream2_get_byte(bc) != AMF_DATA_TYPE_STRING)
return AVERROR_INVALIDDATA;
stringlen = bytestream2_get_be16(bc);
if (stringlen + 1 > strsize)
return AVERROR(EINVAL);
readsize = bytestream2_get_buffer(bc, str, stringlen);
if (readsize != stringlen) {
av_log(NULL, AV_LOG_WARNING,
"Unable to read as many bytes as AMF string signaled\n");
}
str[readsize] = '\0';
*length = FFMIN(stringlen, readsize);
return 0;
}
int ff_amf_read_null(GetByteContext *bc)
{
if (bytestream2_get_byte(bc) != AMF_DATA_TYPE_NULL)
return AVERROR_INVALIDDATA;
return 0;
}
int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
int chunk_size, RTMPPacket *prev_pkt)
{
uint8_t hdr;
if (ffurl_read(h, &hdr, 1) != 1)
return AVERROR(EIO);
return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt, hdr);
}
int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
RTMPPacket *prev_pkt, uint8_t hdr)
{
uint8_t t, buf[16];
int channel_id, timestamp, data_size, offset = 0;
uint32_t extra = 0;
enum RTMPPacketType type;
int size = 0;
int ret;
size++;
channel_id = hdr & 0x3F;
if (channel_id < 2) { //special case for channel number >= 64
buf[1] = 0;
if (ffurl_read_complete(h, buf, channel_id + 1) != channel_id + 1)
return AVERROR(EIO);
size += channel_id + 1;
channel_id = AV_RL16(buf) + 64;
}
data_size = prev_pkt[channel_id].data_size;
type = prev_pkt[channel_id].type;
extra = prev_pkt[channel_id].extra;
hdr >>= 6;
if (hdr == RTMP_PS_ONEBYTE) {
timestamp = prev_pkt[channel_id].ts_delta;
} else {
if (ffurl_read_complete(h, buf, 3) != 3)
return AVERROR(EIO);
size += 3;
timestamp = AV_RB24(buf);
if (hdr != RTMP_PS_FOURBYTES) {
if (ffurl_read_complete(h, buf, 3) != 3)
return AVERROR(EIO);
size += 3;
data_size = AV_RB24(buf);
if (ffurl_read_complete(h, buf, 1) != 1)
return AVERROR(EIO);
size++;
type = buf[0];
if (hdr == RTMP_PS_TWELVEBYTES) {
if (ffurl_read_complete(h, buf, 4) != 4)
return AVERROR(EIO);
size += 4;
extra = AV_RL32(buf);
}
}
if (timestamp == 0xFFFFFF) {
if (ffurl_read_complete(h, buf, 4) != 4)
return AVERROR(EIO);
timestamp = AV_RB32(buf);
}
}
if (hdr != RTMP_PS_TWELVEBYTES)
timestamp += prev_pkt[channel_id].timestamp;
if ((ret = ff_rtmp_packet_create(p, channel_id, type, timestamp,
data_size)) < 0)
return ret;
p->extra = extra;
// save history
prev_pkt[channel_id].channel_id = channel_id;
prev_pkt[channel_id].type = type;
prev_pkt[channel_id].data_size = data_size;
prev_pkt[channel_id].ts_delta = timestamp - prev_pkt[channel_id].timestamp;
prev_pkt[channel_id].timestamp = timestamp;
prev_pkt[channel_id].extra = extra;
while (data_size > 0) {
int toread = FFMIN(data_size, chunk_size);
if (ffurl_read_complete(h, p->data + offset, toread) != toread) {
ff_rtmp_packet_destroy(p);
return AVERROR(EIO);
}
data_size -= chunk_size;
offset += chunk_size;
size += chunk_size;
if (data_size > 0) {
if ((ret = ffurl_read_complete(h, &t, 1)) < 0) { // marker
ff_rtmp_packet_destroy(p);
return ret;
}
size++;
if (t != (0xC0 + channel_id))
return -1;
}
}
return size;
}
#define RTMP_SEND_BUFFER_SIZE 65536
static unsigned char* rtmp_send_buffer = 0;
static unsigned int rtmp_send_buffer_data_size = 0;
int ff_create_rtmp_send_buffer(void)
{
if (!rtmp_send_buffer)
{
rtmp_send_buffer = (unsigned char*)av_malloc(RTMP_SEND_BUFFER_SIZE);
rtmp_send_buffer_data_size = 0;
}
return 0;
}
int ff_destroy_rtmp_send_buffer(void)
{
av_freep(&rtmp_send_buffer);
rtmp_send_buffer_data_size = 0;
return 0;
}
int ff_buffered_urlwrite(URLContext *h, const unsigned char *buf, int size, int pkt_type)
{
int ret = 0;
if (!rtmp_send_buffer)
{
return -1;
}
if (rtmp_send_buffer_data_size + size < RTMP_SEND_BUFFER_SIZE)
{
// Just add the data to the buffer
memcpy(rtmp_send_buffer + rtmp_send_buffer_data_size, buf, size);
rtmp_send_buffer_data_size += size;
ret = size;
}
else
{
// Can't fit the new data in the buffer. First flush the buffer if it has some data
if (rtmp_send_buffer_data_size > 0)
{
ret = ffurl_write(h, rtmp_send_buffer, rtmp_send_buffer_data_size);
rtmp_send_buffer_data_size = 0;
if (ret < 0)
return ret;
}
if (size > RTMP_SEND_BUFFER_SIZE)
{
// The size of the new data is greater than the buffer size, simply write out the data
ret = ffurl_write(h, buf, size);
rtmp_send_buffer_data_size = 0;
if (ret < 0)
return ret;
}
else
{
// Put the new data in the buffer
memcpy(rtmp_send_buffer, buf, size);
rtmp_send_buffer_data_size = size;
}
ret = size;
}
if (pkt_type != RTMP_PT_AUDIO && pkt_type != RTMP_PT_VIDEO)
{
// If we've got a non-audio/video packets, if the buffer has data in it,
// we must flush the buffer to send it immediately
//
if (rtmp_send_buffer_data_size > 0)
{
ret = ffurl_write(h, rtmp_send_buffer, rtmp_send_buffer_data_size);
rtmp_send_buffer_data_size = 0;
if (ret < 0)
return ret;
}
}
return ret;
}
int ff_rtmp_packet_write(URLContext *h, RTMPPacket *pkt,
int chunk_size, RTMPPacket *prev_pkt)
{
uint8_t pkt_hdr[16], *p = pkt_hdr;
int mode = RTMP_PS_TWELVEBYTES;
int off = 0;
int size = 0;
int ret;
pkt->ts_delta = pkt->timestamp - prev_pkt[pkt->channel_id].timestamp;
//if channel_id = 0, this is first presentation of prev_pkt, send full hdr.
if (prev_pkt[pkt->channel_id].channel_id &&
pkt->extra == prev_pkt[pkt->channel_id].extra) {
if (pkt->type == prev_pkt[pkt->channel_id].type &&
pkt->data_size == prev_pkt[pkt->channel_id].data_size) {
mode = RTMP_PS_FOURBYTES;
if (pkt->ts_delta == prev_pkt[pkt->channel_id].ts_delta)
mode = RTMP_PS_ONEBYTE;
} else {
mode = RTMP_PS_EIGHTBYTES;
}
}
if (pkt->channel_id < 64) {
bytestream_put_byte(&p, pkt->channel_id | (mode << 6));
} else if (pkt->channel_id < 64 + 256) {
bytestream_put_byte(&p, 0 | (mode << 6));
bytestream_put_byte(&p, pkt->channel_id - 64);
} else {
bytestream_put_byte(&p, 1 | (mode << 6));
bytestream_put_le16(&p, pkt->channel_id - 64);
}
if (mode != RTMP_PS_ONEBYTE) {
uint32_t timestamp = pkt->timestamp;
if (mode != RTMP_PS_TWELVEBYTES)
timestamp = pkt->ts_delta;
bytestream_put_be24(&p, timestamp >= 0xFFFFFF ? 0xFFFFFF : timestamp);
if (mode != RTMP_PS_FOURBYTES) {
bytestream_put_be24(&p, pkt->data_size);
bytestream_put_byte(&p, pkt->type);
if (mode == RTMP_PS_TWELVEBYTES)
bytestream_put_le32(&p, pkt->extra);
}
if (timestamp >= 0xFFFFFF)
bytestream_put_be32(&p, timestamp);
}
// save history
prev_pkt[pkt->channel_id].channel_id = pkt->channel_id;
prev_pkt[pkt->channel_id].type = pkt->type;
prev_pkt[pkt->channel_id].data_size = pkt->data_size;
prev_pkt[pkt->channel_id].timestamp = pkt->timestamp;
if (mode != RTMP_PS_TWELVEBYTES) {
prev_pkt[pkt->channel_id].ts_delta = pkt->ts_delta;
} else {
prev_pkt[pkt->channel_id].ts_delta = pkt->timestamp;
}
prev_pkt[pkt->channel_id].extra = pkt->extra;
if ((ret = ff_buffered_urlwrite(h, pkt_hdr, p - pkt_hdr, pkt->type)) < 0)
return ret;
size = p - pkt_hdr + pkt->data_size;
while (off < pkt->data_size) {
int towrite = FFMIN(chunk_size, pkt->data_size - off);
if ((ret = ff_buffered_urlwrite(h, pkt->data + off, towrite, pkt->type)) < 0)
return ret;
off += towrite;
if (off < pkt->data_size) {
uint8_t marker = 0xC0 | pkt->channel_id;
if ((ret = ff_buffered_urlwrite(h, &marker, 1, pkt->type)) < 0)
return ret;
size++;
}
}
return size;
}
int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id, RTMPPacketType type,
int timestamp, int size)
{
if (size) {
pkt->data = av_malloc(size);
if (!pkt->data)
return AVERROR(ENOMEM);
}
pkt->data_size = size;
pkt->channel_id = channel_id;
pkt->type = type;
pkt->timestamp = timestamp;
pkt->extra = 0;
pkt->ts_delta = 0;
return 0;
}
void ff_rtmp_packet_destroy(RTMPPacket *pkt)
{
if (!pkt)
return;
av_freep(&pkt->data);
pkt->data_size = 0;
}
int ff_amf_tag_size(const uint8_t *data, const uint8_t *data_end)
{
const uint8_t *base = data;
if (data >= data_end)
return -1;
switch (*data++) {
case AMF_DATA_TYPE_NUMBER: return 9;
case AMF_DATA_TYPE_BOOL: return 2;
case AMF_DATA_TYPE_STRING: return 3 + AV_RB16(data);
case AMF_DATA_TYPE_LONG_STRING: return 5 + AV_RB32(data);
case AMF_DATA_TYPE_NULL: return 1;
case AMF_DATA_TYPE_ARRAY:
data += 4;
case AMF_DATA_TYPE_OBJECT:
for (;;) {
int size = bytestream_get_be16(&data);
int t;
if (!size) {
data++;
break;
}
if (data + size >= data_end || data + size < data)
return -1;
data += size;
t = ff_amf_tag_size(data, data_end);
if (t < 0 || data + t >= data_end)
return -1;
data += t;
}
return data - base;
case AMF_DATA_TYPE_OBJECT_END: return 1;
default: return -1;
}
}
int ff_amf_get_field_value(const uint8_t *data, const uint8_t *data_end,
const uint8_t *name, uint8_t *dst, int dst_size)
{
int namelen = strlen(name);
int len;
while (*data != AMF_DATA_TYPE_OBJECT && data < data_end) {
len = ff_amf_tag_size(data, data_end);
if (len < 0)
len = data_end - data;
data += len;
}
if (data_end - data < 3)
return -1;
data++;
for (;;) {
int size = bytestream_get_be16(&data);
if (!size)
break;
if (data + size >= data_end || data + size < data)
return -1;
data += size;
if (size == namelen && !memcmp(data-size, name, namelen)) {
switch (*data++) {
case AMF_DATA_TYPE_NUMBER:
snprintf(dst, dst_size, "%g", av_int2double(AV_RB64(data)));
break;
case AMF_DATA_TYPE_BOOL:
snprintf(dst, dst_size, "%s", *data ? "true" : "false");
break;
case AMF_DATA_TYPE_STRING:
len = bytestream_get_be16(&data);
av_strlcpy(dst, data, FFMIN(len+1, dst_size));
break;
default:
return -1;
}
return 0;
}
len = ff_amf_tag_size(data, data_end);
if (len < 0 || data + len >= data_end || data + len < data)
return -1;
data += len;
}
return -1;
}
static const char* rtmp_packet_type(int type)
{
switch (type) {
case RTMP_PT_CHUNK_SIZE: return "chunk size";
case RTMP_PT_BYTES_READ: return "bytes read";
case RTMP_PT_PING: return "ping";
case RTMP_PT_SERVER_BW: return "server bandwidth";
case RTMP_PT_CLIENT_BW: return "client bandwidth";
case RTMP_PT_AUDIO: return "audio packet";
case RTMP_PT_VIDEO: return "video packet";
case RTMP_PT_FLEX_STREAM: return "Flex shared stream";
case RTMP_PT_FLEX_OBJECT: return "Flex shared object";
case RTMP_PT_FLEX_MESSAGE: return "Flex shared message";
case RTMP_PT_NOTIFY: return "notification";
case RTMP_PT_SHARED_OBJ: return "shared object";
case RTMP_PT_INVOKE: return "invoke";
case RTMP_PT_METADATA: return "metadata";
default: return "unknown";
}
}
static void ff_amf_tag_contents(void *ctx, const uint8_t *data, const uint8_t *data_end)
{
int size;
char buf[1024];
if (data >= data_end)
return;
switch (*data++) {
case AMF_DATA_TYPE_NUMBER:
av_log(ctx, AV_LOG_DEBUG, " number %g\n", av_int2double(AV_RB64(data)));
return;
case AMF_DATA_TYPE_BOOL:
av_log(ctx, AV_LOG_DEBUG, " bool %d\n", *data);
return;
case AMF_DATA_TYPE_STRING:
case AMF_DATA_TYPE_LONG_STRING:
if (data[-1] == AMF_DATA_TYPE_STRING) {
size = bytestream_get_be16(&data);
} else {
size = bytestream_get_be32(&data);
}
size = FFMIN(size, 1023);
memcpy(buf, data, size);
buf[size] = 0;
av_log(ctx, AV_LOG_DEBUG, " string '%s'\n", buf);
return;
case AMF_DATA_TYPE_NULL:
av_log(ctx, AV_LOG_DEBUG, " NULL\n");
return;
case AMF_DATA_TYPE_ARRAY:
data += 4;
case AMF_DATA_TYPE_OBJECT:
av_log(ctx, AV_LOG_DEBUG, " {\n");
for (;;) {
int size = bytestream_get_be16(&data);
int t;
memcpy(buf, data, size);
buf[size] = 0;
if (!size) {
av_log(ctx, AV_LOG_DEBUG, " }\n");
data++;
break;
}
if (data + size >= data_end || data + size < data)
return;
data += size;
av_log(ctx, AV_LOG_DEBUG, " %s: ", buf);
ff_amf_tag_contents(ctx, data, data_end);
t = ff_amf_tag_size(data, data_end);
if (t < 0 || data + t >= data_end)
return;
data += t;
}
return;
case AMF_DATA_TYPE_OBJECT_END:
av_log(ctx, AV_LOG_DEBUG, " }\n");
return;
default:
return;
}
}
void ff_rtmp_packet_dump(void *ctx, RTMPPacket *p)
{
av_log(ctx, AV_LOG_DEBUG, "RTMP packet type '%s'(%d) for channel %d, timestamp %d, extra field %d size %d\n",
rtmp_packet_type(p->type), p->type, p->channel_id, p->timestamp, p->extra, p->data_size);
if (p->type == RTMP_PT_INVOKE || p->type == RTMP_PT_NOTIFY) {
uint8_t *src = p->data, *src_end = p->data + p->data_size;
while (src < src_end) {
int sz;
ff_amf_tag_contents(ctx, src, src_end);
sz = ff_amf_tag_size(src, src_end);
if (sz < 0)
break;
src += sz;
}
} else if (p->type == RTMP_PT_SERVER_BW){
av_log(ctx, AV_LOG_DEBUG, "Server BW = %d\n", AV_RB32(p->data));
} else if (p->type == RTMP_PT_CLIENT_BW){
av_log(ctx, AV_LOG_DEBUG, "Client BW = %d\n", AV_RB32(p->data));
} else if (p->type != RTMP_PT_AUDIO && p->type != RTMP_PT_VIDEO && p->type != RTMP_PT_METADATA) {
int i;
for (i = 0; i < p->data_size; i++)
av_log(ctx, AV_LOG_DEBUG, " %02X", p->data[i]);
av_log(ctx, AV_LOG_DEBUG, "\n");
}
}
-------------- next part --------------
/*
* RTMP packet utilities
* Copyright (c) 2009 Kostya Shishkov
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef AVFORMAT_RTMPPKT_H
#define AVFORMAT_RTMPPKT_H
#include "libavcodec/bytestream.h"
#include "avformat.h"
#include "url.h"
/** maximum possible number of different RTMP channels */
#define RTMP_CHANNELS 65599
/**
* channels used to for RTMP packets with different purposes (i.e. data, network
* control, remote procedure calls, etc.)
*/
enum RTMPChannel {
RTMP_NETWORK_CHANNEL = 2, ///< channel for network-related messages (bandwidth report, ping, etc)
RTMP_SYSTEM_CHANNEL, ///< channel for sending server control messages
RTMP_SOURCE_CHANNEL, ///< channel for sending a/v to server
RTMP_VIDEO_CHANNEL = 8, ///< channel for video data
RTMP_AUDIO_CHANNEL, ///< channel for audio data
};
/**
* known RTMP packet types
*/
typedef enum RTMPPacketType {
RTMP_PT_CHUNK_SIZE = 1, ///< chunk size change
RTMP_PT_BYTES_READ = 3, ///< number of bytes read
RTMP_PT_PING, ///< ping
RTMP_PT_SERVER_BW, ///< server bandwidth
RTMP_PT_CLIENT_BW, ///< client bandwidth
RTMP_PT_AUDIO = 8, ///< audio packet
RTMP_PT_VIDEO, ///< video packet
RTMP_PT_FLEX_STREAM = 15, ///< Flex shared stream
RTMP_PT_FLEX_OBJECT, ///< Flex shared object
RTMP_PT_FLEX_MESSAGE, ///< Flex shared message
RTMP_PT_NOTIFY, ///< some notification
RTMP_PT_SHARED_OBJ, ///< shared object
RTMP_PT_INVOKE, ///< invoke some stream action
RTMP_PT_METADATA = 22, ///< FLV metadata
} RTMPPacketType;
/**
* possible RTMP packet header sizes
*/
enum RTMPPacketSize {
RTMP_PS_TWELVEBYTES = 0, ///< packet has 12-byte header
RTMP_PS_EIGHTBYTES, ///< packet has 8-byte header
RTMP_PS_FOURBYTES, ///< packet has 4-byte header
RTMP_PS_ONEBYTE ///< packet is really a next chunk of a packet
};
/**
* structure for holding RTMP packets
*/
typedef struct RTMPPacket {
int channel_id; ///< RTMP channel ID (nothing to do with audio/video channels though)
RTMPPacketType type; ///< packet payload type
uint32_t timestamp; ///< packet full timestamp
uint32_t ts_delta; ///< timestamp increment to the previous one in milliseconds (latter only for media packets)
uint32_t extra; ///< probably an additional channel ID used during streaming data
uint8_t *data; ///< packet payload
int data_size; ///< packet payload size
} RTMPPacket;
/**
* Create new RTMP packet with given attributes.
*
* @param pkt packet
* @param channel_id packet channel ID
* @param type packet type
* @param timestamp packet timestamp
* @param size packet size
* @return zero on success, negative value otherwise
*/
int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id, RTMPPacketType type,
int timestamp, int size);
/**
* Free RTMP packet.
*
* @param pkt packet
*/
void ff_rtmp_packet_destroy(RTMPPacket *pkt);
/**
* Read RTMP packet sent by the server.
*
* @param h reader context
* @param p packet
* @param chunk_size current chunk size
* @param prev_pkt previously read packet headers for all channels
* (may be needed for restoring incomplete packet header)
* @return number of bytes read on success, negative value otherwise
*/
int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
int chunk_size, RTMPPacket *prev_pkt);
/**
* Read internal RTMP packet sent by the server.
*
* @param h reader context
* @param p packet
* @param chunk_size current chunk size
* @param prev_pkt previously read packet headers for all channels
* (may be needed for restoring incomplete packet header)
* @param c the first byte already read
* @return number of bytes read on success, negative value otherwise
*/
int ff_rtmp_packet_read_internal(URLContext *h, RTMPPacket *p, int chunk_size,
RTMPPacket *prev_pkt, uint8_t c);
/**
* Create the send buffer used to buffer the incoming RTMP packets
*/
int ff_create_rtmp_send_buffer(void);
/**
* Destroy the send buffer used to buffer the incoming RTMP packets
*/
int ff_destroy_rtmp_send_buffer(void);
/**
* Buffer the incoming RTMP packet data before calling ffurl_write
*
* @param h reader context
* @param buf buffer to send
* @param size size of the data in the buffer
* @param pkt_type type of the RTMP packet data
* @return number of bytes written on success, negative value otherwise
*/
int ff_buffered_urlwrite(URLContext *h, const unsigned char *buf,
int size, int pkt_type);
/**
* Send RTMP packet to the server.
*
* @param h reader context
* @param p packet to send
* @param chunk_size current chunk size
* @param prev_pkt previously sent packet headers for all channels
* (may be used for packet header compressing)
* @return number of bytes written on success, negative value otherwise
*/
int ff_rtmp_packet_write(URLContext *h, RTMPPacket *p,
int chunk_size, RTMPPacket *prev_pkt);
/**
* Print information and contents of RTMP packet.
*
* @param ctx output context
* @param p packet to dump
*/
void ff_rtmp_packet_dump(void *ctx, RTMPPacket *p);
/**
* @name Functions used to work with the AMF format (which is also used in .flv)
* @see amf_* funcs in libavformat/flvdec.c
* @{
*/
/**
* Calculate number of bytes taken by first AMF entry in data.
*
* @param data input data
* @param data_end input buffer end
* @return number of bytes used by first AMF entry
*/
int ff_amf_tag_size(const uint8_t *data, const uint8_t *data_end);
/**
* Retrieve value of given AMF object field in string form.
*
* @param data AMF object data
* @param data_end input buffer end
* @param name name of field to retrieve
* @param dst buffer for storing result
* @param dst_size output buffer size
* @return 0 if search and retrieval succeeded, negative value otherwise
*/
int ff_amf_get_field_value(const uint8_t *data, const uint8_t *data_end,
const uint8_t *name, uint8_t *dst, int dst_size);
/**
* Write boolean value in AMF format to buffer.
*
* @param dst pointer to the input buffer (will be modified)
* @param val value to write
*/
void ff_amf_write_bool(uint8_t **dst, int val);
/**
* Write number in AMF format to buffer.
*
* @param dst pointer to the input buffer (will be modified)
* @param num value to write
*/
void ff_amf_write_number(uint8_t **dst, double num);
/**
* Write string in AMF format to buffer.
*
* @param dst pointer to the input buffer (will be modified)
* @param str string to write
*/
void ff_amf_write_string(uint8_t **dst, const char *str);
/**
* Write AMF NULL value to buffer.
*
* @param dst pointer to the input buffer (will be modified)
*/
void ff_amf_write_null(uint8_t **dst);
/**
* Write marker for AMF object to buffer.
*
* @param dst pointer to the input buffer (will be modified)
*/
void ff_amf_write_object_start(uint8_t **dst);
/**
* Write string used as field name in AMF object to buffer.
*
* @param dst pointer to the input buffer (will be modified)
* @param str string to write
*/
void ff_amf_write_field_name(uint8_t **dst, const char *str);
/**
* Write marker for end of AMF object to buffer.
*
* @param dst pointer to the input buffer (will be modified)
*/
void ff_amf_write_object_end(uint8_t **dst);
/**
* Read AMF boolean value.
*
*@param[in,out] gbc GetByteContext initialized with AMF-formatted data
*@param[out] val 0 or 1
*@return 0 on success or an AVERROR code on failure
*/
int ff_amf_read_bool(GetByteContext *gbc, int *val);
/**
* Read AMF number value.
*
*@param[in,out] gbc GetByteContext initialized with AMF-formatted data
*@param[out] val read value
*@return 0 on success or an AVERROR code on failure
*/
int ff_amf_read_number(GetByteContext *gbc, double *val);
/**
* Read AMF string value.
*
* Appends a trailing \0 to output string in order to
* ease later parsing.
*
*@param[in,out] gbc GetByteContext initialized with AMF-formatted data
*@param[out] str read string
*@param[in] strsize buffer size available to store the read string
*@param[out] length read string length
*@return 0 on success or an AVERROR code on failure
*/
int ff_amf_read_string(GetByteContext *gbc, uint8_t *str,
int strsize, int *length);
/**
* Read AMF NULL value.
*
*@param[in,out] gbc GetByteContext initialized with AMF-formatted data
*@return 0 on success or an AVERROR code on failure
*/
int ff_amf_read_null(GetByteContext *gbc);
/** @} */ // AMF funcs
#endif /* AVFORMAT_RTMPPKT_H */
-------------- next part --------------
/*
* RTMP network protocol
* Copyright (c) 2009 Kostya Shishkov
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* @file
* RTMP protocol
*/
#include "libavcodec/bytestream.h"
#include "libavutil/avstring.h"
#include "libavutil/intfloat.h"
#include "libavutil/lfg.h"
#include "libavutil/opt.h"
#include "libavutil/random_seed.h"
#include "libavutil/sha.h"
#include "avformat.h"
#include "internal.h"
#include "network.h"
#include "flv.h"
#include "rtmp.h"
#include "rtmpcrypt.h"
#include "rtmppkt.h"
#include "url.h"
#if CONFIG_ZLIB
#include <zlib.h>
#endif
//#define DEBUG
#define APP_MAX_LENGTH 128
#define PLAYPATH_MAX_LENGTH 256
#define TCURL_MAX_LENGTH 512
#define FLASHVER_MAX_LENGTH 64
#define RTMP_PKTDATA_DEFAULT_SIZE 4096
#define RTMP_OUTGOING_CHUNK_SIZE 128
/** RTMP protocol handler state */
typedef enum {
STATE_START, ///< client has not done anything yet
STATE_HANDSHAKED, ///< client has performed handshake
STATE_FCPUBLISH, ///< client FCPublishing stream (for output)
STATE_PLAYING, ///< client has started receiving multimedia data from server
STATE_PUBLISHING, ///< client has started sending multimedia data to server (for output)
STATE_RECEIVING, ///< received a publish command (for input)
STATE_STOPPED, ///< the broadcast has been stopped
} ClientState;
typedef struct TrackedMethod {
char *name;
int id;
} TrackedMethod;
/** protocol handler context */
typedef struct RTMPContext {
const AVClass *class;
URLContext* stream; ///< TCP stream used in interactions with RTMP server
RTMPPacket prev_pkt[2][RTMP_CHANNELS]; ///< packet history used when reading and sending packets
int in_chunk_size; ///< size of the chunks incoming RTMP packets are divided into
int out_chunk_size; ///< size of the chunks outgoing RTMP packets are divided into
int is_input; ///< input/output flag
char *playpath; ///< stream identifier to play (with possible "mp4:" prefix)
int live; ///< 0: recorded, -1: live, -2: both
char *app; ///< name of application
char *conn; ///< append arbitrary AMF data to the Connect message
ClientState state; ///< current state
int main_channel_id; ///< an additional channel ID which is used for some invocations
uint8_t* flv_data; ///< buffer with data for demuxer
int flv_size; ///< current buffer size
int flv_off; ///< number of bytes read from current buffer
int flv_nb_packets; ///< number of flv packets published
RTMPPacket out_pkt; ///< rtmp packet, created from flv a/v or metadata (for output)
uint32_t client_report_size; ///< number of bytes after which client should report to server
uint32_t bytes_read; ///< number of bytes read from server
uint32_t last_bytes_read; ///< number of bytes read last reported to server
int skip_bytes; ///< number of bytes to skip from the input FLV stream in the next write call
uint8_t flv_header[11]; ///< partial incoming flv packet header
int flv_header_bytes; ///< number of initialized bytes in flv_header
int nb_invokes; ///< keeps track of invoke messages
char* tcurl; ///< url of the target stream
char* flashver; ///< version of the flash plugin
char* swfhash; ///< SHA256 hash of the decompressed SWF file (32 bytes)
int swfhash_len; ///< length of the SHA256 hash
int swfsize; ///< size of the decompressed SWF file
char* swfurl; ///< url of the swf player
char* swfverify; ///< URL to player swf file, compute hash/size automatically
char swfverification[42]; ///< hash of the SWF verification
char* pageurl; ///< url of the web page
char* subscribe; ///< name of live stream to subscribe
int server_bw; ///< server bandwidth
int client_buffer_time; ///< client buffer time in ms
int flush_interval; ///< number of packets flushed in the same request (RTMPT only)
int encrypted; ///< use an encrypted connection (RTMPE only)
TrackedMethod*tracked_methods; ///< tracked methods buffer
int nb_tracked_methods; ///< number of tracked methods
int tracked_methods_size; ///< size of the tracked methods buffer
int listen; ///< listen mode flag
int listen_timeout; ///< listen timeout to wait for new connections
int nb_streamid; ///< The next stream id to return on createStream calls
} RTMPContext;
#define PLAYER_KEY_OPEN_PART_LEN 30 ///< length of partial key used for first client digest signing
/** Client key used for digest signing */
static const uint8_t rtmp_player_key[] = {
'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
'F', 'l', 'a', 's', 'h', ' ', 'P', 'l', 'a', 'y', 'e', 'r', ' ', '0', '0', '1',
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
};
#define SERVER_KEY_OPEN_PART_LEN 36 ///< length of partial key used for first server digest signing
/** Key used for RTMP server digest signing */
static const uint8_t rtmp_server_key[] = {
'G', 'e', 'n', 'u', 'i', 'n', 'e', ' ', 'A', 'd', 'o', 'b', 'e', ' ',
'F', 'l', 'a', 's', 'h', ' ', 'M', 'e', 'd', 'i', 'a', ' ',
'S', 'e', 'r', 'v', 'e', 'r', ' ', '0', '0', '1',
0xF0, 0xEE, 0xC2, 0x4A, 0x80, 0x68, 0xBE, 0xE8, 0x2E, 0x00, 0xD0, 0xD1, 0x02,
0x9E, 0x7E, 0x57, 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8,
0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE
};
static int add_tracked_method(RTMPContext *rt, const char *name, int id)
{
void *ptr;
if (rt->nb_tracked_methods + 1 > rt->tracked_methods_size) {
rt->tracked_methods_size = (rt->nb_tracked_methods + 1) * 2;
ptr = av_realloc(rt->tracked_methods,
rt->tracked_methods_size * sizeof(*rt->tracked_methods));
if (!ptr)
return AVERROR(ENOMEM);
rt->tracked_methods = ptr;
}
rt->tracked_methods[rt->nb_tracked_methods].name = av_strdup(name);
if (!rt->tracked_methods[rt->nb_tracked_methods].name)
return AVERROR(ENOMEM);
rt->tracked_methods[rt->nb_tracked_methods].id = id;
rt->nb_tracked_methods++;
return 0;
}
static void del_tracked_method(RTMPContext *rt, int index)
{
memmove(&rt->tracked_methods[index], &rt->tracked_methods[index + 1],
sizeof(*rt->tracked_methods) * (rt->nb_tracked_methods - index - 1));
rt->nb_tracked_methods--;
}
static int find_tracked_method(URLContext *s, RTMPPacket *pkt, int offset,
char **tracked_method)
{
RTMPContext *rt = s->priv_data;
GetByteContext gbc;
double pkt_id;
int ret;
int i;
bytestream2_init(&gbc, pkt->data + offset, pkt->data_size - offset);
if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0)
return ret;
for (i = 0; i < rt->nb_tracked_methods; i++) {
if (rt->tracked_methods[i].id != pkt_id)
continue;
*tracked_method = rt->tracked_methods[i].name;
del_tracked_method(rt, i);
break;
}
return 0;
}
static void free_tracked_methods(RTMPContext *rt)
{
int i;
for (i = 0; i < rt->nb_tracked_methods; i ++)
av_free(rt->tracked_methods[i].name);
av_free(rt->tracked_methods);
}
static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
{
int ret;
if (pkt->type == RTMP_PT_INVOKE && track) {
GetByteContext gbc;
char name[128];
double pkt_id;
int len;
bytestream2_init(&gbc, pkt->data, pkt->data_size);
if ((ret = ff_amf_read_string(&gbc, name, sizeof(name), &len)) < 0)
goto fail;
if ((ret = ff_amf_read_number(&gbc, &pkt_id)) < 0)
goto fail;
if ((ret = add_tracked_method(rt, name, pkt_id)) < 0)
goto fail;
}
ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
fail:
ff_rtmp_packet_destroy(pkt);
return ret;
}
static int rtmp_write_amf_data(URLContext *s, char *param, uint8_t **p)
{
char *field, *value;
char type;
/* The type must be B for Boolean, N for number, S for string, O for
* object, or Z for null. For Booleans the data must be either 0 or 1 for
* FALSE or TRUE, respectively. Likewise for Objects the data must be
* 0 or 1 to end or begin an object, respectively. Data items in subobjects
* may be named, by prefixing the type with 'N' and specifying the name
* before the value (ie. NB:myFlag:1). This option may be used multiple times
* to construct arbitrary AMF sequences. */
if (param[0] && param[1] == ':') {
type = param[0];
value = param + 2;
} else if (param[0] == 'N' && param[1] && param[2] == ':') {
type = param[1];
field = param + 3;
value = strchr(field, ':');
if (!value)
goto fail;
*value = '\0';
value++;
if (!field || !value)
goto fail;
ff_amf_write_field_name(p, field);
} else {
goto fail;
}
switch (type) {
case 'B':
ff_amf_write_bool(p, value[0] != '0');
break;
case 'S':
ff_amf_write_string(p, value);
break;
case 'N':
ff_amf_write_number(p, strtod(value, NULL));
break;
case 'Z':
ff_amf_write_null(p);
break;
case 'O':
if (value[0] != '0')
ff_amf_write_object_start(p);
else
ff_amf_write_object_end(p);
break;
default:
goto fail;
break;
}
return 0;
fail:
av_log(s, AV_LOG_ERROR, "Invalid AMF parameter: %s\n", param);
return AVERROR(EINVAL);
}
/**
* Generate 'connect' call and send it to the server.
*/
static int gen_connect(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 4096)) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "connect");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_object_start(&p);
ff_amf_write_field_name(&p, "app");
ff_amf_write_string(&p, rt->app);
if (!rt->is_input) {
ff_amf_write_field_name(&p, "type");
ff_amf_write_string(&p, "nonprivate");
}
ff_amf_write_field_name(&p, "flashVer");
ff_amf_write_string(&p, rt->flashver);
if (rt->swfurl) {
ff_amf_write_field_name(&p, "swfUrl");
ff_amf_write_string(&p, rt->swfurl);
}
ff_amf_write_field_name(&p, "tcUrl");
ff_amf_write_string(&p, rt->tcurl);
if (rt->is_input) {
ff_amf_write_field_name(&p, "fpad");
ff_amf_write_bool(&p, 0);
ff_amf_write_field_name(&p, "capabilities");
ff_amf_write_number(&p, 15.0);
/* Tell the server we support all the audio codecs except
* SUPPORT_SND_INTEL (0x0008) and SUPPORT_SND_UNUSED (0x0010)
* which are unused in the RTMP protocol implementation. */
ff_amf_write_field_name(&p, "audioCodecs");
ff_amf_write_number(&p, 4071.0);
ff_amf_write_field_name(&p, "videoCodecs");
ff_amf_write_number(&p, 252.0);
ff_amf_write_field_name(&p, "videoFunction");
ff_amf_write_number(&p, 1.0);
if (rt->pageurl) {
ff_amf_write_field_name(&p, "pageUrl");
ff_amf_write_string(&p, rt->pageurl);
}
}
ff_amf_write_object_end(&p);
if (rt->conn) {
char *param = rt->conn;
// Write arbitrary AMF data to the Connect message.
while (param != NULL) {
char *sep;
param += strspn(param, " ");
if (!*param)
break;
sep = strchr(param, ' ');
if (sep)
*sep = '\0';
if ((ret = rtmp_write_amf_data(s, param, &p)) < 0) {
// Invalid AMF parameter.
ff_rtmp_packet_destroy(&pkt);
return ret;
}
if (sep)
param = sep + 1;
else
break;
}
}
pkt.data_size = p - pkt.data;
return rtmp_send_packet(rt, &pkt, 1);
}
static int read_connect(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt = { 0 };
uint8_t *p;
const uint8_t *cp;
int ret;
char command[64];
int stringlen;
double seqnum;
uint8_t tmpstr[256];
GetByteContext gbc;
if ((ret = ff_rtmp_packet_read(rt->stream, &pkt, rt->in_chunk_size,
rt->prev_pkt[1])) < 0)
return ret;
cp = pkt.data;
bytestream2_init(&gbc, cp, pkt.data_size);
if (ff_amf_read_string(&gbc, command, sizeof(command), &stringlen)) {
av_log(s, AV_LOG_ERROR, "Unable to read command string\n");
ff_rtmp_packet_destroy(&pkt);
return AVERROR_INVALIDDATA;
}
if (strcmp(command, "connect")) {
av_log(s, AV_LOG_ERROR, "Expecting connect, got %s\n", command);
ff_rtmp_packet_destroy(&pkt);
return AVERROR_INVALIDDATA;
}
ret = ff_amf_read_number(&gbc, &seqnum);
if (ret)
av_log(s, AV_LOG_WARNING, "SeqNum not found\n");
/* Here one could parse an AMF Object with data as flashVers and others. */
ret = ff_amf_get_field_value(gbc.buffer,
gbc.buffer + bytestream2_get_bytes_left(&gbc),
"app", tmpstr, sizeof(tmpstr));
if (ret)
av_log(s, AV_LOG_WARNING, "App field not found in connect\n");
if (!ret && strcmp(tmpstr, rt->app))
av_log(s, AV_LOG_WARNING, "App field don't match up: %s <-> %s\n",
tmpstr, rt->app);
ff_rtmp_packet_destroy(&pkt);
// Send Window Acknowledgement Size (as defined in speficication)
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
RTMP_PT_SERVER_BW, 0, 4)) < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->server_bw);
pkt.data_size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
// Send Peer Bandwidth
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
RTMP_PT_CLIENT_BW, 0, 5)) < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->server_bw);
bytestream_put_byte(&p, 2); // dynamic
pkt.data_size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
// Ping request
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL,
RTMP_PT_PING, 0, 6)) < 0)
return ret;
p = pkt.data;
bytestream_put_be16(&p, 0); // 0 -> Stream Begin
bytestream_put_be32(&p, 0);
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
// Chunk size
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
RTMP_PT_CHUNK_SIZE, 0, 4)) < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->out_chunk_size);
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
// Send result_ NetConnection.Connect.Success to connect
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
RTMP_PT_INVOKE, 0,
RTMP_PKTDATA_DEFAULT_SIZE)) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "_result");
ff_amf_write_number(&p, seqnum);
ff_amf_write_object_start(&p);
ff_amf_write_field_name(&p, "fmsVer");
ff_amf_write_string(&p, "FMS/3,0,1,123");
ff_amf_write_field_name(&p, "capabilities");
ff_amf_write_number(&p, 31);
ff_amf_write_object_end(&p);
ff_amf_write_object_start(&p);
ff_amf_write_field_name(&p, "level");
ff_amf_write_string(&p, "status");
ff_amf_write_field_name(&p, "code");
ff_amf_write_string(&p, "NetConnection.Connect.Success");
ff_amf_write_field_name(&p, "description");
ff_amf_write_string(&p, "Connection succeeded.");
ff_amf_write_field_name(&p, "objectEncoding");
ff_amf_write_number(&p, 0);
ff_amf_write_object_end(&p);
pkt.data_size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
if (ret < 0)
return ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL,
RTMP_PT_INVOKE, 0, 30)) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "onBWDone");
ff_amf_write_number(&p, 0);
ff_amf_write_null(&p);
ff_amf_write_number(&p, 8192);
pkt.data_size = p - pkt.data;
ret = ff_rtmp_packet_write(rt->stream, &pkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&pkt);
return ret;
}
/**
* Generate 'releaseStream' call and send it to the server. It should make
* the server release some channel for media streams.
*/
static int gen_release_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 29 + strlen(rt->playpath))) < 0)
return ret;
av_log(s, AV_LOG_DEBUG, "Releasing stream...\n");
p = pkt.data;
ff_amf_write_string(&p, "releaseStream");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
return rtmp_send_packet(rt, &pkt, 1);
}
/**
* Generate 'FCPublish' call and send it to the server. It should make
* the server preapare for receiving media streams.
*/
static int gen_fcpublish_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 25 + strlen(rt->playpath))) < 0)
return ret;
av_log(s, AV_LOG_DEBUG, "FCPublish stream...\n");
p = pkt.data;
ff_amf_write_string(&p, "FCPublish");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
return rtmp_send_packet(rt, &pkt, 1);
}
/**
* Generate 'FCUnpublish' call and send it to the server. It should make
* the server destroy stream.
*/
static int gen_fcunpublish_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 27 + strlen(rt->playpath))) < 0)
return ret;
av_log(s, AV_LOG_DEBUG, "UnPublishing stream...\n");
p = pkt.data;
ff_amf_write_string(&p, "FCUnpublish");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate 'createStream' call and send it to the server. It should make
* the server allocate some channel for media streams.
*/
static int gen_create_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Creating stream...\n");
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 25)) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "createStream");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
return rtmp_send_packet(rt, &pkt, 1);
}
/**
* Generate 'deleteStream' call and send it to the server. It should make
* the server remove some channel for media streams.
*/
static int gen_delete_stream(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Deleting stream...\n");
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 34)) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "deleteStream");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_number(&p, rt->main_channel_id);
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate client buffer time and send it to the server.
*/
static int gen_buffer_time(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING,
1, 10)) < 0)
return ret;
p = pkt.data;
bytestream_put_be16(&p, 3);
bytestream_put_be32(&p, rt->main_channel_id);
bytestream_put_be32(&p, rt->client_buffer_time);
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate 'play' call and send it to the server, then ping the server
* to start actual playing.
*/
static int gen_play(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Sending play command for '%s'\n", rt->playpath);
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_VIDEO_CHANNEL, RTMP_PT_INVOKE,
0, 29 + strlen(rt->playpath))) < 0)
return ret;
pkt.extra = rt->main_channel_id;
p = pkt.data;
ff_amf_write_string(&p, "play");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
ff_amf_write_number(&p, rt->live);
return rtmp_send_packet(rt, &pkt, 1);
}
/**
* Generate 'publish' call and send it to the server.
*/
static int gen_publish(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Sending publish command for '%s'\n", rt->playpath);
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE,
0, 30 + strlen(rt->playpath))) < 0)
return ret;
pkt.extra = rt->main_channel_id;
p = pkt.data;
ff_amf_write_string(&p, "publish");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
ff_amf_write_string(&p, "live");
return rtmp_send_packet(rt, &pkt, 1);
}
/**
* Generate ping reply and send it to the server.
*/
static int gen_pong(URLContext *s, RTMPContext *rt, RTMPPacket *ppkt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if (ppkt->data_size < 6) {
av_log(s, AV_LOG_ERROR, "Too short ping packet (%d)\n",
ppkt->data_size);
return AVERROR_INVALIDDATA;
}
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING,
ppkt->timestamp + 1, 6)) < 0)
return ret;
p = pkt.data;
bytestream_put_be16(&p, 7);
bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate SWF verification message and send it to the server.
*/
static int gen_swf_verification(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Sending SWF verification...\n");
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_PING,
0, 44)) < 0)
return ret;
p = pkt.data;
bytestream_put_be16(&p, 27);
memcpy(p, rt->swfverification, 42);
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate outgoing chunk size message and send it to the server
*/
static int gen_outgoing_chunk_size(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_CHUNK_SIZE,
0, 4)) < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->out_chunk_size);
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate server bandwidth message and send it to the server.
*/
static int gen_server_bw(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_SERVER_BW,
0, 4)) < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->server_bw);
return rtmp_send_packet(rt, &pkt, 0);
}
/**
* Generate check bandwidth message and send it to the server.
*/
static int gen_check_bw(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 21)) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "_checkbw");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
return rtmp_send_packet(rt, &pkt, 1);
}
/**
* Generate report on bytes read so far and send it to the server.
*/
static int gen_bytes_read(URLContext *s, RTMPContext *rt, uint32_t ts)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_BYTES_READ,
ts, 4)) < 0)
return ret;
p = pkt.data;
bytestream_put_be32(&p, rt->bytes_read);
return rtmp_send_packet(rt, &pkt, 0);
}
static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
const char *subscribe)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SYSTEM_CHANNEL, RTMP_PT_INVOKE,
0, 27 + strlen(subscribe))) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "FCSubscribe");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, subscribe);
return rtmp_send_packet(rt, &pkt, 1);
}
int ff_rtmp_calc_digest(const uint8_t *src, int len, int gap,
const uint8_t *key, int keylen, uint8_t *dst)
{
struct AVSHA *sha;
uint8_t hmac_buf[64+32] = {0};
int i;
sha = av_mallocz(av_sha_size);
if (!sha)
return AVERROR(ENOMEM);
if (keylen < 64) {
memcpy(hmac_buf, key, keylen);
} else {
av_sha_init(sha, 256);
av_sha_update(sha,key, keylen);
av_sha_final(sha, hmac_buf);
}
for (i = 0; i < 64; i++)
hmac_buf[i] ^= HMAC_IPAD_VAL;
av_sha_init(sha, 256);
av_sha_update(sha, hmac_buf, 64);
if (gap <= 0) {
av_sha_update(sha, src, len);
} else { //skip 32 bytes used for storing digest
av_sha_update(sha, src, gap);
av_sha_update(sha, src + gap + 32, len - gap - 32);
}
av_sha_final(sha, hmac_buf + 64);
for (i = 0; i < 64; i++)
hmac_buf[i] ^= HMAC_IPAD_VAL ^ HMAC_OPAD_VAL; //reuse XORed key for opad
av_sha_init(sha, 256);
av_sha_update(sha, hmac_buf, 64+32);
av_sha_final(sha, dst);
av_free(sha);
return 0;
}
int ff_rtmp_calc_digest_pos(const uint8_t *buf, int off, int mod_val,
int add_val)
{
int i, digest_pos = 0;
for (i = 0; i < 4; i++)
digest_pos += buf[i + off];
digest_pos = digest_pos % mod_val + add_val;
return digest_pos;
}
/**
* Put HMAC-SHA2 digest of packet data (except for the bytes where this digest
* will be stored) into that packet.
*
* @param buf handshake data (1536 bytes)
* @param encrypted use an encrypted connection (RTMPE)
* @return offset to the digest inside input data
*/
static int rtmp_handshake_imprint_with_digest(uint8_t *buf, int encrypted)
{
int ret, digest_pos;
if (encrypted)
digest_pos = ff_rtmp_calc_digest_pos(buf, 772, 728, 776);
else
digest_pos = ff_rtmp_calc_digest_pos(buf, 8, 728, 12);
ret = ff_rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
rtmp_player_key, PLAYER_KEY_OPEN_PART_LEN,
buf + digest_pos);
if (ret < 0)
return ret;
return digest_pos;
}
/**
* Verify that the received server response has the expected digest value.
*
* @param buf handshake data received from the server (1536 bytes)
* @param off position to search digest offset from
* @return 0 if digest is valid, digest position otherwise
*/
static int rtmp_validate_digest(uint8_t *buf, int off)
{
uint8_t digest[32];
int ret, digest_pos;
digest_pos = ff_rtmp_calc_digest_pos(buf, off, 728, off + 4);
ret = ff_rtmp_calc_digest(buf, RTMP_HANDSHAKE_PACKET_SIZE, digest_pos,
rtmp_server_key, SERVER_KEY_OPEN_PART_LEN,
digest);
if (ret < 0)
return ret;
if (!memcmp(digest, buf + digest_pos, 32))
return digest_pos;
return 0;
}
static int rtmp_calc_swf_verification(URLContext *s, RTMPContext *rt,
uint8_t *buf)
{
uint8_t *p;
int ret;
if (rt->swfhash_len != 32) {
av_log(s, AV_LOG_ERROR,
"Hash of the decompressed SWF file is not 32 bytes long.\n");
return AVERROR(EINVAL);
}
p = &rt->swfverification[0];
bytestream_put_byte(&p, 1);
bytestream_put_byte(&p, 1);
bytestream_put_be32(&p, rt->swfsize);
bytestream_put_be32(&p, rt->swfsize);
if ((ret = ff_rtmp_calc_digest(rt->swfhash, 32, 0, buf, 32, p)) < 0)
return ret;
return 0;
}
#if CONFIG_ZLIB
static int rtmp_uncompress_swfplayer(uint8_t *in_data, int64_t in_size,
uint8_t **out_data, int64_t *out_size)
{
z_stream zs = { 0 };
void *ptr;
int size;
int ret = 0;
zs.avail_in = in_size;
zs.next_in = in_data;
ret = inflateInit(&zs);
if (ret != Z_OK)
return AVERROR_UNKNOWN;
do {
uint8_t tmp_buf[16384];
zs.avail_out = sizeof(tmp_buf);
zs.next_out = tmp_buf;
ret = inflate(&zs, Z_NO_FLUSH);
if (ret != Z_OK && ret != Z_STREAM_END) {
ret = AVERROR_UNKNOWN;
goto fail;
}
size = sizeof(tmp_buf) - zs.avail_out;
if (!(ptr = av_realloc(*out_data, *out_size + size))) {
ret = AVERROR(ENOMEM);
goto fail;
}
*out_data = ptr;
memcpy(*out_data + *out_size, tmp_buf, size);
*out_size += size;
} while (zs.avail_out == 0);
fail:
inflateEnd(&zs);
return ret;
}
#endif
static int rtmp_calc_swfhash(URLContext *s)
{
RTMPContext *rt = s->priv_data;
uint8_t *in_data = NULL, *out_data = NULL, *swfdata;
int64_t in_size, out_size;
URLContext *stream;
char swfhash[32];
int swfsize;
int ret = 0;
/* Get the SWF player file. */
if ((ret = ffurl_open(&stream, rt->swfverify, AVIO_FLAG_READ,
&s->interrupt_callback, NULL)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot open connection %s.\n", rt->swfverify);
goto fail;
}
if ((in_size = ffurl_seek(stream, 0, AVSEEK_SIZE)) < 0) {
ret = AVERROR(EIO);
goto fail;
}
if (!(in_data = av_malloc(in_size))) {
ret = AVERROR(ENOMEM);
goto fail;
}
if ((ret = ffurl_read_complete(stream, in_data, in_size)) < 0)
goto fail;
if (in_size < 3) {
ret = AVERROR_INVALIDDATA;
goto fail;
}
if (!memcmp(in_data, "CWS", 3)) {
/* Decompress the SWF player file using Zlib. */
if (!(out_data = av_malloc(8))) {
ret = AVERROR(ENOMEM);
goto fail;
}
*in_data = 'F'; // magic stuff
memcpy(out_data, in_data, 8);
out_size = 8;
#if CONFIG_ZLIB
if ((ret = rtmp_uncompress_swfplayer(in_data + 8, in_size - 8,
&out_data, &out_size)) < 0)
goto fail;
#else
av_log(s, AV_LOG_ERROR,
"Zlib is required for decompressing the SWF player file.\n");
ret = AVERROR(EINVAL);
goto fail;
#endif
swfsize = out_size;
swfdata = out_data;
} else {
swfsize = in_size;
swfdata = in_data;
}
/* Compute the SHA256 hash of the SWF player file. */
if ((ret = ff_rtmp_calc_digest(swfdata, swfsize, 0,
"Genuine Adobe Flash Player 001", 30,
swfhash)) < 0)
goto fail;
/* Set SWFVerification parameters. */
av_opt_set_bin(rt, "rtmp_swfhash", swfhash, 32, 0);
rt->swfsize = swfsize;
fail:
av_freep(&in_data);
av_freep(&out_data);
ffurl_close(stream);
return ret;
}
/**
* Perform handshake with the server by means of exchanging pseudorandom data
* signed with HMAC-SHA2 digest.
*
* @return 0 if handshake succeeds, negative value otherwise
*/
static int rtmp_handshake(URLContext *s, RTMPContext *rt)
{
AVLFG rnd;
uint8_t tosend [RTMP_HANDSHAKE_PACKET_SIZE+1] = {
3, // unencrypted data
0, 0, 0, 0, // client uptime
RTMP_CLIENT_VER1,
RTMP_CLIENT_VER2,
RTMP_CLIENT_VER3,
RTMP_CLIENT_VER4,
};
uint8_t clientdata[RTMP_HANDSHAKE_PACKET_SIZE];
uint8_t serverdata[RTMP_HANDSHAKE_PACKET_SIZE+1];
int i;
int server_pos, client_pos;
uint8_t digest[32], signature[32];
int ret, type = 0;
av_log(s, AV_LOG_DEBUG, "Handshaking...\n");
av_lfg_init(&rnd, 0xDEADC0DE);
// generate handshake packet - 1536 bytes of pseudorandom data
for (i = 9; i <= RTMP_HANDSHAKE_PACKET_SIZE; i++)
tosend[i] = av_lfg_get(&rnd) >> 24;
if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* When the client wants to use RTMPE, we have to change the command
* byte to 0x06 which means to use encrypted data and we have to set
* the flash version to at least 9.0.115.0. */
tosend[0] = 6;
tosend[5] = 128;
tosend[6] = 0;
tosend[7] = 3;
tosend[8] = 2;
/* Initialize the Diffie-Hellmann context and generate the public key
* to send to the server. */
if ((ret = ff_rtmpe_gen_pub_key(rt->stream, tosend + 1)) < 0)
return ret;
}
client_pos = rtmp_handshake_imprint_with_digest(tosend + 1, rt->encrypted);
if (client_pos < 0)
return client_pos;
if ((ret = ffurl_write(rt->stream, tosend,
RTMP_HANDSHAKE_PACKET_SIZE + 1)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot write RTMP handshake request\n");
return ret;
}
if ((ret = ffurl_read_complete(rt->stream, serverdata,
RTMP_HANDSHAKE_PACKET_SIZE + 1)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
return ret;
}
if ((ret = ffurl_read_complete(rt->stream, clientdata,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0) {
av_log(s, AV_LOG_ERROR, "Cannot read RTMP handshake response\n");
return ret;
}
av_log(s, AV_LOG_DEBUG, "Type answer %d\n", serverdata[0]);
av_log(s, AV_LOG_DEBUG, "Server version %d.%d.%d.%d\n",
serverdata[5], serverdata[6], serverdata[7], serverdata[8]);
if (rt->is_input && serverdata[5] >= 3) {
server_pos = rtmp_validate_digest(serverdata + 1, 772);
if (server_pos < 0)
return server_pos;
if (!server_pos) {
type = 1;
server_pos = rtmp_validate_digest(serverdata + 1, 8);
if (server_pos < 0)
return server_pos;
if (!server_pos) {
av_log(s, AV_LOG_ERROR, "Server response validating failed\n");
return AVERROR(EIO);
}
}
/* Generate SWFVerification token (SHA256 HMAC hash of decompressed SWF,
* key are the last 32 bytes of the server handshake. */
if (rt->swfsize) {
if ((ret = rtmp_calc_swf_verification(s, rt, serverdata + 1 +
RTMP_HANDSHAKE_PACKET_SIZE - 32)) < 0)
return ret;
}
ret = ff_rtmp_calc_digest(tosend + 1 + client_pos, 32, 0,
rtmp_server_key, sizeof(rtmp_server_key),
digest);
if (ret < 0)
return ret;
ret = ff_rtmp_calc_digest(clientdata, RTMP_HANDSHAKE_PACKET_SIZE - 32,
0, digest, 32, signature);
if (ret < 0)
return ret;
if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Compute the shared secret key sent by the server and initialize
* the RC4 encryption. */
if ((ret = ff_rtmpe_compute_secret_key(rt->stream, serverdata + 1,
tosend + 1, type)) < 0)
return ret;
/* Encrypt the signature received by the server. */
ff_rtmpe_encrypt_sig(rt->stream, signature, digest, serverdata[0]);
}
if (memcmp(signature, clientdata + RTMP_HANDSHAKE_PACKET_SIZE - 32, 32)) {
av_log(s, AV_LOG_ERROR, "Signature mismatch\n");
return AVERROR(EIO);
}
for (i = 0; i < RTMP_HANDSHAKE_PACKET_SIZE; i++)
tosend[i] = av_lfg_get(&rnd) >> 24;
ret = ff_rtmp_calc_digest(serverdata + 1 + server_pos, 32, 0,
rtmp_player_key, sizeof(rtmp_player_key),
digest);
if (ret < 0)
return ret;
ret = ff_rtmp_calc_digest(tosend, RTMP_HANDSHAKE_PACKET_SIZE - 32, 0,
digest, 32,
tosend + RTMP_HANDSHAKE_PACKET_SIZE - 32);
if (ret < 0)
return ret;
if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Encrypt the signature to be send to the server. */
ff_rtmpe_encrypt_sig(rt->stream, tosend +
RTMP_HANDSHAKE_PACKET_SIZE - 32, digest,
serverdata[0]);
}
// write reply back to the server
if ((ret = ffurl_write(rt->stream, tosend,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0)
return ret;
if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Set RC4 keys for encryption and update the keystreams. */
if ((ret = ff_rtmpe_update_keystream(rt->stream)) < 0)
return ret;
}
} else {
if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Compute the shared secret key sent by the server and initialize
* the RC4 encryption. */
if ((ret = ff_rtmpe_compute_secret_key(rt->stream, serverdata + 1,
tosend + 1, 1)) < 0)
return ret;
if (serverdata[0] == 9) {
/* Encrypt the signature received by the server. */
ff_rtmpe_encrypt_sig(rt->stream, signature, digest,
serverdata[0]);
}
}
if ((ret = ffurl_write(rt->stream, serverdata + 1,
RTMP_HANDSHAKE_PACKET_SIZE)) < 0)
return ret;
if (CONFIG_FFRTMPCRYPT_PROTOCOL && rt->encrypted) {
/* Set RC4 keys for encryption and update the keystreams. */
if ((ret = ff_rtmpe_update_keystream(rt->stream)) < 0)
return ret;
}
}
return 0;
}
static int rtmp_receive_hs_packet(RTMPContext* rt, uint32_t *first_int,
uint32_t *second_int, char *arraydata,
int size)
{
int inoutsize;
inoutsize = ffurl_read_complete(rt->stream, arraydata,
RTMP_HANDSHAKE_PACKET_SIZE);
if (inoutsize <= 0)
return AVERROR(EIO);
if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
av_log(rt, AV_LOG_ERROR, "Erroneous Message size %d"
" not following standard\n", (int)inoutsize);
return AVERROR(EINVAL);
}
*first_int = AV_RB32(arraydata);
*second_int = AV_RB32(arraydata + 4);
return 0;
}
static int rtmp_send_hs_packet(RTMPContext* rt, uint32_t first_int,
uint32_t second_int, char *arraydata, int size)
{
int inoutsize;
AV_WB32(arraydata, first_int);
AV_WB32(arraydata + 4, first_int);
inoutsize = ffurl_write(rt->stream, arraydata,
RTMP_HANDSHAKE_PACKET_SIZE);
if (inoutsize != RTMP_HANDSHAKE_PACKET_SIZE) {
av_log(rt, AV_LOG_ERROR, "Unable to write answer\n");
return AVERROR(EIO);
}
return 0;
}
/**
* rtmp handshake server side
*/
static int rtmp_server_handshake(URLContext *s, RTMPContext *rt)
{
uint8_t buffer[RTMP_HANDSHAKE_PACKET_SIZE];
uint32_t hs_epoch;
uint32_t hs_my_epoch;
uint8_t hs_c1[RTMP_HANDSHAKE_PACKET_SIZE];
uint8_t hs_s1[RTMP_HANDSHAKE_PACKET_SIZE];
uint32_t zeroes;
uint32_t temp = 0;
int randomidx = 0;
int inoutsize = 0;
int ret;
inoutsize = ffurl_read_complete(rt->stream, buffer, 1); // Receive C0
if (inoutsize <= 0) {
av_log(s, AV_LOG_ERROR, "Unable to read handshake\n");
return AVERROR(EIO);
}
// Check Version
if (buffer[0] != 3) {
av_log(s, AV_LOG_ERROR, "RTMP protocol version mismatch\n");
return AVERROR(EIO);
}
if (ffurl_write(rt->stream, buffer, 1) <= 0) { // Send S0
av_log(s, AV_LOG_ERROR,
"Unable to write answer - RTMP S0\n");
return AVERROR(EIO);
}
/* Receive C1 */
ret = rtmp_receive_hs_packet(rt, &hs_epoch, &zeroes, hs_c1,
RTMP_HANDSHAKE_PACKET_SIZE);
if (ret) {
av_log(s, AV_LOG_ERROR, "RTMP Handshake C1 Error\n");
return ret;
}
if (zeroes)
av_log(s, AV_LOG_WARNING, "Erroneous C1 Message zero != 0\n");
/* Send S1 */
/* By now same epoch will be sent */
hs_my_epoch = hs_epoch;
/* Generate random */
for (randomidx = 0; randomidx < (RTMP_HANDSHAKE_PACKET_SIZE);
randomidx += 4)
AV_WB32(hs_s1 + 8 + randomidx, av_get_random_seed());
ret = rtmp_send_hs_packet(rt, hs_my_epoch, 0, hs_s1,
RTMP_HANDSHAKE_PACKET_SIZE);
if (ret) {
av_log(s, AV_LOG_ERROR, "RTMP Handshake S1 Error\n");
return ret;
}
/* Send S2 */
ret = rtmp_send_hs_packet(rt, hs_epoch, 0, hs_c1,
RTMP_HANDSHAKE_PACKET_SIZE);
if (ret) {
av_log(s, AV_LOG_ERROR, "RTMP Handshake S2 Error\n");
return ret;
}
/* Receive C2 */
ret = rtmp_receive_hs_packet(rt, &temp, &zeroes, buffer,
RTMP_HANDSHAKE_PACKET_SIZE);
if (ret) {
av_log(s, AV_LOG_ERROR, "RTMP Handshake C2 Error\n");
return ret;
}
if (temp != hs_my_epoch)
av_log(s, AV_LOG_WARNING,
"Erroneous C2 Message epoch does not match up with C1 epoch\n");
if (memcmp(buffer + 8, hs_s1 + 8,
RTMP_HANDSHAKE_PACKET_SIZE - 8))
av_log(s, AV_LOG_WARNING,
"Erroneous C2 Message random does not match up\n");
return 0;
}
static int handle_chunk_size(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
int ret;
if (pkt->data_size < 4) {
av_log(s, AV_LOG_ERROR,
"Too short chunk size change packet (%d)\n",
pkt->data_size);
return AVERROR_INVALIDDATA;
}
if (!rt->is_input) {
// Set the outgoing chunk size
rt->out_chunk_size = RTMP_OUTGOING_CHUNK_SIZE;
// Send the outgoing chunk size to the server
if ((ret = gen_outgoing_chunk_size(s, rt)) < 0)
return ret;
}
rt->in_chunk_size = AV_RB32(pkt->data);
if (rt->in_chunk_size <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect chunk size %d\n",
rt->in_chunk_size);
return AVERROR_INVALIDDATA;
}
av_log(s, AV_LOG_DEBUG, "New incoming chunk size = %d\n",
rt->in_chunk_size);
return 0;
}
static int handle_ping(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
int t, ret;
if (pkt->data_size < 2) {
av_log(s, AV_LOG_ERROR, "Too short ping packet (%d)\n",
pkt->data_size);
return AVERROR_INVALIDDATA;
}
t = AV_RB16(pkt->data);
if (t == 6) {
if ((ret = gen_pong(s, rt, pkt)) < 0)
return ret;
} else if (t == 26) {
if (rt->swfsize) {
if ((ret = gen_swf_verification(s, rt)) < 0)
return ret;
} else {
av_log(s, AV_LOG_WARNING, "Ignoring SWFVerification request.\n");
}
}
return 0;
}
static int handle_client_bw(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
if (pkt->data_size < 4) {
av_log(s, AV_LOG_ERROR,
"Client bandwidth report packet is less than 4 bytes long (%d)\n",
pkt->data_size);
return AVERROR_INVALIDDATA;
}
rt->client_report_size = AV_RB32(pkt->data);
if (rt->client_report_size <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect client bandwidth %d\n",
rt->client_report_size);
return AVERROR_INVALIDDATA;
}
av_log(s, AV_LOG_DEBUG, "Client bandwidth = %d\n", rt->client_report_size);
rt->client_report_size >>= 1;
return 0;
}
static int handle_server_bw(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
if (pkt->data_size < 4) {
av_log(s, AV_LOG_ERROR,
"Too short server bandwidth report packet (%d)\n",
pkt->data_size);
return AVERROR_INVALIDDATA;
}
rt->server_bw = AV_RB32(pkt->data);
if (rt->server_bw <= 0) {
av_log(s, AV_LOG_ERROR, "Incorrect server bandwidth %d\n",
rt->server_bw);
return AVERROR_INVALIDDATA;
}
av_log(s, AV_LOG_DEBUG, "Server bandwidth = %d\n", rt->server_bw);
return 0;
}
static int handle_invoke_error(URLContext *s, RTMPPacket *pkt)
{
const uint8_t *data_end = pkt->data + pkt->data_size;
char *tracked_method = NULL;
int level = AV_LOG_ERROR;
uint8_t tmpstr[256];
int ret;
if ((ret = find_tracked_method(s, pkt, 9, &tracked_method)) < 0)
return ret;
if (!ff_amf_get_field_value(pkt->data + 9, data_end,
"description", tmpstr, sizeof(tmpstr))) {
if (tracked_method && (!strcmp(tracked_method, "_checkbw") ||
!strcmp(tracked_method, "releaseStream") ||
!strcmp(tracked_method, "FCSubscribe") ||
!strcmp(tracked_method, "FCPublish"))) {
/* Gracefully ignore Adobe-specific historical artifact errors. */
level = AV_LOG_WARNING;
ret = 0;
} else
ret = -1;
av_log(s, level, "Server error: %s\n", tmpstr);
}
av_free(tracked_method);
return ret;
}
static int send_invoke_response(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
double seqnum;
char filename[64];
char command[64];
char statusmsg[128];
int stringlen;
char *pchar;
const uint8_t *p = pkt->data;
uint8_t *pp = NULL;
RTMPPacket spkt = { 0 };
GetByteContext gbc;
int ret;
bytestream2_init(&gbc, p, pkt->data_size);
if (ff_amf_read_string(&gbc, command, sizeof(command),
&stringlen)) {
av_log(s, AV_LOG_ERROR, "Error in PT_INVOKE\n");
return AVERROR_INVALIDDATA;
}
ret = ff_amf_read_number(&gbc, &seqnum);
if (ret)
return ret;
ret = ff_amf_read_null(&gbc);
if (ret)
return ret;
if (!strcmp(command, "FCPublish") ||
!strcmp(command, "publish")) {
ret = ff_amf_read_string(&gbc, filename,
sizeof(filename), &stringlen);
// check with url
if (s->filename) {
pchar = strrchr(s->filename, '/');
if (!pchar) {
av_log(s, AV_LOG_WARNING,
"Unable to find / in url %s, bad format\n",
s->filename);
pchar = s->filename;
}
pchar++;
if (strcmp(pchar, filename))
av_log(s, AV_LOG_WARNING, "Unexpected stream %s, expecting"
" %s\n", filename, pchar);
}
rt->state = STATE_RECEIVING;
}
if (!strcmp(command, "FCPublish")) {
if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL,
RTMP_PT_INVOKE, 0,
RTMP_PKTDATA_DEFAULT_SIZE)) < 0) {
av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
return ret;
}
pp = spkt.data;
ff_amf_write_string(&pp, "onFCPublish");
} else if (!strcmp(command, "publish")) {
PutByteContext pbc;
// Send Stream Begin 1
if ((ret = ff_rtmp_packet_create(&spkt, RTMP_NETWORK_CHANNEL,
RTMP_PT_PING, 0, 6)) < 0) {
av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
return ret;
}
pp = spkt.data;
bytestream2_init_writer(&pbc, pp, spkt.data_size);
bytestream2_put_be16(&pbc, 0); // 0 -> Stream Begin
bytestream2_put_be32(&pbc, rt->nb_streamid);
ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&spkt);
if (ret < 0)
return ret;
// Send onStatus(NetStream.Publish.Start)
if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL,
RTMP_PT_INVOKE, 0,
RTMP_PKTDATA_DEFAULT_SIZE)) < 0) {
av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
return ret;
}
spkt.extra = pkt->extra;
pp = spkt.data;
ff_amf_write_string(&pp, "onStatus");
ff_amf_write_number(&pp, 0);
ff_amf_write_null(&pp);
ff_amf_write_object_start(&pp);
ff_amf_write_field_name(&pp, "level");
ff_amf_write_string(&pp, "status");
ff_amf_write_field_name(&pp, "code");
ff_amf_write_string(&pp, "NetStream.Publish.Start");
ff_amf_write_field_name(&pp, "description");
snprintf(statusmsg, sizeof(statusmsg),
"%s is now published", filename);
ff_amf_write_string(&pp, statusmsg);
ff_amf_write_field_name(&pp, "details");
ff_amf_write_string(&pp, filename);
ff_amf_write_field_name(&pp, "clientid");
snprintf(statusmsg, sizeof(statusmsg), "%s", LIBAVFORMAT_IDENT);
ff_amf_write_string(&pp, statusmsg);
ff_amf_write_object_end(&pp);
} else {
if ((ret = ff_rtmp_packet_create(&spkt, RTMP_SYSTEM_CHANNEL,
RTMP_PT_INVOKE, 0,
RTMP_PKTDATA_DEFAULT_SIZE)) < 0) {
av_log(s, AV_LOG_ERROR, "Unable to create response packet\n");
return ret;
}
pp = spkt.data;
ff_amf_write_string(&pp, "_result");
ff_amf_write_number(&pp, seqnum);
ff_amf_write_null(&pp);
if (!strcmp(command, "createStream")) {
rt->nb_streamid++;
if (rt->nb_streamid == 0 || rt->nb_streamid == 2)
rt->nb_streamid++; /* Values 0 and 2 are reserved */
ff_amf_write_number(&pp, rt->nb_streamid);
/* By now we don't control which streams are removed in
* deleteStream. There is no stream creation control
* if a client creates more than 2^32 - 2 streams. */
}
}
spkt.data_size = pp - spkt.data;
ret = ff_rtmp_packet_write(rt->stream, &spkt, rt->out_chunk_size,
rt->prev_pkt[1]);
ff_rtmp_packet_destroy(&spkt);
return ret;
}
static int handle_invoke_result(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
char *tracked_method = NULL;
int ret = 0;
if ((ret = find_tracked_method(s, pkt, 10, &tracked_method)) < 0)
return ret;
if (!tracked_method) {
/* Ignore this reply when the current method is not tracked. */
return ret;
}
if (!memcmp(tracked_method, "connect", 7)) {
if (!rt->is_input) {
if ((ret = gen_release_stream(s, rt)) < 0)
goto fail;
if ((ret = gen_fcpublish_stream(s, rt)) < 0)
goto fail;
} else {
if ((ret = gen_server_bw(s, rt)) < 0)
goto fail;
}
if ((ret = gen_create_stream(s, rt)) < 0)
goto fail;
if (rt->is_input) {
/* Send the FCSubscribe command when the name of live
* stream is defined by the user or if it's a live stream. */
if (rt->subscribe) {
if ((ret = gen_fcsubscribe_stream(s, rt, rt->subscribe)) < 0)
goto fail;
} else if (rt->live == -1) {
if ((ret = gen_fcsubscribe_stream(s, rt, rt->playpath)) < 0)
goto fail;
}
}
} else if (!memcmp(tracked_method, "createStream", 12)) {
//extract a number from the result
if (pkt->data[10] || pkt->data[19] != 5 || pkt->data[20]) {
av_log(s, AV_LOG_WARNING, "Unexpected reply on connect()\n");
} else {
rt->main_channel_id = av_int2double(AV_RB64(pkt->data + 21));
}
if (!rt->is_input) {
if ((ret = gen_publish(s, rt)) < 0)
goto fail;
} else {
if ((ret = gen_play(s, rt)) < 0)
goto fail;
if ((ret = gen_buffer_time(s, rt)) < 0)
goto fail;
}
}
fail:
av_free(tracked_method);
return ret;
}
static int handle_invoke_status(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
const uint8_t *data_end = pkt->data + pkt->data_size;
const uint8_t *ptr = pkt->data + 11;
uint8_t tmpstr[256];
int i, t;
for (i = 0; i < 2; i++) {
t = ff_amf_tag_size(ptr, data_end);
if (t < 0)
return 1;
ptr += t;
}
t = ff_amf_get_field_value(ptr, data_end, "level", tmpstr, sizeof(tmpstr));
if (!t && !strcmp(tmpstr, "error")) {
if (!ff_amf_get_field_value(ptr, data_end,
"description", tmpstr, sizeof(tmpstr)))
av_log(s, AV_LOG_ERROR, "Server error: %s\n", tmpstr);
return -1;
}
t = ff_amf_get_field_value(ptr, data_end, "code", tmpstr, sizeof(tmpstr));
if (!t && !strcmp(tmpstr, "NetStream.Play.Start")) rt->state = STATE_PLAYING;
if (!t && !strcmp(tmpstr, "NetStream.Play.Stop")) rt->state = STATE_STOPPED;
if (!t && !strcmp(tmpstr, "NetStream.Play.UnpublishNotify")) rt->state = STATE_STOPPED;
if (!t && !strcmp(tmpstr, "NetStream.Publish.Start")) rt->state = STATE_PUBLISHING;
return 0;
}
static int handle_invoke(URLContext *s, RTMPPacket *pkt)
{
RTMPContext *rt = s->priv_data;
int ret = 0;
//TODO: check for the messages sent for wrong state?
if (!memcmp(pkt->data, "\002\000\006_error", 9)) {
if ((ret = handle_invoke_error(s, pkt)) < 0)
return ret;
} else if (!memcmp(pkt->data, "\002\000\007_result", 10)) {
if ((ret = handle_invoke_result(s, pkt)) < 0)
return ret;
} else if (!memcmp(pkt->data, "\002\000\010onStatus", 11)) {
if ((ret = handle_invoke_status(s, pkt)) < 0)
return ret;
} else if (!memcmp(pkt->data, "\002\000\010onBWDone", 11)) {
if ((ret = gen_check_bw(s, rt)) < 0)
return ret;
} else if (!memcmp(pkt->data, "\002\000\015releaseStream", 16) ||
!memcmp(pkt->data, "\002\000\011FCPublish", 12) ||
!memcmp(pkt->data, "\002\000\007publish", 10) ||
!memcmp(pkt->data, "\002\000\010_checkbw", 11) ||
!memcmp(pkt->data, "\002\000\014createStream", 15)) {
if (ret = send_invoke_response(s, pkt) < 0)
return ret;
}
return ret;
}
static int handle_notify(URLContext *s, RTMPPacket *pkt) {
RTMPContext *rt = s->priv_data;
const uint8_t *p = NULL;
uint8_t *cp = NULL;
uint8_t commandbuffer[64];
char statusmsg[128];
int stringlen;
GetByteContext gbc;
PutByteContext pbc;
uint32_t ts;
int old_flv_size;
const uint8_t *datatowrite;
unsigned datatowritelength;
p = pkt->data;
bytestream2_init(&gbc, p, pkt->data_size);
if (ff_amf_read_string(&gbc, commandbuffer, sizeof(commandbuffer),
&stringlen))
return AVERROR_INVALIDDATA;
if (!strcmp(commandbuffer, "@setDataFrame")) {
datatowrite = gbc.buffer;
datatowritelength = bytestream2_get_bytes_left(&gbc);
if (ff_amf_read_string(&gbc, statusmsg,
sizeof(statusmsg), &stringlen))
return AVERROR_INVALIDDATA;
if (strcmp(statusmsg, "onMetaData")) {
av_log(s, AV_LOG_INFO, "Expecting onMetadata but got %s\n",
statusmsg);
return 0;
}
/* Provide ECMAArray to flv */
ts = pkt->timestamp;
// generate packet header and put data into buffer for FLV demuxer
if (rt->flv_off < rt->flv_size) {
old_flv_size = rt->flv_size;
rt->flv_size += datatowritelength + 15;
} else {
old_flv_size = 0;
rt->flv_size = datatowritelength + 15;
rt->flv_off = 0;
}
cp = av_realloc(rt->flv_data, rt->flv_size);
if (!cp)
return AVERROR(ENOMEM);
rt->flv_data = cp;
bytestream2_init_writer(&pbc, cp, rt->flv_size);
bytestream2_skip_p(&pbc, old_flv_size);
bytestream2_put_byte(&pbc, pkt->type);
bytestream2_put_be24(&pbc, datatowritelength);
bytestream2_put_be24(&pbc, ts);
bytestream2_put_byte(&pbc, ts >> 24);
bytestream2_put_be24(&pbc, 0);
bytestream2_put_buffer(&pbc, datatowrite, datatowritelength);
bytestream2_put_be32(&pbc, 0);
}
return 0;
}
/**
* Parse received packet and possibly perform some action depending on
* the packet contents.
* @return 0 for no errors, negative values for serious errors which prevent
* further communications, positive values for uncritical errors
*/
static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
{
int ret;
#ifdef DEBUG
ff_rtmp_packet_dump(s, pkt);
#endif
switch (pkt->type) {
case RTMP_PT_BYTES_READ:
av_dlog(s, "received bytes read report\n");
break;
case RTMP_PT_CHUNK_SIZE:
if ((ret = handle_chunk_size(s, pkt)) < 0)
return ret;
break;
case RTMP_PT_PING:
if ((ret = handle_ping(s, pkt)) < 0)
return ret;
break;
case RTMP_PT_CLIENT_BW:
if ((ret = handle_client_bw(s, pkt)) < 0)
return ret;
break;
case RTMP_PT_SERVER_BW:
if ((ret = handle_server_bw(s, pkt)) < 0)
return ret;
break;
case RTMP_PT_INVOKE:
if ((ret = handle_invoke(s, pkt)) < 0)
return ret;
break;
case RTMP_PT_VIDEO:
case RTMP_PT_AUDIO:
case RTMP_PT_METADATA:
case RTMP_PT_NOTIFY:
/* Audio, Video and Metadata packets are parsed in get_packet() */
break;
default:
av_log(s, AV_LOG_VERBOSE, "Unknown packet type received 0x%02X\n", pkt->type);
break;
}
return 0;
}
/**
* Interact with the server by receiving and sending RTMP packets until
* there is some significant data (media data or expected status notification).
*
* @param s reading context
* @param for_header non-zero value tells function to work until it
* gets notification from the server that playing has been started,
* otherwise function will work until some media data is received (or
* an error happens)
* @return 0 for successful operation, negative value in case of error
*/
static int get_packet(URLContext *s, int for_header)
{
RTMPContext *rt = s->priv_data;
int ret;
uint8_t *p;
const uint8_t *next;
uint32_t data_size;
uint32_t ts, cts, pts=0;
if (rt->state == STATE_STOPPED)
return AVERROR_EOF;
for (;;) {
RTMPPacket rpkt = { 0 };
if ((ret = ff_rtmp_packet_read(rt->stream, &rpkt,
rt->in_chunk_size, rt->prev_pkt[0])) <= 0) {
if (ret == 0) {
return AVERROR(EAGAIN);
} else {
return AVERROR(EIO);
}
}
rt->bytes_read += ret;
if (rt->bytes_read - rt->last_bytes_read > rt->client_report_size) {
av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
if ((ret = gen_bytes_read(s, rt, rpkt.timestamp + 1)) < 0)
return ret;
rt->last_bytes_read = rt->bytes_read;
}
ret = rtmp_parse_result(s, rt, &rpkt);
if (ret < 0) {//serious error in current packet
ff_rtmp_packet_destroy(&rpkt);
return ret;
}
if (rt->state == STATE_STOPPED) {
ff_rtmp_packet_destroy(&rpkt);
return AVERROR_EOF;
}
if (for_header && (rt->state == STATE_PLAYING ||
rt->state == STATE_PUBLISHING ||
rt->state == STATE_RECEIVING)) {
ff_rtmp_packet_destroy(&rpkt);
return 0;
}
if (!rpkt.data_size || !rt->is_input) {
ff_rtmp_packet_destroy(&rpkt);
continue;
}
if (rpkt.type == RTMP_PT_VIDEO || rpkt.type == RTMP_PT_AUDIO ||
(rpkt.type == RTMP_PT_NOTIFY && !memcmp("\002\000\012onMetaData", rpkt.data, 13))) {
ts = rpkt.timestamp;
// generate packet header and put data into buffer for FLV demuxer
rt->flv_off = 0;
rt->flv_size = rpkt.data_size + 15;
rt->flv_data = p = av_realloc(rt->flv_data, rt->flv_size);
bytestream_put_byte(&p, rpkt.type);
bytestream_put_be24(&p, rpkt.data_size);
bytestream_put_be24(&p, ts);
bytestream_put_byte(&p, ts >> 24);
bytestream_put_be24(&p, 0);
bytestream_put_buffer(&p, rpkt.data, rpkt.data_size);
bytestream_put_be32(&p, 0);
ff_rtmp_packet_destroy(&rpkt);
return 0;
} else if (rpkt.type == RTMP_PT_NOTIFY) {
ret = handle_notify(s, &rpkt);
ff_rtmp_packet_destroy(&rpkt);
if (ret) {
av_log(s, AV_LOG_ERROR, "Handle notify error\n");
return ret;
}
return 0;
} else if (rpkt.type == RTMP_PT_METADATA) {
// we got raw FLV data, make it available for FLV demuxer
rt->flv_off = 0;
rt->flv_size = rpkt.data_size;
rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
/* rewrite timestamps */
next = rpkt.data;
ts = rpkt.timestamp;
while (next - rpkt.data < rpkt.data_size - 11) {
next++;
data_size = bytestream_get_be24(&next);
p=next;
cts = bytestream_get_be24(&next);
cts |= bytestream_get_byte(&next) << 24;
if (pts==0)
pts=cts;
ts += cts - pts;
pts = cts;
bytestream_put_be24(&p, ts);
bytestream_put_byte(&p, ts >> 24);
next += data_size + 3 + 4;
}
memcpy(rt->flv_data, rpkt.data, rpkt.data_size);
ff_rtmp_packet_destroy(&rpkt);
return 0;
}
ff_rtmp_packet_destroy(&rpkt);
}
}
static int rtmp_close(URLContext *h)
{
RTMPContext *rt = h->priv_data;
int ret = 0;
if (!rt->is_input) {
rt->flv_data = NULL;
if (rt->out_pkt.data_size)
ff_rtmp_packet_destroy(&rt->out_pkt);
if (rt->state > STATE_FCPUBLISH)
ret = gen_fcunpublish_stream(h, rt);
}
if (rt->state > STATE_HANDSHAKED)
ret = gen_delete_stream(h, rt);
free_tracked_methods(rt);
av_freep(&rt->flv_data);
ffurl_close(rt->stream);
ff_destroy_rtmp_send_buffer();
return ret;
}
/**
* Open RTMP connection and verify that the stream can be played.
*
* URL syntax: rtmp://server[:port][/app][/playpath]
* where 'app' is first one or two directories in the path
* (e.g. /ondemand/, /flash/live/, etc.)
* and 'playpath' is a file name (the rest of the path,
* may be prefixed with "mp4:")
*/
static int rtmp_open(URLContext *s, const char *uri, int flags)
{
if (ff_create_rtmp_send_buffer() < 0)
return AVERROR(ENOMEM);
RTMPContext *rt = s->priv_data;
char proto[8], hostname[256], path[1024], *fname;
char *old_app;
uint8_t buf[2048];
int port;
AVDictionary *opts = NULL;
int ret;
if (rt->listen_timeout > 0)
rt->listen = 1;
rt->is_input = !(flags & AVIO_FLAG_WRITE);
av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname), &port,
path, sizeof(path), s->filename);
if (rt->listen && strcmp(proto, "rtmp")) {
av_log(s, AV_LOG_ERROR, "rtmp_listen not available for %s\n",
proto);
return AVERROR(EINVAL);
}
if (!strcmp(proto, "rtmpt") || !strcmp(proto, "rtmpts")) {
if (!strcmp(proto, "rtmpts"))
av_dict_set(&opts, "ffrtmphttp_tls", "1", 1);
/* open the http tunneling connection */
ff_url_join(buf, sizeof(buf), "ffrtmphttp", NULL, hostname, port, NULL);
} else if (!strcmp(proto, "rtmps")) {
/* open the tls connection */
if (port < 0)
port = RTMPS_DEFAULT_PORT;
ff_url_join(buf, sizeof(buf), "tls", NULL, hostname, port, NULL);
} else if (!strcmp(proto, "rtmpe") || (!strcmp(proto, "rtmpte"))) {
if (!strcmp(proto, "rtmpte"))
av_dict_set(&opts, "ffrtmpcrypt_tunneling", "1", 1);
/* open the encrypted connection */
ff_url_join(buf, sizeof(buf), "ffrtmpcrypt", NULL, hostname, port, NULL);
rt->encrypted = 1;
} else {
/* open the tcp connection */
if (port < 0)
port = RTMP_DEFAULT_PORT;
if (rt->listen)
ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port,
"?listen&listen_timeout=%d",
rt->listen_timeout * 1000);
else
ff_url_join(buf, sizeof(buf), "tcp", NULL, hostname, port, NULL);
}
if ((ret = ffurl_open(&rt->stream, buf, AVIO_FLAG_READ_WRITE,
&s->interrupt_callback, &opts)) < 0) {
av_log(s , AV_LOG_ERROR, "Cannot open connection %s\n", buf);
goto fail;
}
if (rt->swfverify) {
if ((ret = rtmp_calc_swfhash(s)) < 0)
goto fail;
}
rt->state = STATE_START;
if (!rt->listen && (ret = rtmp_handshake(s, rt)) < 0)
goto fail;
if (rt->listen && (ret = rtmp_server_handshake(s, rt)) < 0)
goto fail;
rt->out_chunk_size = 128;
rt->in_chunk_size = 128; // Probably overwritten later
rt->state = STATE_HANDSHAKED;
// Keep the application name when it has been defined by the user.
old_app = rt->app;
rt->app = av_malloc(APP_MAX_LENGTH);
if (!rt->app) {
ret = AVERROR(ENOMEM);
goto fail;
}
//extract "app" part from path
if (!strncmp(path, "/ondemand/", 10)) {
fname = path + 10;
memcpy(rt->app, "ondemand", 9);
} else {
char *next = *path ? path + 1 : path;
char *p = strchr(next, '/');
if (!p) {
fname = next;
rt->app[0] = '\0';
} else {
// make sure we do not mismatch a playpath for an application instance
char *c = strchr(p + 1, ':');
fname = strchr(p + 1, '/');
if (!fname || (c && c < fname)) {
fname = p + 1;
av_strlcpy(rt->app, path + 1, p - path);
} else {
fname++;
av_strlcpy(rt->app, path + 1, fname - path - 1);
}
}
}
if (old_app) {
// The name of application has been defined by the user, override it.
av_free(rt->app);
rt->app = old_app;
}
if (!rt->playpath) {
int len = strlen(fname);
rt->playpath = av_malloc(PLAYPATH_MAX_LENGTH);
if (!rt->playpath) {
ret = AVERROR(ENOMEM);
goto fail;
}
if (!strchr(fname, ':') && len >= 4 &&
(!strcmp(fname + len - 4, ".f4v") ||
!strcmp(fname + len - 4, ".mp4"))) {
memcpy(rt->playpath, "mp4:", 5);
} else if (len >= 4 && !strcmp(fname + len - 4, ".flv")) {
fname[len - 4] = '\0';
} else {
rt->playpath[0] = 0;
}
strncat(rt->playpath, fname, PLAYPATH_MAX_LENGTH - 5);
}
if (!rt->tcurl) {
rt->tcurl = av_malloc(TCURL_MAX_LENGTH);
if (!rt->tcurl) {
ret = AVERROR(ENOMEM);
goto fail;
}
ff_url_join(rt->tcurl, TCURL_MAX_LENGTH, proto, NULL, hostname,
port, "/%s", rt->app);
}
if (!rt->flashver) {
rt->flashver = av_malloc(FLASHVER_MAX_LENGTH);
if (!rt->flashver) {
ret = AVERROR(ENOMEM);
goto fail;
}
if (rt->is_input) {
snprintf(rt->flashver, FLASHVER_MAX_LENGTH, "%s %d,%d,%d,%d",
RTMP_CLIENT_PLATFORM, RTMP_CLIENT_VER1, RTMP_CLIENT_VER2,
RTMP_CLIENT_VER3, RTMP_CLIENT_VER4);
} else {
snprintf(rt->flashver, FLASHVER_MAX_LENGTH,
"FMLE/3.0 (compatible; %s)", LIBAVFORMAT_IDENT);
}
}
rt->client_report_size = 1048576;
rt->bytes_read = 0;
rt->last_bytes_read = 0;
rt->server_bw = 2500000;
av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
proto, path, rt->app, rt->playpath);
if (!rt->listen) {
if ((ret = gen_connect(s, rt)) < 0)
goto fail;
} else {
if (read_connect(s, s->priv_data) < 0)
goto fail;
rt->is_input = 1;
}
do {
ret = get_packet(s, 1);
} while (ret == EAGAIN);
if (ret < 0)
goto fail;
if (rt->is_input) {
// generate FLV header for demuxer
rt->flv_size = 13;
rt->flv_data = av_realloc(rt->flv_data, rt->flv_size);
rt->flv_off = 0;
memcpy(rt->flv_data, "FLV\1\5\0\0\0\011\0\0\0\0", rt->flv_size);
} else {
rt->flv_size = 0;
rt->flv_data = NULL;
rt->flv_off = 0;
rt->skip_bytes = 13;
}
s->max_packet_size = rt->stream->max_packet_size;
s->is_streamed = 1;
return 0;
fail:
av_dict_free(&opts);
rtmp_close(s);
return ret;
}
static int rtmp_read(URLContext *s, uint8_t *buf, int size)
{
RTMPContext *rt = s->priv_data;
int orig_size = size;
int ret;
while (size > 0) {
int data_left = rt->flv_size - rt->flv_off;
if (data_left >= size) {
memcpy(buf, rt->flv_data + rt->flv_off, size);
rt->flv_off += size;
return orig_size;
}
if (data_left > 0) {
memcpy(buf, rt->flv_data + rt->flv_off, data_left);
buf += data_left;
size -= data_left;
rt->flv_off = rt->flv_size;
return data_left;
}
if ((ret = get_packet(s, 0)) < 0)
return ret;
}
return orig_size;
}
static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
{
RTMPContext *rt = s->priv_data;
int size_temp = size;
int pktsize, pkttype;
uint32_t ts;
const uint8_t *buf_temp = buf;
uint8_t c;
int ret;
do {
if (rt->skip_bytes) {
int skip = FFMIN(rt->skip_bytes, size_temp);
buf_temp += skip;
size_temp -= skip;
rt->skip_bytes -= skip;
continue;
}
if (rt->flv_header_bytes < 11) {
const uint8_t *header = rt->flv_header;
int copy = FFMIN(11 - rt->flv_header_bytes, size_temp);
bytestream_get_buffer(&buf_temp, rt->flv_header + rt->flv_header_bytes, copy);
rt->flv_header_bytes += copy;
size_temp -= copy;
if (rt->flv_header_bytes < 11)
break;
pkttype = bytestream_get_byte(&header);
pktsize = bytestream_get_be24(&header);
ts = bytestream_get_be24(&header);
ts |= bytestream_get_byte(&header) << 24;
bytestream_get_be24(&header);
rt->flv_size = pktsize;
//force 12bytes header
if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO) && ts == 0) ||
pkttype == RTMP_PT_NOTIFY) {
if (pkttype == RTMP_PT_NOTIFY)
pktsize += 16;
rt->prev_pkt[1][RTMP_SOURCE_CHANNEL].channel_id = 0;
}
//this can be a big packet, it's better to send it right here
if ((ret = ff_rtmp_packet_create(&rt->out_pkt, RTMP_SOURCE_CHANNEL,
pkttype, ts, pktsize)) < 0)
return ret;
rt->out_pkt.extra = rt->main_channel_id;
rt->flv_data = rt->out_pkt.data;
if (pkttype == RTMP_PT_NOTIFY)
ff_amf_write_string(&rt->flv_data, "@setDataFrame");
}
if (rt->flv_size - rt->flv_off > size_temp) {
bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, size_temp);
rt->flv_off += size_temp;
size_temp = 0;
} else {
bytestream_get_buffer(&buf_temp, rt->flv_data + rt->flv_off, rt->flv_size - rt->flv_off);
size_temp -= rt->flv_size - rt->flv_off;
rt->flv_off += rt->flv_size - rt->flv_off;
}
if (rt->flv_off == rt->flv_size) {
rt->skip_bytes = 4;
if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
return ret;
rt->flv_size = 0;
rt->flv_off = 0;
rt->flv_header_bytes = 0;
rt->flv_nb_packets++;
}
} while (buf_temp - buf < size);
if (rt->flv_nb_packets < rt->flush_interval)
return size;
rt->flv_nb_packets = 0;
/* set stream into nonblocking mode */
rt->stream->flags |= AVIO_FLAG_NONBLOCK;
/* try to read one byte from the stream */
ret = ffurl_read(rt->stream, &c, 1);
/* switch the stream back into blocking mode */
rt->stream->flags &= ~AVIO_FLAG_NONBLOCK;
if (ret == AVERROR(EAGAIN)) {
/* no incoming data to handle */
return size;
} else if (ret < 0) {
return ret;
} else if (ret == 1) {
RTMPPacket rpkt = { 0 };
if ((ret = ff_rtmp_packet_read_internal(rt->stream, &rpkt,
rt->in_chunk_size,
rt->prev_pkt[0], c)) <= 0)
return ret;
if ((ret = rtmp_parse_result(s, rt, &rpkt)) < 0)
return ret;
ff_rtmp_packet_destroy(&rpkt);
}
return size;
}
#define OFFSET(x) offsetof(RTMPContext, x)
#define DEC AV_OPT_FLAG_DECODING_PARAM
#define ENC AV_OPT_FLAG_ENCODING_PARAM
static const AVOption rtmp_options[] = {
{"rtmp_app", "Name of application to connect to on the RTMP server", OFFSET(app), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_buffer", "Set buffer time in milliseconds. The default is 3000.", OFFSET(client_buffer_time), AV_OPT_TYPE_INT, {.i64 = 3000}, 0, INT_MAX, DEC|ENC},
{"rtmp_conn", "Append arbitrary AMF data to the Connect message", OFFSET(conn), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_flashver", "Version of the Flash plugin used to run the SWF player.", OFFSET(flashver), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_flush_interval", "Number of packets flushed in the same request (RTMPT only).", OFFSET(flush_interval), AV_OPT_TYPE_INT, {.i64 = 10}, 0, INT_MAX, ENC},
{"rtmp_live", "Specify that the media is a live stream.", OFFSET(live), AV_OPT_TYPE_INT, {.i64 = -2}, INT_MIN, INT_MAX, DEC, "rtmp_live"},
{"any", "both", 0, AV_OPT_TYPE_CONST, {.i64 = -2}, 0, 0, DEC, "rtmp_live"},
{"live", "live stream", 0, AV_OPT_TYPE_CONST, {.i64 = -1}, 0, 0, DEC, "rtmp_live"},
{"recorded", "recorded stream", 0, AV_OPT_TYPE_CONST, {.i64 = 0}, 0, 0, DEC, "rtmp_live"},
{"rtmp_pageurl", "URL of the web page in which the media was embedded. By default no value will be sent.", OFFSET(pageurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_playpath", "Stream identifier to play or to publish", OFFSET(playpath), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_subscribe", "Name of live stream to subscribe to. Defaults to rtmp_playpath.", OFFSET(subscribe), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_swfhash", "SHA256 hash of the decompressed SWF file (32 bytes).", OFFSET(swfhash), AV_OPT_TYPE_BINARY, .flags = DEC},
{"rtmp_swfsize", "Size of the decompressed SWF file, required for SWFVerification.", OFFSET(swfsize), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, DEC},
{"rtmp_swfurl", "URL of the SWF player. By default no value will be sent", OFFSET(swfurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_swfverify", "URL to player swf file, compute hash/size automatically.", OFFSET(swfverify), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC},
{"rtmp_tcurl", "URL of the target stream. Defaults to proto://host[:port]/app.", OFFSET(tcurl), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, DEC|ENC},
{"rtmp_listen", "Listen for incoming rtmp connections", OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
{"timeout", "Maximum timeout (in seconds) to wait for incoming connections. -1 is infinite. Implies -rtmp_listen 1", OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX, DEC, "rtmp_listen" },
{ NULL },
};
#define RTMP_PROTOCOL(flavor) \
static const AVClass flavor##_class = { \
.class_name = #flavor, \
.item_name = av_default_item_name, \
.option = rtmp_options, \
.version = LIBAVUTIL_VERSION_INT, \
}; \
\
URLProtocol ff_##flavor##_protocol = { \
.name = #flavor, \
.url_open = rtmp_open, \
.url_read = rtmp_read, \
.url_write = rtmp_write, \
.url_close = rtmp_close, \
.priv_data_size = sizeof(RTMPContext), \
.flags = URL_PROTOCOL_FLAG_NETWORK, \
.priv_data_class= &flavor##_class, \
};
RTMP_PROTOCOL(rtmp)
RTMP_PROTOCOL(rtmpe)
RTMP_PROTOCOL(rtmps)
RTMP_PROTOCOL(rtmpt)
RTMP_PROTOCOL(rtmpte)
RTMP_PROTOCOL(rtmpts)
More information about the ffmpeg-devel
mailing list