[rtmpdump] r200 - trunk/rtmpsuck.c
hyc
subversion at mplayerhq.hu
Wed Jan 6 11:29:48 CET 2010
Author: hyc
Date: Wed Jan 6 11:29:47 2010
New Revision: 200
Log:
Relay data in chunks instead of full packets, to reduce latency. Support
multiple outstanding Play requests.
Modified:
trunk/rtmpsuck.c
Modified: trunk/rtmpsuck.c
==============================================================================
--- trunk/rtmpsuck.c Wed Jan 6 11:23:59 2010 (r199)
+++ trunk/rtmpsuck.c Wed Jan 6 11:29:47 2010 (r200)
@@ -70,6 +70,13 @@ enum
STREAMING_STOPPED
};
+typedef struct Flist
+{
+ struct Flist *f_next;
+ FILE *f_file;
+ AVal f_path;
+} Flist;
+
typedef struct Plist
{
struct Plist *p_next;
@@ -85,7 +92,8 @@ typedef struct
RTMP rc;
Plist *rs_pkt[2]; /* head, tail */
Plist *rc_pkt[2]; /* head, tail */
- FILE *out;
+ Flist *f_head, *f_tail;
+ Flist *f_cur;
} STREAMING_SERVER;
@@ -129,9 +137,9 @@ SAVC(fmsVer);
SAVC(mode);
SAVC(level);
SAVC(code);
-SAVC(description);
SAVC(secureToken);
SAVC(onStatus);
+SAVC(details);
SAVC(close);
static const AVal av_NetStream_Failed = AVC("NetStream.Failed");
static const AVal av_NetStream_Play_Failed = AVC("NetStream.Play.Failed");
@@ -190,13 +198,7 @@ ServeInvoke(STREAMING_SERVER *server, in
if (cobj.o_props[i].p_type == AMF_STRING)
{
pval = cobj.o_props[i].p_vu.p_aval;
- if (pval.av_val)
- {
- pval.av_val = malloc(pval.av_len+1);
- memcpy(pval.av_val, cobj.o_props[i].p_vu.p_aval.av_val, pval.av_len);
- pval.av_val[pval.av_len] = '\0';
- }
- LogPrintf("%.*s: %s\n", pname.av_len, pname.av_val, pval.av_val);
+ LogPrintf("%.*s: %.*s\n", pname.av_len, pname.av_val, pval.av_len, pval.av_val);
}
if (AVMATCH(&pname, &av_app))
{
@@ -289,12 +291,7 @@ ServeInvoke(STREAMING_SERVER *server, in
server->rc.Link.authflag = AMFProp_GetBoolean(&obj.o_props[3]);
if (obj.o_num > 4)
{
- AVal tmp;
- AMFProp_GetString(&obj.o_props[4], &tmp);
- server->rc.Link.auth.av_len = tmp.av_len;
- server->rc.Link.auth.av_val = malloc(tmp.av_len+1);
- memcpy(server->rc.Link.auth.av_val, tmp.av_val, tmp.av_len);
- server->rc.Link.auth.av_val[tmp.av_len] = '\0';
+ AMFProp_GetString(&obj.o_props[4], &server->rc.Link.auth);
}
}
@@ -307,6 +304,7 @@ ServeInvoke(STREAMING_SERVER *server, in
else if (AVMATCH(&method, &av_play))
{
AVal av;
+ FILE *out;
char *file, *p, *q;
char flvHeader[] = { 'F', 'L', 'V', 0x01,
0x05, // video + audio, we finalize later if the value is different
@@ -317,7 +315,8 @@ ServeInvoke(STREAMING_SERVER *server, in
server->rc.m_stream_id = pack->m_nInfoField2;
AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &av);
server->rc.Link.playpath = av;
- q = strchr(av.av_val, '?');
+
+ q = memchr(av.av_val, '?', av.av_len);
if (q)
av.av_len = q - av.av_val;
for (p=av.av_val+av.av_len-1; p>=av.av_val; p--)
@@ -337,20 +336,38 @@ ServeInvoke(STREAMING_SERVER *server, in
LogPrintf("Playpath: %.*s\nSaving as: %s\n",
server->rc.Link.playpath.av_len, server->rc.Link.playpath.av_val,
file);
- server->out = fopen(file, "wb");
- if (!server->out)
+ out = fopen(file, "wb");
+ free(file);
+ if (!out)
ret = 1;
else
- fwrite(flvHeader, 1, sizeof(flvHeader), server->out);
- free(file);
+ {
+ Flist *fl;
+ fwrite(flvHeader, 1, sizeof(flvHeader), out);
+ av = server->rc.Link.playpath;
+ fl = malloc(sizeof(Flist)+av.av_len+1);
+ fl->f_file = out;
+ fl->f_path.av_len = av.av_len;
+ fl->f_path.av_val = (char *)(fl+1);
+ memcpy(fl->f_path.av_val, av.av_val, av.av_len);
+ fl->f_path.av_val[av.av_len] = '\0';
+ fl->f_next = NULL;
+ if (server->f_tail)
+ server->f_tail->f_next = fl;
+ else
+ server->f_head = fl;
+ server->f_tail = fl;
+ server->f_cur = fl;
+ }
}
else if (AVMATCH(&method, &av_onStatus))
{
AMFObject obj2;
- AVal code, level;
+ AVal code, level, details;
AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
+ AMFProp_GetString(AMF_GetProp(&obj2, &av_details, -1), &details);
Log(LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
if (AVMATCH(&code, &av_NetStream_Failed)
@@ -363,6 +380,15 @@ ServeInvoke(STREAMING_SERVER *server, in
if (AVMATCH(&code, &av_NetStream_Play_Start))
{
+ Flist *fl;
+ /* If multiple streams were queued up, find the one
+ to make current. */
+ for (fl = server->f_head; fl; fl=fl->f_next)
+ if (AVMATCH(&fl->f_path, &details) && fl->f_file)
+ {
+ server->f_cur = fl;
+ break;
+ }
server->rc.m_bPlaying = true;
}
@@ -370,6 +396,18 @@ ServeInvoke(STREAMING_SERVER *server, in
if (AVMATCH(&code, &av_NetStream_Play_Complete)
|| AVMATCH(&code, &av_NetStream_Play_Stop))
{
+ Flist **fl;
+ /* Remove this file from the play queue */
+ for (fl = &server->f_head; *fl; fl = &(*fl)->f_next)
+ if (*fl == server->f_cur)
+ {
+ Flist *f = *fl;
+ *fl = f->f_next;
+ f->f_file = NULL;
+ free(f);
+ server->f_cur = NULL;
+ break;
+ }
ret = 1;
}
}
@@ -643,6 +681,7 @@ controlServerThread(void *unused)
case 'q':
LogPrintf("Exiting\n");
stopStreaming(rtmpServer);
+ free(rtmpServer);
exit(0);
break;
default:
@@ -657,7 +696,7 @@ void doServe(STREAMING_SERVER * server,
)
{
RTMPPacket pc = { 0 }, ps = { 0 };
- char *buf;
+ char *buf, hbuf[RTMP_MAX_HEADER_SIZE];
unsigned int buflen = 131072;
bool paused = false;
@@ -703,6 +742,8 @@ void doServe(STREAMING_SERVER * server,
break;
}
+ pc.m_header = hbuf;
+
/* We have our own timeout in select() */
server->rc.Link.timeout = 10;
server->rs.Link.timeout = 10;
@@ -729,12 +770,12 @@ void doServe(STREAMING_SERVER * server,
FD_SET(server->rc.m_socket, &rfds);
/* give more time to start up if we're not playing yet */
- tv.tv_sec = server->out ? 30 : 60;
+ tv.tv_sec = server->f_cur ? 30 : 60;
tv.tv_usec = 0;
if (select(n + 1, &rfds, NULL, NULL, &tv) <= 0)
{
- if (server->out && server->rc.m_mediaChannel && !paused)
+ if (server->f_cur && server->rc.m_mediaChannel && !paused)
{
server->rc.m_pauseStamp = server->rc.m_channelTimestamp[server->rc.m_mediaChannel];
if (RTMP_ToggleStream(&server->rc))
@@ -795,10 +836,11 @@ void doServe(STREAMING_SERVER * server,
}
}
else if (ps.m_packetType == 0x11 || ps.m_packetType == 0x14)
- if (ServePacket(server, 0, &ps) && server->out)
+ if (ServePacket(server, 0, &ps) && server->f_cur)
{
- fclose(server->out);
- server->out = NULL;
+ fclose(server->f_cur->f_file);
+ server->f_cur->f_file = NULL;
+ server->f_cur = NULL;
}
RTMP_SendPacket(&server->rc, &ps, false);
RTMPPacket_Free(&ps);
@@ -807,98 +849,107 @@ void doServe(STREAMING_SERVER * server,
}
if (cr)
{
+ int n = pc.m_nBytesRead;
while (RTMP_ReadPacket(&server->rc, &pc))
- if (RTMPPacket_IsReady(&pc))
- {
- int sendit = 1;
- if (paused)
- {
- if (pc.m_nTimeStamp <= server->rc.m_mediaStamp)
- continue;
- paused = 0;
- server->rc.m_pausing = 0;
- }
- /* change chunk size */
- if (pc.m_packetType == 0x01)
- {
- if (pc.m_nBodySize >= 4)
- {
- server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body);
- Log(LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__,
- server->rc.m_inChunkSize);
- server->rs.m_outChunkSize = server->rc.m_inChunkSize;
- }
- }
- else if (pc.m_packetType == 0x04)
- {
- short nType = AMF_DecodeInt16(pc.m_body);
- /* SWFverification */
- if (nType == 0x1a)
-#ifdef CRYPTO
- if (server->rc.Link.SWFHash.av_len)
- {
- RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
- sendit = 0;
- }
-#else
- /* The session will certainly fail right after this */
- Log(LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__);
-#endif
- }
- else if (server->out && (
- pc.m_packetType == 0x08 ||
- pc.m_packetType == 0x09 ||
- pc.m_packetType == 0x12 ||
- pc.m_packetType == 0x16) &&
- RTMP_ClientPacket(&server->rc, &pc))
- {
- int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
- if (len > 0 && fwrite(buf, 1, len, server->out) != len)
- goto cleanup;
- }
- else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14)
- {
- if (ServePacket(server, 1, &pc) && server->out)
- {
- fclose(server->out);
- server->out = NULL;
- }
- }
-
- if (sendit && RTMP_IsConnected(&server->rs))
- RTMP_SendPacket(&server->rs, &pc, false);
- RTMPPacket_Free(&pc);
- break;
- }
+ {
+ int sendit = 1;
+ if (RTMPPacket_IsReady(&pc))
+ {
+ if (paused)
+ {
+ if (pc.m_nTimeStamp <= server->rc.m_mediaStamp)
+ continue;
+ paused = 0;
+ server->rc.m_pausing = 0;
+ }
+ /* change chunk size */
+ if (pc.m_packetType == 0x01)
+ {
+ if (pc.m_nBodySize >= 4)
+ {
+ server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body);
+ Log(LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__,
+ server->rc.m_inChunkSize);
+ server->rs.m_outChunkSize = server->rc.m_inChunkSize;
+ }
+ }
+ else if (pc.m_packetType == 0x04)
+ {
+ short nType = AMF_DecodeInt16(pc.m_body);
+ /* SWFverification */
+ if (nType == 0x1a)
+ #ifdef CRYPTO
+ if (server->rc.Link.SWFHash.av_len)
+ {
+ RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
+ sendit = 0;
+ }
+ #else
+ /* The session will certainly fail right after this */
+ Log(LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__);
+ #endif
+ }
+ else if (server->f_cur && (
+ pc.m_packetType == 0x08 ||
+ pc.m_packetType == 0x09 ||
+ pc.m_packetType == 0x12 ||
+ pc.m_packetType == 0x16) &&
+ RTMP_ClientPacket(&server->rc, &pc))
+ {
+ int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
+ if (len > 0 && fwrite(buf, 1, len, server->f_cur->f_file) != len)
+ goto cleanup;
+ }
+ else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14)
+ {
+ if (ServePacket(server, 1, &pc) && server->f_cur)
+ {
+ fclose(server->f_cur->f_file);
+ server->f_cur->f_file = NULL;
+ server->f_cur = NULL;
+ }
+ }
+ }
+ if (sendit && RTMP_IsConnected(&server->rs))
+ RTMP_SendChunk(&server->rs, &pc, n);
+ if (RTMPPacket_IsReady(&pc))
+ {
+ RTMPPacket_Free(&pc);
+ pc.m_nBytesRead = 0;
+ }
+ break;
+ }
}
if (!RTMP_IsConnected(&server->rs) && RTMP_IsConnected(&server->rc)
- && !server->out)
+ && !server->f_cur)
RTMP_Close(&server->rc);
}
cleanup:
LogPrintf("Closing connection... ");
RTMP_Close(&server->rs);
- if (server->out)
+ while (server->f_head)
{
- fclose(server->out);
- server->out = NULL;
+ Flist *fl = server->f_head;
+ server->f_head = fl->f_next;
+ if (fl->f_file)
+ fclose(fl->f_file);
+ free(fl);
}
+ free(buf);
/* Should probably be done by RTMP_Close() ... */
free((void *)server->rc.Link.hostname);
server->rc.Link.hostname = NULL;
- free(server->rc.Link.tcUrl.av_val);
server->rc.Link.tcUrl.av_val = NULL;
- free(server->rc.Link.swfUrl.av_val);
server->rc.Link.swfUrl.av_val = NULL;
- free(server->rc.Link.pageUrl.av_val);
server->rc.Link.pageUrl.av_val = NULL;
- free(server->rc.Link.app.av_val);
server->rc.Link.app.av_val = NULL;
- free(server->rc.Link.auth.av_val);
server->rc.Link.auth.av_val = NULL;
- free(server->rc.Link.flashVer.av_val);
server->rc.Link.flashVer.av_val = NULL;
+#ifdef CRYPTO
+ free(server->rc.Link.SWFHash.av_val);
+ server->rc.Link.SWFHash.av_val = NULL;
+#endif
LogPrintf("done!\n\n");
quit:
@@ -1044,7 +1095,7 @@ main(int argc, char **argv)
LogPrintf("RTMP Proxy Server %s\n", RTMPDUMP_VERSION);
LogPrintf("(c) 2010 Andrej Stepanchuk, Howard Chu; license: GPL\n\n");
- debuglevel = LOGINFO;
+ debuglevel = LOGDEBUG;
if (argc > 1 && !strcmp(argv[1], "-z"))
debuglevel = LOGALL;
@@ -1080,6 +1131,8 @@ main(int argc, char **argv)
}
Log(LOGDEBUG, "Done, exiting...");
+ free(rtmpServer);
+
CleanupSockets();
#ifdef _DEBUG
More information about the rtmpdump
mailing list