summaryrefslogtreecommitdiff
path: root/cpp/src/IceSSL/TransceiverI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceSSL/TransceiverI.cpp')
-rw-r--r--cpp/src/IceSSL/TransceiverI.cpp519
1 files changed, 413 insertions, 106 deletions
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp
index fdb6fe229ae..ec862faf22b 100644
--- a/cpp/src/IceSSL/TransceiverI.cpp
+++ b/cpp/src/IceSSL/TransceiverI.cpp
@@ -32,7 +32,7 @@ IceSSL::TransceiverI::getNativeInfo()
return this;
}
-#if defined(ICE_USE_IOCP)
+#ifdef ICE_USE_IOCP
IceInternal::AsyncInfo*
IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status)
{
@@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status)
#endif
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize()
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
{
try
{
@@ -61,14 +61,89 @@ IceSSL::TransceiverI::initialize()
}
else if(_state <= StateConnectPending)
{
-#ifndef ICE_USE_IOCP
+#ifdef ICE_USE_IOCP
+ IceInternal::doFinishConnectAsync(_fd, _write);
+#else
IceInternal::doFinishConnect(_fd);
+#endif
+
+ _desc = IceInternal::fdToString(_fd, _proxy, _addr, true);
+
+ if(_proxy)
+ {
+ //
+ // Prepare the read & write buffers in advance.
+ //
+ _proxy->beginWriteConnectRequest(_addr, writeBuffer);
+ _proxy->beginReadConnectRequestResponse(readBuffer);
+
+#ifdef ICE_USE_IOCP
+ //
+ // Return SocketOperationWrite to indicate we need to start a write.
+ //
+ _state = StateProxyConnectRequest; // Send proxy connect request
+ return IceInternal::SocketOperationWrite;
#else
- IceInternal::doFinishConnectAsync(_fd, _write);
+ //
+ // Write the proxy connection message using TCP.
+ //
+ if(writeRaw(writeBuffer))
+ {
+ //
+ // Write completed without blocking.
+ //
+ _proxy->endWriteConnectRequest(writeBuffer);
+
+ //
+ // Try to read the response using TCP.
+ //
+ if(readRaw(readBuffer))
+ {
+ //
+ // Read completed without blocking - fall through.
+ //
+ _proxy->endReadConnectRequestResponse(readBuffer);
+ }
+ else
+ {
+ //
+ // Return SocketOperationRead to indicate we need to complete the read.
+ //
+ _state = StateProxyConnectRequestPending; // Wait for proxy response
+ return IceInternal::SocketOperationRead;
+ }
+ }
+ else
+ {
+ //
+ // Return SocketOperationWrite to indicate we need to complete the write.
+ //
+ _state = StateProxyConnectRequest; // Send proxy connect request
+ return IceInternal::SocketOperationWrite;
+ }
#endif
+ }
+
_state = StateConnected;
- _desc = IceInternal::fdToString(_fd);
}
+ else if(_state == StateProxyConnectRequest)
+ {
+ //
+ // Write completed.
+ //
+ _proxy->endWriteConnectRequest(writeBuffer);
+ _state = StateProxyConnectRequestPending; // Wait for proxy response
+ return IceInternal::SocketOperationRead;
+ }
+ else if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // Read completed.
+ //
+ _proxy->endReadConnectRequestResponse(readBuffer);
+ _state = StateConnected;
+ }
+
assert(_state == StateConnected);
if(!_ssl)
@@ -96,16 +171,18 @@ IceSSL::TransceiverI::initialize()
_sentBytes = 0;
#endif
-#ifndef ICE_USE_IOCP
- // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type.
- BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0);
-#else
+#ifdef ICE_USE_IOCP
BIO* bio;
if(!BIO_new_bio_pair(&bio, _maxSendPacketSize, &_iocpBio, _maxReceivePacketSize))
{
bio = 0;
_iocpBio = 0;
}
+#else
+ //
+ // This static_cast is necessary due to 64bit windows. There SOCKET is a non-int type.
+ //
+ BIO* bio = BIO_new_socket(static_cast<int>(_fd), 0);
#endif
if(!bio)
{
@@ -255,17 +332,7 @@ IceSSL::TransceiverI::initialize()
}
else
{
-#ifndef _WIN32
- //
- // The local address is only accessible with connected sockets on Windows.
- //
- IceInternal::Address localAddr;
- IceInternal::fdToLocalAddress(_fd, localAddr);
- out << "local address: " << IceInternal::addrToString(localAddr) << "\n";
-#else
- out << "local address: <not available>\n";
-#endif
- out << "remote address: " << IceInternal::addrToString(_connectAddr) << "\n" << ex;
+ out << IceInternal::fdToString(_fd, _proxy, _addr, false) << "\n" << ex;
}
}
throw;
@@ -341,6 +408,14 @@ IceSSL::TransceiverI::close()
bool
IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
{
+ if(_state == StateProxyConnectRequest)
+ {
+ //
+ // We need to write the proxy message, but we have to use TCP and not SSL.
+ //
+ return writeRaw(buf);
+ }
+
#ifdef ICE_USE_IOCP
if(_writeI != _writeBuffer.end())
{
@@ -351,15 +426,15 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
#endif
- // Its impossible for the packetSize to be more than an Int.
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
int packetSize = static_cast<int>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
ERR_clear_error(); // Clear any spurious errors.
assert(_fd != INVALID_SOCKET);
-#ifndef ICE_USE_IOCP
- int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize);
-#else
+#ifdef ICE_USE_IOCP
int ret;
if(_sentBytes)
{
@@ -378,6 +453,8 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
}
}
+#else
+ int ret = SSL_write(_ssl, reinterpret_cast<const void*>(&*buf.i), packetSize);
#endif
if(ret <= 0)
{
@@ -482,6 +559,14 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
bool
IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
{
+ if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // We need to read the proxy reply, but we have to use TCP and not SSL.
+ //
+ return readRaw(buf);
+ }
+
#ifdef ICE_USE_IOCP
if(_readI != _readBuffer.end())
{
@@ -492,7 +577,9 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
}
#endif
- // It's impossible for the packetSize to be more than an Int.
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
int packetSize = static_cast<int>(buf.b.end() - buf.i);
while(buf.i != buf.b.end())
{
@@ -644,51 +731,40 @@ IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
}
#ifdef ICE_USE_IOCP
+
bool
-IceSSL::TransceiverI::startWrite(IceInternal::Buffer& /*buf*/)
+IceSSL::TransceiverI::startWrite(IceInternal::Buffer& buf)
{
- if(_state < StateConnected)
+ if(_state == StateConnectPending)
{
- IceInternal::doConnectAsync(_fd, _connectAddr, _write);
+ IceInternal::Address addr = _proxy ? _proxy->getAddress() : _addr;
+ IceInternal::doConnectAsync(_fd, addr, _write);
return false;
}
+ else if(_state == StateProxyConnectRequest)
+ {
+ //
+ // We need to write the proxy message, but we have to use TCP and not SSL.
+ //
+ assert(!buf.b.empty() && buf.i != buf.b.end());
+
+ const int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ const int actualSize = writeAsync(reinterpret_cast<char*>(&*buf.i), packetSize);
+ return packetSize == actualSize;
+ }
assert(!_writeBuffer.empty() && _writeI != _writeBuffer.end());
- int packetSize = static_cast<int>(_writeBuffer.end() - _writeI);
- if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
- packetSize = _maxSendPacketSize;
- }
+ const int packetSize = static_cast<int>(_writeBuffer.end() - _writeI);
+ const int actualSize = writeAsync(reinterpret_cast<char*>(&*_writeI), packetSize);
- _write.buf.len = packetSize;
- _write.buf.buf = reinterpret_cast<char*>(&*_writeI);
- int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
- if(err == SOCKET_ERROR)
- {
- if(!IceInternal::wouldBlock())
- {
- if(IceInternal::connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- }
- }
- return packetSize == static_cast<int>(_writeBuffer.end() - _writeI);
+ return packetSize == actualSize;
}
void
-IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/)
+IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& buf)
{
- if(_state < StateConnected)
+ if(_state < StateConnected && _state != StateProxyConnectRequest)
{
return;
}
@@ -710,12 +786,30 @@ IceSSL::TransceiverI::finishWrite(IceInternal::Buffer& /*buf*/)
}
}
- _writeI += _write.count;
+ if(_state == StateProxyConnectRequest)
+ {
+ buf.i += _write.count;
+ }
+ else
+ {
+ _writeI += _write.count;
+ }
}
void
IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf)
{
+ if(_state == StateProxyConnectRequestPending)
+ {
+ //
+ // We need to read the proxy reply, but we have to use TCP and not SSL.
+ //
+ assert(!buf.b.empty() && buf.i != buf.b.end());
+ const int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ readAsync(reinterpret_cast<char*>(&*buf.i), packetSize);
+ return;
+ }
+
if(_readI == _readBuffer.end())
{
assert(!buf.b.empty() && buf.i != buf.b.end());
@@ -723,7 +817,7 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf)
ERR_clear_error(); // Clear any spurious errors.
#ifndef NDEBUG
- int ret =
+ int ret =
#endif
SSL_read(_ssl, reinterpret_cast<void*>(&*buf.i), static_cast<int>(buf.b.end() - buf.i));
assert(ret <= 0 && SSL_get_error(_ssl, ret) == SSL_ERROR_WANT_READ);
@@ -735,37 +829,12 @@ IceSSL::TransceiverI::startRead(IceInternal::Buffer& buf)
assert(!_readBuffer.empty() && _readI != _readBuffer.end());
- int packetSize = static_cast<int>(_readBuffer.end() - _readI);
- if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
- {
- packetSize = _maxReceivePacketSize;
- }
-
- _read.buf.len = packetSize;
- _read.buf.buf = reinterpret_cast<char*>(&*_readI);
- int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
- if(err == SOCKET_ERROR)
- {
- if(!IceInternal::wouldBlock())
- {
- if(IceInternal::connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = IceInternal::getSocketErrno();
- throw ex;
- }
- }
- }
+ const int packetSize = static_cast<int>(_readBuffer.end() - _readI);
+ readAsync(reinterpret_cast<char*>(&*_readI), packetSize);
}
void
-IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/)
+IceSSL::TransceiverI::finishRead(IceInternal::Buffer& buf)
{
if(static_cast<int>(_read.count) == SOCKET_ERROR)
{
@@ -783,22 +852,36 @@ IceSSL::TransceiverI::finishRead(IceInternal::Buffer& /*buf*/)
throw ex;
}
}
+ else if(_read.count == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
- _readI += _read.count;
-
- if(_iocpBio && _readI == _readBuffer.end())
+ if(_state == StateProxyConnectRequestPending)
{
- assert(_readI == _readBuffer.end());
- int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size()));
- if(n < 0) // Expected if the transceiver was closed.
+ buf.i += _read.count;
+ }
+ else
+ {
+ _readI += _read.count;
+
+ if(_iocpBio && _readI == _readBuffer.end())
{
- SecurityException ex(__FILE__, __LINE__);
- ex.reason = "SSL bio write failed";
- throw ex;
+ assert(_readI == _readBuffer.end());
+ int n = BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size()));
+ if(n < 0) // Expected if the transceiver was closed.
+ {
+ SecurityException ex(__FILE__, __LINE__);
+ ex.reason = "SSL bio write failed";
+ throw ex;
+ }
+ assert(n == static_cast<int>(_readBuffer.size()));
}
- assert(n == static_cast<int>(_readBuffer.size()));
}
}
+
#endif
string
@@ -828,15 +911,17 @@ IceSSL::TransceiverI::checkSendSize(const IceInternal::Buffer& buf, size_t messa
}
}
-IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& host,
- const IceInternal::Address& addr) :
+IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const IceInternal::NetworkProxyPtr& proxy,
+ const string& host, const IceInternal::Address& addr) :
IceInternal::NativeInfo(fd),
_instance(instance),
_logger(instance->communicator()->getLogger()),
_stats(instance->communicator()->getStats()),
- _ssl(0),
+ _proxy(proxy),
_host(host),
+ _addr(addr),
_incoming(false),
+ _ssl(0),
_state(StateNeedConnect)
#ifdef ICE_USE_IOCP
, _iocpBio(0),
@@ -863,7 +948,6 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const
_desc = IceInternal::fdToString(_fd);
}
#endif
- _connectAddr = addr;
}
IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const string& adapterName) :
@@ -871,9 +955,9 @@ IceSSL::TransceiverI::TransceiverI(const InstancePtr& instance, SOCKET fd, const
_instance(instance),
_logger(instance->communicator()->getLogger()),
_stats(instance->communicator()->getStats()),
- _ssl(0),
- _incoming(true),
_adapterName(adapterName),
+ _incoming(true),
+ _ssl(0),
_state(StateConnected),
_desc(IceInternal::fdToString(fd))
#ifdef ICE_USE_IOCP
@@ -919,7 +1003,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const
{
X509_free(cert);
}
-
+
if(chain != 0)
{
for(int i = 0; i < sk_X509_num(chain); ++i)
@@ -942,6 +1026,7 @@ IceSSL::TransceiverI::getNativeConnectionInfo() const
}
#ifdef ICE_USE_IOCP
+
bool
IceSSL::TransceiverI::receive()
{
@@ -1005,7 +1090,7 @@ IceSSL::TransceiverI::receive()
assert(_readI == _readBuffer.end());
#ifndef NDEBUG
- int n =
+ int n =
#endif
BIO_write(_iocpBio, &_readBuffer[0], static_cast<int>(_readBuffer.size()));
@@ -1021,7 +1106,7 @@ IceSSL::TransceiverI::send()
assert(BIO_ctrl_pending(_iocpBio));
_writeBuffer.resize(BIO_ctrl_pending(_iocpBio));
#ifndef NDEBUG
- int n =
+ int n =
#endif
BIO_read(_iocpBio, &_writeBuffer[0], static_cast<int>(_writeBuffer.size()));
assert(n == static_cast<int>(_writeBuffer.size()));
@@ -1085,4 +1170,226 @@ IceSSL::TransceiverI::send()
return true;
}
+int
+IceSSL::TransceiverI::writeAsync(char* buf, int packetSize)
+{
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
+ {
+ packetSize = _maxSendPacketSize;
+ }
+
+ _write.buf.len = packetSize;
+ _write.buf.buf = buf;
+
+ int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
+
+ if(err == SOCKET_ERROR)
+ {
+ if(!IceInternal::wouldBlock())
+ {
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+ }
+
+ return packetSize;
+}
+
+int
+IceSSL::TransceiverI::readAsync(char* buf, int packetSize)
+{
+ if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
+ {
+ packetSize = _maxReceivePacketSize;
+ }
+
+ _read.buf.len = packetSize;
+ _read.buf.buf = buf;
+
+ int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
+
+ if(err == SOCKET_ERROR)
+ {
+ if(!IceInternal::wouldBlock())
+ {
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+ }
+
+ return packetSize;
+}
+
+#endif
+
+bool
+IceSSL::TransceiverI::writeRaw(IceInternal::Buffer& buf)
+{
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+#ifdef ICE_USE_IOCP
+ //
+ // Limit packet size to avoid performance problems on WIN32
+ //
+ if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
+ {
+ packetSize = _maxSendPacketSize;
+ }
#endif
+ while(buf.i != buf.b.end())
+ {
+ assert(_fd != INVALID_SOCKET);
+
+ ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
+ if(ret == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(ret == SOCKET_ERROR)
+ {
+ if(IceInternal::interrupted())
+ {
+ continue;
+ }
+
+ if(IceInternal::noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ continue;
+ }
+
+ if(IceInternal::wouldBlock())
+ {
+ return false;
+ }
+
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_instance->networkTraceLevel() >= 3)
+ {
+ Trace out(_logger, _instance->networkTraceCategory());
+ out << "sent " << ret << " of " << packetSize << " bytes via tcp\n" << toString();
+ }
+
+ if(_stats)
+ {
+ _stats->bytesSent("tcp", static_cast<Int>(ret));
+ }
+
+ buf.i += ret;
+
+ if(packetSize > buf.b.end() - buf.i)
+ {
+ packetSize = static_cast<int>(buf.b.end() - buf.i);
+ }
+ }
+
+ return true;
+}
+
+bool
+IceSSL::TransceiverI::readRaw(IceInternal::Buffer& buf)
+{
+ //
+ // It's impossible for packetSize to be more than an Int.
+ //
+ int packetSize = static_cast<int>(buf.b.end() - buf.i);
+ while(buf.i != buf.b.end())
+ {
+ assert(_fd != INVALID_SOCKET);
+ ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
+
+ if(ret == 0)
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
+ }
+
+ if(ret == SOCKET_ERROR)
+ {
+ if(IceInternal::interrupted())
+ {
+ continue;
+ }
+
+ if(IceInternal::noBuffers() && packetSize > 1024)
+ {
+ packetSize /= 2;
+ continue;
+ }
+
+ if(IceInternal::wouldBlock())
+ {
+ return false;
+ }
+
+ if(IceInternal::connectionLost())
+ {
+ ConnectionLostException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ else
+ {
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = IceInternal::getSocketErrno();
+ throw ex;
+ }
+ }
+
+ if(_instance->networkTraceLevel() >= 3)
+ {
+ Trace out(_logger, _instance->networkTraceCategory());
+ out << "received " << ret << " of " << packetSize << " bytes via tcp\n" << toString();
+ }
+
+ if(_stats)
+ {
+ _stats->bytesReceived("tcp", static_cast<Int>(ret));
+ }
+
+ buf.i += ret;
+
+ packetSize = static_cast<int>(buf.b.end() - buf.i);
+ }
+
+ return true;
+}