diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-08-21 15:55:01 +0200 |
commit | b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a (patch) | |
tree | 183215e2dbeadfbc871b800ce09726e58af38b91 /cpp/src/Ice/TcpTransceiver.cpp | |
parent | adding compression cookbook demo (diff) | |
download | ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.bz2 ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.tar.xz ice-b9f2fa14fb3f222a6ec5e0a93bf25fe5ad12b56a.zip |
IOCP changes, bug 3501, 4200, 4156, 3101
Diffstat (limited to 'cpp/src/Ice/TcpTransceiver.cpp')
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 321 |
1 files changed, 274 insertions, 47 deletions
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index 8089d5e4a9b..88efe8c0dc4 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -20,17 +20,73 @@ using namespace std; using namespace Ice; using namespace IceInternal; -SOCKET -IceInternal::TcpTransceiver::fd() +NativeInfoPtr +IceInternal::TcpTransceiver::getNativeInfo() { - assert(_fd != INVALID_SOCKET); - return _fd; + return this; +} + +#ifdef ICE_USE_IOCP +AsyncInfo* +IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) +{ + switch(status) + { + case SocketOperationRead: + return &_read; + case SocketOperationWrite: + return &_write; + default: + assert(false); + return 0; + } +} +#endif + +SocketOperation +IceInternal::TcpTransceiver::initialize() +{ + if(_state == StateNeedConnect) + { + _state = StateConnectPending; + return SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { + try + { +#ifndef ICE_USE_IOCP + doFinishConnect(_fd); +#else + doFinishConnectAsync(_fd, _write); +#endif + _state = StateConnected; + _desc = fdToString(_fd); + } + catch(const Ice::LocalException& ex) + { + if(_traceLevels->network >= 2) + { + Trace out(_logger, _traceLevels->networkCat); + out << "failed to establish tcp connection\n" << _desc << "\n" << ex; + } + throw; + } + + if(_traceLevels->network >= 1) + { + Trace out(_logger, _traceLevels->networkCat); + out << "tcp connection established\n" << _desc; + } + } + assert(_state == StateConnected); + return SocketOperationNone; } void IceInternal::TcpTransceiver::close() { - if(_traceLevels->network >= 1) + if(_state == StateConnected && _traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); out << "closing tcp connection\n" << toString(); @@ -55,21 +111,21 @@ IceInternal::TcpTransceiver::write(Buffer& buf) // Its impossible for the packetSize to be more than an Int. int packetSize = static_cast<int>(buf.b.end() - buf.i); -#ifdef _WIN32 +#ifdef ICE_USE_IOCP // // Limit packet size to avoid performance problems on WIN32 // - if(packetSize > _maxPacketSize) + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) { - packetSize = _maxPacketSize; + packetSize = _maxSendPacketSize; } #endif while(buf.i != buf.b.end()) { assert(_fd != INVALID_SOCKET); - ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); + ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); if(ret == 0) { ConnectionLostException ex(__FILE__, __LINE__); @@ -213,61 +269,188 @@ IceInternal::TcpTransceiver::read(Buffer& buf) buf.i += ret; - if(packetSize > buf.b.end() - buf.i) - { - packetSize = static_cast<int>(buf.b.end() - buf.i); - } + packetSize = static_cast<int>(buf.b.end() - buf.i); } return true; } -string -IceInternal::TcpTransceiver::type() const +#ifdef ICE_USE_IOCP +void +IceInternal::TcpTransceiver::startWrite(Buffer& buf) { - return "tcp"; -} + if(_state < StateConnected) + { + doConnectAsync(_fd, _connectAddr, _write); + _desc = fdToString(_fd); + return; + } -string -IceInternal::TcpTransceiver::toString() const -{ - return _desc; + assert(!buf.b.empty() && buf.i != buf.b.end()); + + int packetSize = static_cast<int>(buf.b.end() - buf.i); + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) + { + packetSize = _maxSendPacketSize; + } + + _write.buf.len = packetSize; + _write.buf.buf = reinterpret_cast<char*>(&*buf.i); + int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); + if(err == SOCKET_ERROR) + { + if(!wouldBlock()) + { + if(connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + } + } } -SocketStatus -IceInternal::TcpTransceiver::initialize() +void +IceInternal::TcpTransceiver::finishWrite(Buffer& buf) { - if(_state == StateNeedConnect) + if(_state < StateConnected) { - _state = StateConnectPending; - return NeedConnect; + return; } - else if(_state <= StateConnectPending) + + if(_write.count == SOCKET_ERROR) { - try + WSASetLastError(_write.error); + if(connectionLost()) { - doFinishConnect(_fd); - _state = StateConnected; - _desc = fdToString(_fd); + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } - catch(const Ice::LocalException& ex) + else { - if(_traceLevels->network >= 2) + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + } + + if(_traceLevels->network >= 3) + { + int packetSize = static_cast<int>(buf.b.end() - buf.i); + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) + { + packetSize = _maxSendPacketSize; + } + Trace out(_logger, _traceLevels->networkCat); + out << "sent " << _write.count << " of " << packetSize << " bytes via tcp\n" << toString(); + } + + if(_stats) + { + _stats->bytesSent(type(), _write.count); + } + + buf.i += _write.count; +} + +void +IceInternal::TcpTransceiver::startRead(Buffer& buf) +{ + int packetSize = static_cast<int>(buf.b.end() - buf.i); + if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) + { + packetSize = _maxReceivePacketSize; + } + + assert(!buf.b.empty() && buf.i != buf.b.end()); + + _read.buf.len = packetSize; + _read.buf.buf = reinterpret_cast<char*>(&*buf.i); + int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); + if(err == SOCKET_ERROR) + { + if(!wouldBlock()) + { + if(connectionLost()) { - Trace out(_logger, _traceLevels->networkCat); - out << "failed to establish tcp connection\n" << _desc << "\n" << ex; + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } - throw; } + } +} - if(_traceLevels->network >= 1) +void +IceInternal::TcpTransceiver::finishRead(Buffer& buf) +{ + if(_read.count == SOCKET_ERROR) + { + WSASetLastError(_read.error); + if(connectionLost()) { - Trace out(_logger, _traceLevels->networkCat); - out << "tcp connection established\n" << _desc; + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = getSocketErrno(); + throw ex; } } - assert(_state == StateConnected); - return Finished; + else if(_read.count == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + + if(_traceLevels->network >= 3) + { + int packetSize = static_cast<int>(buf.b.end() - buf.i); + if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) + { + packetSize = _maxReceivePacketSize; + } + Trace out(_logger, _traceLevels->networkCat); + out << "received " << _read.count << " of " << packetSize << " bytes via tcp\n" << toString(); + } + + if(_stats) + { + _stats->bytesReceived(type(), static_cast<Int>(_read.count)); + } + + buf.i += _read.count; +} +#endif + +string +IceInternal::TcpTransceiver::type() const +{ + return "tcp"; +} + +string +IceInternal::TcpTransceiver::toString() const +{ + return _desc; } void @@ -280,23 +463,36 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize } IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, bool connected) : + NativeInfo(fd), _traceLevels(instance->traceLevels()), _logger(instance->initializationData().logger), _stats(instance->initializationData().stats), - _fd(fd), _state(connected ? StateConnected : StateNeedConnect), - _desc(fdToString(_fd)) + _desc(connected ? fdToString(_fd) : string()) +#ifdef ICE_USE_IOCP + , _read(SocketOperationRead), + _write(SocketOperationWrite) +#endif { -#ifdef _WIN32 + setBlock(_fd, false); + setTcpBufSize(_fd, instance->initializationData().properties, _logger); + +#ifdef ICE_USE_IOCP // // On Windows, limiting the buffer size is important to prevent // poor throughput performances when transfering large amount of // data. See Microsoft KB article KB823764. // - _maxPacketSize = IceInternal::getSendBufferSize(_fd) / 2; - if(_maxPacketSize < 512) + _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2; + if(_maxSendPacketSize < 512) + { + _maxSendPacketSize = 0; + } + + _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd); + if(_maxReceivePacketSize < 512) { - _maxPacketSize = 0; + _maxReceivePacketSize = 0; } #endif } @@ -305,3 +501,34 @@ IceInternal::TcpTransceiver::~TcpTransceiver() { assert(_fd == INVALID_SOCKET); } + +void +IceInternal::TcpTransceiver::connect(const struct sockaddr_storage& addr) +{ +#ifndef ICE_USE_IOCP + try + { + if(doConnect(_fd, addr)) + { + _state = StateConnected; + _desc = fdToString(_fd); + if(_traceLevels->network >= 1) + { + Trace out(_logger, _traceLevels->networkCat); + out << "tcp connection established\n" << _desc; + } + } + else + { + _desc = fdToString(_fd); + } + } + catch(...) + { + _fd = INVALID_SOCKET; + throw; + } +#else + _connectAddr = addr; +#endif +} |