diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-02-17 16:40:01 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-02-17 16:40:01 +0000 |
commit | 85b846e8066528c3c43374e4f98316a3eae7ed07 (patch) | |
tree | 9cec48d47e7e6269ba3a8cc16c5223c832ee3e52 /cppe/src | |
parent | Refactored TAO tests. (diff) | |
download | ice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.bz2 ice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.xz ice-85b846e8066528c3c43374e4f98316a3eae7ed07.zip |
- Changes to use blocking sockets instead of non-blocking sockets.
- Removed timeout in accept() method.
Diffstat (limited to 'cppe/src')
-rw-r--r-- | cppe/src/IceE/Acceptor.h | 2 | ||||
-rw-r--r-- | cppe/src/IceE/IncomingConnectionFactory.cpp | 2 | ||||
-rw-r--r-- | cppe/src/IceE/Network.cpp | 59 | ||||
-rw-r--r-- | cppe/src/IceE/Network.h | 3 | ||||
-rw-r--r-- | cppe/src/IceE/Transceiver.h | 2 | ||||
-rw-r--r-- | cppe/src/TcpTransport/Acceptor.cpp | 9 | ||||
-rw-r--r-- | cppe/src/TcpTransport/Connector.cpp | 1 | ||||
-rw-r--r-- | cppe/src/TcpTransport/Transceiver.cpp | 421 |
8 files changed, 230 insertions, 269 deletions
diff --git a/cppe/src/IceE/Acceptor.h b/cppe/src/IceE/Acceptor.h index 08f5c874f74..70f18c57020 100644 --- a/cppe/src/IceE/Acceptor.h +++ b/cppe/src/IceE/Acceptor.h @@ -38,7 +38,7 @@ public: SOCKET fd(); void close(); void listen(); - TransceiverPtr accept(int); + TransceiverPtr accept(); void connectToSelf(); std::string toString() const; diff --git a/cppe/src/IceE/IncomingConnectionFactory.cpp b/cppe/src/IceE/IncomingConnectionFactory.cpp index e2e400a37d6..2bc094f03be 100644 --- a/cppe/src/IceE/IncomingConnectionFactory.cpp +++ b/cppe/src/IceE/IncomingConnectionFactory.cpp @@ -318,7 +318,7 @@ IceInternal::IncomingConnectionFactory::run() TransceiverPtr transceiver; try { - transceiver = _acceptor->accept(-1); + transceiver = _acceptor->accept(); } catch(const SocketException&) { diff --git a/cppe/src/IceE/Network.cpp b/cppe/src/IceE/Network.cpp index 1c9b430d1cf..b7448fa4843 100644 --- a/cppe/src/IceE/Network.cpp +++ b/cppe/src/IceE/Network.cpp @@ -334,6 +334,24 @@ IceInternal::setBlock(SOCKET fd, bool block) } } +#ifdef ICEE_USE_SOCKET_TIMEOUT +void +IceInternal::setTimeout(SOCKET fd, bool recv, int timeout) +{ + assert(timeout != 0); + struct timeval tv; + tv.tv_sec = timeout > 0 ? timeout / 1000 : 0; + tv.tv_usec = timeout > 0 ? (timeout - tv.tv_sec * 1000) * 1000 : 0; + if(setsockopt(fd, SOL_SOCKET, recv ? SO_RCVTIMEO : SO_SNDTIMEO, (char*)&tv, (int)sizeof(timeval)) == SOCKET_ERROR) + { + closeSocketNoThrow(fd); + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } +} +#endif + void IceInternal::setTcpNoDelay(SOCKET fd) { @@ -924,7 +942,7 @@ IceInternal::acceptInterrupted() } SOCKET -IceInternal::doAccept(SOCKET fd, int timeout) +IceInternal::doAccept(SOCKET fd) { int ret; @@ -936,45 +954,6 @@ repeatAccept: goto repeatAccept; } - if(wouldBlock()) - { - repeatSelect: - int rs; - fd_set fdSet; - FD_ZERO(&fdSet); - FD_SET(fd, &fdSet); - if(timeout >= 0) - { - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; - rs = ::select(fd + 1, &fdSet, 0, 0, &tv); - } - else - { - rs = ::select(fd + 1, &fdSet, 0, 0, 0); - } - - if(rs == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeatSelect; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - - if(rs == 0) - { - throw TimeoutException(__FILE__, __LINE__); - } - - goto repeatAccept; - } - SocketException ex(__FILE__, __LINE__); ex.error = getSocketErrno(); throw ex; diff --git a/cppe/src/IceE/Network.h b/cppe/src/IceE/Network.h index cc6ab9fd432..7f421e98cd5 100644 --- a/cppe/src/IceE/Network.h +++ b/cppe/src/IceE/Network.h @@ -85,6 +85,7 @@ void shutdownSocketWrite(SOCKET); void shutdownSocketReadWrite(SOCKET); void setBlock(SOCKET, bool); +void setTimeout(SOCKET, bool, int); void setTcpNoDelay(SOCKET); void setKeepAlive(SOCKET); void setSendBufferSize(SOCKET, int); @@ -92,7 +93,7 @@ void setSendBufferSize(SOCKET, int); void doBind(SOCKET, struct sockaddr_in&); void doListen(SOCKET, int); void doConnect(SOCKET, struct sockaddr_in&, int); -SOCKET doAccept(SOCKET, int); +SOCKET doAccept(SOCKET); void getAddress(const std::string&, int, struct sockaddr_in&); std::string getLocalHost(bool); diff --git a/cppe/src/IceE/Transceiver.h b/cppe/src/IceE/Transceiver.h index 34b0f89f079..7e59326e021 100644 --- a/cppe/src/IceE/Transceiver.h +++ b/cppe/src/IceE/Transceiver.h @@ -50,6 +50,8 @@ private: friend class Connector; friend class Acceptor; + void doSelect(bool, int); + const TraceLevelsPtr _traceLevels; const Ice::LoggerPtr _logger; diff --git a/cppe/src/TcpTransport/Acceptor.cpp b/cppe/src/TcpTransport/Acceptor.cpp index 5056cfebab0..eae64c0a4aa 100644 --- a/cppe/src/TcpTransport/Acceptor.cpp +++ b/cppe/src/TcpTransport/Acceptor.cpp @@ -63,10 +63,10 @@ IceInternal::Acceptor::listen() } TransceiverPtr -IceInternal::Acceptor::accept(int timeout) +IceInternal::Acceptor::accept() { - SOCKET fd = doAccept(_fd, timeout); - setBlock(fd, false); + SOCKET fd = doAccept(_fd); + setBlock(fd, true); if(_traceLevels->network >= 1) { @@ -81,7 +81,7 @@ void IceInternal::Acceptor::connectToSelf() { SOCKET fd = createSocket(); - setBlock(fd, false); +// setBlock(fd, false); // No need to use a non blocking socket here. doConnect(fd, _addr, -1); closeSocket(fd); } @@ -120,7 +120,6 @@ IceInternal::Acceptor::Acceptor(const InstancePtr& instance, const string& host, try { _fd = createSocket(); - setBlock(_fd, false); getAddress(host, port, _addr); if(_traceLevels->network >= 2) { diff --git a/cppe/src/TcpTransport/Connector.cpp b/cppe/src/TcpTransport/Connector.cpp index 1750d7fe117..1825301e8fe 100644 --- a/cppe/src/TcpTransport/Connector.cpp +++ b/cppe/src/TcpTransport/Connector.cpp @@ -34,6 +34,7 @@ Connector::connect(int timeout) SOCKET fd = createSocket(); setBlock(fd, false); doConnect(fd, _addr, timeout); + setBlock(fd, true); if(_traceLevels->network >= 1) { diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp index 65c9bc7da30..465dc4d7c29 100644 --- a/cppe/src/TcpTransport/Transceiver.cpp +++ b/cppe/src/TcpTransport/Transceiver.cpp @@ -23,7 +23,6 @@ using namespace IceInternal; void IceInternal::incRef(Transceiver* p) { p->__incRef(); } void IceInternal::decRef(Transceiver* p) { p->__decRef(); } - SOCKET IceInternal::Transceiver::fd() { @@ -92,6 +91,8 @@ IceInternal::Transceiver::shutdownReadWrite() void IceInternal::Transceiver::write(Buffer& buf, int timeout) { + assert(timeout != 0); + Buffer::Container::difference_type packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); @@ -107,6 +108,16 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) while(buf.i != buf.b.end()) { +#ifndef ICEE_USE_SOCKET_TIMEOUT + if(timeout > 0) + { + doSelect(false, timeout); + } +#else + setTimeout(_fd, false, timeout); +#endif + + repeatSend: assert(_fd != INVALID_SOCKET); ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); @@ -119,128 +130,24 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) if(ret == SOCKET_ERROR) { -#ifdef _WIN32 - repeatError: -#endif if(interrupted()) { - continue; + goto repeatSend; } if(noBuffers() && packetSize > 1024) { packetSize /= 2; - continue; + goto repeatSend; } - + +#ifdef ICEE_USE_SOCKET_TIMEOUT if(wouldBlock()) { -#ifdef _WIN32 - WSAEVENT events[2]; - events[0] = _event; - events[1] = _writeEvent; - long tout = (timeout >= 0) ? timeout : WSA_INFINITE; - DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE); - if(rc == WSA_WAIT_FAILED) - { - // - // This an error from WSAWaitForMultipleEvents - // itself (similar to an error from select). None - // of these errors are recoverable (such as - // EINTR). - // - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - - if(rc == WSA_WAIT_TIMEOUT) - { - assert(timeout >= 0); - throw TimeoutException(__FILE__, __LINE__); - } - - if(rc == WSA_WAIT_EVENT_0) - { - WSANETWORKEVENTS nevents; - if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) - { - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - - // - // If we have consumed a READ event, set the - // _readEvent event. - // - if(nevents.lNetworkEvents & FD_READ) - { - WSASetEvent(_readEvent); - } - - // - // This checks for an error on the fd (this would - // be same as recv itself returning an error). In - // the event of an error we set the error code, - // and repeat the error handling. - // - if(nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]); - goto repeatError; - } - if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); - goto repeatError; - } - } - else - { - // - // Otherwise the _writeEvent is set, reset it. - // - WSAResetEvent(_writeEvent); - } -#else - repeatSelect: - int rs; - assert(_fd != INVALID_SOCKET); - FD_SET(_fd, &_wFdSet); - - if(timeout >= 0) - { - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; - rs = ::select(_fd + 1, 0, &_wFdSet, 0, &tv); - } - else - { - rs = ::select(_fd + 1, 0, &_wFdSet, 0, 0); - } - - if(rs == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeatSelect; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - - if(rs == 0) - { - throw TimeoutException(__FILE__, __LINE__); - } -#endif - continue; + throw TimeoutException(__FILE__, __LINE__); } - +#endif + if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); @@ -273,14 +180,26 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) void IceInternal::Transceiver::read(Buffer& buf, int timeout) { + assert(timeout != 0); + Buffer::Container::difference_type packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { +#ifndef ICEE_USE_SOCKET_TIMEOUT + if(timeout > 0) + { + doSelect(true, timeout); + } +#else + setTimeout(_fd, true, timeout); +#endif + + repeatRead: assert(_fd != INVALID_SOCKET); ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); - + if(ret == 0) { // @@ -301,126 +220,23 @@ IceInternal::Transceiver::read(Buffer& buf, int timeout) if(ret == SOCKET_ERROR) { -#ifdef _WIN32 - repeatError: -#endif if(interrupted()) { - continue; + goto repeatRead; } if(noBuffers() && packetSize > 1024) { packetSize /= 2; - continue; + goto repeatRead; } +#ifdef ICEE_USE_SOCKET_TIMEOUT if(wouldBlock()) { -#ifdef _WIN32 - // - // This code is basically the same as the code in - // ::send above. Check that for detailed comments. - // - WSAEVENT events[2]; - events[0] = _event; - events[1] = _readEvent; - long tout = (timeout >= 0) ? timeout : WSA_INFINITE; - DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE); - if(rc == WSA_WAIT_FAILED) - { - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - if(rc == WSA_WAIT_TIMEOUT) - { - assert(timeout >= 0); - throw TimeoutException(__FILE__, __LINE__); - } - - if(rc == WSA_WAIT_EVENT_0) - { - WSANETWORKEVENTS nevents; - if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) - { - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - - // - // If we have consumed a WRITE event, set the - // _writeEvent event. - // - if(nevents.lNetworkEvents & FD_WRITE) - { - WSASetEvent(_writeEvent); - } - - // - // This checks for an error on the fd (this would - // be same as recv itself returning an error). In - // the event of an error we set the error code, - // and repeat the error handling. - // - if(nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_READ_BIT]); - goto repeatError; - } - if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); - goto repeatError; - } - } - else - { - // - // Otherwise the _readEvent is set, reset it. - // - WSAResetEvent(_readEvent); - } -#else - repeatSelect: - - int rs; - assert(_fd != INVALID_SOCKET); - FD_SET(_fd, &_rFdSet); - - if(timeout >= 0) - { - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; - rs = ::select(_fd + 1, &_rFdSet, 0, 0, &tv); - } - else - { - rs = ::select(_fd + 1, &_rFdSet, 0, 0, 0); - } - - if(rs == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeatSelect; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - - if(rs == 0) - { - throw TimeoutException(__FILE__, __LINE__); - } -#endif - continue; + throw TimeoutException(__FILE__, __LINE__); } - +#endif if(connectionLost()) { // @@ -536,3 +352,166 @@ IceInternal::Transceiver::~Transceiver() assert(_writeEvent == 0); #endif } + +void +IceInternal::Transceiver::doSelect(bool read, int timeout) +{ + assert(timeout >= 0); + while(true) + { +#ifdef _WIN32 + // + // This code is basically the same as the code in + // ::send above. Check that for detailed comments. + // + WSAEVENT events[2]; + events[0] = _event; + events[1] = read ? _readEvent : _writeEvent; + DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, timeout, FALSE); + if(rc == WSA_WAIT_FAILED) + { + SocketException ex(__FILE__, __LINE__); + ex.error = WSAGetLastError(); + throw ex; + } + if(rc == WSA_WAIT_TIMEOUT) + { + assert(timeout >= 0); + throw TimeoutException(__FILE__, __LINE__); + } + + if(rc == WSA_WAIT_EVENT_0) + { + WSANETWORKEVENTS nevents; + if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) + { + SocketException ex(__FILE__, __LINE__); + ex.error = WSAGetLastError(); + throw ex; + } + + // + // If we're selecting for reading and have consumed a WRITE + // event, set the _writeEvent event. Otherwise, if we're + // selecting for writing have consumed a READ event, set the + // _readEvent event. + // + if(read && nevents.lNetworkEvents & FD_WRITE) + { + WSASetEvent(_writeEvent); + } + else if(!read && nevents.lNetworkEvents & FD_READ) + { + WSASetEvent(_readEvent); + } + + + // + // This checks for an error on the fd (this would + // be same as recv itself returning an error). In + // the event of an error we set the error code, + // and repeat the error handling. + // + if(read && nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0) + { + WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); + } + else if(!read && nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0) + { + WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]); + } + else if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) + { + WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); + } + else + { + return; // No errors: we're done. + } + + if(interrupted()) + { + continue; + } + + if(connectionLost()) + { + // + // See the commment above about shutting down the + // socket if the connection is lost while reading + // data. + // + //assert(_fd != INVALID_SOCKET); + //shutdownSocketReadWrite(_fd); + + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + } + else + { + // + // Otherwise the _readEvent or _writeEvent is set, reset it. + // + if(read) + { + WSAResetEvent(_readEvent); + } + else + { + WSAResetEvent(_writeEvent); + } + return; + } +#else + int rs; + assert(_fd != INVALID_SOCKET); + if(read) + { + FD_SET(_fd, &_rFdSet); + } + else + { + FD_SET(_fd, &_wFdSet); + } + + if(timeout >= 0) + { + struct timeval tv; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; + rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, &tv); + } + else + { + rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, 0); + } + + if(rs == SOCKET_ERROR) + { + if(interrupted()) + { + continue; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + + if(rs == 0) + { + throw TimeoutException(__FILE__, __LINE__); + } + + return; +#endif + } +} |