summaryrefslogtreecommitdiff
path: root/cppe/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-02-17 16:40:01 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-02-17 16:40:01 +0000
commit85b846e8066528c3c43374e4f98316a3eae7ed07 (patch)
tree9cec48d47e7e6269ba3a8cc16c5223c832ee3e52 /cppe/src
parentRefactored TAO tests. (diff)
downloadice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.bz2
ice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.xz
ice-85b846e8066528c3c43374e4f98316a3eae7ed07.zip
- Changes to use blocking sockets instead of non-blocking sockets.
- Removed timeout in accept() method.
Diffstat (limited to 'cppe/src')
-rw-r--r--cppe/src/IceE/Acceptor.h2
-rw-r--r--cppe/src/IceE/IncomingConnectionFactory.cpp2
-rw-r--r--cppe/src/IceE/Network.cpp59
-rw-r--r--cppe/src/IceE/Network.h3
-rw-r--r--cppe/src/IceE/Transceiver.h2
-rw-r--r--cppe/src/TcpTransport/Acceptor.cpp9
-rw-r--r--cppe/src/TcpTransport/Connector.cpp1
-rw-r--r--cppe/src/TcpTransport/Transceiver.cpp421
8 files changed, 230 insertions, 269 deletions
diff --git a/cppe/src/IceE/Acceptor.h b/cppe/src/IceE/Acceptor.h
index 08f5c874f74..70f18c57020 100644
--- a/cppe/src/IceE/Acceptor.h
+++ b/cppe/src/IceE/Acceptor.h
@@ -38,7 +38,7 @@ public:
SOCKET fd();
void close();
void listen();
- TransceiverPtr accept(int);
+ TransceiverPtr accept();
void connectToSelf();
std::string toString() const;
diff --git a/cppe/src/IceE/IncomingConnectionFactory.cpp b/cppe/src/IceE/IncomingConnectionFactory.cpp
index e2e400a37d6..2bc094f03be 100644
--- a/cppe/src/IceE/IncomingConnectionFactory.cpp
+++ b/cppe/src/IceE/IncomingConnectionFactory.cpp
@@ -318,7 +318,7 @@ IceInternal::IncomingConnectionFactory::run()
TransceiverPtr transceiver;
try
{
- transceiver = _acceptor->accept(-1);
+ transceiver = _acceptor->accept();
}
catch(const SocketException&)
{
diff --git a/cppe/src/IceE/Network.cpp b/cppe/src/IceE/Network.cpp
index 1c9b430d1cf..b7448fa4843 100644
--- a/cppe/src/IceE/Network.cpp
+++ b/cppe/src/IceE/Network.cpp
@@ -334,6 +334,24 @@ IceInternal::setBlock(SOCKET fd, bool block)
}
}
+#ifdef ICEE_USE_SOCKET_TIMEOUT
+void
+IceInternal::setTimeout(SOCKET fd, bool recv, int timeout)
+{
+ assert(timeout != 0);
+ struct timeval tv;
+ tv.tv_sec = timeout > 0 ? timeout / 1000 : 0;
+ tv.tv_usec = timeout > 0 ? (timeout - tv.tv_sec * 1000) * 1000 : 0;
+ if(setsockopt(fd, SOL_SOCKET, recv ? SO_RCVTIMEO : SO_SNDTIMEO, (char*)&tv, (int)sizeof(timeval)) == SOCKET_ERROR)
+ {
+ closeSocketNoThrow(fd);
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+}
+#endif
+
void
IceInternal::setTcpNoDelay(SOCKET fd)
{
@@ -924,7 +942,7 @@ IceInternal::acceptInterrupted()
}
SOCKET
-IceInternal::doAccept(SOCKET fd, int timeout)
+IceInternal::doAccept(SOCKET fd)
{
int ret;
@@ -936,45 +954,6 @@ repeatAccept:
goto repeatAccept;
}
- if(wouldBlock())
- {
- repeatSelect:
- int rs;
- fd_set fdSet;
- FD_ZERO(&fdSet);
- FD_SET(fd, &fdSet);
- if(timeout >= 0)
- {
- struct timeval tv;
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
- rs = ::select(fd + 1, &fdSet, 0, 0, &tv);
- }
- else
- {
- rs = ::select(fd + 1, &fdSet, 0, 0, 0);
- }
-
- if(rs == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeatSelect;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-
- if(rs == 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
-
- goto repeatAccept;
- }
-
SocketException ex(__FILE__, __LINE__);
ex.error = getSocketErrno();
throw ex;
diff --git a/cppe/src/IceE/Network.h b/cppe/src/IceE/Network.h
index cc6ab9fd432..7f421e98cd5 100644
--- a/cppe/src/IceE/Network.h
+++ b/cppe/src/IceE/Network.h
@@ -85,6 +85,7 @@ void shutdownSocketWrite(SOCKET);
void shutdownSocketReadWrite(SOCKET);
void setBlock(SOCKET, bool);
+void setTimeout(SOCKET, bool, int);
void setTcpNoDelay(SOCKET);
void setKeepAlive(SOCKET);
void setSendBufferSize(SOCKET, int);
@@ -92,7 +93,7 @@ void setSendBufferSize(SOCKET, int);
void doBind(SOCKET, struct sockaddr_in&);
void doListen(SOCKET, int);
void doConnect(SOCKET, struct sockaddr_in&, int);
-SOCKET doAccept(SOCKET, int);
+SOCKET doAccept(SOCKET);
void getAddress(const std::string&, int, struct sockaddr_in&);
std::string getLocalHost(bool);
diff --git a/cppe/src/IceE/Transceiver.h b/cppe/src/IceE/Transceiver.h
index 34b0f89f079..7e59326e021 100644
--- a/cppe/src/IceE/Transceiver.h
+++ b/cppe/src/IceE/Transceiver.h
@@ -50,6 +50,8 @@ private:
friend class Connector;
friend class Acceptor;
+ void doSelect(bool, int);
+
const TraceLevelsPtr _traceLevels;
const Ice::LoggerPtr _logger;
diff --git a/cppe/src/TcpTransport/Acceptor.cpp b/cppe/src/TcpTransport/Acceptor.cpp
index 5056cfebab0..eae64c0a4aa 100644
--- a/cppe/src/TcpTransport/Acceptor.cpp
+++ b/cppe/src/TcpTransport/Acceptor.cpp
@@ -63,10 +63,10 @@ IceInternal::Acceptor::listen()
}
TransceiverPtr
-IceInternal::Acceptor::accept(int timeout)
+IceInternal::Acceptor::accept()
{
- SOCKET fd = doAccept(_fd, timeout);
- setBlock(fd, false);
+ SOCKET fd = doAccept(_fd);
+ setBlock(fd, true);
if(_traceLevels->network >= 1)
{
@@ -81,7 +81,7 @@ void
IceInternal::Acceptor::connectToSelf()
{
SOCKET fd = createSocket();
- setBlock(fd, false);
+// setBlock(fd, false); // No need to use a non blocking socket here.
doConnect(fd, _addr, -1);
closeSocket(fd);
}
@@ -120,7 +120,6 @@ IceInternal::Acceptor::Acceptor(const InstancePtr& instance, const string& host,
try
{
_fd = createSocket();
- setBlock(_fd, false);
getAddress(host, port, _addr);
if(_traceLevels->network >= 2)
{
diff --git a/cppe/src/TcpTransport/Connector.cpp b/cppe/src/TcpTransport/Connector.cpp
index 1750d7fe117..1825301e8fe 100644
--- a/cppe/src/TcpTransport/Connector.cpp
+++ b/cppe/src/TcpTransport/Connector.cpp
@@ -34,6 +34,7 @@ Connector::connect(int timeout)
SOCKET fd = createSocket();
setBlock(fd, false);
doConnect(fd, _addr, timeout);
+ setBlock(fd, true);
if(_traceLevels->network >= 1)
{
diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp
index 65c9bc7da30..465dc4d7c29 100644
--- a/cppe/src/TcpTransport/Transceiver.cpp
+++ b/cppe/src/TcpTransport/Transceiver.cpp
@@ -23,7 +23,6 @@ using namespace IceInternal;
void IceInternal::incRef(Transceiver* p) { p->__incRef(); }
void IceInternal::decRef(Transceiver* p) { p->__decRef(); }
-
SOCKET
IceInternal::Transceiver::fd()
{
@@ -92,6 +91,8 @@ IceInternal::Transceiver::shutdownReadWrite()
void
IceInternal::Transceiver::write(Buffer& buf, int timeout)
{
+ assert(timeout != 0);
+
Buffer::Container::difference_type packetSize =
static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
@@ -107,6 +108,16 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
while(buf.i != buf.b.end())
{
+#ifndef ICEE_USE_SOCKET_TIMEOUT
+ if(timeout > 0)
+ {
+ doSelect(false, timeout);
+ }
+#else
+ setTimeout(_fd, false, timeout);
+#endif
+
+ repeatSend:
assert(_fd != INVALID_SOCKET);
ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
@@ -119,128 +130,24 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
if(ret == SOCKET_ERROR)
{
-#ifdef _WIN32
- repeatError:
-#endif
if(interrupted())
{
- continue;
+ goto repeatSend;
}
if(noBuffers() && packetSize > 1024)
{
packetSize /= 2;
- continue;
+ goto repeatSend;
}
-
+
+#ifdef ICEE_USE_SOCKET_TIMEOUT
if(wouldBlock())
{
-#ifdef _WIN32
- WSAEVENT events[2];
- events[0] = _event;
- events[1] = _writeEvent;
- long tout = (timeout >= 0) ? timeout : WSA_INFINITE;
- DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE);
- if(rc == WSA_WAIT_FAILED)
- {
- //
- // This an error from WSAWaitForMultipleEvents
- // itself (similar to an error from select). None
- // of these errors are recoverable (such as
- // EINTR).
- //
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
-
- if(rc == WSA_WAIT_TIMEOUT)
- {
- assert(timeout >= 0);
- throw TimeoutException(__FILE__, __LINE__);
- }
-
- if(rc == WSA_WAIT_EVENT_0)
- {
- WSANETWORKEVENTS nevents;
- if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
-
- //
- // If we have consumed a READ event, set the
- // _readEvent event.
- //
- if(nevents.lNetworkEvents & FD_READ)
- {
- WSASetEvent(_readEvent);
- }
-
- //
- // This checks for an error on the fd (this would
- // be same as recv itself returning an error). In
- // the event of an error we set the error code,
- // and repeat the error handling.
- //
- if(nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]);
- goto repeatError;
- }
- if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
- goto repeatError;
- }
- }
- else
- {
- //
- // Otherwise the _writeEvent is set, reset it.
- //
- WSAResetEvent(_writeEvent);
- }
-#else
- repeatSelect:
- int rs;
- assert(_fd != INVALID_SOCKET);
- FD_SET(_fd, &_wFdSet);
-
- if(timeout >= 0)
- {
- struct timeval tv;
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
- rs = ::select(_fd + 1, 0, &_wFdSet, 0, &tv);
- }
- else
- {
- rs = ::select(_fd + 1, 0, &_wFdSet, 0, 0);
- }
-
- if(rs == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeatSelect;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-
- if(rs == 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
-#endif
- continue;
+ throw TimeoutException(__FILE__, __LINE__);
}
-
+#endif
+
if(connectionLost())
{
ConnectionLostException ex(__FILE__, __LINE__);
@@ -273,14 +180,26 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
void
IceInternal::Transceiver::read(Buffer& buf, int timeout)
{
+ assert(timeout != 0);
+
Buffer::Container::difference_type packetSize =
static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
+#ifndef ICEE_USE_SOCKET_TIMEOUT
+ if(timeout > 0)
+ {
+ doSelect(true, timeout);
+ }
+#else
+ setTimeout(_fd, true, timeout);
+#endif
+
+ repeatRead:
assert(_fd != INVALID_SOCKET);
ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
-
+
if(ret == 0)
{
//
@@ -301,126 +220,23 @@ IceInternal::Transceiver::read(Buffer& buf, int timeout)
if(ret == SOCKET_ERROR)
{
-#ifdef _WIN32
- repeatError:
-#endif
if(interrupted())
{
- continue;
+ goto repeatRead;
}
if(noBuffers() && packetSize > 1024)
{
packetSize /= 2;
- continue;
+ goto repeatRead;
}
+#ifdef ICEE_USE_SOCKET_TIMEOUT
if(wouldBlock())
{
-#ifdef _WIN32
- //
- // This code is basically the same as the code in
- // ::send above. Check that for detailed comments.
- //
- WSAEVENT events[2];
- events[0] = _event;
- events[1] = _readEvent;
- long tout = (timeout >= 0) ? timeout : WSA_INFINITE;
- DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE);
- if(rc == WSA_WAIT_FAILED)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
- if(rc == WSA_WAIT_TIMEOUT)
- {
- assert(timeout >= 0);
- throw TimeoutException(__FILE__, __LINE__);
- }
-
- if(rc == WSA_WAIT_EVENT_0)
- {
- WSANETWORKEVENTS nevents;
- if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
-
- //
- // If we have consumed a WRITE event, set the
- // _writeEvent event.
- //
- if(nevents.lNetworkEvents & FD_WRITE)
- {
- WSASetEvent(_writeEvent);
- }
-
- //
- // This checks for an error on the fd (this would
- // be same as recv itself returning an error). In
- // the event of an error we set the error code,
- // and repeat the error handling.
- //
- if(nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_READ_BIT]);
- goto repeatError;
- }
- if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
- goto repeatError;
- }
- }
- else
- {
- //
- // Otherwise the _readEvent is set, reset it.
- //
- WSAResetEvent(_readEvent);
- }
-#else
- repeatSelect:
-
- int rs;
- assert(_fd != INVALID_SOCKET);
- FD_SET(_fd, &_rFdSet);
-
- if(timeout >= 0)
- {
- struct timeval tv;
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
- rs = ::select(_fd + 1, &_rFdSet, 0, 0, &tv);
- }
- else
- {
- rs = ::select(_fd + 1, &_rFdSet, 0, 0, 0);
- }
-
- if(rs == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeatSelect;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-
- if(rs == 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
-#endif
- continue;
+ throw TimeoutException(__FILE__, __LINE__);
}
-
+#endif
if(connectionLost())
{
//
@@ -536,3 +352,166 @@ IceInternal::Transceiver::~Transceiver()
assert(_writeEvent == 0);
#endif
}
+
+void
+IceInternal::Transceiver::doSelect(bool read, int timeout)
+{
+ assert(timeout >= 0);
+ while(true)
+ {
+#ifdef _WIN32
+ //
+ // This code is basically the same as the code in
+ // ::send above. Check that for detailed comments.
+ //
+ WSAEVENT events[2];
+ events[0] = _event;
+ events[1] = read ? _readEvent : _writeEvent;
+ DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, timeout, FALSE);
+ if(rc == WSA_WAIT_FAILED)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+ if(rc == WSA_WAIT_TIMEOUT)
+ {
+ assert(timeout >= 0);
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+
+ if(rc == WSA_WAIT_EVENT_0)
+ {
+ WSANETWORKEVENTS nevents;
+ if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+
+ //
+ // If we're selecting for reading and have consumed a WRITE
+ // event, set the _writeEvent event. Otherwise, if we're
+ // selecting for writing have consumed a READ event, set the
+ // _readEvent event.
+ //
+ if(read && nevents.lNetworkEvents & FD_WRITE)
+ {
+ WSASetEvent(_writeEvent);
+ }
+ else if(!read && nevents.lNetworkEvents & FD_READ)
+ {
+ WSASetEvent(_readEvent);
+ }
+
+
+ //
+ // This checks for an error on the fd (this would
+ // be same as recv itself returning an error). In
+ // the event of an error we set the error code,
+ // and repeat the error handling.
+ //
+ if(read && nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
+ }
+ else if(!read && nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]);
+ }
+ else if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
+ }
+ else
+ {
+ return; // No errors: we're done.
+ }
+
+ if(interrupted())
+ {
+ continue;
+ }
+
+ if(connectionLost())
+ {
+ //
+ // See the commment above about shutting down the
+ // socket if the connection is lost while reading
+ // data.
+ //
+ //assert(_fd != INVALID_SOCKET);
+ //shutdownSocketReadWrite(_fd);
+
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ }
+ else
+ {
+ //
+ // Otherwise the _readEvent or _writeEvent is set, reset it.
+ //
+ if(read)
+ {
+ WSAResetEvent(_readEvent);
+ }
+ else
+ {
+ WSAResetEvent(_writeEvent);
+ }
+ return;
+ }
+#else
+ int rs;
+ assert(_fd != INVALID_SOCKET);
+ if(read)
+ {
+ FD_SET(_fd, &_rFdSet);
+ }
+ else
+ {
+ FD_SET(_fd, &_wFdSet);
+ }
+
+ if(timeout >= 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
+ rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, &tv);
+ }
+ else
+ {
+ rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, 0);
+ }
+
+ if(rs == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+
+ if(rs == 0)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+
+ return;
+#endif
+ }
+}