summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-07-12 14:07:08 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-07-12 14:07:08 +0200
commite0064a1ce41067e40eb1495745e3499e836f1a61 (patch)
treeca41e7d77ff2da9d02775a4209c2e0009868053d /cpp
parentICE-5377 - clang optimized build failure (diff)
downloadice-e0064a1ce41067e40eb1495745e3499e836f1a61.tar.bz2
ice-e0064a1ce41067e40eb1495745e3499e836f1a61.tar.xz
ice-e0064a1ce41067e40eb1495745e3499e836f1a61.zip
Fix to allow transceivers to read more data than requested.
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp11
-rw-r--r--cpp/src/Ice/EventHandler.cpp1
-rw-r--r--cpp/src/Ice/EventHandler.h1
-rw-r--r--cpp/src/Ice/Network.cpp18
-rw-r--r--cpp/src/Ice/Network.h7
-rw-r--r--cpp/src/Ice/Selector.cpp3
-rw-r--r--cpp/src/Ice/TcpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/TcpTransceiver.h4
-rw-r--r--cpp/src/Ice/ThreadPool.cpp108
-rw-r--r--cpp/src/Ice/ThreadPool.h1
-rw-r--r--cpp/src/Ice/Transceiver.h4
-rw-r--r--cpp/src/Ice/UdpTransceiver.cpp6
-rw-r--r--cpp/src/Ice/UdpTransceiver.h4
-rw-r--r--cpp/src/IceSSL/TransceiverI.cpp4
-rw-r--r--cpp/src/IceSSL/TransceiverI.h4
-rw-r--r--cpp/test/Ice/background/AllTests.cpp25
-rw-r--r--cpp/test/Ice/background/Configuration.cpp17
-rw-r--r--cpp/test/Ice/background/Configuration.h46
-rw-r--r--cpp/test/Ice/background/Test.ice2
-rw-r--r--cpp/test/Ice/background/TestI.cpp6
-rw-r--r--cpp/test/Ice/background/TestI.h2
-rw-r--r--cpp/test/Ice/background/Transceiver.cpp108
-rw-r--r--cpp/test/Ice/background/Transceiver.h7
23 files changed, 331 insertions, 64 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 8091d2f2233..9abb600eae2 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1366,7 +1366,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
if(_readHeader) // Read header if necessary.
{
- if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
+ if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData))
{
return;
}
@@ -1437,7 +1437,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
_observer.startRead(_readStream.i);
}
- if(!_transceiver->read(_readStream))
+ if(!_transceiver->read(_readStream, _hasMoreData))
{
assert(!_readStream.b.empty());
scheduleTimeout(SocketOperationRead, _endpoint->timeout());
@@ -2135,7 +2135,8 @@ Ice::ConnectionI::setState(State state)
}
if(_state == StateHolding)
{
- _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state.
+ // We need to continue to read in closing state.
+ _threadPool->_register(this, SocketOperationRead);
}
break;
}
@@ -2280,7 +2281,7 @@ Ice::ConnectionI::initiateShutdown()
bool
Ice::ConnectionI::initialize(SocketOperation operation)
{
- SocketOperation s = _transceiver->initialize(_readStream, _writeStream);
+ SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData);
if(s != SocketOperationNone)
{
scheduleTimeout(s, connectTimeout());
@@ -2349,7 +2350,7 @@ Ice::ConnectionI::validate(SocketOperation operation)
_observer.startRead(_readStream.i);
}
- if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream))
+ if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData))
{
scheduleTimeout(SocketOperationRead, connectTimeout());
_threadPool->update(this, operation, SocketOperationRead);
diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp
index f0505a8b7db..413d44c2e32 100644
--- a/cpp/src/Ice/EventHandler.cpp
+++ b/cpp/src/Ice/EventHandler.cpp
@@ -25,6 +25,7 @@ IceInternal::EventHandler::EventHandler() :
#else
_disabled(SocketOperationNone),
#endif
+ _hasMoreData(false),
_registered(SocketOperationNone)
{
}
diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h
index 7e51f0e9159..565e963a11f 100644
--- a/cpp/src/Ice/EventHandler.h
+++ b/cpp/src/Ice/EventHandler.h
@@ -65,6 +65,7 @@ protected:
#else
SocketOperation _disabled;
#endif
+ bool _hasMoreData;
SocketOperation _registered;
friend class ThreadPool;
diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp
index fb4189d7e6e..7e76a350862 100644
--- a/cpp/src/Ice/Network.cpp
+++ b/cpp/src/Ice/Network.cpp
@@ -543,6 +543,24 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s)
ZeroMemory(this, sizeof(AsyncInfo));
status = s;
}
+
+void
+IceInternal::NativeInfo::initialize(HANDLE handle, ULONG_PTR key)
+{
+ _handle = handle;
+ _key = key;
+}
+
+void
+IceInternal::NativeInfo::completed(SocketOperation operation)
+{
+ if(!PostQueuedCompletionStatus(_handle, 0, _key, getAsyncInfo(operation)))
+ {
+ Ice::SocketException ex(__FILE__, __LINE__);
+ ex.error = GetLastError();
+ throw ex;
+ }
+}
#endif
IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; }
diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h
index fe606f8fc2d..46b01689ec0 100644
--- a/cpp/src/Ice/Network.h
+++ b/cpp/src/Ice/Network.h
@@ -178,6 +178,8 @@ public:
//
#if defined(ICE_USE_IOCP)
virtual AsyncInfo* getAsyncInfo(SocketOperation) = 0;
+ void initialize(HANDLE, ULONG_PTR);
+ void completed(SocketOperation operation);
#elif defined(ICE_OS_WINRT)
virtual void setCompletedHandler(SocketOperationCompletedHandler^) = 0;
#endif
@@ -185,6 +187,11 @@ public:
protected:
SOCKET _fd;
+
+#if defined(ICE_USE_IOCP)
+ HANDLE _handle;
+ ULONG_PTR _key;
+#endif
};
typedef IceUtil::Handle<NativeInfo> NativeInfoPtr;
diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp
index c9763d6e2a9..48de01df197 100644
--- a/cpp/src/Ice/Selector.cpp
+++ b/cpp/src/Ice/Selector.cpp
@@ -140,6 +140,7 @@ Selector::initialize(EventHandler* handler)
throw ex;
}
handler->__incRef();
+ handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler));
}
void
@@ -482,7 +483,6 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
}
assert(ret > 0);
- handlers.clear();
for(int i = 0; i < ret; ++i)
{
pair<EventHandler*, SocketOperation> p;
@@ -701,7 +701,6 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti
}
assert(ret > 0);
- handlers.clear();
#if defined(ICE_USE_SELECT)
if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0)
diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp
index d82fafa17ce..c29391fa9da 100644
--- a/cpp/src/Ice/TcpTransceiver.cpp
+++ b/cpp/src/Ice/TcpTransceiver.cpp
@@ -46,7 +46,7 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status)
#endif
SocketOperation
-IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
+IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData)
{
try
{
@@ -93,7 +93,7 @@ IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer)
//
// Try to read the response.
//
- if(read(readBuffer))
+ if(read(readBuffer, hasMoreData))
{
//
// Read completed without blocking - fall through.
@@ -268,7 +268,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf)
}
bool
-IceInternal::TcpTransceiver::read(Buffer& buf)
+IceInternal::TcpTransceiver::read(Buffer& buf, bool&)
{
//
// It's impossible for packetSize to be more than an Int.
diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h
index a2afe4bb514..5faad7b9a46 100644
--- a/cpp/src/Ice/TcpTransceiver.h
+++ b/cpp/src/Ice/TcpTransceiver.h
@@ -41,10 +41,10 @@ public:
virtual AsyncInfo* getAsyncInfo(SocketOperation);
#endif
- virtual SocketOperation initialize(Buffer&, Buffer&);
+ virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
virtual void close();
virtual bool write(Buffer&);
- virtual bool read(Buffer&);
+ virtual bool read(Buffer&, bool&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 62652c786d2..32b7bbddde6 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -108,6 +108,29 @@ private:
IceUtil::ThreadPtr _thread;
};
+class InterruptWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ virtual void
+ execute(ThreadPoolCurrent& current)
+ {
+ // Nothing to do, this is just used to interrupt the thread pool selector.
+ }
+};
+ThreadPoolWorkItemPtr interruptWorkItem;
+
+class InterruptWorkItemInit
+{
+public:
+
+ InterruptWorkItemInit()
+ {
+ interruptWorkItem = new InterruptWorkItem;
+ }
+};
+InterruptWorkItemInit init;
+
//
// Exception raised by the thread pool work queue when the thread pool
// is destroyed.
@@ -562,6 +585,20 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
Lock sync(*this);
assert(!_destroyed);
_selector.update(handler.get(), remove, add);
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+ if(add & SocketOperationRead && handler->_hasMoreData)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(handler.get());
+ }
+ else if(remove & SocketOperationRead)
+ {
+ _pendingHandlers.erase(handler.get());
+ }
+#endif
}
void
@@ -577,8 +614,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
// Clear the current ready handlers. The handlers from this vector can't be
// reference counted and a handler might get destroyed once it's finished.
//
- _handlers.clear();
- _nextHandler = _handlers.end();
+ //_handlers.clear();
+ //_nextHandler = _handlers.end();
#else
// If there are no pending asynchronous operations, we can call finish on the handler now.
if(!handler->_pending)
@@ -682,6 +719,19 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(select)
{
_handlers.swap(handlers);
+ if(!_pendingHandlers.empty())
+ {
+ for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler)
+ {
+ _pendingHandlers.erase(_nextHandler->first);
+ }
+ set<EventHandler*>::const_iterator p;
+ for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p)
+ {
+ _handlers.push_back(make_pair(*p, SocketOperationRead));
+ }
+ _pendingHandlers.clear();
+ }
_nextHandler = _handlers.begin();
_selector.finishSelect();
select = false;
@@ -700,6 +750,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// the IO thread count now.
//
--_inUseIO;
+ if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
}
else
{
@@ -707,7 +765,18 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// If the handler called ioCompleted(), we re-enable the handler in
// case it was disabled and we decrease the number of thread in use.
//
- _selector.enable(current._handler.get(), current.operation);
+ if(_serialize)
+ {
+ _selector.enable(current._handler.get(), current.operation);
+ if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
+ }
assert(_inUse > 0);
--_inUse;
}
@@ -717,10 +786,22 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
return; // Wait timed-out.
}
}
+ else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
//
// Get the next ready handler.
//
+ while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered))
+ {
+ ++_nextHandler;
+ }
if(_nextHandler != _handlers.end())
{
current._ioCompleted = false;
@@ -748,6 +829,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
}
else
{
+ _handlers.clear();
_selector.startSelect();
select = true;
thread->setState(ThreadStateIdle);
@@ -889,13 +971,25 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_sizeMax > 1)
{
+
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
--_inUseIO;
-
- if(_serialize && !_destroyed)
+
+ if(!_destroyed)
{
- _selector.disable(current._handler.get(), current.operation);
- }
+ if(_serialize)
+ {
+ _selector.disable(current._handler.get(), current.operation);
+ }
+ else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
+ }
if(current._leader)
{
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 43a8f2fb361..cf8a6d926b4 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -127,6 +127,7 @@ private:
std::vector<std::pair<EventHandler*, SocketOperation> > _handlers;
std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler;
#endif
+ std::set<EventHandler*> _pendingHandlers;
bool _promote;
};
diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h
index c2c00cf151d..c1f6aa64b03 100644
--- a/cpp/src/Ice/Transceiver.h
+++ b/cpp/src/Ice/Transceiver.h
@@ -25,10 +25,10 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared
public:
virtual NativeInfoPtr getNativeInfo() = 0;
- virtual SocketOperation initialize(Buffer&, Buffer&) = 0;
+ virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0;
virtual void close() = 0;
virtual bool write(Buffer&) = 0;
- virtual bool read(Buffer&) = 0;
+ virtual bool read(Buffer&, bool&) = 0;
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&) = 0;
virtual void finishWrite(Buffer&) = 0;
diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp
index ded61171571..53d0901228c 100644
--- a/cpp/src/Ice/UdpTransceiver.cpp
+++ b/cpp/src/Ice/UdpTransceiver.cpp
@@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler
#endif
SocketOperation
-IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/)
+IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/)
{
if(_state == StateNeedConnect)
{
@@ -243,13 +243,13 @@ repeat:
#ifdef ICE_OS_WINRT
bool
-IceInternal::UdpTransceiver::read(Buffer&)
+IceInternal::UdpTransceiver::read(Buffer&, bool&)
{
return false;
}
#else
bool
-IceInternal::UdpTransceiver::read(Buffer& buf)
+IceInternal::UdpTransceiver::read(Buffer& buf, bool&)
{
assert(buf.i == buf.b.begin());
assert(_fd != INVALID_SOCKET);
diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h
index ad6bf128619..9768ff79a2d 100644
--- a/cpp/src/Ice/UdpTransceiver.h
+++ b/cpp/src/Ice/UdpTransceiver.h
@@ -47,10 +47,10 @@ public:
virtual void setCompletedHandler(SocketOperationCompletedHandler^);
#endif
- virtual SocketOperation initialize(Buffer&, Buffer&);
+ virtual SocketOperation initialize(Buffer&, Buffer&, bool&);
virtual void close();
virtual bool write(Buffer&);
- virtual bool read(Buffer&);
+ virtual bool read(Buffer&, bool&);
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
virtual bool startWrite(Buffer&);
virtual void finishWrite(Buffer&);
diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp
index fbf683d947d..f48395deda9 100644
--- a/cpp/src/IceSSL/TransceiverI.cpp
+++ b/cpp/src/IceSSL/TransceiverI.cpp
@@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status)
#endif
IceInternal::SocketOperation
-IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
+IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&)
{
try
{
@@ -557,7 +557,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf)
}
bool
-IceSSL::TransceiverI::read(IceInternal::Buffer& buf)
+IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&)
{
if(_state == StateProxyConnectRequestPending)
{
diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h
index 2c475c8638e..3b2b3a73937 100644
--- a/cpp/src/IceSSL/TransceiverI.h
+++ b/cpp/src/IceSSL/TransceiverI.h
@@ -46,10 +46,10 @@ public:
virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation);
#endif
- virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&);
virtual void close();
virtual bool write(IceInternal::Buffer&);
- virtual bool read(IceInternal::Buffer&);
+ virtual bool read(IceInternal::Buffer&, bool&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(IceInternal::Buffer&);
virtual void finishWrite(IceInternal::Buffer&);
diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp
index 1d21b70379c..b6db7300986 100644
--- a/cpp/test/Ice/background/AllTests.cpp
+++ b/cpp/test/Ice/background/AllTests.cpp
@@ -313,6 +313,28 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
cout << "ok" << endl;
+ cout << "testing buffered transport... " << flush;
+ {
+ configuration->buffered(true);
+ backgroundController->buffered(true);
+ Ice::AsyncResultPtr r;
+ for(int i = 0; i < 10000; ++i)
+ {
+ r = background->begin_op();
+ if(i % 50 == 0)
+ {
+ backgroundController->holdAdapter();
+ backgroundController->resumeAdapter();
+ }
+ if(i % 100 == 0)
+ {
+ r->waitForCompleted();
+ }
+ }
+ r->waitForCompleted();
+ }
+ cout << "ok" << endl;
+
return background;
}
@@ -422,9 +444,8 @@ initializeTests(const ConfigurationPtr& configuration,
{
background->op();
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::LocalException&)
{
- cerr << ex << endl;
test(false);
}
background->ice_getConnection()->close(false);
diff --git a/cpp/test/Ice/background/Configuration.cpp b/cpp/test/Ice/background/Configuration.cpp
index 4a3e9498d04..45bc7872a6f 100644
--- a/cpp/test/Ice/background/Configuration.cpp
+++ b/cpp/test/Ice/background/Configuration.cpp
@@ -15,7 +15,8 @@ Configuration::Configuration() :
_initializeSocketOperation(IceInternal::SocketOperationNone),
_initializeResetCount(0),
_readReadyCount(0),
- _writeReadyCount(0)
+ _writeReadyCount(0),
+ _buffered(false)
{
assert(!_instance);
_instance = this;
@@ -174,6 +175,20 @@ Configuration::checkWriteException()
}
}
+void
+Configuration::buffered(bool buffered)
+{
+ Lock sync(*this);
+ _buffered = buffered;
+}
+
+bool
+Configuration::buffered()
+{
+ Lock sync(*this);
+ return _buffered;
+}
+
Configuration*
Configuration::getInstance()
{
diff --git a/cpp/test/Ice/background/Configuration.h b/cpp/test/Ice/background/Configuration.h
index 9f0ff1ab634..ddcedc2d3f9 100644
--- a/cpp/test/Ice/background/Configuration.h
+++ b/cpp/test/Ice/background/Configuration.h
@@ -25,27 +25,30 @@ public:
Configuration();
virtual ~Configuration();
- virtual void connectorsException(Ice::LocalException*);
- virtual void checkConnectorsException();
-
- virtual void connectException(Ice::LocalException*);
- virtual void checkConnectException();
-
- virtual void initializeSocketOperation(IceInternal::SocketOperation);
- virtual void initializeException(Ice::LocalException*);
- virtual IceInternal::SocketOperation initializeSocketOperation();
- virtual void checkInitializeException();
-
- virtual void readReady(bool);
- virtual void readException(Ice::LocalException*);
- virtual bool readReady();
- virtual void checkReadException();
-
- virtual void writeReady(bool);
- virtual void writeException(Ice::LocalException*);
- virtual bool writeReady();
- virtual void checkWriteException();
-
+ void connectorsException(Ice::LocalException*);
+ void checkConnectorsException();
+
+ void connectException(Ice::LocalException*);
+ void checkConnectException();
+
+ void initializeSocketOperation(IceInternal::SocketOperation);
+ void initializeException(Ice::LocalException*);
+ IceInternal::SocketOperation initializeSocketOperation();
+ void checkInitializeException();
+
+ void readReady(bool);
+ void readException(Ice::LocalException*);
+ bool readReady();
+ void checkReadException();
+
+ void writeReady(bool);
+ void writeException(Ice::LocalException*);
+ bool writeReady();
+ void checkWriteException();
+
+ void buffered(bool);
+ bool buffered();
+
static Configuration* getInstance();
private:
@@ -59,6 +62,7 @@ private:
IceUtil::UniquePtr<Ice::LocalException> _readException;
int _writeReadyCount;
IceUtil::UniquePtr<Ice::LocalException> _writeException;
+ bool _buffered;
static Configuration* _instance;
};
diff --git a/cpp/test/Ice/background/Test.ice b/cpp/test/Ice/background/Test.ice
index 6fff0d79750..2c2ece09398 100644
--- a/cpp/test/Ice/background/Test.ice
+++ b/cpp/test/Ice/background/Test.ice
@@ -39,6 +39,8 @@ interface BackgroundController
void writeReady(bool enable);
void writeException(bool enable);
+
+ void buffered(bool enable);
};
};
diff --git a/cpp/test/Ice/background/TestI.cpp b/cpp/test/Ice/background/TestI.cpp
index 26cb121abb4..0e6a44cc6cd 100644
--- a/cpp/test/Ice/background/TestI.cpp
+++ b/cpp/test/Ice/background/TestI.cpp
@@ -109,6 +109,12 @@ BackgroundControllerI::writeException(bool enable, const Ice::Current&)
_configuration->writeException(enable ? new Ice::SocketException(__FILE__, __LINE__) : 0);
}
+void
+BackgroundControllerI::buffered(bool enable, const Ice::Current&)
+{
+ _configuration->buffered(enable);
+}
+
BackgroundControllerI::BackgroundControllerI(const Ice::ObjectAdapterPtr& adapter,
const ConfigurationPtr& configuration) :
_adapter(adapter),
diff --git a/cpp/test/Ice/background/TestI.h b/cpp/test/Ice/background/TestI.h
index 7dfe67d2fed..34bc097808e 100644
--- a/cpp/test/Ice/background/TestI.h
+++ b/cpp/test/Ice/background/TestI.h
@@ -53,6 +53,8 @@ public:
virtual void writeReady(bool, const Ice::Current&);
virtual void writeException(bool, const Ice::Current&);
+
+ virtual void buffered(bool, const Ice::Current&);
BackgroundControllerI(const Ice::ObjectAdapterPtr&, const ConfigurationPtr&);
diff --git a/cpp/test/Ice/background/Transceiver.cpp b/cpp/test/Ice/background/Transceiver.cpp
index 03e99934561..5260c7b024d 100644
--- a/cpp/test/Ice/background/Transceiver.cpp
+++ b/cpp/test/Ice/background/Transceiver.cpp
@@ -18,7 +18,7 @@ Transceiver::getNativeInfo()
}
IceInternal::SocketOperation
-Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer)
+Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData)
{
#ifndef ICE_USE_IOCP
IceInternal::SocketOperation status = _configuration->initializeSocketOperation();
@@ -30,7 +30,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr
{
if(!_initialized)
{
- status = _transceiver->initialize(readBuffer, writeBuffer);
+ status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData);
if(status != IceInternal::SocketOperationNone)
{
return status;
@@ -48,7 +48,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr
_configuration->checkInitializeException();
if(!_initialized)
{
- IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer);
+ IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData);
if(status != IceInternal::SocketOperationNone)
{
return status;
@@ -77,7 +77,7 @@ Transceiver::write(IceInternal::Buffer& buf)
}
bool
-Transceiver::read(IceInternal::Buffer& buf)
+Transceiver::read(IceInternal::Buffer& buf, bool& moreData)
{
if(!_configuration->readReady())
{
@@ -85,7 +85,41 @@ Transceiver::read(IceInternal::Buffer& buf)
}
_configuration->checkReadException();
- return _transceiver->read(buf);
+
+ if(_configuration->buffered())
+ {
+ while(buf.i != buf.b.end())
+ {
+ if(_readBufferPos == _readBuffer.i)
+ {
+ _readBufferPos = _readBuffer.i = _readBuffer.b.begin();
+ _transceiver->read(_readBuffer, moreData);
+ if(_readBufferPos == _readBuffer.i)
+ {
+ moreData = false;
+ return false;
+ }
+ }
+ assert(_readBuffer.i > _readBufferPos);
+ size_t requested = buf.b.end() - buf.i;
+ size_t available = _readBuffer.i - _readBufferPos;
+ assert(available > 0);
+ if(available >= requested)
+ {
+ available = requested;
+ }
+
+ memcpy(buf.i, _readBufferPos, available);
+ _readBufferPos += available;
+ buf.i += available;
+ }
+ moreData = _readBufferPos < _readBuffer.i;
+ return true;
+ }
+ else
+ {
+ return _transceiver->read(buf, moreData);
+ }
}
#ifdef ICE_USE_IOCP
@@ -107,14 +141,67 @@ void
Transceiver::startRead(IceInternal::Buffer& buf)
{
_configuration->checkReadException();
- _transceiver->startRead(buf);
+ if(_configuration->buffered())
+ {
+ size_t available = _readBuffer.i - _readBufferPos;
+ if(available > 0)
+ {
+ size_t requested = buf.b.end() - buf.i;
+ assert(available > 0);
+ if(available >= requested)
+ {
+ available = requested;
+ }
+
+ memcpy(buf.i, _readBufferPos, available);
+ _readBufferPos += available;
+ buf.i += available;
+ }
+
+ if(_readBufferPos == _readBuffer.i && buf.i != buf.b.end())
+ {
+ _readBufferPos = _readBuffer.i = _readBuffer.b.begin();
+ _transceiver->startRead(_readBuffer);
+ }
+ else
+ {
+ _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead);
+ }
+ }
+ else
+ {
+ _transceiver->startRead(buf);
+ }
}
void
Transceiver::finishRead(IceInternal::Buffer& buf)
{
_configuration->checkReadException();
- _transceiver->finishRead(buf);
+ if(_configuration->buffered())
+ {
+ if(buf.i != buf.b.end())
+ {
+ _transceiver->finishRead(_readBuffer);
+
+ assert(_readBuffer.i > _readBufferPos);
+ size_t requested = buf.b.end() - buf.i;
+ size_t available = _readBuffer.i - _readBufferPos;
+ assert(available > 0);
+ if(available >= requested)
+ {
+ available = requested;
+ }
+
+ memcpy(buf.i, _readBufferPos, available);
+ _readBufferPos += available;
+ buf.i += available;
+ }
+ }
+ else
+ {
+ _transceiver->finishRead(buf);
+ }
}
#endif
@@ -148,6 +235,11 @@ Transceiver::checkSendSize(const IceInternal::Buffer& buf, size_t messageSizeMax
Transceiver::Transceiver(const IceInternal::TransceiverPtr& transceiver) :
_transceiver(transceiver),
_configuration(Configuration::getInstance()),
- _initialized(false)
+ _initialized(false),
+ _readBuffer(0)
{
+ _readBuffer.b.resize(1024 * 8); // 8KB buffer
+ _readBufferPos = _readBuffer.b.begin();
+ _readBuffer.i = _readBuffer.b.begin();
}
+
diff --git a/cpp/test/Ice/background/Transceiver.h b/cpp/test/Ice/background/Transceiver.h
index 770272557b2..b6e06d40510 100644
--- a/cpp/test/Ice/background/Transceiver.h
+++ b/cpp/test/Ice/background/Transceiver.h
@@ -21,7 +21,7 @@ public:
virtual void close();
virtual bool write(IceInternal::Buffer&);
- virtual bool read(IceInternal::Buffer&);
+ virtual bool read(IceInternal::Buffer&, bool&);
#ifdef ICE_USE_IOCP
virtual bool startWrite(IceInternal::Buffer&);
virtual void finishWrite(IceInternal::Buffer&);
@@ -31,7 +31,7 @@ public:
virtual std::string type() const;
virtual std::string toString() const;
virtual Ice::ConnectionInfoPtr getInfo() const;
- virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&);
+ virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&);
virtual void checkSendSize(const IceInternal::Buffer&, size_t);
private:
@@ -44,6 +44,9 @@ private:
const IceInternal::TransceiverPtr _transceiver;
const ConfigurationPtr _configuration;
bool _initialized;
+
+ IceInternal::Buffer _readBuffer;
+ IceInternal::Buffer::Container::const_iterator _readBufferPos;
};
#endif