[rtmpdump] r127 - in trunk: Makefile rtmpsuck.c
hyc
subversion at mplayerhq.hu
Tue Dec 29 09:12:10 CET 2009
Author: hyc
Date: Tue Dec 29 09:12:09 2009
New Revision: 127
Log:
Add transparent proxy server, work in progress.
Added:
trunk/rtmpsuck.c
Modified:
trunk/Makefile
Modified: trunk/Makefile
==============================================================================
--- trunk/Makefile Tue Dec 29 09:10:10 2009 (r126)
+++ trunk/Makefile Tue Dec 29 09:12:09 2009 (r127)
@@ -46,6 +46,9 @@ rtmpdump: log.o rtmp.o amf.o rtmpdump.o
rtmpsrv: log.o rtmp.o amf.o rtmpsrv.o
$(CC) $(LDFLAGS) $^ -o $@$(EXT) $(SLIBS)
+rtmpsuck: log.o rtmp.o amf.o rtmpsuck.o swfvfy.o
+ $(CC) $(LDFLAGS) $^ -o $@$(EXT) $(SLIBS)
+
log.o: log.c log.h Makefile
parseurl.o: parseurl.c parseurl.h log.h Makefile
streams.o: streams.c rtmp.h log.h swfvfy.o Makefile
Added: trunk/rtmpsuck.c
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ trunk/rtmpsuck.c Tue Dec 29 09:12:09 2009 (r127)
@@ -0,0 +1,1051 @@
+/* RTMP Proxy Server
+ * Copyright (C) 2009 Andrej Stepanchuk
+ * Copyright (C) 2009 Howard Chu
+ *
+ * This Program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This Program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with RTMPDump; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+
+/* This is a Proxy Server that displays the connection parameters from a
+ * client and then saves any data streamed to the client.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <math.h>
+#include <limits.h>
+
+#include <signal.h>
+#include <getopt.h>
+
+#include <assert.h>
+
+#include "rtmp.h"
+#include "parseurl.h"
+
+#ifdef WIN32
+#include <process.h>
+#else
+#ifdef linux
+#include <linux/netfilter_ipv4.h>
+#endif
+#include <pthread.h>
+#endif
+
+#ifdef CRYPTO
+#include <curl/curl.h>
+#define HASHLEN 32
+extern int SWFVerify(const char *url, unsigned int *size, unsigned char *hash);
+#define InitCurl() curl_global_init(CURL_GLOBAL_ALL)
+#define FreeCurl() curl_global_cleanup()
+#else
+#define InitCurl()
+#define FreeCurl()
+#endif
+
+#define RTMPDUMP_PROXY_VERSION "v2.0"
+
+#define RD_SUCCESS 0
+#define RD_FAILED 1
+#define RD_INCOMPLETE 2
+
+#define PACKET_SIZE 1024*1024
+
+#ifdef WIN32
+#define InitSockets() {\
+ WORD version; \
+ WSADATA wsaData; \
+ \
+ version = MAKEWORD(1,1); \
+ WSAStartup(version, &wsaData); }
+
+#define CleanupSockets() WSACleanup()
+#else
+#define InitSockets()
+#define CleanupSockets()
+#endif
+
+enum
+{
+ STREAMING_ACCEPTING,
+ STREAMING_IN_PROGRESS,
+ STREAMING_STOPPING,
+ STREAMING_STOPPED
+};
+
+typedef struct Plist
+{
+ struct Plist *p_next;
+ RTMPPacket p_pkt;
+} Plist;
+
+typedef struct
+{
+ int socket;
+ int state;
+ uint32_t stamp;
+ RTMP rs;
+ RTMP rc;
+ Plist *rs_pkt[2]; /* head, tail */
+ Plist *rc_pkt[2]; /* head, tail */
+ FILE *out;
+
+} STREAMING_SERVER;
+
+STREAMING_SERVER *rtmpServer = 0; // server structure pointer
+
+STREAMING_SERVER *startStreaming(const char *address, int port);
+void stopStreaming(STREAMING_SERVER * server);
+
+typedef struct
+{
+ char *hostname;
+ int rtmpport;
+ int protocol;
+ bool bLiveStream; // is it a live stream? then we can't seek/resume
+
+ long int timeout; // timeout connection afte 300 seconds
+ uint32_t bufferTime;
+
+ char *rtmpurl;
+ AVal playpath;
+ AVal swfUrl;
+ AVal tcUrl;
+ AVal pageUrl;
+ AVal app;
+ AVal auth;
+ AVal swfHash;
+ AVal flashVer;
+ AVal subscribepath;
+ uint32_t swfSize;
+
+ uint32_t dStartOffset;
+ uint32_t dStopOffset;
+ uint32_t nTimeStamp;
+} RTMP_REQUEST;
+
+#define STR2AVAL(av,str) av.av_val = str; av.av_len = strlen(av.av_val)
+
+/* this request is formed from the parameters and used to initialize a new request,
+ * thus it is a default settings list. All settings can be overriden by specifying the
+ * parameters in the GET request. */
+RTMP_REQUEST defaultRTMPRequest;
+
+#ifdef _DEBUG
+uint32_t debugTS = 0;
+
+int pnum = 0;
+
+FILE *netstackdump = NULL;
+FILE *netstackdump_read = NULL;
+#endif
+
+static void
+QueuePkt(Plist **q, RTMPPacket *p)
+{
+ Plist *k;
+
+ k = malloc(sizeof(Plist));
+ k->p_pkt = *p;
+ k->p_next = NULL;
+ if (!q[0])
+ q[0] = k;
+ else
+ q[1]->p_next = k;
+ q[1] = k;
+ p->m_body = NULL;
+}
+
+static void
+DequeuePkt(Plist **q, RTMPPacket *p)
+{
+ Plist *k = q[0];
+ q[0] = k->p_next;
+ if (!q[0])
+ q[1] = NULL;
+ *p = k->p_pkt;
+ free(k);
+}
+
+#define SAVC(x) static const AVal av_##x = AVC(#x)
+
+SAVC(app);
+SAVC(connect);
+SAVC(flashVer);
+SAVC(swfUrl);
+SAVC(pageUrl);
+SAVC(tcUrl);
+SAVC(fpad);
+SAVC(capabilities);
+SAVC(audioCodecs);
+SAVC(videoCodecs);
+SAVC(videoFunction);
+SAVC(objectEncoding);
+SAVC(_result);
+SAVC(createStream);
+SAVC(play);
+SAVC(fmsVer);
+SAVC(mode);
+SAVC(level);
+SAVC(code);
+SAVC(description);
+SAVC(secureToken);
+
+// Returns 0 for OK/Failed/error, 1 for 'Stop or Complete'
+int
+ServeInvoke(STREAMING_SERVER *server, RTMPPacket *pack, const char *body)
+{
+ int ret = 0, nRes;
+ int nBodySize = pack->m_nBodySize;
+
+ if (body > pack->m_body)
+ nBodySize--;
+
+ if (body[0] != 0x02) // make sure it is a string method name we start with
+ {
+ Log(LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
+ __FUNCTION__);
+ return 0;
+ }
+
+ AMFObject obj;
+ nRes = AMF_Decode(&obj, body, nBodySize, false);
+ if (nRes < 0)
+ {
+ Log(LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
+ return 0;
+ }
+
+ AMF_Dump(&obj);
+ AVal method;
+ AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
+ Log(LOGDEBUG, "%s, client invoking <%s>", __FUNCTION__, method.av_val);
+
+ if (AVMATCH(&method, &av_connect))
+ {
+ AMFObject cobj;
+ AVal pname, pval;
+ int i;
+ AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &cobj);
+ LogPrintf("Processing connect\n");
+ for (i=0; i<cobj.o_num; i++)
+ {
+ pname = cobj.o_props[i].p_name;
+ pval.av_val = NULL;
+ pval.av_len = 0;
+ if (cobj.o_props[i].p_type == AMF_STRING)
+ {
+ pval = cobj.o_props[i].p_vu.p_aval;
+ if (pval.av_val)
+ pval.av_val = strdup(pval.av_val);
+ LogPrintf("%.*s: %.*s\n", pname.av_len, pname.av_val, pval.av_len, pval.av_val);
+ }
+ if (AVMATCH(&pname, &av_app))
+ {
+ server->rc.Link.app = pval;
+ pval.av_val = NULL;
+ }
+ else if (AVMATCH(&pname, &av_flashVer))
+ {
+ server->rc.Link.flashVer = pval;
+ pval.av_val = NULL;
+ }
+ else if (AVMATCH(&pname, &av_swfUrl))
+ {
+ unsigned char hash[HASHLEN];
+ server->rc.Link.swfUrl = pval;
+ if (SWFVerify(pval.av_val, &server->rc.Link.SWFSize, hash) == 0)
+ {
+ server->rc.Link.SWFHash.av_val = malloc(HASHLEN);
+ memcpy(server->rc.Link.SWFHash.av_val, hash, HASHLEN);
+ server->rc.Link.SWFHash.av_len = HASHLEN;
+ }
+ pval.av_val = NULL;
+ }
+ else if (AVMATCH(&pname, &av_tcUrl))
+ {
+ char *r1 = NULL, *r2;
+ int len;
+
+ server->rc.Link.tcUrl = pval;
+ if ((pval.av_val[0] | 0x40) == 'r' &&
+ (pval.av_val[1] | 0x40) == 't' &&
+ (pval.av_val[2] | 0x40) == 'm' &&
+ (pval.av_val[3] | 0x40) == 'p')
+ {
+ if (pval.av_val[4] == ':')
+ {
+ server->rc.Link.protocol = RTMP_PROTOCOL_RTMP;
+ r1 = pval.av_val+7;
+ }
+ else if ((pval.av_val[4] | 0x40) == 'e' && pval.av_val[5] == ':')
+ {
+ server->rc.Link.protocol = RTMP_PROTOCOL_RTMPE;
+ r1 = pval.av_val+8;
+ }
+ r2 = strchr(r1, '/');
+ len = r2 - r1;
+ r2 = malloc(len+1);
+ memcpy(r2, r1, len);
+ r2[len] = '\0';
+ server->rc.Link.hostname = (const char *)r2;
+ r1 = strrchr(server->rc.Link.hostname, ':');
+ if (r1)
+ {
+ *r1++ = '\0';
+ server->rc.Link.port = atoi(r1);
+ }
+ else
+ {
+ server->rc.Link.port = 1935;
+ }
+ }
+ pval.av_val = NULL;
+ }
+ else if (AVMATCH(&pname, &av_pageUrl))
+ {
+ server->rc.Link.pageUrl = pval;
+ pval.av_val = NULL;
+ }
+ else if (AVMATCH(&pname, &av_audioCodecs))
+ {
+ server->rc.m_fAudioCodecs = cobj.o_props[i].p_vu.p_number;
+ }
+ else if (AVMATCH(&pname, &av_videoCodecs))
+ {
+ server->rc.m_fVideoCodecs = cobj.o_props[i].p_vu.p_number;
+ }
+ else if (AVMATCH(&pname, &av_objectEncoding))
+ {
+ server->rc.m_fEncoding = cobj.o_props[i].p_vu.p_number;
+ }
+ /* Dup'd a string we didn't recognize? */
+ if (pval.av_val)
+ free(pval.av_val);
+ }
+
+ if (!RTMP_Connect(&server->rc))
+ {
+ /* failed */
+ return 1;
+ }
+ }
+ else if (AVMATCH(&method, &av_play))
+ {
+ char *file, *p;
+ char flvHeader[] = { 'F', 'L', 'V', 0x01,
+ 0x05, // video + audio, we finalize later if the value is different
+ 0x00, 0x00, 0x00, 0x09,
+ 0x00, 0x00, 0x00, 0x00 // first prevTagSize=0
+ };
+
+ AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &server->rc.Link.playpath);
+ file = malloc(server->rc.Link.playpath.av_len+1);
+ memcpy(file, server->rc.Link.playpath.av_val, server->rc.Link.playpath.av_len);
+ file[server->rc.Link.playpath.av_len] = '\0';
+ for (p=file; *p; p++)
+ if (*p == '/')
+ *p = '_';
+ LogPrintf("Playpath: %.*s, writing to %s\n", server->rc.Link.playpath.av_len,
+ server->rc.Link.playpath.av_val, file);
+ server->out = fopen(file, "wb");
+ if (!server->out)
+ ret = 1;
+ else
+ fwrite(flvHeader, 1, sizeof(flvHeader), server->out);
+ }
+ AMF_Reset(&obj);
+ return ret;
+}
+
+int
+ServePacket(STREAMING_SERVER *server, RTMPPacket *packet)
+{
+ int ret = 0;
+
+ Log(LOGDEBUG, "%s, received packet type %02X, size %lu bytes", __FUNCTION__,
+ packet->m_packetType, packet->m_nBodySize);
+
+ switch (packet->m_packetType)
+ {
+ case 0x01:
+ // chunk size
+// HandleChangeChunkSize(r, packet);
+ break;
+
+ case 0x03:
+ // bytes read report
+ break;
+
+ case 0x04:
+ // ctrl
+// HandleCtrl(r, packet);
+ break;
+
+ case 0x05:
+ // server bw
+// HandleServerBW(r, packet);
+ break;
+
+ case 0x06:
+ // client bw
+ // HandleClientBW(r, packet);
+ break;
+
+ case 0x08:
+ // audio data
+ //Log(LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize);
+ break;
+
+ case 0x09:
+ // video data
+ //Log(LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize);
+ break;
+
+ case 0x0F: // flex stream send
+ break;
+
+ case 0x10: // flex shared object
+ break;
+
+ case 0x11: // flex message
+ {
+ Log(LOGDEBUG, "%s, flex message, size %lu bytes, not fully supported",
+ __FUNCTION__, packet->m_nBodySize);
+ //LogHex(packet.m_body, packet.m_nBodySize);
+
+ // some DEBUG code
+ /*RTMP_LIB_AMFObject obj;
+ int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1);
+ if(nRes < 0) {
+ Log(LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);
+ //return;
+ }
+
+ obj.Dump(); */
+
+ ServeInvoke(server, packet, packet->m_body + 1);
+ break;
+ }
+ case 0x12:
+ // metadata (notify)
+ break;
+
+ case 0x13:
+ /* shared object */
+ break;
+
+ case 0x14:
+ // invoke
+ Log(LOGDEBUG, "%s, received: invoke %lu bytes", __FUNCTION__,
+ packet->m_nBodySize);
+ //LogHex(packet.m_body, packet.m_nBodySize);
+
+ if (ServeInvoke(server, packet, packet->m_body))
+ RTMP_Close(&server->rs);
+ break;
+
+ case 0x16:
+ /* flv */
+ break;
+ default:
+ Log(LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
+ packet->m_packetType);
+#ifdef _DEBUG
+ LogHex(LOGDEBUG, packet->m_body, packet->m_nBodySize);
+#endif
+ }
+ return ret;
+}
+
+int
+WriteStream(char **buf, // target pointer, maybe preallocated
+ unsigned int *plen, // length of buffer if preallocated
+ uint32_t *nTimeStamp,
+ RTMPPacket *packet)
+{
+ uint32_t prevTagSize = 0;
+ int ret = -1, len = *plen;
+
+ while (1)
+ {
+ char *packetBody = packet->m_body;
+ unsigned int nPacketLen = packet->m_nBodySize;
+
+ // skip video info/command packets
+ if (packet->m_packetType == 0x09 &&
+ nPacketLen == 2 && ((*packetBody & 0xf0) == 0x50))
+ {
+ ret = 0;
+ break;
+ }
+
+ if (packet->m_packetType == 0x09 && nPacketLen <= 5)
+ {
+ Log(LOGWARNING, "ignoring too small video packet: size: %d",
+ nPacketLen);
+ ret = 0;
+ break;
+ }
+ if (packet->m_packetType == 0x08 && nPacketLen <= 1)
+ {
+ Log(LOGWARNING, "ignoring too small audio packet: size: %d",
+ nPacketLen);
+ ret = 0;
+ break;
+ }
+#ifdef _DEBUG
+ Log(LOGDEBUG, "type: %02X, size: %d, TS: %d ms", packet->m_packetType,
+ nPacketLen, packet->m_nTimeStamp);
+ if (packet->m_packetType == 0x09)
+ Log(LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0));
+#endif
+
+ // calculate packet size and reallocate buffer if necessary
+ unsigned int size = nPacketLen
+ +
+ ((packet->m_packetType == 0x08 || packet->m_packetType == 0x09
+ || packet->m_packetType == 0x12) ? 11 : 0) + (packet->m_packetType !=
+ 0x16 ? 4 : 0);
+
+ if (size + 4 > len)
+ { // the extra 4 is for the case of an FLV stream without a last prevTagSize (we need extra 4 bytes to append it)
+ *buf = (char *) realloc(*buf, size + 4);
+ if (*buf == 0)
+ {
+ Log(LOGERROR, "Couldn't reallocate memory!");
+ ret = -1; // fatal error
+ break;
+ }
+ }
+ char *ptr = *buf, *pend = ptr + size+4;
+
+ // audio (0x08), video (0x09) or metadata (0x12) packets :
+ // construct 11 byte header then add rtmp packet's data
+ if (packet->m_packetType == 0x08 || packet->m_packetType == 0x09
+ || packet->m_packetType == 0x12)
+ {
+ // set data type
+ //*dataType |= (((packet->m_packetType == 0x08)<<2)|(packet->m_packetType == 0x09));
+
+ (*nTimeStamp) = packet->m_nTimeStamp;
+ prevTagSize = 11 + nPacketLen;
+
+ *ptr++ = packet->m_packetType;
+ ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);
+ ptr = AMF_EncodeInt24(ptr, pend, *nTimeStamp);
+ *ptr = (char) (((*nTimeStamp) & 0xFF000000) >> 24);
+ ptr++;
+
+ // stream id
+ ptr = AMF_EncodeInt24(ptr, pend, 0);
+ }
+
+ memcpy(ptr, packetBody, nPacketLen);
+ unsigned int len = nPacketLen;
+
+ // correct tagSize and obtain timestamp if we have an FLV stream
+ if (packet->m_packetType == 0x16)
+ {
+ unsigned int pos = 0;
+
+ while (pos + 11 < nPacketLen)
+ {
+ uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1); // size without header (11) and without prevTagSize (4)
+ *nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4);
+ *nTimeStamp |= (packetBody[pos + 7] << 24);
+
+ // set data type
+ //*dataType |= (((*(packetBody+pos) == 0x08)<<2)|(*(packetBody+pos) == 0x09));
+
+ if (pos + 11 + dataSize + 4 > nPacketLen)
+ {
+ if (pos + 11 + dataSize > nPacketLen)
+ {
+ Log(LOGERROR,
+ "Wrong data size (%lu), stream corrupted, aborting!",
+ dataSize);
+ ret = -2;
+ break;
+ }
+ Log(LOGWARNING, "No tagSize found, appending!");
+
+ // we have to append a last tagSize!
+ prevTagSize = dataSize + 11;
+ AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize);
+ size += 4;
+ len += 4;
+ }
+ else
+ {
+ prevTagSize =
+ AMF_DecodeInt32(packetBody + pos + 11 + dataSize);
+
+#ifdef _DEBUG
+ Log(LOGDEBUG,
+ "FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms",
+ (unsigned char) packetBody[pos], dataSize, prevTagSize,
+ *nTimeStamp);
+#endif
+
+ if (prevTagSize != (dataSize + 11))
+ {
+#ifdef _DEBUG
+ Log(LOGWARNING,
+ "Tag and data size are not consitent, writing tag size according to dataSize+11: %d",
+ dataSize + 11);
+#endif
+
+ prevTagSize = dataSize + 11;
+ AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize);
+ }
+ }
+
+ pos += prevTagSize + 4; //(11+dataSize+4);
+ }
+ }
+ ptr += len;
+
+ if (packet->m_packetType != 0x16)
+ { // FLV tag packets contain their own prevTagSize
+ AMF_EncodeInt32(ptr, pend, prevTagSize);
+ //ptr += 4;
+ }
+
+ ret = size;
+ break;
+ }
+
+ if (len > *plen)
+ *plen = len;
+
+ return ret; // no more media packets
+}
+
+#ifdef WIN32
+HANDLE
+ThreadCreate(void *(*routine) (void *), void *args)
+{
+ HANDLE thd;
+
+ thd = (HANDLE) _beginthread(routine, 0, args);
+ if (thd == -1L)
+ LogPrintf("%s, _beginthread failed with %d\n", __FUNCTION__, errno);
+
+ return thd;
+}
+#else
+pthread_t
+ThreadCreate(void *(*routine) (void *), void *args)
+{
+ pthread_t id = 0;
+ pthread_attr_t attributes;
+ int ret;
+
+ pthread_attr_init(&attributes);
+ pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED);
+
+ ret =
+ pthread_create(&id, &attributes, (void *(*)(void *)) routine,
+ (void *) args);
+ if (ret != 0)
+ LogPrintf("%s, pthread_create failed with %d\n", __FUNCTION__, ret);
+
+ return id;
+}
+#endif
+
+void *
+controlServerThread(void *unused)
+{
+ char ich;
+ while (1)
+ {
+ ich = getchar();
+ switch (ich)
+ {
+ case 'q':
+ LogPrintf("Exiting\n");
+ stopStreaming(rtmpServer);
+ exit(0);
+ break;
+ default:
+ LogPrintf("Unknown command \'%c\', ignoring\n", ich);
+ }
+ }
+ return 0;
+}
+
+void doServe(STREAMING_SERVER * server, // server socket and state (our listening socket)
+ int sockfd // client connection socket
+ )
+{
+ RTMPPacket pc = { 0 }, ps = { 0 };
+ char *buf;
+ unsigned int buflen = 131072;
+
+ // timeout for http requests
+ fd_set rfds, wfds;
+ struct timeval tv;
+
+ server->state = STREAMING_IN_PROGRESS;
+
+ memset(&tv, 0, sizeof(struct timeval));
+ tv.tv_sec = 5;
+
+ FD_ZERO(&rfds);
+ FD_SET(sockfd, &rfds);
+
+ if (select(sockfd + 1, &rfds, NULL, NULL, &tv) <= 0)
+ {
+ Log(LOGERROR, "Request timeout/select failed, ignoring request");
+ goto quit;
+ }
+ else
+ {
+ RTMP_Init(&server->rs);
+ RTMP_Init(&server->rc);
+ server->rs.m_socket = sockfd;
+ if (!RTMP_Serve(&server->rs))
+ {
+ Log(LOGERROR, "Handshake failed");
+ goto cleanup;
+ }
+ }
+
+ buf = malloc(buflen);
+
+ /* Just process the Connect request */
+ while (RTMP_IsConnected(&server->rs) && RTMP_ReadPacket(&server->rs, &ps))
+ {
+ if (!RTMPPacket_IsReady(&ps))
+ continue;
+ ServePacket(server, &ps);
+ RTMPPacket_Free(&ps);
+ if (RTMP_IsConnected(&server->rc))
+ break;
+ }
+
+ while (RTMP_IsConnected(&server->rs) || RTMP_IsConnected(&server->rc))
+ {
+ int n;
+ int sr, cr;
+
+ cr = server->rc.m_nBufferSize;
+ sr = server->rs.m_nBufferSize;
+
+ if (cr || sr)
+ {
+ FD_SET(server->rc.m_socket, &wfds);
+ FD_SET(server->rs.m_socket, &wfds);
+ }
+ else
+ {
+ n = server->rs.m_socket;
+ if (server->rc.m_socket > n)
+ n = server->rc.m_socket;
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ if (RTMP_IsConnected(&server->rs))
+ FD_SET(sockfd, &rfds);
+ if (RTMP_IsConnected(&server->rc))
+ FD_SET(server->rc.m_socket, &rfds);
+
+ if (server->rs_pkt[0])
+ FD_SET(server->rc.m_socket, &wfds);
+ if (server->rc_pkt[0])
+ FD_SET(server->rs.m_socket, &wfds);
+
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+
+ if (select(n + 1, &rfds, &wfds, NULL, &tv) <= 0)
+ {
+ Log(LOGERROR, "Request timeout/select failed, ignoring request");
+ goto cleanup;
+ }
+ if (FD_ISSET(server->rs.m_socket, &rfds))
+ sr = 1;
+ if (FD_ISSET(server->rc.m_socket, &rfds))
+ cr = 1;
+ }
+ if (sr)
+ {
+ while (RTMP_ReadPacket(&server->rs, &ps))
+ if (RTMPPacket_IsReady(&ps))
+ {
+ QueuePkt(server->rs_pkt, &ps);
+ break;
+ }
+ }
+ if (cr)
+ {
+ while (RTMP_ReadPacket(&server->rc, &pc))
+ if (RTMPPacket_IsReady(&pc))
+ {
+ QueuePkt(server->rc_pkt, &pc);
+ break;
+ }
+ }
+ if (server->rs_pkt[0] && FD_ISSET(server->rc.m_socket, &wfds))
+ {
+ DequeuePkt(server->rs_pkt, &ps);
+ if (!server->out && (ps.m_packetType == 0x11 || ps.m_packetType == 0x14))
+ ServePacket(server, &ps);
+ RTMP_SendPacket(&server->rc, &ps, false);
+ RTMPPacket_Free(&ps);
+ }
+ if (server->rc_pkt[0] && FD_ISSET(server->rs.m_socket, &wfds))
+ {
+ int sendit = 1;
+ DequeuePkt(server->rc_pkt, &pc);
+ if (pc.m_packetType == 0x04)
+ {
+ short nType = AMF_DecodeInt16(pc.m_body);
+ /* SWFverification */
+ if (nType == 0x1a && server->rc.Link.SWFHash.av_len)
+ {
+ RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
+ sendit = 0;
+ }
+ }
+ else if (server->out && (
+ pc.m_packetType == 0x08 ||
+ pc.m_packetType == 0x09 ||
+ pc.m_packetType == 0x12 ||
+ pc.m_packetType == 0x16))
+ {
+ int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
+ if (len > 0 && fwrite(buf, 1, len, server->out) != len)
+ goto cleanup;
+ pc.m_headerType = 1;
+ }
+ if (sendit && RTMP_IsConnected(&server->rs))
+ RTMP_SendPacket(&server->rs, &pc, false);
+ RTMPPacket_Free(&pc);
+ }
+ }
+
+cleanup:
+ LogPrintf("Closing connection... ");
+ RTMP_Close(&server->rs);
+ if (server->out)
+ fclose(server->out);
+ /* Should probably be done by RTMP_Close() ... */
+ free((void *)server->rc.Link.hostname);
+ free(server->rc.Link.tcUrl.av_val);
+ free(server->rc.Link.swfUrl.av_val);
+ free(server->rc.Link.pageUrl.av_val);
+ free(server->rc.Link.app.av_val);
+ free(server->rc.Link.auth.av_val);
+ free(server->rc.Link.flashVer.av_val);
+ LogPrintf("done!\n\n");
+
+quit:
+ if (server->state == STREAMING_IN_PROGRESS)
+ server->state = STREAMING_ACCEPTING;
+
+ return;
+}
+
+void *
+serverThread(STREAMING_SERVER * server)
+{
+ server->state = STREAMING_ACCEPTING;
+
+ while (server->state == STREAMING_ACCEPTING)
+ {
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(struct sockaddr_in);
+ int sockfd =
+ accept(server->socket, (struct sockaddr *) &addr, &addrlen);
+
+ if (sockfd > 0)
+ {
+#ifdef linux
+ struct sockaddr_in dest;
+ char destch[16];
+ socklen_t destlen = sizeof(struct sockaddr_in);
+ getsockopt(sockfd, SOL_IP, SO_ORIGINAL_DST, &dest, &destlen);
+ strcpy(destch, inet_ntoa(dest.sin_addr));
+ Log(LOGDEBUG, "%s: accepted connection from %s to %s\n", __FUNCTION__,
+ inet_ntoa(addr.sin_addr), destch);
+#else
+ Log(LOGDEBUG, "%s: accepted connection from %s\n", __FUNCTION__,
+ inet_ntoa(addr.sin_addr));
+#endif
+ /* Create a new thread and transfer the control to that */
+ doServe(server, sockfd);
+ Log(LOGDEBUG, "%s: processed request\n", __FUNCTION__);
+ }
+ else
+ {
+ Log(LOGERROR, "%s: accept failed", __FUNCTION__);
+ }
+ }
+ server->state = STREAMING_STOPPED;
+ return 0;
+}
+
+STREAMING_SERVER *
+startStreaming(const char *address, int port)
+{
+ struct sockaddr_in addr;
+ int sockfd, tmp;
+ STREAMING_SERVER *server;
+
+ sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (sockfd == -1)
+ {
+ Log(LOGERROR, "%s, couldn't create socket", __FUNCTION__);
+ return 0;
+ }
+
+ tmp = 1;
+ setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
+ (char *) &tmp, sizeof(tmp) );
+
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = inet_addr(address); //htonl(INADDR_ANY);
+ addr.sin_port = htons(port);
+
+ if (bind(sockfd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) ==
+ -1)
+ {
+ Log(LOGERROR, "%s, TCP bind failed for port number: %d", __FUNCTION__,
+ port);
+ return 0;
+ }
+
+ if (listen(sockfd, 10) == -1)
+ {
+ Log(LOGERROR, "%s, listen failed", __FUNCTION__);
+ close(sockfd);
+ return 0;
+ }
+
+ server = (STREAMING_SERVER *) calloc(1, sizeof(STREAMING_SERVER));
+ server->socket = sockfd;
+
+ ThreadCreate((void *(*)(void *)) serverThread, server);
+
+ return server;
+}
+
+void
+stopStreaming(STREAMING_SERVER * server)
+{
+ assert(server);
+
+ if (server->state != STREAMING_STOPPED)
+ {
+ if (server->state == STREAMING_IN_PROGRESS)
+ {
+ server->state = STREAMING_STOPPING;
+
+ // wait for streaming threads to exit
+ while (server->state != STREAMING_STOPPED)
+ usleep(1 * 1000);
+ }
+
+ if (close(server->socket))
+ Log(LOGERROR, "%s: Failed to close listening socket, error %d",
+ GetSockError());
+
+ server->state = STREAMING_STOPPED;
+ }
+}
+
+
+void
+sigIntHandler(int sig)
+{
+ RTMP_ctrlC = true;
+ LogPrintf("Caught signal: %d, cleaning up, just a second...\n", sig);
+ if (rtmpServer)
+ stopStreaming(rtmpServer);
+ signal(SIGINT, SIG_DFL);
+}
+
+int
+main(int argc, char **argv)
+{
+ int nStatus = RD_SUCCESS;
+
+ // rtmp streaming server
+ char DEFAULT_RTMP_STREAMING_DEVICE[] = "0.0.0.0"; // 0.0.0.0 is any device
+
+ char *rtmpStreamingDevice = DEFAULT_RTMP_STREAMING_DEVICE; // streaming device, default 0.0.0.0
+ int nRtmpStreamingPort = 1935; // port
+
+ LogPrintf("RTMP Proxy Server %s\n", RTMPDUMP_PROXY_VERSION);
+ LogPrintf("(c) 2009 Andrej Stepanchuk, Howard Chu; license: GPL\n\n");
+
+ debuglevel = LOGALL;
+
+ // init request
+ memset(&defaultRTMPRequest, 0, sizeof(RTMP_REQUEST));
+
+ defaultRTMPRequest.rtmpport = -1;
+ defaultRTMPRequest.protocol = RTMP_PROTOCOL_UNDEFINED;
+ defaultRTMPRequest.bLiveStream = false; // is it a live stream? then we can't seek/resume
+
+ defaultRTMPRequest.timeout = 300; // timeout connection afte 300 seconds
+ defaultRTMPRequest.bufferTime = 20 * 1000;
+
+ signal(SIGINT, sigIntHandler);
+ signal(SIGPIPE, SIG_IGN);
+
+#ifdef _DEBUG
+ netstackdump = fopen("netstackdump", "wb");
+ netstackdump_read = fopen("netstackdump_read", "wb");
+#endif
+
+ InitSockets();
+
+ InitCurl();
+
+ // start text UI
+ ThreadCreate(controlServerThread, 0);
+
+ // start http streaming
+ if ((rtmpServer =
+ startStreaming(rtmpStreamingDevice, nRtmpStreamingPort)) == 0)
+ {
+ Log(LOGERROR, "Failed to start RTMP server, exiting!");
+ return RD_FAILED;
+ }
+ LogPrintf("Streaming on rtmp://%s:%d\n", rtmpStreamingDevice,
+ nRtmpStreamingPort);
+
+ while (rtmpServer->state != STREAMING_STOPPED)
+ {
+ sleep(1);
+ }
+ Log(LOGDEBUG, "Done, exiting...");
+
+ FreeCurl();
+
+ CleanupSockets();
+
+#ifdef _DEBUG
+ if (netstackdump != 0)
+ fclose(netstackdump);
+ if (netstackdump_read != 0)
+ fclose(netstackdump_read);
+#endif
+ return nStatus;
+}
More information about the rtmpdump
mailing list