diff options
Diffstat (limited to 'cpp/src/Ice/TcpTransceiver.cpp')
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 219 |
1 files changed, 165 insertions, 54 deletions
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index ee49f1f7e74..c577367c5fb 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -46,59 +46,126 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) #endif SocketOperation -IceInternal::TcpTransceiver::initialize() +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return SocketOperationConnect; - } - else if(_state <= StateConnectPending) + try { - try + if(_state == StateNeedConnect) { -#if defined(ICE_USE_IOCP) + _state = StateConnectPending; + return SocketOperationConnect; + } + else if(_state <= StateConnectPending) + { +#ifdef ICE_USE_IOCP doFinishConnectAsync(_fd, _write); #else doFinishConnect(_fd); #endif - _state = StateConnected; - _desc = fdToString(_fd); - } - catch(const Ice::LocalException& ex) - { - if(_traceLevels->network >= 2) + + _desc = fdToString(_fd, _proxy, _addr, true); + + if(_proxy) { - Trace out(_logger, _traceLevels->networkCat); - out << "failed to establish tcp connection\n"; -#if !defined(_WIN32) // - // The local address is only accessible with connected sockets on Windows. + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); + +#ifdef ICE_USE_IOCP + // + // Return SocketOperationWrite to indicate we need to start a write. // - Address localAddr; - fdToLocalAddress(_fd, localAddr); - out << "local address: " << addrToString(localAddr) << "\n"; + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; #else - out << "local address: <not available>\n"; + // + // Write the proxy connection message. + // + if(write(writeBuffer)) + { + // + // Write completed without blocking. + // + _proxy->endWriteConnectRequest(writeBuffer); + + // + // Try to read the response. + // + if(read(readBuffer)) + { + // + // Read completed without blocking - fall through. + // + _proxy->endReadConnectRequestResponse(readBuffer); + } + else + { + // + // Return SocketOperationRead to indicate we need to complete the read. + // + _state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperationRead; + } + } + else + { + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return SocketOperationWrite; + } #endif - out << "remote address: " << addrToString(_connectAddr) << "\n" << ex; } - throw; - } - if(_traceLevels->network >= 1) + _state = StateConnected; + } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + } + catch(const Ice::LocalException& ex) + { + if(_traceLevels->network >= 2) { Trace out(_logger, _traceLevels->networkCat); - out << "tcp connection established\n" << _desc; + out << "failed to establish tcp connection\n" << fdToString(_fd, _proxy, _addr, false) << "\n" << ex; } + throw; } + assert(_state == StateConnected); + if(_traceLevels->network >= 1) + { + Trace out(_logger, _traceLevels->networkCat); + out << "tcp connection established\n" << _desc; + } return SocketOperationNone; } void IceInternal::TcpTransceiver::close() { + // + // If the transceiver is not connected, its description is simply "not connected", + // which isn't very helpful. + // if(_state == StateConnected && _traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); @@ -121,17 +188,19 @@ IceInternal::TcpTransceiver::close() bool IceInternal::TcpTransceiver::write(Buffer& buf) { - // Its impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); -# ifdef ICE_USE_IOCP +#ifdef ICE_USE_IOCP // // Limit packet size to avoid performance problems on WIN32 // if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } -# endif +#endif while(buf.i != buf.b.end()) { assert(_fd != INVALID_SOCKET); @@ -161,7 +230,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf) { return false; } - + if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); @@ -194,13 +263,16 @@ IceInternal::TcpTransceiver::write(Buffer& buf) packetSize = static_cast<int>(buf.b.end() - buf.i); } } + return true; } bool IceInternal::TcpTransceiver::read(Buffer& buf) { - // Its impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { @@ -220,7 +292,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf) { continue; } - + if(noBuffers() && packetSize > 1024) { packetSize /= 2; @@ -231,7 +303,7 @@ IceInternal::TcpTransceiver::read(Buffer& buf) { return false; } - + if(connectionLost()) { ConnectionLostException ex(__FILE__, __LINE__); @@ -264,13 +336,14 @@ IceInternal::TcpTransceiver::read(Buffer& buf) return true; } -#if defined(ICE_USE_IOCP) +#ifdef ICE_USE_IOCP bool IceInternal::TcpTransceiver::startWrite(Buffer& buf) { - if(_state < StateConnected) + if(_state == StateConnectPending) { - doConnectAsync(_fd, _connectAddr, _write); + Address addr = _proxy ? _proxy->getAddress() : _addr; + doConnectAsync(_fd, addr, _write); return false; } @@ -279,7 +352,7 @@ IceInternal::TcpTransceiver::startWrite(Buffer& buf) int packetSize = static_cast<int>(buf.b.end() - buf.i); if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } assert(packetSize > 0); @@ -310,7 +383,7 @@ IceInternal::TcpTransceiver::startWrite(Buffer& buf) void IceInternal::TcpTransceiver::finishWrite(Buffer& buf) { - if(_state < StateConnected) + if(_state < StateConnected && _state != StateProxyConnectRequest) { return; } @@ -336,18 +409,19 @@ IceInternal::TcpTransceiver::finishWrite(Buffer& buf) { int packetSize = static_cast<int>(buf.b.end() - buf.i); if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { + { packetSize = _maxSendPacketSize; } Trace out(_logger, _traceLevels->networkCat); out << "sent " << _write.count << " of " << packetSize << " bytes via tcp\n" << toString(); } - + if(_stats) { _stats->bytesSent(type(), _write.count); } + buf.i += _write.count; } @@ -409,7 +483,7 @@ IceInternal::TcpTransceiver::finishRead(Buffer& buf) ex.error = 0; throw ex; } - + if(_traceLevels->network >= 3) { int packetSize = static_cast<int>(buf.b.end() - buf.i); @@ -442,7 +516,7 @@ IceInternal::TcpTransceiver::toString() const return _desc; } -Ice::ConnectionInfoPtr +Ice::ConnectionInfoPtr IceInternal::TcpTransceiver::getInfo() const { Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo(); @@ -459,15 +533,53 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize } } -IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, bool connected) : +IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, const NetworkProxyPtr& proxy, + const Address& addr) : + NativeInfo(fd), + _proxy(proxy), + _addr(addr), + _traceLevels(instance->traceLevels()), + _logger(instance->initializationData().logger), + _stats(instance->initializationData().stats), + _state(StateNeedConnect) +#ifdef ICE_USE_IOCP + , _read(SocketOperationRead), + _write(SocketOperationWrite) +#endif +{ + setBlock(_fd, false); + + setTcpBufSize(_fd, instance->initializationData().properties, _logger); + +#ifdef ICE_USE_IOCP + // + // On Windows, limiting the buffer size is important to prevent + // poor throughput performances when transfering large amount of + // data. See Microsoft KB article KB823764. + // + _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2; + if(_maxSendPacketSize < 512) + { + _maxSendPacketSize = 0; + } + + _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd); + if(_maxReceivePacketSize < 512) + { + _maxReceivePacketSize = 0; + } +#endif +} + +IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd) : NativeInfo(fd), _traceLevels(instance->traceLevels()), _logger(instance->initializationData().logger), _stats(instance->initializationData().stats), - _state(connected ? StateConnected : StateNeedConnect), - _desc(connected ? fdToString(_fd) : string()) + _state(StateConnected), + _desc(fdToString(_fd)) #ifdef ICE_USE_IOCP - , _read(SocketOperationRead), + , _read(SocketOperationRead), _write(SocketOperationWrite) #endif { @@ -501,15 +613,15 @@ IceInternal::TcpTransceiver::~TcpTransceiver() } void -IceInternal::TcpTransceiver::connect(const Address& addr) +IceInternal::TcpTransceiver::connect() { #if !defined(ICE_USE_IOCP) try { - if(doConnect(_fd, addr)) + if(doConnect(_fd, _addr)) { _state = StateConnected; - _desc = fdToString(_fd); + _desc = fdToString(_fd, _proxy, _addr); if(_traceLevels->network >= 1) { Trace out(_logger, _traceLevels->networkCat); @@ -518,7 +630,7 @@ IceInternal::TcpTransceiver::connect(const Address& addr) } else { - _desc = fdToString(_fd); + _desc = fdToString(_fd, _proxy, _addr); } } catch(...) @@ -527,5 +639,4 @@ IceInternal::TcpTransceiver::connect(const Address& addr) throw; } #endif - _connectAddr = addr; } |