summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/TcpTransceiver.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
committerBenoit Foucher <benoit@zeroc.com>2009-08-21 15:55:01 +0200
commitb9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch)
tree183215e2dbeadfbc871b800ce09726e58af38b91 /cpp/src/Ice/TcpTransceiver.cpp
parentadding compression cookbook demo (diff)
downloadice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz
ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'cpp/src/Ice/TcpTransceiver.cpp')
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp321
1 files changed, 274 insertions, 47 deletions
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index 8089d5e4a9b..88efe8c0dc4 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -20,17 +20,73 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
-SOCKET
-IceInternal::TcpTransceiver::fd()
+NativeInfoPtr
+IceInternal::TcpTransceiver::getNativeInfo()
{
- assert(_fd != INVALID_SOCKET);
- return _fd;
+ return this;
+}
+
+#ifdef ICE_USE_IOCP
+AsyncInfo*
+IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status)
+{
+ switch(status)
+ {
+ case SocketOperationRead:
+ return &_read;
+ case SocketOperationWrite:
+ return &_write;
+ default:
+ assert(false);
+ return 0;
+ }
+}
+#endif
+
+SocketOperation
+IceInternal::TcpTransceiver::initialize()
+{
+ if(_state == StateNeedConnect)
+ {
+ _state = StateConnectPending;
+ return SocketOperationConnect;
+ }
+ else if(_state <= StateConnectPending)
+ {
+ try
+ {
+#ifndef ICE_USE_IOCP
+ doFinishConnect(_fd);
+#else
+ doFinishConnectAsync(_fd, _write);
+#endif
+ _state = StateConnected;
+ _desc = fdToString(_fd);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ if(_traceLevels->network >= 2)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "failed to establish tcp connection\n" << _desc << "\n" << ex;
+ }
+ throw;
+ }
+
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "tcp connection established\n" << _desc;
+ }
+ }
+ assert(_state == StateConnected);
+ return SocketOperationNone;
}
void
IceInternal::TcpTransceiver::close()
{
- if(_traceLevels->network >= 1)
+ if(_state == StateConnected && _traceLevels->network >= 1)
{
Trace out(_logger, _traceLevels->networkCat);
out << "closing tcp connection\n" << toString();
@@ -55,21 +111,21 @@ IceInternal::TcpTransceiver::write(Buffer& buf)
// Its impossible for the packetSize to be more than an Int.
int packetSize = static_cast<int>(buf.b.end() - buf.i);
-#ifdef _WIN32
+#ifdef ICE_USE_IOCP
//
// Limit packet size to avoid performance problems on WIN32
//
- if(packetSize > _maxPacketSize)
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
{
- packetSize = _maxPacketSize;
+ packetSize = _maxSendPacketSize;
}
#endif
while(buf.i != buf.b.end())
{
assert(_fd != INVALID_SOCKET);
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
+ ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
if(ret == 0)
{
ConnectionLostException ex(__FILE__, __LINE__);
@@ -213,61 +269,188 @@ IceInternal::TcpTransceiver::read(Buffer& buf)
buf.i += ret;
- if(packetSize > buf.b.end() - buf.i)
- {
- packetSize = static_cast<int>(buf.b.end() - buf.i);
- }
+ packetSize = static_cast<int>(buf.b.end() - buf.i);
}
return true;
}
-string
-IceInternal::TcpTransceiver::type() const
+#ifdef ICE_USE_IOCP
+void
+IceInternal::TcpTransceiver::startWrite(Buffer& buf)
{
- return "tcp";
-}
+ if(_state < StateConnected)
+ {
+ doConnectAsync(_fd, _connectAddr, _write);
+ _desc = fdToString(_fd);
+ return;
+ }
-string
-IceInternal::TcpTransceiver::toString() const
-{
- return _desc;
+ assert(!buf.b.empty() && buf.i != buf.b.end());
+
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
+ {
+ packetSize = _maxSendPacketSize;
+ }
+
+ _write.buf.len = packetSize;
+ _write.buf.buf = reinterpret_cast<char*>(&*buf.i);
+ int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+ if(err == SOCKET_ERROR)
+ {
+ if(!wouldBlock())
+ {
+ if(connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ }
+ }
}
-SocketStatus
-IceInternal::TcpTransceiver::initialize()
+void
+IceInternal::TcpTransceiver::finishWrite(Buffer& buf)
{
- if(_state == StateNeedConnect)
+ if(_state < StateConnected)
{
- _state = StateConnectPending;
- return NeedConnect;
+ return;
}
- else if(_state <= StateConnectPending)
+
+ if(_write.count == SOCKET_ERROR)
{
- try
+ WSASetLastError(_write.error);
+ if(connectionLost())
{
- doFinishConnect(_fd);
- _state = StateConnected;
- _desc = fdToString(_fd);
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
- catch(const Ice::LocalException& ex)
+ else
{
- if(_traceLevels->network >= 2)
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_traceLevels->network >= 3)
+ {
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
+ {
+ packetSize = _maxSendPacketSize;
+ }
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "sent " << _write.count << " of " << packetSize << " bytes via tcp\n" << toString();
+ }
+
+ if(_stats)
+ {
+ _stats->bytesSent(type(), _write.count);
+ }
+
+ buf.i += _write.count;
+}
+
+void
+IceInternal::TcpTransceiver::startRead(Buffer& buf)
+{
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
+ {
+ packetSize = _maxReceivePacketSize;
+ }
+
+ assert(!buf.b.empty() && buf.i != buf.b.end());
+
+ _read.buf.len = packetSize;
+ _read.buf.buf = reinterpret_cast<char*>(&*buf.i);
+ int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+ if(err == SOCKET_ERROR)
+ {
+ if(!wouldBlock())
+ {
+ if(connectionLost())
{
- Trace out(_logger, _traceLevels->networkCat);
- out << "failed to establish tcp connection\n" << _desc << "\n" << ex;
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
- throw;
}
+ }
+}
- if(_traceLevels->network >= 1)
+void
+IceInternal::TcpTransceiver::finishRead(Buffer& buf)
+{
+ if(_read.count == SOCKET_ERROR)
+ {
+ WSASetLastError(_read.error);
+ if(connectionLost())
{
- Trace out(_logger, _traceLevels->networkCat);
- out << "tcp connection established\n" << _desc;
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
}
- assert(_state == StateConnected);
- return Finished;
+ else if(_read.count == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(_traceLevels->network >= 3)
+ {
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
+ {
+ packetSize = _maxReceivePacketSize;
+ }
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "received " << _read.count << " of " << packetSize << " bytes via tcp\n" << toString();
+ }
+
+ if(_stats)
+ {
+ _stats->bytesReceived(type(), static_cast<Int>(_read.count));
+ }
+
+ buf.i += _read.count;
+}
+#endif
+
+string
+IceInternal::TcpTransceiver::type() const
+{
+ return "tcp";
+}
+
+string
+IceInternal::TcpTransceiver::toString() const
+{
+ return _desc;
}
void
@@ -280,23 +463,36 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize
}
IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, bool connected) :
+ NativeInfo(fd),
_traceLevels(instance->traceLevels()),
_logger(instance->initializationData().logger),
_stats(instance->initializationData().stats),
- _fd(fd),
_state(connected ? StateConnected : StateNeedConnect),
- _desc(fdToString(_fd))
+ _desc(connected ? fdToString(_fd) : string())
+#ifdef ICE_USE_IOCP
+ , _read(SocketOperationRead),
+ _write(SocketOperationWrite)
+#endif
{
-#ifdef _WIN32
+ setBlock(_fd, false);
+ setTcpBufSize(_fd, instance->initializationData().properties, _logger);
+
+#ifdef ICE_USE_IOCP
//
// On Windows, limiting the buffer size is important to prevent
// poor throughput performances when transfering large amount of
// data. See Microsoft KB article KB823764.
//
- _maxPacketSize = IceInternal::getSendBufferSize(_fd) / 2;
- if(_maxPacketSize < 512)
+ _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2;
+ if(_maxSendPacketSize < 512)
+ {
+ _maxSendPacketSize = 0;
+ }
+
+ _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd);
+ if(_maxReceivePacketSize < 512)
{
- _maxPacketSize = 0;
+ _maxReceivePacketSize = 0;
}
#endif
}
@@ -305,3 +501,34 @@ IceInternal::TcpTransceiver::~TcpTransceiver()
{
assert(_fd == INVALID_SOCKET);
}
+
+void
+IceInternal::TcpTransceiver::connect(const struct sockaddr_storage& addr)
+{
+#ifndef ICE_USE_IOCP
+ try
+ {
+ if(doConnect(_fd, addr))
+ {
+ _state = StateConnected;
+ _desc = fdToString(_fd);
+ if(_traceLevels->network >= 1)
+ {
+ Trace out(_logger, _traceLevels->networkCat);
+ out << "tcp connection established\n" << _desc;
+ }
+ }
+ else
+ {
+ _desc = fdToString(_fd);
+ }
+ }
+ catch(...)
+ {
+ _fd = INVALID_SOCKET;
+ throw;
+ }
+#else
+ _connectAddr = addr;
+#endif
+}