diff options
Diffstat (limited to 'cpp/src/IceSSL/TransceiverI.cpp')
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.cpp | 519 |
1 files changed, 413 insertions, 106 deletions
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index fdb6fe229ae..ec862faf22b 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -32,7 +32,7 @@ IceSSL::TransceiverI::getNativeInfo() return this; } -#if defined(ICE_USE_IOCP) +#ifdef ICE_USE_IOCP IceInternal::AsyncInfo* IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) { @@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) #endif IceInternal::SocketOperation -IceSSL::TransceiverI::initialize() +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { try { @@ -61,14 +61,89 @@ IceSSL::TransceiverI::initialize() } else if(_state <= StateConnectPending) { -#ifndef ICE_USE_IOCP +#ifdef ICE_USE_IOCP + IceInternal::doFinishConnectAsync(_fd, _write); +#else IceInternal::doFinishConnect(_fd); +#endif + + _desc = IceInternal::fdToString(_fd, _proxy, _addr, true); + + if(_proxy) + { + // + // Prepare the read & write buffers in advance. + // + _proxy->beginWriteConnectRequest(_addr, writeBuffer); + _proxy->beginReadConnectRequestResponse(readBuffer); + +#ifdef ICE_USE_IOCP + // + // Return SocketOperationWrite to indicate we need to start a write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; #else - IceInternal::doFinishConnectAsync(_fd, _write); + // + // Write the proxy connection message using TCP. + // + if(writeRaw(writeBuffer)) + { + // + // Write completed without blocking. + // + _proxy->endWriteConnectRequest(writeBuffer); + + // + // Try to read the response using TCP. + // + if(readRaw(readBuffer)) + { + // + // Read completed without blocking - fall through. + // + _proxy->endReadConnectRequestResponse(readBuffer); + } + else + { + // + // Return SocketOperationRead to indicate we need to complete the read. + // + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + } + else + { + // + // Return SocketOperationWrite to indicate we need to complete the write. + // + _state = StateProxyConnectRequest; // Send proxy connect request + return IceInternal::SocketOperationWrite; + } #endif + } + _state = StateConnected; - _desc = IceInternal::fdToString(_fd); } + else if(_state == StateProxyConnectRequest) + { + // + // Write completed. + // + _proxy->endWriteConnectRequest(writeBuffer); + _state = StateProxyConnectRequestPending; // Wait for proxy response + return IceInternal::SocketOperationRead; + } + else if(_state == StateProxyConnectRequestPending) + { + // + // Read completed. + // + _proxy->endReadConnectRequestResponse(readBuffer); + _state = StateConnected; + } + assert(_state == StateConnected); if(!_ssl) @@ -96,16 +171,18 @@ IceSSL::TransceiverI::initialize() _sentBytes = 0; #endif -#ifndef ICE_USE_IOCP - // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type. - BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0); -#else +#ifdef ICE_USE_IOCP BIO* bio; if(!BIO_new_bio_pair(&bio, _maxSendPacketSize, &_iocpBio, _maxReceivePacketSize)) { bio = 0; _iocpBio = 0; } +#else + // + // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type. + // + BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0); #endif if(!bio) { @@ -255,17 +332,7 @@ IceSSL::TransceiverI::initialize() } else { -#ifndef _WIN32 - // - // The local address is only accessible with connected sockets on Windows. - // - IceInternal::Address localAddr; - IceInternal::fdToLocalAddress(_fd, localAddr); - out << "local address: " << IceInternal::addrToString(localAddr) << "\n"; -#else - out << "local address: <not available>\n"; -#endif - out << "remote address: " << IceInternal::addrToString(_connectAddr) << "\n" << ex; + out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex; } } throw; @@ -341,6 +408,14 @@ IceSSL::TransceiverI::close() bool IceSSL::TransceiverI::write(IceInternal::Buffer& buf) { + if(_state == StateProxyConnectRequest) + { + // + // We need to write the proxy message, but we have to use TCP and not SSL. + // + return writeRaw(buf); + } + #ifdef ICE_USE_IOCP if(_writeI != _writeBuffer.end()) { @@ -351,15 +426,15 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } #endif - // Its impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { ERR_clear_error(); // Clear any spurious errors. assert(_fd != INVALID_SOCKET); -#ifndef ICE_USE_IOCP - int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize); -#else +#ifdef ICE_USE_IOCP int ret; if(_sentBytes) { @@ -378,6 +453,8 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } } } +#else + int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize); #endif if(ret <= 0) { @@ -482,6 +559,14 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) bool IceSSL::TransceiverI::read(IceInternal::Buffer& buf) { + if(_state == StateProxyConnectRequestPending) + { + // + // We need to read the proxy reply, but we have to use TCP and not SSL. + // + return readRaw(buf); + } + #ifdef ICE_USE_IOCP if(_readI != _readBuffer.end()) { @@ -492,7 +577,9 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf) } #endif - // It's impossible for the packetSize to be more than an Int. + // + // It's impossible for packetSize to be more than an Int. + // int packetSize = static_cast<int>(buf.b.end() - buf.i); while(buf.i != buf.b.end()) { @@ -644,51 +731,40 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf) } #ifdef ICE_USE_IOCP + bool -IceSSL::TransceiverI::startWrite(IceInternal::Buffer& /*buf*/) +IceSSL::TransceiverI::startWrite(IceInternal::Buffer& buf) { - if(_state < StateConnected) + if(_state == StateConnectPending) { - IceInternal::doConnectAsync(_fd, _connectAddr, _write); + IceInternal::Address addr = _proxy ? _proxy->getAddress() : _addr; + IceInternal::doConnectAsync(_fd, addr, _write); return false; } + else if(_state == StateProxyConnectRequest) + { + // + // We need to write the proxy message, but we have to use TCP and not SSL. + // + assert(!buf.b.empty() && buf.i != buf.b.end()); + + const int packetSize = static_cast<int>(buf.b.end() - buf.i); + const int actualSize = writeAsync(reinterpret_cast<char*>(&*buf.i), packetSize); + return packetSize == actualSize; + } assert(!_writeBuffer.empty() && _writeI != _writeBuffer.end()); - int packetSize = static_cast<int>(_writeBuffer.end() - _writeI); - if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) - { - packetSize = _maxSendPacketSize; - } + const int packetSize = static_cast<int>(_writeBuffer.end() - _writeI); + const int actualSize = writeAsync(reinterpret_cast<char*>(&*_writeI), packetSize); - _write.buf.len = packetSize; - _write.buf.buf = reinterpret_cast<char*>(&*_writeI); - int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); - if(err == SOCKET_ERROR) - { - if(!IceInternal::wouldBlock()) - { - if(IceInternal::connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - } - } - return packetSize == static_cast<int>(_writeBuffer.end() - _writeI); + return packetSize == actualSize; } void -IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/) +IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& buf) { - if(_state < StateConnected) + if(_state < StateConnected && _state != StateProxyConnectRequest) { return; } @@ -710,12 +786,30 @@ IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/) } } - _writeI += _write.count; + if(_state == StateProxyConnectRequest) + { + buf.i += _write.count; + } + else + { + _writeI += _write.count; + } } void IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf) { + if(_state == StateProxyConnectRequestPending) + { + // + // We need to read the proxy reply, but we have to use TCP and not SSL. + // + assert(!buf.b.empty() && buf.i != buf.b.end()); + const int packetSize = static_cast<int>(buf.b.end() - buf.i); + readAsync(reinterpret_cast<char*>(&*buf.i), packetSize); + return; + } + if(_readI == _readBuffer.end()) { assert(!buf.b.empty() && buf.i != buf.b.end()); @@ -723,7 +817,7 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf) ERR_clear_error(); // Clear any spurious errors. #ifndef NDEBUG - int ret = + int ret = #endif SSL_read(_ssl, reinterpret_cast<void*>(&*buf.i), static_cast<int>(buf.b.end() - buf.i)); assert(ret <= 0 && SSL_get_error(_ssl, ret) == SSL_ERROR_WANT_READ); @@ -735,37 +829,12 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf) assert(!_readBuffer.empty() && _readI != _readBuffer.end()); - int packetSize = static_cast<int>(_readBuffer.end() - _readI); - if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) - { - packetSize = _maxReceivePacketSize; - } - - _read.buf.len = packetSize; - _read.buf.buf = reinterpret_cast<char*>(&*_readI); - int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); - if(err == SOCKET_ERROR) - { - if(!IceInternal::wouldBlock()) - { - if(IceInternal::connectionLost()) - { - ConnectionLostException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - else - { - SocketException ex(__FILE__, __LINE__); - ex.error = IceInternal::getSocketErrno(); - throw ex; - } - } - } + const int packetSize = static_cast<int>(_readBuffer.end() - _readI); + readAsync(reinterpret_cast<char*>(&*_readI), packetSize); } void -IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/) +IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf) { if(static_cast<int>(_read.count) == SOCKET_ERROR) { @@ -783,22 +852,36 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/) throw ex; } } + else if(_read.count == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } - _readI += _read.count; - - if(_iocpBio && _readI == _readBuffer.end()) + if(_state == StateProxyConnectRequestPending) { - assert(_readI == _readBuffer.end()); - int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size())); - if(n < 0) // Expected if the transceiver was closed. + buf.i += _read.count; + } + else + { + _readI += _read.count; + + if(_iocpBio && _readI == _readBuffer.end()) { - SecurityException ex(__FILE__, __LINE__); - ex.reason = "SSL bio write failed"; - throw ex; + assert(_readI == _readBuffer.end()); + int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size())); + if(n < 0) // Expected if the transceiver was closed. + { + SecurityException ex(__FILE__, __LINE__); + ex.reason = "SSL bio write failed"; + throw ex; + } + assert(n == static_cast<int>(_readBuffer.size())); } - assert(n == static_cast<int>(_readBuffer.size())); } } + #endif string @@ -828,15 +911,17 @@ IceSSL::TransceiverI::checkSendSize(const IceInternal::Buffer& buf, size_t messa } } -IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& host, - const IceInternal::Address& addr) : +IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const IceInternal::NetworkProxyPtr& proxy, + const string& host, const IceInternal::Address& addr) : IceInternal::NativeInfo(fd), _instance(instance), _logger(instance->communicator()->getLogger()), _stats(instance->communicator()->getStats()), - _ssl(0), + _proxy(proxy), _host(host), + _addr(addr), _incoming(false), + _ssl(0), _state(StateNeedConnect) #ifdef ICE_USE_IOCP , _iocpBio(0), @@ -863,7 +948,6 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const _desc = IceInternal::fdToString(_fd); } #endif - _connectAddr = addr; } IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& adapterName) : @@ -871,9 +955,9 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const _instance(instance), _logger(instance->communicator()->getLogger()), _stats(instance->communicator()->getStats()), - _ssl(0), - _incoming(true), _adapterName(adapterName), + _incoming(true), + _ssl(0), _state(StateConnected), _desc(IceInternal::fdToString(fd)) #ifdef ICE_USE_IOCP @@ -919,7 +1003,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const { X509_free(cert); } - + if(chain != 0) { for(int i = 0; i < sk_X509_num(chain); ++i) @@ -942,6 +1026,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const } #ifdef ICE_USE_IOCP + bool IceSSL::TransceiverI::receive() { @@ -1005,7 +1090,7 @@ IceSSL::TransceiverI::receive() assert(_readI == _readBuffer.end()); #ifndef NDEBUG - int n = + int n = #endif BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size())); @@ -1021,7 +1106,7 @@ IceSSL::TransceiverI::send() assert(BIO_ctrl_pending(_iocpBio)); _writeBuffer.resize(BIO_ctrl_pending(_iocpBio)); #ifndef NDEBUG - int n = + int n = #endif BIO_read(_iocpBio, &_writeBuffer[0], static_cast<int>(_writeBuffer.size())); assert(n == static_cast<int>(_writeBuffer.size())); @@ -1085,4 +1170,226 @@ IceSSL::TransceiverI::send() return true; } +int +IceSSL::TransceiverI::writeAsync(char* buf, int packetSize) +{ + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) + { + packetSize = _maxSendPacketSize; + } + + _write.buf.len = packetSize; + _write.buf.buf = buf; + + int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL); + + if(err == SOCKET_ERROR) + { + if(!IceInternal::wouldBlock()) + { + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + } + + return packetSize; +} + +int +IceSSL::TransceiverI::readAsync(char* buf, int packetSize) +{ + if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize) + { + packetSize = _maxReceivePacketSize; + } + + _read.buf.len = packetSize; + _read.buf.buf = buf; + + int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL); + + if(err == SOCKET_ERROR) + { + if(!IceInternal::wouldBlock()) + { + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + } + + return packetSize; +} + +#endif + +bool +IceSSL::TransceiverI::writeRaw(IceInternal::Buffer& buf) +{ + // + // It's impossible for packetSize to be more than an Int. + // + int packetSize = static_cast<int>(buf.b.end() - buf.i); +#ifdef ICE_USE_IOCP + // + // Limit packet size to avoid performance problems on WIN32 + // + if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize) + { + packetSize = _maxSendPacketSize; + } #endif + while(buf.i != buf.b.end()) + { + assert(_fd != INVALID_SOCKET); + + ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0); + if(ret == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + + if(ret == SOCKET_ERROR) + { + if(IceInternal::interrupted()) + { + continue; + } + + if(IceInternal::noBuffers() && packetSize > 1024) + { + packetSize /= 2; + continue; + } + + if(IceInternal::wouldBlock()) + { + return false; + } + + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + + if(_instance->networkTraceLevel() >= 3) + { + Trace out(_logger, _instance->networkTraceCategory()); + out << "sent " << ret << " of " << packetSize << " bytes via tcp\n" << toString(); + } + + if(_stats) + { + _stats->bytesSent("tcp", static_cast<Int>(ret)); + } + + buf.i += ret; + + if(packetSize > buf.b.end() - buf.i) + { + packetSize = static_cast<int>(buf.b.end() - buf.i); + } + } + + return true; +} + +bool +IceSSL::TransceiverI::readRaw(IceInternal::Buffer& buf) +{ + // + // It's impossible for packetSize to be more than an Int. + // + int packetSize = static_cast<int>(buf.b.end() - buf.i); + while(buf.i != buf.b.end()) + { + assert(_fd != INVALID_SOCKET); + ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0); + + if(ret == 0) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = 0; + throw ex; + } + + if(ret == SOCKET_ERROR) + { + if(IceInternal::interrupted()) + { + continue; + } + + if(IceInternal::noBuffers() && packetSize > 1024) + { + packetSize /= 2; + continue; + } + + if(IceInternal::wouldBlock()) + { + return false; + } + + if(IceInternal::connectionLost()) + { + ConnectionLostException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + else + { + SocketException ex(__FILE__, __LINE__); + ex.error = IceInternal::getSocketErrno(); + throw ex; + } + } + + if(_instance->networkTraceLevel() >= 3) + { + Trace out(_logger, _instance->networkTraceCategory()); + out << "received " << ret << " of " << packetSize << " bytes via tcp\n" << toString(); + } + + if(_stats) + { + _stats->bytesReceived("tcp", static_cast<Int>(ret)); + } + + buf.i += ret; + + packetSize = static_cast<int>(buf.b.end() - buf.i); + } + + return true; +} |