diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-02-17 16:40:01 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-02-17 16:40:01 +0000 |
commit | 85b846e8066528c3c43374e4f98316a3eae7ed07 (patch) | |
tree | 9cec48d47e7e6269ba3a8cc16c5223c832ee3e52 /cppe/src/TcpTransport/Transceiver.cpp | |
parent | Refactored TAO tests. (diff) | |
download | ice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.bz2 ice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.xz ice-85b846e8066528c3c43374e4f98316a3eae7ed07.zip |
- Changes to use blocking sockets instead of non-blocking sockets.
- Removed timeout in accept() method.
Diffstat (limited to 'cppe/src/TcpTransport/Transceiver.cpp')
-rw-r--r-- | cppe/src/TcpTransport/Transceiver.cpp | 421 |
1 files changed, 200 insertions, 221 deletions
diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp index 65c9bc7da30..465dc4d7c29 100644 --- a/cppe/src/TcpTransport/Transceiver.cpp +++ b/cppe/src/TcpTransport/Transceiver.cpp @@ -23,7 +23,6 @@ using namespace IceInternal; void IceInternal::incRef(Transceiver* p) { p->__incRef(); } void IceInternal::decRef(Transceiver* p) { p->__decRef(); } - SOCKET IceInternal::Transceiver::fd() { @@ -92,6 +91,8 @@ IceInternal::Transceiver::shutdownReadWrite() void IceInternal::Transceiver::write(Buffer& buf, int timeout) { + assert(timeout != 0); + Buffer::Container::difference_type packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); @@ -107,6 +108,16 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) while(buf.i != buf.b.end()) { +#ifndef ICEE_USE_SOCKET_TIMEOUT + if(timeout > 0) + { + doSelect(false, timeout); + } +#else + setTimeout(_fd, false, timeout); +#endif + + repeatSend: assert(_fd != INVALID_SOCKET); ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); @@ -119,128 +130,24 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) if(ret == SOCKET_ERROR) { -#ifdef _WIN32 - repeatError: -#endif if(interrupted()) { - continue; + goto repeatSend; } if(noBuffers() && packetSize > 1024) { packetSize /= 2; - continue; + goto repeatSend; } - + +#ifdef ICEE_USE_SOCKET_TIMEOUT if(wouldBlock()) { -#ifdef _WIN32 - WSAEVENT events[2]; - events[0] = _event; - events[1] = _writeEvent; - long tout = (timeout >= 0) ? timeout : WSA_INFINITE; - DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE); - if(rc == WSA_WAIT_FAILED) - { - // - // This an error from WSAWaitForMultipleEvents - // itself (similar to an error from select). None - // of these errors are recoverable (such as - // EINTR). - // - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - - if(rc == WSA_WAIT_TIMEOUT) - { - assert(timeout >= 0); - throw TimeoutException(__FILE__, __LINE__); - } - - if(rc == WSA_WAIT_EVENT_0) - { - WSANETWORKEVENTS nevents; - if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) - { - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - - // - // If we have consumed a READ event, set the - // _readEvent event. - // - if(nevents.lNetworkEvents & FD_READ) - { - WSASetEvent(_readEvent); - } - - // - // This checks for an error on the fd (this would - // be same as recv itself returning an error). In - // the event of an error we set the error code, - // and repeat the error handling. - // - if(nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]); - goto repeatError; - } - if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); - goto repeatError; - } - } - else - { - // - // Otherwise the _writeEvent is set, reset it. - // - WSAResetEvent(_writeEvent); - } -#else - repeatSelect: - int rs; - assert(_fd != INVALID_SOCKET); - FD_SET(_fd, &_wFdSet); - - if(timeout >= 0) - { - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; - rs = ::select(_fd + 1, 0, &_wFdSet, 0, &tv); - } - else - { - rs = ::select(_fd + 1, 0, &_wFdSet, 0, 0); - } - - if(rs == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeatSelect; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - - if(rs == 0) - { - throw TimeoutException(__FILE__, __LINE__); - } -#endif - continue; + throw TimeoutException(__FILE__, __LINE__); } - +#endif + if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); @@ -273,14 +180,26 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) void IceInternal::Transceiver::read(Buffer& buf, int timeout) { + assert(timeout != 0); + Buffer::Container::difference_type packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { +#ifndef ICEE_USE_SOCKET_TIMEOUT + if(timeout > 0) + { + doSelect(true, timeout); + } +#else + setTimeout(_fd, true, timeout); +#endif + + repeatRead: assert(_fd != INVALID_SOCKET); ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); - + if(ret == 0) { // @@ -301,126 +220,23 @@ IceInternal::Transceiver::read(Buffer& buf, int timeout) if(ret == SOCKET_ERROR) { -#ifdef _WIN32 - repeatError: -#endif if(interrupted()) { - continue; + goto repeatRead; } if(noBuffers() && packetSize > 1024) { packetSize /= 2; - continue; + goto repeatRead; } +#ifdef ICEE_USE_SOCKET_TIMEOUT if(wouldBlock()) { -#ifdef _WIN32 - // - // This code is basically the same as the code in - // ::send above. Check that for detailed comments. - // - WSAEVENT events[2]; - events[0] = _event; - events[1] = _readEvent; - long tout = (timeout >= 0) ? timeout : WSA_INFINITE; - DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE); - if(rc == WSA_WAIT_FAILED) - { - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - if(rc == WSA_WAIT_TIMEOUT) - { - assert(timeout >= 0); - throw TimeoutException(__FILE__, __LINE__); - } - - if(rc == WSA_WAIT_EVENT_0) - { - WSANETWORKEVENTS nevents; - if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) - { - SocketException ex(__FILE__, __LINE__); - ex.error = WSAGetLastError(); - throw ex; - } - - // - // If we have consumed a WRITE event, set the - // _writeEvent event. - // - if(nevents.lNetworkEvents & FD_WRITE) - { - WSASetEvent(_writeEvent); - } - - // - // This checks for an error on the fd (this would - // be same as recv itself returning an error). In - // the event of an error we set the error code, - // and repeat the error handling. - // - if(nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_READ_BIT]); - goto repeatError; - } - if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) - { - WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); - goto repeatError; - } - } - else - { - // - // Otherwise the _readEvent is set, reset it. - // - WSAResetEvent(_readEvent); - } -#else - repeatSelect: - - int rs; - assert(_fd != INVALID_SOCKET); - FD_SET(_fd, &_rFdSet); - - if(timeout >= 0) - { - struct timeval tv; - tv.tv_sec = timeout / 1000; - tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; - rs = ::select(_fd + 1, &_rFdSet, 0, 0, &tv); - } - else - { - rs = ::select(_fd + 1, &_rFdSet, 0, 0, 0); - } - - if(rs == SOCKET_ERROR) - { - if(interrupted()) - { - goto repeatSelect; - } - - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - - if(rs == 0) - { - throw TimeoutException(__FILE__, __LINE__); - } -#endif - continue; + throw TimeoutException(__FILE__, __LINE__); } - +#endif if(connectionLost()) { // @@ -536,3 +352,166 @@ IceInternal::Transceiver::~Transceiver() assert(_writeEvent == 0); #endif } + +void +IceInternal::Transceiver::doSelect(bool read, int timeout) +{ + assert(timeout >= 0); + while(true) + { +#ifdef _WIN32 + // + // This code is basically the same as the code in + // ::send above. Check that for detailed comments. + // + WSAEVENT events[2]; + events[0] = _event; + events[1] = read ? _readEvent : _writeEvent; + DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, timeout, FALSE); + if(rc == WSA_WAIT_FAILED) + { + SocketException ex(__FILE__, __LINE__); + ex.error = WSAGetLastError(); + throw ex; + } + if(rc == WSA_WAIT_TIMEOUT) + { + assert(timeout >= 0); + throw TimeoutException(__FILE__, __LINE__); + } + + if(rc == WSA_WAIT_EVENT_0) + { + WSANETWORKEVENTS nevents; + if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR) + { + SocketException ex(__FILE__, __LINE__); + ex.error = WSAGetLastError(); + throw ex; + } + + // + // If we're selecting for reading and have consumed a WRITE + // event, set the _writeEvent event. Otherwise, if we're + // selecting for writing have consumed a READ event, set the + // _readEvent event. + // + if(read && nevents.lNetworkEvents & FD_WRITE) + { + WSASetEvent(_writeEvent); + } + else if(!read && nevents.lNetworkEvents & FD_READ) + { + WSASetEvent(_readEvent); + } + + + // + // This checks for an error on the fd (this would + // be same as recv itself returning an error). In + // the event of an error we set the error code, + // and repeat the error handling. + // + if(read && nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0) + { + WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); + } + else if(!read && nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0) + { + WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]); + } + else if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0) + { + WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]); + } + else + { + return; // No errors: we're done. + } + + if(interrupted()) + { + continue; + } + + if(connectionLost()) + { + // + // See the commment above about shutting down the + // socket if the connection is lost while reading + // data. + // + //assert(_fd != INVALID_SOCKET); + //shutdownSocketReadWrite(_fd); + + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + } + else + { + // + // Otherwise the _readEvent or _writeEvent is set, reset it. + // + if(read) + { + WSAResetEvent(_readEvent); + } + else + { + WSAResetEvent(_writeEvent); + } + return; + } +#else + int rs; + assert(_fd != INVALID_SOCKET); + if(read) + { + FD_SET(_fd, &_rFdSet); + } + else + { + FD_SET(_fd, &_wFdSet); + } + + if(timeout >= 0) + { + struct timeval tv; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000; + rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, &tv); + } + else + { + rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, 0); + } + + if(rs == SOCKET_ERROR) + { + if(interrupted()) + { + continue; + } + + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + + if(rs == 0) + { + throw TimeoutException(__FILE__, __LINE__); + } + + return; +#endif + } +} |