summaryrefslogtreecommitdiff
path: root/cpp/test/Ice/background/Transceiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/Ice/background/Transceiver.cpp')
-rw-r--r--cpp/test/Ice/background/Transceiver.cpp135
1 files changed, 118 insertions, 17 deletions
diff --git a/cpp/test/Ice/background/Transceiver.cpp b/cpp/test/Ice/background/Transceiver.cpp
index 03e99934561..47b27c843d9 100644
--- a/cpp/test/Ice/background/Transceiver.cpp
+++ b/cpp/test/Ice/background/Transceiver.cpp
@@ -1,6 +1,6 @@
// **********************************************************************
//
-// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
@@ -18,7 +18,7 @@ Transceiver::getNativeInfo()
}
IceInternal::SocketOperation
-Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
+Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData)
{
#ifndef ICE_USE_IOCP
IceInternal::SocketOperation status = _configuration->initializeSocketOperation();
@@ -30,7 +30,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr
{
if(!_initialized)
{
- status = _transceiver->initialize(readBuffer, writeBuffer);
+ status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData);
if(status != IceInternal::SocketOperationNone)
{
return status;
@@ -48,7 +48,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr
_configuration->checkInitializeException();
if(!_initialized)
{
- IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer);
+ IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData);
if(status != IceInternal::SocketOperationNone)
{
return status;
@@ -58,34 +58,74 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr
return IceInternal::SocketOperationNone;
}
+IceInternal::SocketOperation
+Transceiver::closing(bool initiator, const Ice::LocalException& ex)
+{
+ return _transceiver->closing(initiator, ex);
+}
+
void
Transceiver::close()
{
_transceiver->close();
}
-bool
+IceInternal::SocketOperation
Transceiver::write(IceInternal::Buffer& buf)
{
- if(!_configuration->writeReady())
+ if(!_configuration->writeReady() && (!buf.b.empty() && buf.i < buf.b.end()))
{
- return false;
+ return IceInternal::SocketOperationWrite;
}
_configuration->checkWriteException();
return _transceiver->write(buf);
}
-bool
-Transceiver::read(IceInternal::Buffer& buf)
+IceInternal::SocketOperation
+Transceiver::read(IceInternal::Buffer& buf, bool& moreData)
{
- if(!_configuration->readReady())
+ if(!_configuration->readReady() && (!buf.b.empty() && buf.i < buf.b.end()))
{
- return false;
+ return IceInternal::SocketOperationRead;
}
_configuration->checkReadException();
- return _transceiver->read(buf);
+
+ if(_buffered)
+ {
+ while(buf.i != buf.b.end())
+ {
+ if(_readBufferPos == _readBuffer.i)
+ {
+ _readBufferPos = _readBuffer.i = _readBuffer.b.begin();
+ _transceiver->read(_readBuffer, moreData);
+ if(_readBufferPos == _readBuffer.i)
+ {
+ moreData = false;
+ return IceInternal::SocketOperationRead;
+ }
+ }
+ assert(_readBuffer.i > _readBufferPos);
+ size_t requested = buf.b.end() - buf.i;
+ size_t available = _readBuffer.i - _readBufferPos;
+ assert(available > 0);
+ if(available >= requested)
+ {
+ available = requested;
+ }
+
+ memcpy(buf.i, _readBufferPos, available);
+ _readBufferPos += available;
+ buf.i += available;
+ }
+ moreData = _readBufferPos < _readBuffer.i;
+ return IceInternal::SocketOperationNone;
+ }
+ else
+ {
+ return _transceiver->read(buf, moreData);
+ }
}
#ifdef ICE_USE_IOCP
@@ -107,21 +147,76 @@ void
Transceiver::startRead(IceInternal::Buffer& buf)
{
_configuration->checkReadException();
- _transceiver->startRead(buf);
+ if(_buffered)
+ {
+ size_t available = _readBuffer.i - _readBufferPos;
+ if(available > 0)
+ {
+ size_t requested = buf.b.end() - buf.i;
+ assert(available > 0);
+ if(available >= requested)
+ {
+ available = requested;
+ }
+
+ memcpy(buf.i, _readBufferPos, available);
+ _readBufferPos += available;
+ buf.i += available;
+ }
+
+ if(_readBufferPos == _readBuffer.i && buf.i != buf.b.end())
+ {
+ _readBufferPos = _readBuffer.i = _readBuffer.b.begin();
+ _transceiver->startRead(_readBuffer);
+ }
+ else
+ {
+ assert(buf.i == buf.b.end());
+ _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead);
+ }
+ }
+ else
+ {
+ _transceiver->startRead(buf);
+ }
}
void
Transceiver::finishRead(IceInternal::Buffer& buf)
{
_configuration->checkReadException();
- _transceiver->finishRead(buf);
+ if(_buffered)
+ {
+ if(buf.i != buf.b.end())
+ {
+ _transceiver->finishRead(_readBuffer);
+
+ size_t requested = buf.b.end() - buf.i;
+ size_t available = _readBuffer.i - _readBufferPos;
+ if(available > 0)
+ {
+ if(available >= requested)
+ {
+ available = requested;
+ }
+
+ memcpy(buf.i, _readBufferPos, available);
+ _readBufferPos += available;
+ buf.i += available;
+ }
+ }
+ }
+ else
+ {
+ _transceiver->finishRead(buf);
+ }
}
#endif
string
-Transceiver::type() const
+Transceiver::protocol() const
{
- return "test-" + _transceiver->type();
+ return "test-" + _transceiver->protocol();
}
string
@@ -148,6 +243,12 @@ Transceiver::checkSendSize(const IceInternal::Buffer& buf, size_t messageSizeMax
Transceiver::Transceiver(const IceInternal::TransceiverPtr& transceiver) :
_transceiver(transceiver),
_configuration(Configuration::getInstance()),
- _initialized(false)
+ _initialized(false),
+ _readBuffer(0),
+ _buffered(_configuration->buffered())
{
+ _readBuffer.b.resize(1024 * 8); // 8KB buffer
+ _readBufferPos = _readBuffer.b.begin();
+ _readBuffer.i = _readBuffer.b.begin();
}
+