[rtmpdump] r108 - in trunk: Makefile log.h rtmpsrv.c
hyc
subversion at mplayerhq.hu
Tue Dec 22 07:10:38 CET 2009
Author: hyc
Date: Tue Dec 22 07:10:37 2009
New Revision: 108
Log:
Add dummy RTMP server. Doesn't serve any data, just sucks up
connection parameters from a client.
Added:
trunk/rtmpsrv.c
Modified:
trunk/Makefile
trunk/log.h
Modified: trunk/Makefile
==============================================================================
--- trunk/Makefile Tue Dec 22 05:55:19 2009 (r107)
+++ trunk/Makefile Tue Dec 22 07:10:37 2009 (r108)
@@ -43,9 +43,13 @@ streams: log.o rtmp.o amf.o streams.o pa
rtmpdump: log.o rtmp.o amf.o rtmpdump.o parseurl.o
$(CC) $(LDFLAGS) $^ -o $@$(EXT) $(LIBS)
+rtmpsrv: log.o rtmp.o amf.o rtmpsrv.o
+ $(CC) $(LDFLAGS) $^ -o $@$(EXT) $(SLIBS)
+
log.o: log.c log.h Makefile
parseurl.o: parseurl.c parseurl.h log.h Makefile
streams.o: streams.c rtmp.h log.h Makefile
rtmp.o: rtmp.c rtmp.h handshake.h dh.h log.h amf.h Makefile
amf.o: amf.c amf.h bytes.h log.h Makefile
rtmpdump.o: rtmpdump.c rtmp.h log.h amf.h Makefile
+rtmpsrv.o: rtmpsrv.c rtmp.h log.h amf.h Makefile
Modified: trunk/log.h
==============================================================================
--- trunk/log.h Tue Dec 22 05:55:19 2009 (r107)
+++ trunk/log.h Tue Dec 22 07:10:37 2009 (r108)
@@ -28,7 +28,7 @@
extern "C" {
#endif
/* Enable this to get full debugging output */
-/* #define _DEBUG */
+/* #define _DEBUG */
#define CRYPTO
#ifdef _DEBUG
Added: trunk/rtmpsrv.c
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ trunk/rtmpsrv.c Tue Dec 22 07:10:37 2009 (r108)
@@ -0,0 +1,663 @@
+/* Simple RTMP Server
+ * Copyright (C) 2009 Andrej Stepanchuk
+ * Copyright (C) 2009 Howard Chu
+ *
+ * This Program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This Program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with RTMPDump; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+
+/* This is just a stub for an RTMP server. It doesn't do anything
+ * beyond obtaining the connection parameters from the client.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <math.h>
+
+#include <signal.h>
+#include <getopt.h>
+
+#include <assert.h>
+
+#include "rtmp.h"
+#include "parseurl.h"
+
+#include <pthread.h>
+
+#define RTMPDUMP_SERVER_VERSION "v2.0"
+
+#define RD_SUCCESS 0
+#define RD_FAILED 1
+#define RD_INCOMPLETE 2
+
+#define PACKET_SIZE 1024*1024
+
+enum
+{
+ STREAMING_ACCEPTING,
+ STREAMING_IN_PROGRESS,
+ STREAMING_STOPPING,
+ STREAMING_STOPPED
+};
+
+typedef struct
+{
+ int socket;
+ int state;
+ int streamID;
+
+} STREAMING_SERVER;
+
+STREAMING_SERVER *rtmpServer = 0; // server structure pointer
+
+STREAMING_SERVER *startStreaming(const char *address, int port);
+void stopStreaming(STREAMING_SERVER * server);
+
+typedef struct
+{
+ char *hostname;
+ int rtmpport;
+ int protocol;
+ bool bLiveStream; // is it a live stream? then we can't seek/resume
+
+ long int timeout; // timeout connection afte 300 seconds
+ uint32_t bufferTime;
+
+ char *rtmpurl;
+ AVal playpath;
+ AVal swfUrl;
+ AVal tcUrl;
+ AVal pageUrl;
+ AVal app;
+ AVal auth;
+ AVal swfHash;
+ AVal flashVer;
+ AVal subscribepath;
+ uint32_t swfSize;
+
+ uint32_t dStartOffset;
+ uint32_t dStopOffset;
+ uint32_t nTimeStamp;
+} RTMP_REQUEST;
+
+#define STR2AVAL(av,str) av.av_val = str; av.av_len = strlen(av.av_val)
+
+/* this request is formed from the parameters and used to initialize a new request,
+ * thus it is a default settings list. All settings can be overriden by specifying the
+ * parameters in the GET request. */
+RTMP_REQUEST defaultRTMPRequest;
+
+#ifdef _DEBUG
+uint32_t debugTS = 0;
+
+int pnum = 0;
+
+FILE *netstackdump = NULL;
+FILE *netstackdump_read = NULL;
+#endif
+
+#define SAVC(x) static const AVal av_##x = AVC(#x)
+
+SAVC(app);
+SAVC(connect);
+SAVC(flashVer);
+SAVC(swfUrl);
+SAVC(pageUrl);
+SAVC(tcUrl);
+SAVC(fpad);
+SAVC(capabilities);
+SAVC(audioCodecs);
+SAVC(videoCodecs);
+SAVC(videoFunction);
+SAVC(objectEncoding);
+SAVC(_result);
+SAVC(createStream);
+SAVC(play);
+SAVC(fmsVer);
+SAVC(mode);
+SAVC(level);
+SAVC(code);
+SAVC(description);
+
+static bool
+SendConnectResult(RTMP *r, double txn)
+{
+ RTMPPacket packet;
+ char pbuf[384], *pend = pbuf+sizeof(pbuf);
+ AMFObject obj;
+ AMFObjectProperty p, op;
+ AVal av;
+
+ packet.m_nChannel = 0x03; // control channel (invoke)
+ packet.m_headerType = 1; /* RTMP_PACKET_SIZE_MEDIUM; */
+ packet.m_packetType = 0x14; // INVOKE
+ packet.m_nInfoField1 = 0;
+ packet.m_nInfoField2 = 0;
+ packet.m_hasAbsTimestamp = 0;
+ packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
+
+ char *enc = packet.m_body;
+ enc = AMF_EncodeString(enc, pend, &av__result);
+ enc = AMF_EncodeNumber(enc, pend, txn);
+ *enc++ = AMF_OBJECT;
+
+ STR2AVAL(av, "FMS/3,5,1,525");
+ enc = AMF_EncodeNamedString(enc, pend, &av_fmsVer, &av);
+ enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 31.0);
+ enc = AMF_EncodeNamedNumber(enc, pend, &av_mode, 1.0);
+ *enc++ = 0;
+ *enc++ = 0;
+ *enc++ = AMF_OBJECT_END;
+
+ *enc++ = AMF_OBJECT;
+
+ STR2AVAL(av, "status");
+ enc = AMF_EncodeNamedString(enc, pend, &av_level, &av);
+ STR2AVAL(av, "NetConnection.Connect.Success");
+ enc = AMF_EncodeNamedString(enc, pend, &av_code, &av);
+ STR2AVAL(av, "Connection succeeded.");
+ enc = AMF_EncodeNamedString(enc, pend, &av_description, &av);
+ enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, 0.0);
+ STR2AVAL(p.p_name, "version");
+ STR2AVAL(p.p_vu.p_aval, "3,5,1,525");
+ p.p_type = AMF_STRING;
+ obj.o_num = 1;
+ obj.o_props = &p;
+ op.p_type = AMF_OBJECT;
+ STR2AVAL(op.p_name, "data");
+ op.p_vu.p_object = obj;
+ enc = AMFProp_Encode(&op, enc, pend);
+ *enc++ = 0;
+ *enc++ = 0;
+ *enc++ = AMF_OBJECT_END;
+ *enc++ = 0;
+ *enc++ = 0;
+ *enc++ = AMF_OBJECT_END;
+
+ packet.m_nBodySize = enc - packet.m_body;
+
+ return RTMP_SendPacket(r, &packet, false);
+}
+
+static bool
+SendCreateStreamResult(RTMP *r, double txn, double ID)
+{
+ RTMPPacket packet;
+ char pbuf[256], *pend = pbuf+sizeof(pbuf);
+
+ packet.m_nChannel = 0x03; // control channel (invoke)
+ packet.m_headerType = 1; /* RTMP_PACKET_SIZE_MEDIUM; */
+ packet.m_packetType = 0x14; // INVOKE
+ packet.m_nInfoField1 = 0;
+ packet.m_nInfoField2 = 0;
+ packet.m_hasAbsTimestamp = 0;
+ packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
+
+ char *enc = packet.m_body;
+ enc = AMF_EncodeString(enc, pend, &av__result);
+ enc = AMF_EncodeNumber(enc, pend, txn);
+ *enc++ = AMF_NULL;
+ enc = AMF_EncodeNumber(enc, pend, ID);
+
+ packet.m_nBodySize = enc - packet.m_body;
+
+ return RTMP_SendPacket(r, &packet, false);
+}
+
+// Returns 0 for OK/Failed/error, 1 for 'Stop or Complete'
+int
+ServeInvoke(STREAMING_SERVER *server, RTMP * r, const char *body, unsigned int nBodySize)
+{
+ int ret = 0, nRes;
+ if (body[0] != 0x02) // make sure it is a string method name we start with
+ {
+ Log(LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
+ __FUNCTION__);
+ return 0;
+ }
+
+ AMFObject obj;
+ nRes = AMF_Decode(&obj, body, nBodySize, false);
+ if (nRes < 0)
+ {
+ Log(LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
+ return 0;
+ }
+
+ AMF_Dump(&obj);
+ AVal method;
+ AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
+ double txn = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));
+ Log(LOGDEBUG, "%s, client invoking <%s>", __FUNCTION__, method.av_val);
+
+ if (AVMATCH(&method, &av_connect))
+ {
+ AMFObject cobj;
+ AVal pname;
+ int i;
+ AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &cobj);
+ for (i=0; i<cobj.o_num; i++)
+ {
+ pname = cobj.o_props[i].p_name;
+ if (AVMATCH(&pname, &av_app))
+ {
+ r->Link.app = cobj.o_props[i].p_vu.p_aval;
+ }
+ else if (AVMATCH(&pname, &av_flashVer))
+ {
+ r->Link.flashVer = cobj.o_props[i].p_vu.p_aval;
+ }
+ else if (AVMATCH(&pname, &av_swfUrl))
+ {
+ r->Link.swfUrl = cobj.o_props[i].p_vu.p_aval;
+ }
+ else if (AVMATCH(&pname, &av_tcUrl))
+ {
+ r->Link.tcUrl = cobj.o_props[i].p_vu.p_aval;
+ }
+ else if (AVMATCH(&pname, &av_pageUrl))
+ {
+ r->Link.pageUrl = cobj.o_props[i].p_vu.p_aval;
+ }
+ else if (AVMATCH(&pname, &av_audioCodecs))
+ {
+ r->m_fAudioCodecs = cobj.o_props[i].p_vu.p_number;
+ }
+ else if (AVMATCH(&pname, &av_videoCodecs))
+ {
+ r->m_fVideoCodecs = cobj.o_props[i].p_vu.p_number;
+ }
+ }
+ SendConnectResult(r, txn);
+ }
+ else if (AVMATCH(&method, &av_createStream))
+ {
+ SendCreateStreamResult(r, txn, ++server->streamID);
+ }
+ else if (AVMATCH(&method, &av_play))
+ {
+ AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &r->Link.playpath);
+ r->Link.seekTime = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));
+ if (obj.o_num > 5)
+ r->Link.length = AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 5));
+ ret = 1;
+ }
+ AMF_Reset(&obj);
+ return ret;
+}
+
+int
+ServePacket(STREAMING_SERVER *server, RTMP *r, RTMPPacket *packet)
+{
+ int ret = 0;
+
+ Log(LOGDEBUG, "%s, received packet type %02X, size %lu bytes", __FUNCTION__,
+ packet->m_packetType, packet->m_nBodySize);
+
+ switch (packet->m_packetType)
+ {
+ case 0x01:
+ // chunk size
+// HandleChangeChunkSize(r, packet);
+ break;
+
+ case 0x03:
+ // bytes read report
+ break;
+
+ case 0x04:
+ // ctrl
+// HandleCtrl(r, packet);
+ break;
+
+ case 0x05:
+ // server bw
+// HandleServerBW(r, packet);
+ break;
+
+ case 0x06:
+ // client bw
+ // HandleClientBW(r, packet);
+ break;
+
+ case 0x08:
+ // audio data
+ //Log(LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize);
+ break;
+
+ case 0x09:
+ // video data
+ //Log(LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize);
+ break;
+
+ case 0x0F: // flex stream send
+ break;
+
+ case 0x10: // flex shared object
+ break;
+
+ case 0x11: // flex message
+ {
+ Log(LOGDEBUG, "%s, flex message, size %lu bytes, not fully supported",
+ __FUNCTION__, packet->m_nBodySize);
+ //LogHex(packet.m_body, packet.m_nBodySize);
+
+ // some DEBUG code
+ /*RTMP_LIB_AMFObject obj;
+ int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1);
+ if(nRes < 0) {
+ Log(LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);
+ //return;
+ }
+
+ obj.Dump(); */
+
+ ServeInvoke(server, r, packet->m_body + 1, packet->m_nBodySize - 1);
+ break;
+ }
+ case 0x12:
+ // metadata (notify)
+ break;
+
+ case 0x13:
+ /* shared object */
+ break;
+
+ case 0x14:
+ // invoke
+ Log(LOGDEBUG, "%s, received: invoke %lu bytes", __FUNCTION__,
+ packet->m_nBodySize);
+ //LogHex(packet.m_body, packet.m_nBodySize);
+
+ if (ServeInvoke(server, r, packet->m_body, packet->m_nBodySize))
+ RTMP_Close(r);
+ break;
+
+ case 0x16:
+ /* flv */
+ break;
+ default:
+ Log(LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
+ packet->m_packetType);
+#ifdef _DEBUG
+ LogHex(LOGDEBUG, packet->m_body, packet->m_nBodySize);
+#endif
+ }
+ return ret;
+}
+
+pthread_t
+ThreadCreate(void *(*routine) (void *), void *args)
+{
+ pthread_t id = 0;
+ pthread_attr_t attributes;
+ int ret;
+
+ pthread_attr_init(&attributes);
+ pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED);
+
+ ret =
+ pthread_create(&id, &attributes, (void *(*)(void *)) routine,
+ (void *) args);
+ if (ret != 0)
+ LogPrintf("%s, pthread_create failed with %d\n", __FUNCTION__, ret);
+
+ return id;
+}
+
+void *
+controlServerThread(void *unused)
+{
+ char ich;
+ while (1)
+ {
+ ich = getchar();
+ switch (ich)
+ {
+ case 'q':
+ LogPrintf("Exiting\n");
+ stopStreaming(rtmpServer);
+ exit(0);
+ break;
+ default:
+ LogPrintf("Unknown command \'%c\', ignoring\n", ich);
+ }
+ }
+ return 0;
+}
+
+
+void doServe(STREAMING_SERVER * server, // server socket and state (our listening socket)
+ int sockfd // client connection socket
+ )
+{
+ server->state = STREAMING_IN_PROGRESS;
+
+ RTMP rtmp = { 0 }; /* our session with the real client */
+ RTMPPacket packet = { 0 };
+
+ // timeout for http requests
+ fd_set fds;
+ struct timeval tv;
+
+ memset(&tv, 0, sizeof(struct timeval));
+ tv.tv_sec = 5;
+
+ FD_ZERO(&fds);
+ FD_SET(sockfd, &fds);
+
+ if (select(sockfd + 1, &fds, NULL, NULL, &tv) <= 0)
+ {
+ Log(LOGERROR, "Request timeout/select failed, ignoring request");
+ goto quit;
+ }
+ else
+ {
+ RTMP_Init(&rtmp);
+ rtmp.m_socket = sockfd;
+ if (!RTMP_Serve(&rtmp))
+ {
+ Log(LOGERROR, "Handshake failed");
+ goto cleanup;
+ }
+ }
+ while (RTMP_IsConnected(&rtmp) && RTMP_ReadPacket(&rtmp, &packet))
+ {
+ if (!RTMPPacket_IsReady(&packet))
+ continue;
+ ServePacket(server, &rtmp, &packet);
+ RTMPPacket_Free(&packet);
+ }
+
+cleanup:
+ LogPrintf("Closing connection... ");
+ RTMP_Close(&rtmp);
+ LogPrintf("done!\n\n");
+
+quit:
+ if (server->state == STREAMING_IN_PROGRESS)
+ server->state = STREAMING_ACCEPTING;
+
+ return;
+}
+
+void *
+serverThread(STREAMING_SERVER * server)
+{
+ server->state = STREAMING_ACCEPTING;
+
+ while (server->state == STREAMING_ACCEPTING)
+ {
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(struct sockaddr_in);
+ int sockfd =
+ accept(server->socket, (struct sockaddr *) &addr, &addrlen);
+
+ if (sockfd > 0)
+ {
+ /* Create a new process and transfer the control to that */
+ Log(LOGDEBUG, "%s: accepted connection from %s\n", __FUNCTION__,
+ inet_ntoa(addr.sin_addr));
+ doServe(server, sockfd);
+ Log(LOGDEBUG, "%s: processed request\n", __FUNCTION__);
+ }
+ else
+ {
+ Log(LOGERROR, "%s: accept failed", __FUNCTION__);
+ }
+ }
+ server->state = STREAMING_STOPPED;
+ return 0;
+}
+
+STREAMING_SERVER *
+startStreaming(const char *address, int port)
+{
+ struct sockaddr_in addr;
+ int sockfd;
+ STREAMING_SERVER *server;
+
+ sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (sockfd == -1)
+ {
+ Log(LOGERROR, "%s, couldn't create socket", __FUNCTION__);
+ return 0;
+ }
+
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = inet_addr(address); //htonl(INADDR_ANY);
+ addr.sin_port = htons(port);
+
+ if (bind(sockfd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) ==
+ -1)
+ {
+ Log(LOGERROR, "%s, TCP bind failed for port number: %d", __FUNCTION__,
+ port);
+ return 0;
+ }
+
+ if (listen(sockfd, 10) == -1)
+ {
+ Log(LOGERROR, "%s, listen failed", __FUNCTION__);
+ close(sockfd);
+ return 0;
+ }
+
+ server = (STREAMING_SERVER *) calloc(1, sizeof(STREAMING_SERVER));
+ server->socket = sockfd;
+
+ ThreadCreate((void *(*)(void *)) serverThread, server);
+
+ return server;
+}
+
+void
+stopStreaming(STREAMING_SERVER * server)
+{
+ assert(server);
+
+ if (server->state != STREAMING_STOPPED)
+ {
+ if (server->state == STREAMING_IN_PROGRESS)
+ {
+ server->state = STREAMING_STOPPING;
+
+ // wait for streaming threads to exit
+ while (server->state != STREAMING_STOPPED)
+ usleep(1 * 1000);
+ }
+
+ if (close(server->socket))
+ Log(LOGERROR, "%s: Failed to close listening socket, error %d",
+ GetSockError());
+
+ server->state = STREAMING_STOPPED;
+ }
+}
+
+
+void
+sigIntHandler(int sig)
+{
+ RTMP_ctrlC = true;
+ LogPrintf("Caught signal: %d, cleaning up, just a second...\n", sig);
+ if (rtmpServer)
+ stopStreaming(rtmpServer);
+ signal(SIGINT, SIG_DFL);
+}
+
+int
+main(int argc, char **argv)
+{
+ int nStatus = RD_SUCCESS;
+
+ // http streaming server
+ char DEFAULT_HTTP_STREAMING_DEVICE[] = "0.0.0.0"; // 0.0.0.0 is any device
+
+ char *rtmpStreamingDevice = DEFAULT_HTTP_STREAMING_DEVICE; // streaming device, default 0.0.0.0
+ int nRtmpStreamingPort = 1935; // port
+
+ LogPrintf("RTMP Server %s\n", RTMPDUMP_SERVER_VERSION);
+ LogPrintf("(c) 2009 Andrej Stepanchuk, Howard Chu; license: GPL\n\n");
+
+ debuglevel = LOGALL;
+
+ // init request
+ memset(&defaultRTMPRequest, 0, sizeof(RTMP_REQUEST));
+
+ defaultRTMPRequest.rtmpport = -1;
+ defaultRTMPRequest.protocol = RTMP_PROTOCOL_UNDEFINED;
+ defaultRTMPRequest.bLiveStream = false; // is it a live stream? then we can't seek/resume
+
+ defaultRTMPRequest.timeout = 300; // timeout connection afte 300 seconds
+ defaultRTMPRequest.bufferTime = 20 * 1000;
+
+
+ signal(SIGINT, sigIntHandler);
+ signal(SIGPIPE, SIG_IGN);
+
+#ifdef _DEBUG
+ netstackdump = fopen("netstackdump", "wb");
+ netstackdump_read = fopen("netstackdump_read", "wb");
+#endif
+
+ // start text UI
+ ThreadCreate(controlServerThread, 0);
+
+ // start http streaming
+ if ((rtmpServer =
+ startStreaming(rtmpStreamingDevice, nRtmpStreamingPort)) == 0)
+ {
+ Log(LOGERROR, "Failed to start RTMP server, exiting!");
+ return RD_FAILED;
+ }
+ LogPrintf("Streaming on rtmp://%s:%d\n", rtmpStreamingDevice,
+ nRtmpStreamingPort);
+
+ while (rtmpServer->state != STREAMING_STOPPED)
+ {
+ sleep(1);
+ }
+ Log(LOGDEBUG, "Done, exiting...");
+
+#ifdef _DEBUG
+ if (netstackdump != 0)
+ fclose(netstackdump);
+ if (netstackdump_read != 0)
+ fclose(netstackdump_read);
+#endif
+ return nStatus;
+}
More information about the rtmpdump
mailing list