diff options
Diffstat (limited to 'cppe')
-rw-r--r-- | cppe/src/IceE/Acceptor.h | 3 | ||||
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 9 | ||||
-rwxr-xr-x | cppe/src/IceE/Connector.h | 3 | ||||
-rw-r--r-- | cppe/src/IceE/Network.cpp | 2 | ||||
-rw-r--r-- | cppe/src/IceE/Transceiver.h | 8 | ||||
-rw-r--r-- | cppe/src/TcpTransport/Acceptor.cpp | 16 | ||||
-rw-r--r-- | cppe/src/TcpTransport/Connector.cpp | 14 | ||||
-rw-r--r-- | cppe/src/TcpTransport/TcpEndpoint.cpp | 4 | ||||
-rw-r--r-- | cppe/src/TcpTransport/Transceiver.cpp | 318 |
9 files changed, 214 insertions, 163 deletions
diff --git a/cppe/src/IceE/Acceptor.h b/cppe/src/IceE/Acceptor.h index 70f18c57020..e42f15effe3 100644 --- a/cppe/src/IceE/Acceptor.h +++ b/cppe/src/IceE/Acceptor.h @@ -47,7 +47,7 @@ public: private: - Acceptor(const InstancePtr&, const std::string&, int); + Acceptor(const InstancePtr&, const std::string&, int, int); virtual ~Acceptor(); friend class TcpEndpoint; @@ -57,6 +57,7 @@ private: SOCKET _fd; int _backlog; struct sockaddr_in _addr; + int _timeout; }; } diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index 33446535389..e32fdbe84b8 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -1861,7 +1861,14 @@ Ice::Connection::run() // BasicStream stream(_instance.get()); - readStream(stream); + try + { + readStream(stream); + } + catch(const Ice::TimeoutException&) + { + continue; + } Int requestId = 0; #ifndef ICEE_PURE_CLIENT diff --git a/cppe/src/IceE/Connector.h b/cppe/src/IceE/Connector.h index 0f291e0e202..7ceb7e8ceb9 100755 --- a/cppe/src/IceE/Connector.h +++ b/cppe/src/IceE/Connector.h @@ -37,7 +37,7 @@ public: private: - Connector(const InstancePtr&, const std::string&, int); + Connector(const InstancePtr&, const std::string&, int, int); virtual ~Connector(); friend class TcpEndpoint; @@ -45,6 +45,7 @@ private: TraceLevelsPtr _traceLevels; ::Ice::LoggerPtr _logger; struct sockaddr_in _addr; + int _timeout; }; } diff --git a/cppe/src/IceE/Network.cpp b/cppe/src/IceE/Network.cpp index 30e38f80adf..5f042514867 100644 --- a/cppe/src/IceE/Network.cpp +++ b/cppe/src/IceE/Network.cpp @@ -334,7 +334,7 @@ IceInternal::setBlock(SOCKET fd, bool block) } } -#ifdef ICEE_USE_SOCKET_TIMEOUT +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS void IceInternal::setTimeout(SOCKET fd, bool recv, int timeout) { diff --git a/cppe/src/IceE/Transceiver.h b/cppe/src/IceE/Transceiver.h index 7e59326e021..fad156219a8 100644 --- a/cppe/src/IceE/Transceiver.h +++ b/cppe/src/IceE/Transceiver.h @@ -45,17 +45,20 @@ public: private: - Transceiver(const InstancePtr&, SOCKET); + Transceiver(const InstancePtr&, SOCKET, int); virtual ~Transceiver(); friend class Connector; friend class Acceptor; +#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS void doSelect(bool, int); +#endif const TraceLevelsPtr _traceLevels; const Ice::LoggerPtr _logger; SOCKET _fd; +#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS #ifdef _WIN32 WSAEVENT _event; WSAEVENT _readEvent; @@ -64,6 +67,9 @@ private: fd_set _wFdSet; fd_set _rFdSet; #endif +#else + const int _timeout; +#endif const std::string _desc; #ifdef _WIN32 diff --git a/cppe/src/TcpTransport/Acceptor.cpp b/cppe/src/TcpTransport/Acceptor.cpp index a2d0f4b365f..dd9a8c027eb 100644 --- a/cppe/src/TcpTransport/Acceptor.cpp +++ b/cppe/src/TcpTransport/Acceptor.cpp @@ -66,12 +66,7 @@ TransceiverPtr IceInternal::Acceptor::accept() { SOCKET fd = doAccept(_fd); -#if !defined(_WIN32) || defined(ICEE_USE_SOCKET_TIMEOUT) - // - // TODO: We can't use blocking sockets on Windows yet because - // the transceiver is using WSAEventSelect (which doesn't play - // well with blocking sockets). - // +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS setBlock(fd, true); #endif @@ -81,14 +76,14 @@ IceInternal::Acceptor::accept() out << "accepted tcp connection\n" << fdToString(fd); } - return new Transceiver(_instance, fd); + return new Transceiver(_instance, fd, _timeout); } void IceInternal::Acceptor::connectToSelf() { SOCKET fd = createSocket(); -// setBlock(fd, false); // No need to use a non blocking socket here. + setBlock(fd, false); doConnect(fd, _addr, -1); closeSocket(fd); } @@ -113,11 +108,12 @@ IceInternal::Acceptor::effectivePort() return ntohs(_addr.sin_port); } -IceInternal::Acceptor::Acceptor(const InstancePtr& instance, const string& host, int port) : +IceInternal::Acceptor::Acceptor(const InstancePtr& instance, const string& host, int port, int timeout) : _instance(instance), _traceLevels(instance->traceLevels()), _logger(instance->logger()), - _backlog(0) + _backlog(0), + _timeout(timeout) { if(_backlog <= 0) { diff --git a/cppe/src/TcpTransport/Connector.cpp b/cppe/src/TcpTransport/Connector.cpp index 33f5876b34c..de2ea96f2d9 100644 --- a/cppe/src/TcpTransport/Connector.cpp +++ b/cppe/src/TcpTransport/Connector.cpp @@ -34,12 +34,7 @@ Connector::connect(int timeout) SOCKET fd = createSocket(); setBlock(fd, false); doConnect(fd, _addr, timeout); -#if !defined(_WIN32) || defined(ICEE_USE_SOCKET_TIMEOUT) - // - // TODO: We can't use blocking sockets on Windows yet because - // the transceiver is using WSAEventSelect (which doesn't play - // well with blocking sockets). - // +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS setBlock(fd, true); #endif @@ -49,7 +44,7 @@ Connector::connect(int timeout) out << "tcp connection established\n" << fdToString(fd); } - return new Transceiver(_instance, fd); + return new Transceiver(_instance, fd, _timeout); } string @@ -58,10 +53,11 @@ Connector::toString() const return addrToString(_addr); } -Connector::Connector(const InstancePtr& instance, const string& host, int port) : +Connector::Connector(const InstancePtr& instance, const string& host, int port, int timeout) : _instance(instance), _traceLevels(instance->traceLevels()), - _logger(instance->logger()) + _logger(instance->logger()), + _timeout(timeout) { getAddress(host, port, _addr); } diff --git a/cppe/src/TcpTransport/TcpEndpoint.cpp b/cppe/src/TcpTransport/TcpEndpoint.cpp index 85f6ef4d556..62bd3b70676 100644 --- a/cppe/src/TcpTransport/TcpEndpoint.cpp +++ b/cppe/src/TcpTransport/TcpEndpoint.cpp @@ -224,7 +224,7 @@ IceInternal::TcpEndpoint::unknown() const ConnectorPtr IceInternal::TcpEndpoint::connector() const { - return new Connector(_instance, _host, _port); + return new Connector(_instance, _host, _port, _timeout); } bool @@ -376,7 +376,7 @@ IceInternal::TcpEndpoint::expand(bool includeLoopback) const AcceptorPtr IceInternal::TcpEndpoint::acceptor(EndpointPtr& endp) const { - Acceptor* p = new Acceptor(_instance, _host, _port); + Acceptor* p = new Acceptor(_instance, _host, _port, _timeout); endp = new TcpEndpoint(_instance, _host, p->effectivePort(), _timeout, _publish); return p; } diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp index ee8a61f4744..3481f20239b 100644 --- a/cppe/src/TcpTransport/Transceiver.cpp +++ b/cppe/src/TcpTransport/Transceiver.cpp @@ -39,7 +39,7 @@ IceInternal::Transceiver::close() out << "closing tcp connection\n" << toString(); } -#ifndef ICEE_USE_SOCKET_TIMEOUT +#ifdef ICEE_USE_SOCKET_TIMEOUT #ifdef _WIN32 assert(_event != 0); WSACloseEvent(_event); @@ -93,8 +93,6 @@ IceInternal::Transceiver::shutdownReadWrite() void IceInternal::Transceiver::write(Buffer& buf, int timeout) { - assert(timeout != 0); - Buffer::Container::difference_type packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); @@ -108,81 +106,99 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout) } #endif - while(buf.i != buf.b.end()) +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + if(timeout > 0 && timeout != _timeout) { -#if defined(ICEE_USE_SOCKET_TIMEOUT) setTimeout(_fd, false, timeout); -#elif !defined(_WIN32) - if(timeout > 0) - { - doSelect(false, timeout); - } -#endif - - repeatSend: - assert(_fd != INVALID_SOCKET); - ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); + } - if(ret == 0) + try + { +#endif + while(buf.i != buf.b.end()) { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; - } + repeatSend: + assert(_fd != INVALID_SOCKET); + ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); - if(ret == SOCKET_ERROR) - { - if(interrupted()) + if(ret == 0) { - goto repeatSend; + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; } - if(noBuffers() && packetSize > 1024) + if(ret == SOCKET_ERROR) { - packetSize /= 2; - goto repeatSend; - } + if(interrupted()) + { + goto repeatSend; + } + + if(noBuffers() && packetSize > 1024) + { + packetSize /= 2; + goto repeatSend; + } -#if defined(ICEE_USE_SOCKET_TIMEOUT) - if(wouldBlock()) - { - throw TimeoutException(__FILE__, __LINE__); - } -#elif defined(_WIN32) - if(wouldBlock()) - { - doSelect(false, timeout); - continue; - } +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + if(wouldBlock()) + { + throw TimeoutException(__FILE__, __LINE__); + } +#else + if(wouldBlock()) + { + doSelect(false, timeout > 0 ? timeout : _timeout); + continue; + } #endif - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + if(connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } } - else + + if(_traceLevels->network >= 3) { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + Trace out(_logger, _traceLevels->networkCat); + out << Ice::printfToString("sent %d of %d", ret, packetSize) << " bytes via tcp\n" << toString(); } - } - if(_traceLevels->network >= 3) - { - Trace out(_logger, _traceLevels->networkCat); - out << Ice::printfToString("sent %d of %d", ret, packetSize) << " bytes via tcp\n" << toString(); - } + buf.i += ret; - buf.i += ret; - - if(packetSize > buf.b.end() - buf.i) + if(packetSize > buf.b.end() - buf.i) + { + packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); + } + } +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + } + catch(const Ice::LocalException&) + { + if(timeout > 0 && timeout != _timeout) { - packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); + try + { + setTimeout(_fd, false, _timeout); + } + catch(const Ice::LocalException&) + { + // IGNORE + } } + throw; } +#endif } void @@ -191,102 +207,119 @@ IceInternal::Transceiver::read(Buffer& buf, int timeout) assert(timeout != 0); Buffer::Container::difference_type packetSize = - static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); - - while(buf.i != buf.b.end()) + static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); + +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + if(timeout > 0 && timeout != _timeout) { -#if defined(ICEE_USE_SOCKET_TIMEOUT) setTimeout(_fd, true, timeout); -#elif !defined(_WIN32) - if(timeout > 0) - { - doSelect(true, timeout); - } + } + try + { #endif - - repeatRead: - assert(_fd != INVALID_SOCKET); - ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); - - if(ret == 0) + while(buf.i != buf.b.end()) { - // - // If the connection is lost when reading data, we shut - // down the write end of the socket. This helps to unblock - // threads that are stuck in send() or select() while - // sending data. Note: I don't really understand why - // send() or select() sometimes don't detect a connection - // loss. Therefore this helper to make them detect it. - // - //assert(_fd != INVALID_SOCKET); - //shutdownSocketReadWrite(_fd); + repeatRead: + assert(_fd != INVALID_SOCKET); + ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; - } - - if(ret == SOCKET_ERROR) - { - if(interrupted()) + if(ret == 0) { - goto repeatRead; + // + // If the connection is lost when reading data, we shut + // down the write end of the socket. This helps to unblock + // threads that are stuck in send() or select() while + // sending data. Note: I don't really understand why + // send() or select() sometimes don't detect a connection + // loss. Therefore this helper to make them detect it. + // + //assert(_fd != INVALID_SOCKET); + //shutdownSocketReadWrite(_fd); + + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; } - if(noBuffers() && packetSize > 1024) + if(ret == SOCKET_ERROR) { - packetSize /= 2; - goto repeatRead; + if(interrupted()) + { + goto repeatRead; + } + + if(noBuffers() && packetSize > 1024) + { + packetSize /= 2; + goto repeatRead; + } + +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + if(wouldBlock()) + { + throw TimeoutException(__FILE__, __LINE__); + } +#else + if(wouldBlock()) + { + doSelect(true, timeout > 0 ? timeout : _timeout); + continue; + } +#endif + + if(connectionLost()) + { + // + // See the commment above about shutting down the + // socket if the connection is lost while reading + // data. + // + //assert(_fd != INVALID_SOCKET); + //shutdownSocketReadWrite(_fd); + + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } } - -#if defined(ICEE_USE_SOCKET_TIMEOUT) - if(wouldBlock()) + + if(_traceLevels->network >= 3) { - throw TimeoutException(__FILE__, __LINE__); + Trace out(_logger, _traceLevels->networkCat); + out << Ice::printfToString("received %d of %d", ret, packetSize) << " bytes via tcp\n" << toString(); } -#elif defined(_WIN32) - if(wouldBlock()) + + buf.i += ret; + + if(packetSize > buf.b.end() - buf.i) { - doSelect(true, timeout); - continue; + packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); } -#endif - - if(connectionLost()) + } +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + } + catch(const Ice::LocalException&) + { + if(timeout > 0 && timeout != _timeout) + { + try { - // - // See the commment above about shutting down the - // socket if the connection is lost while reading - // data. - // - //assert(_fd != INVALID_SOCKET); - //shutdownSocketReadWrite(_fd); - - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + setTimeout(_fd, true, _timeout); } - else + catch(const Ice::LocalException&) { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; + // IGNORE } } - - if(_traceLevels->network >= 3) - { - Trace out(_logger, _traceLevels->networkCat); - out << Ice::printfToString("received %d of %d", ret, packetSize) << " bytes via tcp\n" << toString(); - } - - buf.i += ret; - - if(packetSize > buf.b.end() - buf.i) - { - packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i); - } + throw; } +#endif } string @@ -301,16 +334,25 @@ IceInternal::Transceiver::toString() const return _desc; } -IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd) : +IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd, int timeout) : _traceLevels(instance->traceLevels()), _logger(instance->logger()), _fd(fd), +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + _timeout(timeout), +#endif _desc(fdToString(fd)) #ifdef _WIN32 , _isPeerLocal(isPeerLocal(fd)) #endif { -#ifndef ICEE_USE_SOCKET_TIMEOUT +#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS + if(_timeout > 0) + { + setTimeout(_fd, false, _timeout); + setTimeout(_fd, true, _timeout); + } +#else #ifdef _WIN32 _event = WSACreateEvent(); _readEvent = WSACreateEvent(); @@ -363,7 +405,7 @@ IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd) : IceInternal::Transceiver::~Transceiver() { assert(_fd == INVALID_SOCKET); -#ifndef ICEE_USE_SOCKET_TIMEOUT +#ifdef ICEE_USE_SOCKET_TIMEOUT #ifdef _WIN32 assert(_event == 0); assert(_readEvent == 0); @@ -372,6 +414,7 @@ IceInternal::Transceiver::~Transceiver() #endif } +#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS void IceInternal::Transceiver::doSelect(bool read, int timeout) { @@ -534,3 +577,4 @@ IceInternal::Transceiver::doSelect(bool read, int timeout) #endif } } +#endif |