summaryrefslogtreecommitdiff
path: root/cppe/src/TcpTransport/Transceiver.cpp
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2005-06-20 16:43:38 +0000
committerDwayne Boone <dwayne@zeroc.com>2005-06-20 16:43:38 +0000
commit8b0d67e9ad16a3b6bdfff6d9900ea009442249a0 (patch)
treed1cfce0274d58c377392d56f64d57b10d8a1c8fb /cppe/src/TcpTransport/Transceiver.cpp
parentImproved fix for bug #377 (diff)
downloadice-8b0d67e9ad16a3b6bdfff6d9900ea009442249a0.tar.bz2
ice-8b0d67e9ad16a3b6bdfff6d9900ea009442249a0.tar.xz
ice-8b0d67e9ad16a3b6bdfff6d9900ea009442249a0.zip
Separated TCP into separate transport library
Diffstat (limited to 'cppe/src/TcpTransport/Transceiver.cpp')
-rw-r--r--cppe/src/TcpTransport/Transceiver.cpp469
1 files changed, 469 insertions, 0 deletions
diff --git a/cppe/src/TcpTransport/Transceiver.cpp b/cppe/src/TcpTransport/Transceiver.cpp
new file mode 100644
index 00000000000..7709777fc87
--- /dev/null
+++ b/cppe/src/TcpTransport/Transceiver.cpp
@@ -0,0 +1,469 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2005 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Transceiver.h>
+#include <Ice/Instance.h>
+#include <Ice/TraceLevels.h>
+#include <Ice/LoggerUtil.h>
+#include <Ice/Buffer.h>
+#include <Ice/Network.h>
+#include <Ice/LocalException.h>
+#include <IceUtil/SafeStdio.h>
+
+using namespace std;
+using namespace Ice;
+using namespace IceInternal;
+
+void IceInternal::incRef(Transceiver* p) { p->__incRef(); }
+void IceInternal::decRef(Transceiver* p) { p->__decRef(); }
+
+
+SOCKET
+IceInternal::Transceiver::fd()
+{
+ assert(_fd != INVALID_SOCKET);
+ return _fd;
+}
+
+void
+IceInternal::Transceiver::close()
+{
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "closing tcp connection\n" << toString();
+ }
+
+#ifdef _WIN32_WCE
+ assert(_event != 0);
+ WSACloseEvent(_event);
+ _event = 0;
+#endif
+
+ assert(_fd != INVALID_SOCKET);
+ try
+ {
+ closeSocket(_fd);
+ _fd = INVALID_SOCKET;
+ }
+ catch(const SocketException&)
+ {
+ _fd = INVALID_SOCKET;
+ throw;
+ }
+}
+
+void
+IceInternal::Transceiver::shutdownWrite()
+{
+ if(_traceLevels->network >= 2)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "shutting down tcp connection for writing\n" << toString();
+ }
+
+ assert(_fd != INVALID_SOCKET);
+ shutdownSocketWrite(_fd);
+}
+
+void
+IceInternal::Transceiver::shutdownReadWrite()
+{
+ if(_traceLevels->network >= 2)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "shutting down tcp connection for reading and writing\n" << toString();
+ }
+
+ assert(_fd != INVALID_SOCKET);
+ shutdownSocketReadWrite(_fd);
+}
+
+void
+IceInternal::Transceiver::write(Buffer& buf, int timeout)
+{
+ Buffer::Container::difference_type packetSize =
+ static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+
+#ifdef _WIN32
+ //
+ // Limit packet size to avoid performance problems on WIN32
+ //
+ if(packetSize > 64 * 1024)
+ {
+ packetSize = 64 * 1024;
+ }
+#endif
+
+ while(buf.i != buf.b.end())
+ {
+ assert(_fd != INVALID_SOCKET);
+ ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
+
+ if(ret == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(ret == SOCKET_ERROR)
+ {
+#ifdef _WIN32_WCE
+ repeatError:
+#endif
+ if(interrupted())
+ {
+ continue;
+ }
+
+ if(noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ continue;
+ }
+
+ if(wouldBlock())
+ {
+#ifdef _WIN32_WCE
+ WSAEVENT events[1];
+ events[0] = _event;
+ long tout = (timeout >= 0) ? timeout : WSA_INFINITE;
+ DWORD rc = WSAWaitForMultipleEvents(1, events, FALSE, tout, FALSE);
+ if(rc == WSA_WAIT_FAILED)
+ {
+ //
+ // This an error from WSAWaitForMultipleEvents
+ // itself (similar to an error from select). None
+ // of these errors are recoverable (such as
+ // EINTR).
+ //
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+
+ if(rc == WSA_WAIT_TIMEOUT)
+ {
+ assert(timeout >= 0);
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+ assert(rc == WSA_WAIT_EVENT_0);
+
+ WSANETWORKEVENTS nevents;
+ if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+ //
+ // This checks for an error on the fd (this would be
+ // same as recv itself returning an error).
+ //
+ // In the event of an error we set the error code, and
+ // // repeat the error handling.
+ //
+ if(nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_READ_BIT]);
+ goto repeatError;
+ }
+ if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
+ goto repeatError;
+ }
+#else
+ repeatSelect:
+ int rs;
+ assert(_fd != INVALID_SOCKET);
+ FD_SET(_fd, &_wFdSet);
+
+ if(timeout >= 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
+ rs = ::select(_fd + 1, 0, &_wFdSet, 0, &tv);
+ }
+ else
+ {
+ rs = ::select(_fd + 1, 0, &_wFdSet, 0, 0);
+ }
+
+ if(rs == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ goto repeatSelect;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+
+ if(rs == 0)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+#endif
+ continue;
+ }
+
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_traceLevels->network >= 3)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << IceUtil::printfToString("sent %d of %d", ret, packetSize) << " bytes via tcp\n" << toString();
+ }
+
+ buf.i += ret;
+
+ if(packetSize > buf.b.end() - buf.i)
+ {
+ packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+ }
+ }
+}
+
+void
+IceInternal::Transceiver::read(Buffer& buf, int timeout)
+{
+ Buffer::Container::difference_type packetSize =
+ static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+
+ while(buf.i != buf.b.end())
+ {
+ assert(_fd != INVALID_SOCKET);
+ ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
+
+ if(ret == 0)
+ {
+ //
+ // If the connection is lost when reading data, we shut
+ // down the write end of the socket. This helps to unblock
+ // threads that are stuck in send() or select() while
+ // sending data. Note: I don't really understand why
+ // send() or select() sometimes don't detect a connection
+ // loss. Therefore this helper to make them detect it.
+ //
+ //assert(_fd != INVALID_SOCKET);
+ //shutdownSocketReadWrite(_fd);
+
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(ret == SOCKET_ERROR)
+ {
+#ifdef _WIN32_WCE
+ repeatError:
+#endif
+ if(interrupted())
+ {
+ continue;
+ }
+
+ if(noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ continue;
+ }
+
+ if(wouldBlock())
+ {
+#ifdef _WIN32_WCE
+ //
+ // This code is basically the same as the code in
+ // ::send above. Check that for detailed comments.
+ //
+ WSAEVENT events[1];
+ events[0] = _event;
+ long tout = (timeout >= 0) ? timeout : WSA_INFINITE;
+ DWORD rc = WSAWaitForMultipleEvents(1, events, FALSE, tout, FALSE);
+ if(rc == WSA_WAIT_FAILED)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+ if(rc == WSA_WAIT_TIMEOUT)
+ {
+ assert(timeout >= 0);
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+ assert(rc == WSA_WAIT_EVENT_0);
+
+ WSANETWORKEVENTS nevents;
+ if(WSAEnumNetworkEvents(_fd, _event, &nevents) == SOCKET_ERROR)
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = WSAGetLastError();
+ throw ex;
+ }
+
+ if(nevents.lNetworkEvents & FD_READ && nevents.iErrorCode[FD_READ_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_READ_BIT]);
+ goto repeatError;
+ }
+ if(nevents.lNetworkEvents & FD_CLOSE && nevents.iErrorCode[FD_CLOSE_BIT] != 0)
+ {
+ WSASetLastError(nevents.iErrorCode[FD_CLOSE_BIT]);
+ goto repeatError;
+ }
+#else
+ repeatSelect:
+
+ int rs;
+ assert(_fd != INVALID_SOCKET);
+ FD_SET(_fd, &_rFdSet);
+
+ if(timeout >= 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout - tv.tv_sec * 1000) * 1000;
+ rs = ::select(_fd + 1, &_rFdSet, 0, 0, &tv);
+ }
+ else
+ {
+ rs = ::select(_fd + 1, &_rFdSet, 0, 0, 0);
+ }
+
+ if(rs == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ goto repeatSelect;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+
+ if(rs == 0)
+ {
+ throw TimeoutException(__FILE__, __LINE__);
+ }
+#endif
+ continue;
+ }
+
+ if(connectionLost())
+ {
+ //
+ // See the commment above about shutting down the
+ // socket if the connection is lost while reading
+ // data.
+ //
+ //assert(_fd != INVALID_SOCKET);
+ //shutdownSocketReadWrite(_fd);
+
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_traceLevels->network >= 3)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << IceUtil::printfToString("received %d of %d", ret, packetSize) << " bytes via tcp\n" << toString();
+ }
+
+ buf.i += ret;
+
+ if(packetSize > buf.b.end() - buf.i)
+ {
+ packetSize = static_cast<Buffer::Container::difference_type>(buf.b.end() - buf.i);
+ }
+ }
+}
+
+string
+IceInternal::Transceiver::type() const
+{
+ return "tcp";
+}
+
+string
+IceInternal::Transceiver::toString() const
+{
+ return _desc;
+}
+
+IceInternal::Transceiver::Transceiver(const InstancePtr& instance, SOCKET fd) :
+ _traceLevels(instance->traceLevels()),
+ _logger(instance->logger()),
+ _fd(fd),
+ _desc(fdToString(fd))
+{
+#ifdef _WIN32_WCE
+ _event = WSACreateEvent();
+ if(_event == 0)
+ {
+ closeSocket(_fd);
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+
+ //
+ // Create a WSAEVENT which selects read/write and close for
+ // trigging.
+ //
+ if(WSAEventSelect(_fd, _event, FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR)
+ {
+ int error = WSAGetLastError();
+
+ WSACloseEvent(_event);
+ closeSocket(_fd);
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = error;
+ throw ex;
+ }
+#else
+ FD_ZERO(&_wFdSet);
+ FD_ZERO(&_rFdSet);
+#endif
+}
+
+IceInternal::Transceiver::~Transceiver()
+{
+ assert(_fd == INVALID_SOCKET);
+#ifdef _WIN32_WCE
+ assert(_event == 0);
+#endif
+}