diff --git a/trunk/librtmp/rtmp.c b/trunk/librtmp/rtmp.c index a9ee1cb..5292dec 100644 --- a/trunk/librtmp/rtmp.c +++ b/trunk/librtmp/rtmp.c @@ -49,6 +49,8 @@ TLS_CTX RTMP_TLS_ctx; static const int packetSize[] = { 12, 8, 4, 1 }; int RTMP_ctrlC; +static int default_interrupt_cb(void); +RTMPInterruptCB *rtmp_interrupt_cb = default_interrupt_cb; const char RTMPProtocolStrings[][7] = { "RTMP", @@ -146,6 +148,21 @@ RTMP_UserInterrupt() RTMP_ctrlC = TRUE; } +int +default_interrupt_cb(void) +{ + return RTMP_ctrlC; +} + +void +RTMP_SetInterruptCB(RTMPInterruptCB *cb) +{ + if(cb) + rtmp_interrupt_cb = cb; + else + rtmp_interrupt_cb = default_interrupt_cb; +} + void RTMPPacket_Reset(RTMPPacket *p) { @@ -775,6 +792,9 @@ int RTMP_Connect0(RTMP *r, struct sockaddr * service) { int on = 1; + int fd_max, ret; + fd_set wfds; + struct timeval tv; r->m_sb.sb_timedout = FALSE; r->m_pausing = 0; r->m_fDuration = 0.0; @@ -782,13 +802,51 @@ RTMP_Connect0(RTMP *r, struct sockaddr * service) r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (r->m_sb.sb_socket != -1) { - if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr)) < 0) + socket_nonblock(r->m_sb.sb_socket, 1); + while ((ret = connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr))) < 0) { - int err = GetSockError(); - RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket. %d (%s)", - __FUNCTION__, err, strerror(err)); - RTMP_Close(r); - return FALSE; + int err = GetSockError(); + if (err == EINTR) + continue; + if (err == EINPROGRESS || + err == EAGAIN) + { + /* wait until we are connected or until abort */ + for(;;) + { + if (!rtmp_interrupt_cb()) + { + fd_max = r->m_sb.sb_socket; + FD_ZERO(&wfds); + FD_SET(r->m_sb.sb_socket, &wfds); + tv.tv_sec = 0; + tv.tv_usec = 100 * 1000; + ret = select(fd_max + 1, NULL, &wfds, NULL, &tv); + if (ret > 0 && FD_ISSET(r->m_sb.sb_socket, &wfds)) + break; + } + else + { + ret = -1; + break; + } + } + if (ret) + { + /* test error */ + socklen_t optlen = sizeof(ret); + getsockopt (r->m_sb.sb_socket, SOL_SOCKET, SO_ERROR, &ret, &optlen); + } + } + if (ret != 0) + { + int err = GetSockError(); + RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket. %d (%s)", + __FUNCTION__, err, strerror(err)); + RTMP_Close(r); + return FALSE; + } + break; } if (r->Link.socksport) @@ -3463,7 +3521,36 @@ RTMPSockBuf_Fill(RTMPSockBuf *sb) else #endif { - nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0); + char *buf = sb->sb_start + sb->sb_size; + int fd_max, ret; + fd_set rfds; + struct timeval tv; + for (;;) + { + if (rtmp_interrupt_cb()) + { + sb->sb_timedout = TRUE; + return -1; + } + fd_max = sb->sb_socket; + FD_ZERO(&rfds); + FD_SET(sb->sb_socket, &rfds); + tv.tv_sec = 0; + tv.tv_usec = 100 * 1000; + ret = select(fd_max + 1, &rfds, NULL, NULL, &tv); + if (ret > 0 && FD_ISSET(sb->sb_socket, &rfds)) + { + nBytes = recv(sb->sb_socket, buf, nBytes, 0); + break; + } + else if (ret < 0) + { + if (GetSockError() == EINTR) + continue; + nBytes = -1; + break; + } + } } if (nBytes != -1) { @@ -3493,6 +3580,9 @@ int RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len) { int rc; + int ret, size1, fd_max; + fd_set wfds; + struct timeval tv; #ifdef _DEBUG fwrite(buf, 1, len, netstackdump); @@ -3506,7 +3596,42 @@ RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len) else #endif { - rc = send(sb->sb_socket, buf, len, 0); + size1 = len; + while (len > 0) + { + if (rtmp_interrupt_cb()) + { + SetSockError(ETIMEDOUT); + return -1; + } + fd_max = sb->sb_socket; + FD_ZERO(&wfds); + FD_SET(sb->sb_socket, &wfds); + tv.tv_sec = 0; + tv.tv_usec = 100 * 1000; + ret = select(fd_max + 1, NULL, &wfds, NULL, &tv); + if (ret > 0 && FD_ISSET(sb->sb_socket, &wfds)) + { + rc = send(sb->sb_socket, buf, len, 0); + if (rc < 0) + { + int sockerr = GetSockError(); + if (sockerr != EINTR && + sockerr != EAGAIN) + return sockerr; + continue; + } + len -= rc; + buf += rc; + } + else if (ret < 0) + { + if (GetSockError() == EINTR) + continue; + return -1; + } + } + rc = size1 - len; } return rc; } diff --git a/trunk/librtmp/rtmp.h b/trunk/librtmp/rtmp.h index 12fc4e7..3ea82b6 100644 --- a/trunk/librtmp/rtmp.h +++ b/trunk/librtmp/rtmp.h @@ -69,6 +69,9 @@ extern "C" extern const AVal RTMP_DefaultFlashVer; extern int RTMP_ctrlC; + typedef int RTMPInterruptCB(void); + extern RTMPInterruptCB *rtmp_interrupt_cb; + uint32_t RTMP_GetTime(void); #define RTMP_PACKET_TYPE_AUDIO 0x08 @@ -308,6 +311,7 @@ extern "C" int RTMP_LibVersion(void); void RTMP_UserInterrupt(void); /* user typed Ctrl-C */ + void RTMP_SetInerruptCB(RTMPInterruptCB *interrupt_cb); int RTMP_SendCtrl(RTMP *r, short nType, unsigned int nObject, unsigned int nTime); diff --git a/trunk/librtmp/rtmp_sys.h b/trunk/librtmp/rtmp_sys.h index 0874cbe..aad3ab5 100644 --- a/trunk/librtmp/rtmp_sys.h +++ b/trunk/librtmp/rtmp_sys.h @@ -44,6 +44,7 @@ #define sleep(n) Sleep(n*1000) #define msleep(n) Sleep(n) #define SET_RCVTIMEO(tv,s) int tv = s*1000 +#define socket_nonblock(s,e) ioctlsocket(s, FIONBIO, &e) #else /* !_WIN32 */ #include #include @@ -53,12 +54,14 @@ #include #include #include +#include #define GetSockError() errno #define SetSockError(e) errno = e #undef closesocket #define closesocket(s) close(s) #define msleep(n) usleep(n*1000) #define SET_RCVTIMEO(tv,s) struct timeval tv = {s,0} +#define socket_nonblock(s,e) e?fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK):fcntl(s, F_SETFL, fcntl(s, F_GETFL) & ~O_NONBLOCK) #endif #include "rtmp.h"