summaryrefslogtreecommitdiff
path: root/cppe/src/TcpTransport/Transceiver.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-02-17 16:40:01 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-02-17 16:40:01 +0000
commit85b846e8066528c3c43374e4f98316a3eae7ed07 (patch)
tree9cec48d47e7e6269ba3a8cc16c5223c832ee3e52 /cppe/src/TcpTransport/Transceiver.cpp
parentRefactored TAO tests. (diff)
downloadice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.bz2
ice-85b846e8066528c3c43374e4f98316a3eae7ed07.tar.xz
ice-85b846e8066528c3c43374e4f98316a3eae7ed07.zip
- Changes to use blocking sockets instead of non-blocking sockets.
- Removed timeout in accept() method.
Diffstat (limited to 'cppe/src/TcpTransport/Transceiver.cpp')
-rw-r--r--cppe/src/TcpTransport/Transceiver.cpp421
1 files changed, 200 insertions, 221 deletions
diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp
index 65c9bc7da30..465dc4d7c29 100644
--- a/cppe/src/TcpTransport/Transceiver.cpp
+++ b/cppe/src/TcpTransport/Transceiver.cpp
@@ -23,7 +23,6 @@ using namespace IceInternal;
void IceInternal::incRef(Transceiver* p) { p->__incRef(); }
void IceInternal::decRef(Transceiver* p) { p->__decRef(); }
-
SOCKET
IceInternal::Transceiver::fd()
{
@@ -92,6 +91,8 @@ IceInternal::Transceiver::shutdownReadWrite()
void
IceInternal::Transceiver::write(Buffer& buf, int timeout)
{
+ assert(timeout != 0);
+
Buffer::Container::difference_type packetSize =
static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
@@ -107,6 +108,16 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
while(buf.i != buf.b.end())
{
+#ifndef ICEE_USE_SOCKET_TIMEOUT
+ if(timeout > 0)
+ {
+ doSelect(false, timeout);
+ }
+#else
+ setTimeout(_fd, false, timeout);
+#endif
+
+ repeatSend:
assert(_fd != INVALID_SOCKET);
ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
@@ -119,128 +130,24 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
if(ret == SOCKET_ERROR)
{
-#ifdef _WIN32
- repeatError:
-#endif
if(interrupted())
{
- continue;
+ goto repeatSend;
}
if(noBuffers() && packetSize > 1024)
{
packetSize /= 2;
- continue;
+ goto repeatSend;
}
-
+
+#ifdef ICEE_USE_SOCKET_TIMEOUT
if(wouldBlock())
{
-#ifdef _WIN32
- WSAEVENT events[2];
- events[0] = _event;
- events[1] = _writeEvent;
- long tout = (timeout >= 0) ? timeout : WSA_INFINITE;
- DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE);
- if(rc == WSA_WAIT_FAILED)
- {
- //
- // This an error from WSAWaitForMultipleEvents
- // itself (similar to an error from select). None
- // of these errors are recoverable (such as
- // EINTR).
- //
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
-
- if(rc == WSA_WAIT_TIMEOUT)
- {
- assert(timeout >= 0);
- throw TimeoutException(__FILE__, __LINE__);
- }
-
- if(rc == WSA_WAIT_EVENT_0)
- {
- WSANETWORKEVENTS nevents;
- if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
-
- //
- // If we have consumed a READ event, set the
- // _readEvent event.
- //
- if(nevents.lNetworkEvents & FD_READ)
- {
- WSASetEvent(_readEvent);
- }
-
- //
- // This checks for an error on the fd (this would
- // be same as recv itself returning an error). In
- // the event of an error we set the error code,
- // and repeat the error handling.
- //
- if(nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]);
- goto repeatError;
- }
- if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
- goto repeatError;
- }
- }
- else
- {
- //
- // Otherwise the _writeEvent is set, reset it.
- //
- WSAResetEvent(_writeEvent);
- }
-#else
- repeatSelect:
- int rs;
- assert(_fd != INVALID_SOCKET);
- FD_SET(_fd, &_wFdSet);
-
- if(timeout >= 0)
- {
- struct timeval tv;
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
- rs = ::select(_fd + 1, 0, &_wFdSet, 0, &tv);
- }
- else
- {
- rs = ::select(_fd + 1, 0, &_wFdSet, 0, 0);
- }
-
- if(rs == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeatSelect;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-
- if(rs == 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
-#endif
- continue;
+ throw TimeoutException(__FILE__, __LINE__);
}
-
+#endif
+
if(connectionLost())
{
ConnectionLostException ex(__FILE__, __LINE__);
@@ -273,14 +180,26 @@ IceInternal::Transceiver::write(Buffer& buf, int timeout)
void
IceInternal::Transceiver::read(Buffer& buf, int timeout)
{
+ assert(timeout != 0);
+
Buffer::Container::difference_type packetSize =
static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
+#ifndef ICEE_USE_SOCKET_TIMEOUT
+ if(timeout > 0)
+ {
+ doSelect(true, timeout);
+ }
+#else
+ setTimeout(_fd, true, timeout);
+#endif
+
+ repeatRead:
assert(_fd != INVALID_SOCKET);
ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
-
+
if(ret == 0)
{
//
@@ -301,126 +220,23 @@ IceInternal::Transceiver::read(Buffer& buf, int timeout)
if(ret == SOCKET_ERROR)
{
-#ifdef _WIN32
- repeatError:
-#endif
if(interrupted())
{
- continue;
+ goto repeatRead;
}
if(noBuffers() && packetSize > 1024)
{
packetSize /= 2;
- continue;
+ goto repeatRead;
}
+#ifdef ICEE_USE_SOCKET_TIMEOUT
if(wouldBlock())
{
-#ifdef _WIN32
- //
- // This code is basically the same as the code in
- // ::send above. Check that for detailed comments.
- //
- WSAEVENT events[2];
- events[0] = _event;
- events[1] = _readEvent;
- long tout = (timeout >= 0) ? timeout : WSA_INFINITE;
- DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, tout, FALSE);
- if(rc == WSA_WAIT_FAILED)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
- if(rc == WSA_WAIT_TIMEOUT)
- {
- assert(timeout >= 0);
- throw TimeoutException(__FILE__, __LINE__);
- }
-
- if(rc == WSA_WAIT_EVENT_0)
- {
- WSANETWORKEVENTS nevents;
- if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = WSAGetLastError();
- throw ex;
- }
-
- //
- // If we have consumed a WRITE event, set the
- // _writeEvent event.
- //
- if(nevents.lNetworkEvents & FD_WRITE)
- {
- WSASetEvent(_writeEvent);
- }
-
- //
- // This checks for an error on the fd (this would
- // be same as recv itself returning an error). In
- // the event of an error we set the error code,
- // and repeat the error handling.
- //
- if(nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_READ_BIT]);
- goto repeatError;
- }
- if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
- {
- WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
- goto repeatError;
- }
- }
- else
- {
- //
- // Otherwise the _readEvent is set, reset it.
- //
- WSAResetEvent(_readEvent);
- }
-#else
- repeatSelect:
-
- int rs;
- assert(_fd != INVALID_SOCKET);
- FD_SET(_fd, &_rFdSet);
-
- if(timeout >= 0)
- {
- struct timeval tv;
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
- rs = ::select(_fd + 1, &_rFdSet, 0, 0, &tv);
- }
- else
- {
- rs = ::select(_fd + 1, &_rFdSet, 0, 0, 0);
- }
-
- if(rs == SOCKET_ERROR)
- {
- if(interrupted())
- {
- goto repeatSelect;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
-
- if(rs == 0)
- {
- throw TimeoutException(__FILE__, __LINE__);
- }
-#endif
- continue;
+ throw TimeoutException(__FILE__, __LINE__);
}
-
+#endif
if(connectionLost())
{
//
@@ -536,3 +352,166 @@ IceInternal::Transceiver::~Transceiver()
assert(_writeEvent == 0);
#endif
}
+
+void
+IceInternal::Transceiver::doSelect(bool read, int timeout)
+{
+ assert(timeout >= 0);
+ while(true)
+ {
+#ifdef _WIN32
+ //
+ // This code is basically the same as the code in
+ // ::send above. Check that for detailed comments.
+ //
+ WSAEVENT events[2];
+ events[0] = _event;
+ events[1] = read ? _readEvent : _writeEvent;
+ DWORD rc = WSAWaitForMultipleEvents(2, events, FALSE, timeout, FALSE);
+ if(rc == WSA_WAIT_FAILED)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+ if(rc == WSA_WAIT_TIMEOUT)
+ {
+ assert(timeout >= 0);
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+
+ if(rc == WSA_WAIT_EVENT_0)
+ {
+ WSANETWORKEVENTS nevents;
+ if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+
+ //
+ // If we're selecting for reading and have consumed a WRITE
+ // event, set the _writeEvent event. Otherwise, if we're
+ // selecting for writing have consumed a READ event, set the
+ // _readEvent event.
+ //
+ if(read && nevents.lNetworkEvents & FD_WRITE)
+ {
+ WSASetEvent(_writeEvent);
+ }
+ else if(!read && nevents.lNetworkEvents & FD_READ)
+ {
+ WSASetEvent(_readEvent);
+ }
+
+
+ //
+ // This checks for an error on the fd (this would
+ // be same as recv itself returning an error). In
+ // the event of an error we set the error code,
+ // and repeat the error handling.
+ //
+ if(read && nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
+ }
+ else if(!read && nevents.lNetworkEvents & FD_WRITE && nevents.iErrorCode[FD_WRITE_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_WRITE_BIT]);
+ }
+ else if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
+ }
+ else
+ {
+ return; // No errors: we're done.
+ }
+
+ if(interrupted())
+ {
+ continue;
+ }
+
+ if(connectionLost())
+ {
+ //
+ // See the commment above about shutting down the
+ // socket if the connection is lost while reading
+ // data.
+ //
+ //assert(_fd != INVALID_SOCKET);
+ //shutdownSocketReadWrite(_fd);
+
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ }
+ else
+ {
+ //
+ // Otherwise the _readEvent or _writeEvent is set, reset it.
+ //
+ if(read)
+ {
+ WSAResetEvent(_readEvent);
+ }
+ else
+ {
+ WSAResetEvent(_writeEvent);
+ }
+ return;
+ }
+#else
+ int rs;
+ assert(_fd != INVALID_SOCKET);
+ if(read)
+ {
+ FD_SET(_fd, &_rFdSet);
+ }
+ else
+ {
+ FD_SET(_fd, &_wFdSet);
+ }
+
+ if(timeout >= 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
+ rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, &tv);
+ }
+ else
+ {
+ rs = ::select(_fd + 1, read ? &_rFdSet : 0, read ? 0 : &_wFdSet, 0, 0);
+ }
+
+ if(rs == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+
+ if(rs == 0)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+
+ return;
+#endif
+ }
+}