diff options
Diffstat (limited to 'cpp/src/Ice/TcpTransceiver.cpp')
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 475 |
1 files changed, 15 insertions, 460 deletions
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index 76440f5f935..9f7a04c9496 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -23,121 +23,13 @@ using namespace IceInternal; NativeInfoPtr IceInternal::TcpTransceiver::getNativeInfo() { - return this; + return _stream; } -#ifdef ICE_USE_IOCP -AsyncInfo* -IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) -{ - switch(status) - { - case SocketOperationRead: - return &_read; - case SocketOperationWrite: - return &_write; - default: - assert(false); - return 0; - } -} -#endif - SocketOperation -IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool&) { - if(_state == StateNeedConnect) - { - _state = StateConnectPending; - return SocketOperationConnect; - } - else if(_state <= StateConnectPending) - { -#ifdef ICE_USE_IOCP - doFinishConnectAsync(_fd, _write); -#else - doFinishConnect(_fd); -#endif - - _desc = fdToString(_fd, _proxy, _addr, true); - - if(_proxy) - { - // - // Prepare the read & write buffers in advance. - // - _proxy->beginWriteConnectRequest(_addr, writeBuffer); - _proxy->beginReadConnectRequestResponse(readBuffer); - -#ifdef ICE_USE_IOCP - // - // Return SocketOperationWrite to indicate we need to start a write. - // - _state = StateProxyConnectRequest; // Send proxy connect request - return IceInternal::SocketOperationWrite; -#else - // - // Write the proxy connection message. - // - if(write(writeBuffer)) - { - // - // Write completed without blocking. - // - _proxy->endWriteConnectRequest(writeBuffer); - - // - // Try to read the response. - // - if(read(readBuffer, hasMoreData)) - { - // - // Read completed without blocking - fall through. - // - _proxy->endReadConnectRequestResponse(readBuffer); - } - else - { - // - // Return SocketOperationRead to indicate we need to complete the read. - // - _state = StateProxyConnectRequestPending; // Wait for proxy response - return SocketOperationRead; - } - } - else - { - // - // Return SocketOperationWrite to indicate we need to complete the write. - // - _state = StateProxyConnectRequest; // Send proxy connect request - return SocketOperationWrite; - } -#endif - } - - _state = StateConnected; - } - else if(_state == StateProxyConnectRequest) - { - // - // Write completed. - // - _proxy->endWriteConnectRequest(writeBuffer); - _state = StateProxyConnectRequestPending; // Wait for proxy response - return SocketOperationRead; - } - else if(_state == StateProxyConnectRequestPending) - { - // - // Read completed. - // - _proxy->endReadConnectRequestResponse(readBuffer); - _state = StateConnected; - } - - assert(_state == StateConnected); - return SocketOperationNone; + return _stream->connect(readBuffer, writeBuffer); } SocketOperation @@ -151,290 +43,44 @@ IceInternal::TcpTransceiver::closing(bool initiator, const Ice::LocalException&) void IceInternal::TcpTransceiver::close() { - assert(_fd != INVALID_SOCKET); - try - { - closeSocket(_fd); - _fd = INVALID_SOCKET; - } - catch(const SocketException&) - { - _fd = INVALID_SOCKET; - throw; - } + _stream->close(); } SocketOperation IceInternal::TcpTransceiver::write(Buffer& buf) { - if(buf.i == buf.b.end()) - { - return SocketOperationNone; - } - - // - // It's impossible for packetSize to be more than an Int. - // - int packetSize = static_cast<int>(buf.b.end() - buf.i); -#ifdef ICE_USE_IOCP - // - // Limit packet size to avoid performance problems on WIN32 - // - if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { - packetSize = _maxSendPacketSize; - } -#endif - while(buf.i != buf.b.end()) - { - assert(_fd != INVALID_SOCKET); - - ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); - if(ret == 0) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; - } - - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - if(noBuffers() && packetSize > 1024) - { - packetSize /= 2; - continue; - } - - if(wouldBlock()) - { - return SocketOperationWrite; - } - - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - } - - buf.i += ret; - - if(packetSize > buf.b.end() - buf.i) - { - packetSize = static_cast<int>(buf.b.end() - buf.i); - } - } - - return SocketOperationNone; + return _stream->write(buf); } SocketOperation IceInternal::TcpTransceiver::read(Buffer& buf, bool&) { - if(buf.i == buf.b.end()) - { - return SocketOperationNone; - } - - // - // It's impossible for packetSize to be more than an Int. - // - int packetSize = static_cast<int>(buf.b.end() - buf.i); - while(buf.i != buf.b.end()) - { - assert(_fd != INVALID_SOCKET); - ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); - - if(ret == 0) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; - } - - if(ret == SOCKET_ERROR) - { - if(interrupted()) - { - continue; - } - - if(noBuffers() && packetSize > 1024) - { - packetSize /= 2; - continue; - } - - if(wouldBlock()) - { - return SocketOperationRead; - } - - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - } - - buf.i += ret; - - packetSize = static_cast<int>(buf.b.end() - buf.i); - } - return SocketOperationNone; + return _stream->read(buf); } #ifdef ICE_USE_IOCP bool IceInternal::TcpTransceiver::startWrite(Buffer& buf) { - if(_state == StateConnectPending) - { - Address addr = _proxy ? _proxy->getAddress() : _addr; - doConnectAsync(_fd, addr, _sourceAddr, _write); - return false; - } - - assert(!buf.b.empty()); - assert(buf.i != buf.b.end()); - - int packetSize = static_cast<int>(buf.b.end() - buf.i); - if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { - packetSize = _maxSendPacketSize; - } - assert(packetSize > 0); - _write.buf.len = packetSize; - _write.buf.buf = reinterpret_cast<char*>(&*buf.i); - int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); - if(err == SOCKET_ERROR) - { - if(!wouldBlock()) - { - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - } - } - return packetSize == static_cast<int>(buf.b.end() - buf.i); + return _stream->startWrite(buf); } void IceInternal::TcpTransceiver::finishWrite(Buffer& buf) { - if(_state < StateConnected && _state != StateProxyConnectRequest) - { - return; - } - - if(static_cast<int>(_write.count) == SOCKET_ERROR) - { - WSASetLastError(_write.error); - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - } - - buf.i += _write.count; + _stream->finishWrite(buf); } void IceInternal::TcpTransceiver::startRead(Buffer& buf) { - int packetSize = static_cast<int>(buf.b.end() - buf.i); - if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) - { - packetSize = _maxReceivePacketSize; - } - assert(!buf.b.empty() && buf.i != buf.b.end()); - - _read.buf.len = packetSize; - _read.buf.buf = reinterpret_cast<char*>(&*buf.i); - int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); - if(err == SOCKET_ERROR) - { - if(!wouldBlock()) - { - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - } - } + _stream->startRead(buf); } void IceInternal::TcpTransceiver::finishRead(Buffer& buf, bool&) { - if(static_cast<int>(_read.count) == SOCKET_ERROR) - { - WSASetLastError(_read.error); - if(connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = getSocketErrno(); - throw ex; - } - } - else if(_read.count == 0) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = 0; - throw ex; - } - - buf.i += _read.count; + _stream->finishRead(buf); } #endif @@ -447,7 +93,7 @@ IceInternal::TcpTransceiver::protocol() const string IceInternal::TcpTransceiver::toString() const { - return _desc; + return _stream->toString(); } string @@ -460,7 +106,7 @@ Ice::ConnectionInfoPtr IceInternal::TcpTransceiver::getInfo() const { Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo(); - fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort); + fdToAddressAndPort(_stream->fd(), info->localAddress, info->localPort, info->remoteAddress, info->remotePort); return info; } @@ -473,104 +119,13 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize } } -IceInternal::TcpTransceiver::TcpTransceiver(const ProtocolInstancePtr& instance, SOCKET fd, - const NetworkProxyPtr& proxy, const Address& addr, - const Address& sourceAddr) : - NativeInfo(fd), - _instance(instance), - _proxy(proxy), - _addr(addr), - _sourceAddr(sourceAddr), - _state(StateNeedConnect) -#ifdef ICE_USE_IOCP - , _read(SocketOperationRead), - _write(SocketOperationWrite) -#endif +IceInternal::TcpTransceiver::TcpTransceiver(const ProtocolInstancePtr& instance, const StreamSocketPtr& stream) : + _instance(instance), + _stream(stream) { - setBlock(_fd, false); - - setTcpBufSize(_fd, _instance->properties(), _instance->logger()); - -#ifdef ICE_USE_IOCP - // - // On Windows, limiting the buffer size is important to prevent - // poor throughput performances when transfering large amount of - // data. See Microsoft KB article KB823764. - // - _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2; - if(_maxSendPacketSize < 512) - { - _maxSendPacketSize = 0; - } - - _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd); - if(_maxReceivePacketSize < 512) - { - _maxReceivePacketSize = 0; - } -#endif -} - -IceInternal::TcpTransceiver::TcpTransceiver(const ProtocolInstancePtr& instance, SOCKET fd) : - NativeInfo(fd), - _instance(instance), - _state(StateConnected), - _desc(fdToString(_fd)) -#ifdef ICE_USE_IOCP - , _read(SocketOperationRead), - _write(SocketOperationWrite) -#endif -{ - setBlock(_fd, false); - - setTcpBufSize(_fd, _instance->properties(), _instance->logger()); - -#ifdef ICE_USE_IOCP - // - // On Windows, limiting the buffer size is important to prevent - // poor throughput performances when transfering large amount of - // data. See Microsoft KB article KB823764. - // - _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2; - if(_maxSendPacketSize < 512) - { - _maxSendPacketSize = 0; - } - - _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd); - if(_maxReceivePacketSize < 512) - { - _maxReceivePacketSize = 0; - } -#endif } IceInternal::TcpTransceiver::~TcpTransceiver() { - assert(_fd == INVALID_SOCKET); } -void -IceInternal::TcpTransceiver::connect() -{ -#ifndef ICE_USE_IOCP - try - { - Address addr = _proxy ? _proxy->getAddress() : _addr; - if(doConnect(_fd, addr, _sourceAddr)) - { - _state = StateConnected; - _desc = fdToString(_fd, _proxy, _addr, true); - } - else - { - _desc = fdToString(_fd, _proxy, _addr, true); - } - } - catch(...) - { - _fd = INVALID_SOCKET; - throw; - } -#endif -} |