diff options
Diffstat (limited to 'cpp/test/Ice/background/Transceiver.cpp')
-rw-r--r-- | cpp/test/Ice/background/Transceiver.cpp | 135 |
1 files changed, 118 insertions, 17 deletions
diff --git a/cpp/test/Ice/background/Transceiver.cpp b/cpp/test/Ice/background/Transceiver.cpp index 03e99934561..47b27c843d9 100644 --- a/cpp/test/Ice/background/Transceiver.cpp +++ b/cpp/test/Ice/background/Transceiver.cpp @@ -1,6 +1,6 @@ // ********************************************************************** // -// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved. +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. @@ -18,7 +18,7 @@ Transceiver::getNativeInfo() } IceInternal::SocketOperation -Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) +Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData) { #ifndef ICE_USE_IOCP IceInternal::SocketOperation status = _configuration->initializeSocketOperation(); @@ -30,7 +30,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr { if(!_initialized) { - status = _transceiver->initialize(readBuffer, writeBuffer); + status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData); if(status != IceInternal::SocketOperationNone) { return status; @@ -48,7 +48,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr _configuration->checkInitializeException(); if(!_initialized) { - IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer); + IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData); if(status != IceInternal::SocketOperationNone) { return status; @@ -58,34 +58,74 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr return IceInternal::SocketOperationNone; } +IceInternal::SocketOperation +Transceiver::closing(bool initiator, const Ice::LocalException& ex) +{ + return _transceiver->closing(initiator, ex); +} + void Transceiver::close() { _transceiver->close(); } -bool +IceInternal::SocketOperation Transceiver::write(IceInternal::Buffer& buf) { - if(!_configuration->writeReady()) + if(!_configuration->writeReady() && (!buf.b.empty() && buf.i < buf.b.end())) { - return false; + return IceInternal::SocketOperationWrite; } _configuration->checkWriteException(); return _transceiver->write(buf); } -bool -Transceiver::read(IceInternal::Buffer& buf) +IceInternal::SocketOperation +Transceiver::read(IceInternal::Buffer& buf, bool& moreData) { - if(!_configuration->readReady()) + if(!_configuration->readReady() && (!buf.b.empty() && buf.i < buf.b.end())) { - return false; + return IceInternal::SocketOperationRead; } _configuration->checkReadException(); - return _transceiver->read(buf); + + if(_buffered) + { + while(buf.i != buf.b.end()) + { + if(_readBufferPos == _readBuffer.i) + { + _readBufferPos = _readBuffer.i = _readBuffer.b.begin(); + _transceiver->read(_readBuffer, moreData); + if(_readBufferPos == _readBuffer.i) + { + moreData = false; + return IceInternal::SocketOperationRead; + } + } + assert(_readBuffer.i > _readBufferPos); + size_t requested = buf.b.end() - buf.i; + size_t available = _readBuffer.i - _readBufferPos; + assert(available > 0); + if(available >= requested) + { + available = requested; + } + + memcpy(buf.i, _readBufferPos, available); + _readBufferPos += available; + buf.i += available; + } + moreData = _readBufferPos < _readBuffer.i; + return IceInternal::SocketOperationNone; + } + else + { + return _transceiver->read(buf, moreData); + } } #ifdef ICE_USE_IOCP @@ -107,21 +147,76 @@ void Transceiver::startRead(IceInternal::Buffer& buf) { _configuration->checkReadException(); - _transceiver->startRead(buf); + if(_buffered) + { + size_t available = _readBuffer.i - _readBufferPos; + if(available > 0) + { + size_t requested = buf.b.end() - buf.i; + assert(available > 0); + if(available >= requested) + { + available = requested; + } + + memcpy(buf.i, _readBufferPos, available); + _readBufferPos += available; + buf.i += available; + } + + if(_readBufferPos == _readBuffer.i && buf.i != buf.b.end()) + { + _readBufferPos = _readBuffer.i = _readBuffer.b.begin(); + _transceiver->startRead(_readBuffer); + } + else + { + assert(buf.i == buf.b.end()); + _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead); + } + } + else + { + _transceiver->startRead(buf); + } } void Transceiver::finishRead(IceInternal::Buffer& buf) { _configuration->checkReadException(); - _transceiver->finishRead(buf); + if(_buffered) + { + if(buf.i != buf.b.end()) + { + _transceiver->finishRead(_readBuffer); + + size_t requested = buf.b.end() - buf.i; + size_t available = _readBuffer.i - _readBufferPos; + if(available > 0) + { + if(available >= requested) + { + available = requested; + } + + memcpy(buf.i, _readBufferPos, available); + _readBufferPos += available; + buf.i += available; + } + } + } + else + { + _transceiver->finishRead(buf); + } } #endif string -Transceiver::type() const +Transceiver::protocol() const { - return "test-" + _transceiver->type(); + return "test-" + _transceiver->protocol(); } string @@ -148,6 +243,12 @@ Transceiver::checkSendSize(const IceInternal::Buffer& buf, size_t messageSizeMax Transceiver::Transceiver(const IceInternal::TransceiverPtr& transceiver) : _transceiver(transceiver), _configuration(Configuration::getInstance()), - _initialized(false) + _initialized(false), + _readBuffer(0), + _buffered(_configuration->buffered()) { + _readBuffer.b.resize(1024 * 8); // 8KB buffer + _readBufferPos = _readBuffer.b.begin(); + _readBuffer.i = _readBuffer.b.begin(); } + |