[rtmpdump] r200 - trunk/rtmpsuck.c

hyc subversion at mplayerhq.hu
Wed Jan 6 11:29:48 CET 2010


Author: hyc
Date: Wed Jan  6 11:29:47 2010
New Revision: 200

Log:
Relay data in chunks instead of full packets, to reduce latency. Support
multiple outstanding Play requests.

Modified:
   trunk/rtmpsuck.c

Modified: trunk/rtmpsuck.c
==============================================================================
--- trunk/rtmpsuck.c	Wed Jan  6 11:23:59 2010	(r199)
+++ trunk/rtmpsuck.c	Wed Jan  6 11:29:47 2010	(r200)
@@ -70,6 +70,13 @@ enum
   STREAMING_STOPPED
 };
 
+typedef struct Flist
+{
+  struct Flist *f_next;
+  FILE *f_file;
+  AVal f_path;
+} Flist;
+
 typedef struct Plist
 {
   struct Plist *p_next;
@@ -85,7 +92,8 @@ typedef struct
   RTMP rc;
   Plist *rs_pkt[2];	/* head, tail */
   Plist *rc_pkt[2];	/* head, tail */
-  FILE *out;
+  Flist *f_head, *f_tail;
+  Flist *f_cur;
 
 } STREAMING_SERVER;
 
@@ -129,9 +137,9 @@ SAVC(fmsVer);
 SAVC(mode);
 SAVC(level);
 SAVC(code);
-SAVC(description);
 SAVC(secureToken);
 SAVC(onStatus);
+SAVC(details);
 SAVC(close);
 static const AVal av_NetStream_Failed = AVC("NetStream.Failed");
 static const AVal av_NetStream_Play_Failed = AVC("NetStream.Play.Failed");
@@ -190,13 +198,7 @@ ServeInvoke(STREAMING_SERVER *server, in
           if (cobj.o_props[i].p_type == AMF_STRING)
             {
               pval = cobj.o_props[i].p_vu.p_aval;
-              if (pval.av_val)
-              {
-                pval.av_val = malloc(pval.av_len+1);
-                memcpy(pval.av_val, cobj.o_props[i].p_vu.p_aval.av_val, pval.av_len);
-                pval.av_val[pval.av_len] = '\0';
-              }
-              LogPrintf("%.*s: %s\n", pname.av_len, pname.av_val, pval.av_val);
+              LogPrintf("%.*s: %.*s\n", pname.av_len, pname.av_val, pval.av_len, pval.av_val);
             }
           if (AVMATCH(&pname, &av_app))
             {
@@ -289,12 +291,7 @@ ServeInvoke(STREAMING_SERVER *server, in
           server->rc.Link.authflag = AMFProp_GetBoolean(&obj.o_props[3]);
           if (obj.o_num > 4)
           {
-            AVal tmp;
-            AMFProp_GetString(&obj.o_props[4], &tmp);
-            server->rc.Link.auth.av_len = tmp.av_len;
-            server->rc.Link.auth.av_val = malloc(tmp.av_len+1);
-            memcpy(server->rc.Link.auth.av_val, tmp.av_val, tmp.av_len);
-            server->rc.Link.auth.av_val[tmp.av_len] = '\0';
+            AMFProp_GetString(&obj.o_props[4], &server->rc.Link.auth);
           }
         }
 
@@ -307,6 +304,7 @@ ServeInvoke(STREAMING_SERVER *server, in
   else if (AVMATCH(&method, &av_play))
     {
       AVal av;
+      FILE *out;
       char *file, *p, *q;
       char flvHeader[] = { 'F', 'L', 'V', 0x01,
          0x05,                       // video + audio, we finalize later if the value is different
@@ -317,7 +315,8 @@ ServeInvoke(STREAMING_SERVER *server, in
       server->rc.m_stream_id = pack->m_nInfoField2;
       AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &av);
       server->rc.Link.playpath = av;
-      q = strchr(av.av_val, '?');
+
+      q = memchr(av.av_val, '?', av.av_len);
       if (q)
         av.av_len = q - av.av_val;
       for (p=av.av_val+av.av_len-1; p>=av.av_val; p--)
@@ -337,20 +336,38 @@ ServeInvoke(STREAMING_SERVER *server, in
       LogPrintf("Playpath: %.*s\nSaving as: %s\n",
         server->rc.Link.playpath.av_len, server->rc.Link.playpath.av_val,
         file);
-      server->out = fopen(file, "wb");
-      if (!server->out)
+      out = fopen(file, "wb");
+      free(file);
+      if (!out)
         ret = 1;
       else
-        fwrite(flvHeader, 1, sizeof(flvHeader), server->out);
-      free(file);
+        {
+          Flist *fl;
+          fwrite(flvHeader, 1, sizeof(flvHeader), out);
+          av = server->rc.Link.playpath;
+          fl = malloc(sizeof(Flist)+av.av_len+1);
+          fl->f_file = out;
+          fl->f_path.av_len = av.av_len;
+          fl->f_path.av_val = (char *)(fl+1);
+          memcpy(fl->f_path.av_val, av.av_val, av.av_len);
+          fl->f_path.av_val[av.av_len] = '\0';
+          fl->f_next = NULL;
+          if (server->f_tail)
+            server->f_tail->f_next = fl;
+          else
+            server->f_head = fl;
+          server->f_tail = fl;
+          server->f_cur = fl;
+        }
     }
   else if (AVMATCH(&method, &av_onStatus))
     {
       AMFObject obj2;
-      AVal code, level;
+      AVal code, level, details;
       AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
       AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
       AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
+      AMFProp_GetString(AMF_GetProp(&obj2, &av_details, -1), &details);
 
       Log(LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
       if (AVMATCH(&code, &av_NetStream_Failed)
@@ -363,6 +380,15 @@ ServeInvoke(STREAMING_SERVER *server, in
 
       if (AVMATCH(&code, &av_NetStream_Play_Start))
 	{
+          Flist *fl;
+          /* If multiple streams were queued up, find the one
+             to make current. */
+          for (fl = server->f_head; fl; fl=fl->f_next)
+            if (AVMATCH(&fl->f_path, &details) && fl->f_file)
+              {
+                server->f_cur = fl;
+                break;
+              }
 	  server->rc.m_bPlaying = true;
 	}
 
@@ -370,6 +396,18 @@ ServeInvoke(STREAMING_SERVER *server, in
       if (AVMATCH(&code, &av_NetStream_Play_Complete)
 	  || AVMATCH(&code, &av_NetStream_Play_Stop))
 	{
+          Flist **fl;
+          /* Remove this file from the play queue */
+          for (fl = &server->f_head; *fl; fl = &(*fl)->f_next)
+            if (*fl == server->f_cur)
+              {
+                Flist *f = *fl;
+                *fl = f->f_next;
+                f->f_file = NULL;
+                free(f);
+                server->f_cur = NULL;
+                break;
+              }
 	  ret = 1;
 	}
     }
@@ -643,6 +681,7 @@ controlServerThread(void *unused)
 	case 'q':
 	  LogPrintf("Exiting\n");
 	  stopStreaming(rtmpServer);
+          free(rtmpServer);
 	  exit(0);
 	  break;
 	default:
@@ -657,7 +696,7 @@ void doServe(STREAMING_SERVER * server,	
   )
 {
   RTMPPacket pc = { 0 }, ps = { 0 };
-  char *buf;
+  char *buf, hbuf[RTMP_MAX_HEADER_SIZE];
   unsigned int buflen = 131072;
   bool paused = false;
 
@@ -703,6 +742,8 @@ void doServe(STREAMING_SERVER * server,	
         break;
     }
 
+  pc.m_header = hbuf;
+
   /* We have our own timeout in select() */
   server->rc.Link.timeout = 10;
   server->rs.Link.timeout = 10;
@@ -729,12 +770,12 @@ void doServe(STREAMING_SERVER * server,	
 	    FD_SET(server->rc.m_socket, &rfds);
 
           /* give more time to start up if we're not playing yet */
-	  tv.tv_sec = server->out ? 30 : 60;
+	  tv.tv_sec = server->f_cur ? 30 : 60;
 	  tv.tv_usec = 0;
 
 	  if (select(n + 1, &rfds, NULL, NULL, &tv) <= 0)
 	    {
-              if (server->out && server->rc.m_mediaChannel && !paused)
+              if (server->f_cur && server->rc.m_mediaChannel && !paused)
                 {
                   server->rc.m_pauseStamp = server->rc.m_channelTimestamp[server->rc.m_mediaChannel];
                   if (RTMP_ToggleStream(&server->rc))
@@ -795,10 +836,11 @@ void doServe(STREAMING_SERVER * server,	
                       }
                   }
                 else if (ps.m_packetType == 0x11 || ps.m_packetType == 0x14)
-                  if (ServePacket(server, 0, &ps) && server->out)
+                  if (ServePacket(server, 0, &ps) && server->f_cur)
                     {
-                      fclose(server->out);
-                      server->out = NULL;
+                      fclose(server->f_cur->f_file);
+                      server->f_cur->f_file = NULL;
+                      server->f_cur = NULL;
                     }
                 RTMP_SendPacket(&server->rc, &ps, false);
                 RTMPPacket_Free(&ps);
@@ -807,98 +849,107 @@ void doServe(STREAMING_SERVER * server,	
         }
       if (cr)
         {
+          int n = pc.m_nBytesRead;
           while (RTMP_ReadPacket(&server->rc, &pc))
-            if (RTMPPacket_IsReady(&pc))
-              {
-                int sendit = 1;
-                if (paused)
-                  {
-                    if (pc.m_nTimeStamp <= server->rc.m_mediaStamp)
-                      continue;
-                    paused = 0;
-                    server->rc.m_pausing = 0;
-                  }
-                /* change chunk size */
-                if (pc.m_packetType == 0x01)
-                  {
-                    if (pc.m_nBodySize >= 4)
-                      {
-                        server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body);
-                        Log(LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__,
-                            server->rc.m_inChunkSize);
-                        server->rs.m_outChunkSize = server->rc.m_inChunkSize;
-                      }
-                  }
-                else if (pc.m_packetType == 0x04)
-                  {
-                    short nType = AMF_DecodeInt16(pc.m_body);
-                    /* SWFverification */
-                    if (nType == 0x1a)
-#ifdef CRYPTO
-                      if (server->rc.Link.SWFHash.av_len)
-                      {
-                        RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
-                        sendit = 0;
-                      }
-#else
-                      /* The session will certainly fail right after this */
-                      Log(LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__);
-#endif
-                  }
-                else if (server->out && (
-                     pc.m_packetType == 0x08 ||
-                     pc.m_packetType == 0x09 ||
-                     pc.m_packetType == 0x12 ||
-                     pc.m_packetType == 0x16) &&
-                     RTMP_ClientPacket(&server->rc, &pc))
-                  {
-                    int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
-                    if (len > 0 && fwrite(buf, 1, len, server->out) != len)
-                      goto cleanup;
-                  }
-                else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14)
-                  {
-                    if (ServePacket(server, 1, &pc) && server->out)
-                      {
-                        fclose(server->out);
-                        server->out = NULL;
-                      }
-                  }
-
-                if (sendit && RTMP_IsConnected(&server->rs))
-                  RTMP_SendPacket(&server->rs, &pc, false);
-                RTMPPacket_Free(&pc);
-                break;
-              }
+            {
+              int sendit = 1;
+              if (RTMPPacket_IsReady(&pc))
+                {
+                  if (paused)
+                    {
+                      if (pc.m_nTimeStamp <= server->rc.m_mediaStamp)
+                        continue;
+                      paused = 0;
+                      server->rc.m_pausing = 0;
+                    }
+                  /* change chunk size */
+                  if (pc.m_packetType == 0x01)
+                    {
+                      if (pc.m_nBodySize >= 4)
+                        {
+                          server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body);
+                          Log(LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__,
+                              server->rc.m_inChunkSize);
+                          server->rs.m_outChunkSize = server->rc.m_inChunkSize;
+                        }
+                    }
+                  else if (pc.m_packetType == 0x04)
+                    {
+                      short nType = AMF_DecodeInt16(pc.m_body);
+                      /* SWFverification */
+                      if (nType == 0x1a)
+  #ifdef CRYPTO
+                        if (server->rc.Link.SWFHash.av_len)
+                        {
+                          RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
+                          sendit = 0;
+                        }
+  #else
+                        /* The session will certainly fail right after this */
+                        Log(LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__);
+  #endif
+                    }
+                  else if (server->f_cur && (
+                       pc.m_packetType == 0x08 ||
+                       pc.m_packetType == 0x09 ||
+                       pc.m_packetType == 0x12 ||
+                       pc.m_packetType == 0x16) &&
+                       RTMP_ClientPacket(&server->rc, &pc))
+                    {
+                      int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
+                      if (len > 0 && fwrite(buf, 1, len, server->f_cur->f_file) != len)
+                        goto cleanup;
+                    }
+                  else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14)
+                    {
+                      if (ServePacket(server, 1, &pc) && server->f_cur)
+                        {
+                          fclose(server->f_cur->f_file);
+                          server->f_cur->f_file = NULL;
+                          server->f_cur = NULL;
+                        }
+                    }
+                }
+              if (sendit && RTMP_IsConnected(&server->rs))
+                RTMP_SendChunk(&server->rs, &pc, n);
+              if (RTMPPacket_IsReady(&pc))
+                {
+                  RTMPPacket_Free(&pc);
+                  pc.m_nBytesRead = 0;
+                }
+              break;
+            }
         }
       if (!RTMP_IsConnected(&server->rs) && RTMP_IsConnected(&server->rc)
-        && !server->out)
+        && !server->f_cur)
         RTMP_Close(&server->rc);
     }
 
 cleanup:
   LogPrintf("Closing connection... ");
   RTMP_Close(&server->rs);
-  if (server->out)
+  while (server->f_head)
     {
-      fclose(server->out);
-      server->out = NULL;
+      Flist *fl = server->f_head;
+      server->f_head = fl->f_next;
+      if (fl->f_file)
+        fclose(fl->f_file);
+      free(fl);
     }
+  free(buf);
   /* Should probably be done by RTMP_Close() ... */
   free((void *)server->rc.Link.hostname);
   server->rc.Link.hostname = NULL;
-  free(server->rc.Link.tcUrl.av_val);
   server->rc.Link.tcUrl.av_val = NULL;
-  free(server->rc.Link.swfUrl.av_val);
   server->rc.Link.swfUrl.av_val = NULL;
-  free(server->rc.Link.pageUrl.av_val);
   server->rc.Link.pageUrl.av_val = NULL;
-  free(server->rc.Link.app.av_val);
   server->rc.Link.app.av_val = NULL;
-  free(server->rc.Link.auth.av_val);
   server->rc.Link.auth.av_val = NULL;
-  free(server->rc.Link.flashVer.av_val);
   server->rc.Link.flashVer.av_val = NULL;
+#ifdef CRYPTO
+  free(server->rc.Link.SWFHash.av_val);
+  server->rc.Link.SWFHash.av_val = NULL;
+#endif
   LogPrintf("done!\n\n");
 
 quit:
@@ -1044,7 +1095,7 @@ main(int argc, char **argv)
   LogPrintf("RTMP Proxy Server %s\n", RTMPDUMP_VERSION);
   LogPrintf("(c) 2010 Andrej Stepanchuk, Howard Chu; license: GPL\n\n");
 
-  debuglevel = LOGINFO;
+  debuglevel = LOGDEBUG;
 
   if (argc > 1 && !strcmp(argv[1], "-z"))
     debuglevel = LOGALL;
@@ -1080,6 +1131,8 @@ main(int argc, char **argv)
     }
   Log(LOGDEBUG, "Done, exiting...");
 
+  free(rtmpServer);
+
   CleanupSockets();
 
 #ifdef _DEBUG


More information about the rtmpdump mailing list