summaryrefslogtreecommitdiff
path: root/cppe/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-02-20 10:31:57 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-02-20 10:31:57 +0000
commite1d3d16e0fb475d771fbc476a3f1e88c9bcdc10f (patch)
tree03017261521ec7c308e3e092b15a74edcd6ca8e2 /cppe/src
parentporting new proxy methods for connection mgmt (diff)
downloadice-e1d3d16e0fb475d771fbc476a3f1e88c9bcdc10f.tar.bz2
ice-e1d3d16e0fb475d771fbc476a3f1e88c9bcdc10f.tar.xz
ice-e1d3d16e0fb475d771fbc476a3f1e88c9bcdc10f.zip
Use socket timeouts by default.
Diffstat (limited to 'cppe/src')
-rw-r--r--cppe/src/IceE/Acceptor.h3
-rwxr-xr-xcppe/src/IceE/Connection.cpp9
-rwxr-xr-xcppe/src/IceE/Connector.h3
-rw-r--r--cppe/src/IceE/Network.cpp2
-rw-r--r--cppe/src/IceE/Transceiver.h8
-rw-r--r--cppe/src/TcpTransport/Acceptor.cpp16
-rw-r--r--cppe/src/TcpTransport/Connector.cpp14
-rw-r--r--cppe/src/TcpTransport/TcpEndpoint.cpp4
-rw-r--r--cppe/src/TcpTransport/Transceiver.cpp318
9 files changed, 214 insertions, 163 deletions
diff --git a/cppe/src/IceE/Acceptor.h b/cppe/src/IceE/Acceptor.h
index 70f18c57020..e42f15effe3 100644
--- a/cppe/src/IceE/Acceptor.h
+++ b/cppe/src/IceE/Acceptor.h
@@ -47,7 +47,7 @@ public:
private:
- Acceptor(const InstancePtr&, const std::string&, int);
+ Acceptor(const InstancePtr&, const std::string&, int, int);
virtual ~Acceptor();
friend class TcpEndpoint;
@@ -57,6 +57,7 @@ private:
SOCKET _fd;
int _backlog;
struct sockaddr_in _addr;
+ int _timeout;
};
}
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp
index 33446535389..e32fdbe84b8 100755
--- a/cppe/src/IceE/Connection.cpp
+++ b/cppe/src/IceE/Connection.cpp
@@ -1861,7 +1861,14 @@ Ice::Connection::run()
//
BasicStream stream(_instance.get());
- readStream(stream);
+ try
+ {
+ readStream(stream);
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ continue;
+ }
Int requestId = 0;
#ifndef ICEE_PURE_CLIENT
diff --git a/cppe/src/IceE/Connector.h b/cppe/src/IceE/Connector.h
index 0f291e0e202..7ceb7e8ceb9 100755
--- a/cppe/src/IceE/Connector.h
+++ b/cppe/src/IceE/Connector.h
@@ -37,7 +37,7 @@ public:
private:
- Connector(const InstancePtr&, const std::string&, int);
+ Connector(const InstancePtr&, const std::string&, int, int);
virtual ~Connector();
friend class TcpEndpoint;
@@ -45,6 +45,7 @@ private:
TraceLevelsPtr _traceLevels;
::Ice::LoggerPtr _logger;
struct sockaddr_in _addr;
+ int _timeout;
};
}
diff --git a/cppe/src/IceE/Network.cpp b/cppe/src/IceE/Network.cpp
index 30e38f80adf..5f042514867 100644
--- a/cppe/src/IceE/Network.cpp
+++ b/cppe/src/IceE/Network.cpp
@@ -334,7 +334,7 @@ IceInternal::setBlock(SOCKET fd, bool block)
}
}
-#ifdef ICEE_USE_SOCKET_TIMEOUT
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
void
IceInternal::setTimeout(SOCKET fd, bool recv, int timeout)
{
diff --git a/cppe/src/IceE/Transceiver.h b/cppe/src/IceE/Transceiver.h
index 7e59326e021..fad156219a8 100644
--- a/cppe/src/IceE/Transceiver.h
+++ b/cppe/src/IceE/Transceiver.h
@@ -45,17 +45,20 @@ public:
private:
- Transceiver(const InstancePtr&, SOCKET);
+ Transceiver(const InstancePtr&, SOCKET, int);
virtual ~Transceiver();
friend class Connector;
friend class Acceptor;
+#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS
void doSelect(bool, int);
+#endif
const TraceLevelsPtr _traceLevels;
const Ice::LoggerPtr _logger;
SOCKET _fd;
+#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS
#ifdef _WIN32
WSAEVENT _event;
WSAEVENT _readEvent;
@@ -64,6 +67,9 @@ private:
fd_set _wFdSet;
fd_set _rFdSet;
#endif
+#else
+ const int _timeout;
+#endif
const std::string _desc;
#ifdef _WIN32
diff --git a/cppe/src/TcpTransport/Acceptor.cpp b/cppe/src/TcpTransport/Acceptor.cpp
index a2d0f4b365f..dd9a8c027eb 100644
--- a/cppe/src/TcpTransport/Acceptor.cpp
+++ b/cppe/src/TcpTransport/Acceptor.cpp
@@ -66,12 +66,7 @@ TransceiverPtr
IceInternal::Acceptor::accept()
{
SOCKET fd = doAccept(_fd);
-#if !defined(_WIN32) || defined(ICEE_USE_SOCKET_TIMEOUT)
- //
- // TODO: We can't use blocking sockets on Windows yet because
- // the transceiver is using WSAEventSelect (which doesn't play
- // well with blocking sockets).
- //
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
setBlock(fd, true);
#endif
@@ -81,14 +76,14 @@ IceInternal::Acceptor::accept()
out << "accepted tcp connection\n" << fdToString(fd);
}
- return new Transceiver(_instance, fd);
+ return new Transceiver(_instance, fd, _timeout);
}
void
IceInternal::Acceptor::connectToSelf()
{
SOCKET fd = createSocket();
-// setBlock(fd, false); // No need to use a non blocking socket here.
+ setBlock(fd, false);
doConnect(fd, _addr, -1);
closeSocket(fd);
}
@@ -113,11 +108,12 @@ IceInternal::Acceptor::effectivePort()
return ntohs(_addr.sin_port);
}
-IceInternal::Acceptor::Acceptor(const InstancePtr& instance, const string& host, int port) :
+IceInternal::Acceptor::Acceptor(const InstancePtr& instance, const string& host, int port, int timeout) :
_instance(instance),
_traceLevels(instance->traceLevels()),
_logger(instance->logger()),
- _backlog(0)
+ _backlog(0),
+ _timeout(timeout)
{
if(_backlog <= 0)
{
diff --git a/cppe/src/TcpTransport/Connector.cpp b/cppe/src/TcpTransport/Connector.cpp
index 33f5876b34c..de2ea96f2d9 100644
--- a/cppe/src/TcpTransport/Connector.cpp
+++ b/cppe/src/TcpTransport/Connector.cpp
@@ -34,12 +34,7 @@ Connector::connect(int timeout)
SOCKET fd = createSocket();
setBlock(fd, false);
doConnect(fd, _addr, timeout);
-#if !defined(_WIN32) || defined(ICEE_USE_SOCKET_TIMEOUT)
- //
- // TODO: We can't use blocking sockets on Windows yet because
- // the transceiver is using WSAEventSelect (which doesn't play
- // well with blocking sockets).
- //
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
setBlock(fd, true);
#endif
@@ -49,7 +44,7 @@ Connector::connect(int timeout)
out << "tcp connection established\n" << fdToString(fd);
}
- return new Transceiver(_instance, fd);
+ return new Transceiver(_instance, fd, _timeout);
}
string
@@ -58,10 +53,11 @@ Connector::toString() const
return addrToString(_addr);
}
-Connector::Connector(const InstancePtr& instance, const string& host, int port) :
+Connector::Connector(const InstancePtr& instance, const string& host, int port, int timeout) :
_instance(instance),
_traceLevels(instance->traceLevels()),
- _logger(instance->logger())
+ _logger(instance->logger()),
+ _timeout(timeout)
{
getAddress(host, port, _addr);
}
diff --git a/cppe/src/TcpTransport/TcpEndpoint.cpp b/cppe/src/TcpTransport/TcpEndpoint.cpp
index 85f6ef4d556..62bd3b70676 100644
--- a/cppe/src/TcpTransport/TcpEndpoint.cpp
+++ b/cppe/src/TcpTransport/TcpEndpoint.cpp
@@ -224,7 +224,7 @@ IceInternal::TcpEndpoint::unknown() const
ConnectorPtr
IceInternal::TcpEndpoint::connector() const
{
- return new Connector(_instance, _host, _port);
+ return new Connector(_instance, _host, _port, _timeout);
}
bool
@@ -376,7 +376,7 @@ IceInternal::TcpEndpoint::expand(bool includeLoopback) const
AcceptorPtr
IceInternal::TcpEndpoint::acceptor(EndpointPtr& endp) const
{
- Acceptor* p = new Acceptor(_instance, _host, _port);
+ Acceptor* p = new Acceptor(_instance, _host, _port, _timeout);
endp = new TcpEndpoint(_instance, _host, p->effectivePort(), _timeout, _publish);
return p;
}
diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp
index ee8a61f4744..3481f20239b 100644
--- a/cppe/src/TcpTransport/Transceiver.cpp
+++ b/cppe/src/TcpTransport/Transceiver.cpp
@@ -39,7 +39,7 @@ IceInternal::Transceiver::close()
out << "closing tcp connection\n" << toString();
}
-#ifndef ICEE_USE_SOCKET_TIMEOUT
+#ifdef ICEE_USE_SOCKET_TIMEOUT
#ifdef _WIN32
assert(_event != 0);
WSACloseEvent(_event);
@@ -93,8 +93,6 @@ IceInternal::Transceiver::shutdownReadWrite()
void
IceInternal::Transceiver::write(Buffer& buf, int timeout)
{
- assert(timeout != 0);
-
Buffer::Container::difference_type packetSize =
static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
@@ -108,81 +106,99 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
}
#endif
- while(buf.i != buf.b.end())
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ if(timeout > 0 && timeout != _timeout)
{
-#if defined(ICEE_USE_SOCKET_TIMEOUT)
setTimeout(_fd, false, timeout);
-#elif !defined(_WIN32)
- if(timeout > 0)
- {
- doSelect(false, timeout);
- }
-#endif
-
- repeatSend:
- assert(_fd != INVALID_SOCKET);
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
+ }
- if(ret == 0)
+ try
+ {
+#endif
+ while(buf.i != buf.b.end())
{
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
- }
+ repeatSend:
+ assert(_fd != INVALID_SOCKET);
+ ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
+ if(ret == 0)
{
- goto repeatSend;
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
}
- if(noBuffers() && packetSize > 1024)
+ if(ret == SOCKET_ERROR)
{
- packetSize /= 2;
- goto repeatSend;
- }
+ if(interrupted())
+ {
+ goto repeatSend;
+ }
+
+ if(noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ goto repeatSend;
+ }
-#if defined(ICEE_USE_SOCKET_TIMEOUT)
- if(wouldBlock())
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
-#elif defined(_WIN32)
- if(wouldBlock())
- {
- doSelect(false, timeout);
- continue;
- }
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ if(wouldBlock())
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+#else
+ if(wouldBlock())
+ {
+ doSelect(false, timeout > 0 ? timeout : _timeout);
+ continue;
+ }
#endif
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
}
- else
+
+ if(_traceLevels->network >= 3)
{
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ Trace out(_logger, _traceLevels->networkCat);
+ out << Ice::printfToString("sent %d of %d", ret, packetSize) << " bytes via tcp\n" << toString();
}
- }
- if(_traceLevels->network >= 3)
- {
- Trace out(_logger, _traceLevels->networkCat);
- out << Ice::printfToString("sent %d of %d", ret, packetSize) << " bytes via tcp\n" << toString();
- }
+ buf.i += ret;
- buf.i += ret;
-
- if(packetSize > buf.b.end() - buf.i)
+ if(packetSize > buf.b.end() - buf.i)
+ {
+ packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+ }
+ }
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ }
+ catch(const Ice::LocalException&)
+ {
+ if(timeout > 0 && timeout != _timeout)
{
- packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+ try
+ {
+ setTimeout(_fd, false, _timeout);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // IGNORE
+ }
}
+ throw;
}
+#endif
}
void
@@ -191,102 +207,119 @@ IceInternal::Transceiver::read(Buffer& buf, int timeout)
assert(timeout != 0);
Buffer::Container::difference_type packetSize =
- static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
-
- while(buf.i != buf.b.end())
+ static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ if(timeout > 0 && timeout != _timeout)
{
-#if defined(ICEE_USE_SOCKET_TIMEOUT)
setTimeout(_fd, true, timeout);
-#elif !defined(_WIN32)
- if(timeout > 0)
- {
- doSelect(true, timeout);
- }
+ }
+ try
+ {
#endif
-
- repeatRead:
- assert(_fd != INVALID_SOCKET);
- ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
-
- if(ret == 0)
+ while(buf.i != buf.b.end())
{
- //
- // If the connection is lost when reading data, we shut
- // down the write end of the socket. This helps to unblock
- // threads that are stuck in send() or select() while
- // sending data. Note: I don't really understand why
- // send() or select() sometimes don't detect a connection
- // loss. Therefore this helper to make them detect it.
- //
- //assert(_fd != INVALID_SOCKET);
- //shutdownSocketReadWrite(_fd);
+ repeatRead:
+ assert(_fd != INVALID_SOCKET);
+ ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
- }
-
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
+ if(ret == 0)
{
- goto repeatRead;
+ //
+ // If the connection is lost when reading data, we shut
+ // down the write end of the socket. This helps to unblock
+ // threads that are stuck in send() or select() while
+ // sending data. Note: I don't really understand why
+ // send() or select() sometimes don't detect a connection
+ // loss. Therefore this helper to make them detect it.
+ //
+ //assert(_fd != INVALID_SOCKET);
+ //shutdownSocketReadWrite(_fd);
+
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
}
- if(noBuffers() && packetSize > 1024)
+ if(ret == SOCKET_ERROR)
{
- packetSize /= 2;
- goto repeatRead;
+ if(interrupted())
+ {
+ goto repeatRead;
+ }
+
+ if(noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ goto repeatRead;
+ }
+
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ if(wouldBlock())
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+#else
+ if(wouldBlock())
+ {
+ doSelect(true, timeout > 0 ? timeout : _timeout);
+ continue;
+ }
+#endif
+
+ if(connectionLost())
+ {
+ //
+ // See the commment above about shutting down the
+ // socket if the connection is lost while reading
+ // data.
+ //
+ //assert(_fd != INVALID_SOCKET);
+ //shutdownSocketReadWrite(_fd);
+
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
}
-
-#if defined(ICEE_USE_SOCKET_TIMEOUT)
- if(wouldBlock())
+
+ if(_traceLevels->network >= 3)
{
- throw TimeoutException(__FILE__, __LINE__);
+ Trace out(_logger, _traceLevels->networkCat);
+ out << Ice::printfToString("received %d of %d", ret, packetSize) << " bytes via tcp\n" << toString();
}
-#elif defined(_WIN32)
- if(wouldBlock())
+
+ buf.i += ret;
+
+ if(packetSize > buf.b.end() - buf.i)
{
- doSelect(true, timeout);
- continue;
+ packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
}
-#endif
-
- if(connectionLost())
+ }
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ }
+ catch(const Ice::LocalException&)
+ {
+ if(timeout > 0 && timeout != _timeout)
+ {
+ try
{
- //
- // See the commment above about shutting down the
- // socket if the connection is lost while reading
- // data.
- //
- //assert(_fd != INVALID_SOCKET);
- //shutdownSocketReadWrite(_fd);
-
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ setTimeout(_fd, true, _timeout);
}
- else
+ catch(const Ice::LocalException&)
{
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ // IGNORE
}
}
-
- if(_traceLevels->network >= 3)
- {
- Trace out(_logger, _traceLevels->networkCat);
- out << Ice::printfToString("received %d of %d", ret, packetSize) << " bytes via tcp\n" << toString();
- }
-
- buf.i += ret;
-
- if(packetSize > buf.b.end() - buf.i)
- {
- packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
- }
+ throw;
}
+#endif
}
string
@@ -301,16 +334,25 @@ IceInternal::Transceiver::toString() const
return _desc;
}
-IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd) :
+IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd, int timeout) :
_traceLevels(instance->traceLevels()),
_logger(instance->logger()),
_fd(fd),
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ _timeout(timeout),
+#endif
_desc(fdToString(fd))
#ifdef _WIN32
, _isPeerLocal(isPeerLocal(fd))
#endif
{
-#ifndef ICEE_USE_SOCKET_TIMEOUT
+#ifndef ICEE_USE_SELECT_FOR_TIMEOUTS
+ if(_timeout > 0)
+ {
+ setTimeout(_fd, false, _timeout);
+ setTimeout(_fd, true, _timeout);
+ }
+#else
#ifdef _WIN32
_event = WSACreateEvent();
_readEvent = WSACreateEvent();
@@ -363,7 +405,7 @@ IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd) :
IceInternal::Transceiver::~Transceiver()
{
assert(_fd == INVALID_SOCKET);
-#ifndef ICEE_USE_SOCKET_TIMEOUT
+#ifdef ICEE_USE_SOCKET_TIMEOUT
#ifdef _WIN32
assert(_event == 0);
assert(_readEvent == 0);
@@ -372,6 +414,7 @@ IceInternal::Transceiver::~Transceiver()
#endif
}
+#ifdef ICEE_USE_SELECT_FOR_TIMEOUTS
void
IceInternal::Transceiver::doSelect(bool read, int timeout)
{
@@ -534,3 +577,4 @@ IceInternal::Transceiver::doSelect(bool read, int timeout)
#endif
}
}
+#endif