summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/TcpTransceiver.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-09-10 08:47:19 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-09-10 08:47:19 +0200
commitb6c9d9a880f6f1a6908a3c62dfccdce3e68dad80 (patch)
treed3e9e9340064538a8dc7a645260d0eb3cdf55d63 /cpp/src/Ice/TcpTransceiver.cpp
parentUndo bogus change from an earlier commit. (diff)
downloadice-b6c9d9a880f6f1a6908a3c62dfccdce3e68dad80.tar.bz2
ice-b6c9d9a880f6f1a6908a3c62dfccdce3e68dad80.tar.xz
ice-b6c9d9a880f6f1a6908a3c62dfccdce3e68dad80.zip
ICE-5582 (SOCKs test), ICE-5314 (HTTP proxies), major refactoring of networking code (addition of StreamSocket class abstraction)
Diffstat (limited to 'cpp/src/Ice/TcpTransceiver.cpp')
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp475
1 files changed, 15 insertions, 460 deletions
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index 76440f5f935..9f7a04c9496 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -23,121 +23,13 @@ using namespace IceInternal;
NativeInfoPtr
IceInternal::TcpTransceiver::getNativeInfo()
{
- return this;
+ return _stream;
}
-#ifdef ICE_USE_IOCP
-AsyncInfo*
-IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status)
-{
- switch(status)
- {
- case SocketOperationRead:
- return &_read;
- case SocketOperationWrite:
- return &_write;
- default:
- assert(false);
- return 0;
- }
-}
-#endif
-
SocketOperation
-IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData)
+IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool&)
{
- if(_state == StateNeedConnect)
- {
- _state = StateConnectPending;
- return SocketOperationConnect;
- }
- else if(_state <= StateConnectPending)
- {
-#ifdef ICE_USE_IOCP
- doFinishConnectAsync(_fd, _write);
-#else
- doFinishConnect(_fd);
-#endif
-
- _desc = fdToString(_fd, _proxy, _addr, true);
-
- if(_proxy)
- {
- //
- // Prepare the read & write buffers in advance.
- //
- _proxy->beginWriteConnectRequest(_addr, writeBuffer);
- _proxy->beginReadConnectRequestResponse(readBuffer);
-
-#ifdef ICE_USE_IOCP
- //
- // Return SocketOperationWrite to indicate we need to start a write.
- //
- _state = StateProxyConnectRequest; // Send proxy connect request
- return IceInternal::SocketOperationWrite;
-#else
- //
- // Write the proxy connection message.
- //
- if(write(writeBuffer))
- {
- //
- // Write completed without blocking.
- //
- _proxy->endWriteConnectRequest(writeBuffer);
-
- //
- // Try to read the response.
- //
- if(read(readBuffer, hasMoreData))
- {
- //
- // Read completed without blocking - fall through.
- //
- _proxy->endReadConnectRequestResponse(readBuffer);
- }
- else
- {
- //
- // Return SocketOperationRead to indicate we need to complete the read.
- //
- _state = StateProxyConnectRequestPending; // Wait for proxy response
- return SocketOperationRead;
- }
- }
- else
- {
- //
- // Return SocketOperationWrite to indicate we need to complete the write.
- //
- _state = StateProxyConnectRequest; // Send proxy connect request
- return SocketOperationWrite;
- }
-#endif
- }
-
- _state = StateConnected;
- }
- else if(_state == StateProxyConnectRequest)
- {
- //
- // Write completed.
- //
- _proxy->endWriteConnectRequest(writeBuffer);
- _state = StateProxyConnectRequestPending; // Wait for proxy response
- return SocketOperationRead;
- }
- else if(_state == StateProxyConnectRequestPending)
- {
- //
- // Read completed.
- //
- _proxy->endReadConnectRequestResponse(readBuffer);
- _state = StateConnected;
- }
-
- assert(_state == StateConnected);
- return SocketOperationNone;
+ return _stream->connect(readBuffer, writeBuffer);
}
SocketOperation
@@ -151,290 +43,44 @@ IceInternal::TcpTransceiver::closing(bool initiator, const Ice::LocalException&)
void
IceInternal::TcpTransceiver::close()
{
- assert(_fd != INVALID_SOCKET);
- try
- {
- closeSocket(_fd);
- _fd = INVALID_SOCKET;
- }
- catch(const SocketException&)
- {
- _fd = INVALID_SOCKET;
- throw;
- }
+ _stream->close();
}
SocketOperation
IceInternal::TcpTransceiver::write(Buffer& buf)
{
- if(buf.i == buf.b.end())
- {
- return SocketOperationNone;
- }
-
- //
- // It's impossible for packetSize to be more than an Int.
- //
- int packetSize = static_cast<int>(buf.b.end() - buf.i);
-#ifdef ICE_USE_IOCP
- //
- // Limit packet size to avoid performance problems on WIN32
- //
- if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
- packetSize = _maxSendPacketSize;
- }
-#endif
- while(buf.i != buf.b.end())
- {
- assert(_fd != INVALID_SOCKET);
-
- ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
- if(ret == 0)
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
- }
-
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- if(noBuffers() && packetSize > 1024)
- {
- packetSize /= 2;
- continue;
- }
-
- if(wouldBlock())
- {
- return SocketOperationWrite;
- }
-
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- }
-
- buf.i += ret;
-
- if(packetSize > buf.b.end() - buf.i)
- {
- packetSize = static_cast<int>(buf.b.end() - buf.i);
- }
- }
-
- return SocketOperationNone;
+ return _stream->write(buf);
}
SocketOperation
IceInternal::TcpTransceiver::read(Buffer& buf, bool&)
{
- if(buf.i == buf.b.end())
- {
- return SocketOperationNone;
- }
-
- //
- // It's impossible for packetSize to be more than an Int.
- //
- int packetSize = static_cast<int>(buf.b.end() - buf.i);
- while(buf.i != buf.b.end())
- {
- assert(_fd != INVALID_SOCKET);
- ssize_t ret = ::recv(_fd, reinterpret_cast<char*>(&*buf.i), packetSize, 0);
-
- if(ret == 0)
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
- }
-
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- if(noBuffers() && packetSize > 1024)
- {
- packetSize /= 2;
- continue;
- }
-
- if(wouldBlock())
- {
- return SocketOperationRead;
- }
-
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- }
-
- buf.i += ret;
-
- packetSize = static_cast<int>(buf.b.end() - buf.i);
- }
- return SocketOperationNone;
+ return _stream->read(buf);
}
#ifdef ICE_USE_IOCP
bool
IceInternal::TcpTransceiver::startWrite(Buffer& buf)
{
- if(_state == StateConnectPending)
- {
- Address addr = _proxy ? _proxy->getAddress() : _addr;
- doConnectAsync(_fd, addr, _sourceAddr, _write);
- return false;
- }
-
- assert(!buf.b.empty());
- assert(buf.i != buf.b.end());
-
- int packetSize = static_cast<int>(buf.b.end() - buf.i);
- if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
- {
- packetSize = _maxSendPacketSize;
- }
- assert(packetSize > 0);
- _write.buf.len = packetSize;
- _write.buf.buf = reinterpret_cast<char*>(&*buf.i);
- int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
- if(err == SOCKET_ERROR)
- {
- if(!wouldBlock())
- {
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- }
- }
- return packetSize == static_cast<int>(buf.b.end() - buf.i);
+ return _stream->startWrite(buf);
}
void
IceInternal::TcpTransceiver::finishWrite(Buffer& buf)
{
- if(_state < StateConnected && _state != StateProxyConnectRequest)
- {
- return;
- }
-
- if(static_cast<int>(_write.count) == SOCKET_ERROR)
- {
- WSASetLastError(_write.error);
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- }
-
- buf.i += _write.count;
+ _stream->finishWrite(buf);
}
void
IceInternal::TcpTransceiver::startRead(Buffer& buf)
{
- int packetSize = static_cast<int>(buf.b.end() - buf.i);
- if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
- {
- packetSize = _maxReceivePacketSize;
- }
- assert(!buf.b.empty() && buf.i != buf.b.end());
-
- _read.buf.len = packetSize;
- _read.buf.buf = reinterpret_cast<char*>(&*buf.i);
- int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
- if(err == SOCKET_ERROR)
- {
- if(!wouldBlock())
- {
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- }
- }
+ _stream->startRead(buf);
}
void
IceInternal::TcpTransceiver::finishRead(Buffer& buf, bool&)
{
- if(static_cast<int>(_read.count) == SOCKET_ERROR)
- {
- WSASetLastError(_read.error);
- if(connectionLost())
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- else
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- }
- else if(_read.count == 0)
- {
- ConnectionLostException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
- }
-
- buf.i += _read.count;
+ _stream->finishRead(buf);
}
#endif
@@ -447,7 +93,7 @@ IceInternal::TcpTransceiver::protocol() const
string
IceInternal::TcpTransceiver::toString() const
{
- return _desc;
+ return _stream->toString();
}
string
@@ -460,7 +106,7 @@ Ice::ConnectionInfoPtr
IceInternal::TcpTransceiver::getInfo() const
{
Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo();
- fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
+ fdToAddressAndPort(_stream->fd(), info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
return info;
}
@@ -473,104 +119,13 @@ IceInternal::TcpTransceiver::checkSendSize(const Buffer& buf, size_t messageSize
}
}
-IceInternal::TcpTransceiver::TcpTransceiver(const ProtocolInstancePtr& instance, SOCKET fd,
- const NetworkProxyPtr& proxy, const Address& addr,
- const Address& sourceAddr) :
- NativeInfo(fd),
- _instance(instance),
- _proxy(proxy),
- _addr(addr),
- _sourceAddr(sourceAddr),
- _state(StateNeedConnect)
-#ifdef ICE_USE_IOCP
- , _read(SocketOperationRead),
- _write(SocketOperationWrite)
-#endif
+IceInternal::TcpTransceiver::TcpTransceiver(const ProtocolInstancePtr& instance, const StreamSocketPtr& stream) :
+ _instance(instance),
+ _stream(stream)
{
- setBlock(_fd, false);
-
- setTcpBufSize(_fd, _instance->properties(), _instance->logger());
-
-#ifdef ICE_USE_IOCP
- //
- // On Windows, limiting the buffer size is important to prevent
- // poor throughput performances when transfering large amount of
- // data. See Microsoft KB article KB823764.
- //
- _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2;
- if(_maxSendPacketSize < 512)
- {
- _maxSendPacketSize = 0;
- }
-
- _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd);
- if(_maxReceivePacketSize < 512)
- {
- _maxReceivePacketSize = 0;
- }
-#endif
-}
-
-IceInternal::TcpTransceiver::TcpTransceiver(const ProtocolInstancePtr& instance, SOCKET fd) :
- NativeInfo(fd),
- _instance(instance),
- _state(StateConnected),
- _desc(fdToString(_fd))
-#ifdef ICE_USE_IOCP
- , _read(SocketOperationRead),
- _write(SocketOperationWrite)
-#endif
-{
- setBlock(_fd, false);
-
- setTcpBufSize(_fd, _instance->properties(), _instance->logger());
-
-#ifdef ICE_USE_IOCP
- //
- // On Windows, limiting the buffer size is important to prevent
- // poor throughput performances when transfering large amount of
- // data. See Microsoft KB article KB823764.
- //
- _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2;
- if(_maxSendPacketSize < 512)
- {
- _maxSendPacketSize = 0;
- }
-
- _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd);
- if(_maxReceivePacketSize < 512)
- {
- _maxReceivePacketSize = 0;
- }
-#endif
}
IceInternal::TcpTransceiver::~TcpTransceiver()
{
- assert(_fd == INVALID_SOCKET);
}
-void
-IceInternal::TcpTransceiver::connect()
-{
-#ifndef ICE_USE_IOCP
- try
- {
- Address addr = _proxy ? _proxy->getAddress() : _addr;
- if(doConnect(_fd, addr, _sourceAddr))
- {
- _state = StateConnected;
- _desc = fdToString(_fd, _proxy, _addr, true);
- }
- else
- {
- _desc = fdToString(_fd, _proxy, _addr, true);
- }
- }
- catch(...)
- {
- _fd = INVALID_SOCKET;
- throw;
- }
-#endif
-}