diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-07-12 14:07:08 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-07-12 14:07:08 +0200 |
commit | e0064a1ce41067e40eb1495745e3499e836f1a61 (patch) | |
tree | ca41e7d77ff2da9d02775a4209c2e0009868053d /cpp | |
parent | ICE-5377 - clang optimized build failure (diff) | |
download | ice-e0064a1ce41067e40eb1495745e3499e836f1a61.tar.bz2 ice-e0064a1ce41067e40eb1495745e3499e836f1a61.tar.xz ice-e0064a1ce41067e40eb1495745e3499e836f1a61.zip |
Fix to allow transceivers to read more data than requested.
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 11 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.cpp | 1 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Network.cpp | 18 | ||||
-rw-r--r-- | cpp/src/Ice/Network.h | 7 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 3 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 108 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Transceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.h | 4 | ||||
-rw-r--r-- | cpp/test/Ice/background/AllTests.cpp | 25 | ||||
-rw-r--r-- | cpp/test/Ice/background/Configuration.cpp | 17 | ||||
-rw-r--r-- | cpp/test/Ice/background/Configuration.h | 46 | ||||
-rw-r--r-- | cpp/test/Ice/background/Test.ice | 2 | ||||
-rw-r--r-- | cpp/test/Ice/background/TestI.cpp | 6 | ||||
-rw-r--r-- | cpp/test/Ice/background/TestI.h | 2 | ||||
-rw-r--r-- | cpp/test/Ice/background/Transceiver.cpp | 108 | ||||
-rw-r--r-- | cpp/test/Ice/background/Transceiver.h | 7 |
23 files changed, 331 insertions, 64 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 8091d2f2233..9abb600eae2 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1366,7 +1366,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { if(_readHeader) // Read header if necessary. { - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData)) { return; } @@ -1437,7 +1437,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _observer.startRead(_readStream.i); } - if(!_transceiver->read(_readStream)) + if(!_transceiver->read(_readStream, _hasMoreData)) { assert(!_readStream.b.empty()); scheduleTimeout(SocketOperationRead, _endpoint->timeout()); @@ -2135,7 +2135,8 @@ Ice::ConnectionI::setState(State state) } if(_state == StateHolding) { - _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state. + // We need to continue to read in closing state. + _threadPool->_register(this, SocketOperationRead); } break; } @@ -2280,7 +2281,7 @@ Ice::ConnectionI::initiateShutdown() bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(_readStream, _writeStream); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData); if(s != SocketOperationNone) { scheduleTimeout(s, connectTimeout()); @@ -2349,7 +2350,7 @@ Ice::ConnectionI::validate(SocketOperation operation) _observer.startRead(_readStream.i); } - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData)) { scheduleTimeout(SocketOperationRead, connectTimeout()); _threadPool->update(this, operation, SocketOperationRead); diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp index f0505a8b7db..413d44c2e32 100644 --- a/cpp/src/Ice/EventHandler.cpp +++ b/cpp/src/Ice/EventHandler.cpp @@ -25,6 +25,7 @@ IceInternal::EventHandler::EventHandler() : #else _disabled(SocketOperationNone), #endif + _hasMoreData(false), _registered(SocketOperationNone) { } diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h index 7e51f0e9159..565e963a11f 100644 --- a/cpp/src/Ice/EventHandler.h +++ b/cpp/src/Ice/EventHandler.h @@ -65,6 +65,7 @@ protected: #else SocketOperation _disabled; #endif + bool _hasMoreData; SocketOperation _registered; friend class ThreadPool; diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index fb4189d7e6e..7e76a350862 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -543,6 +543,24 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s) ZeroMemory(this, sizeof(AsyncInfo)); status = s; } + +void +IceInternal::NativeInfo::initialize(HANDLE handle, ULONG_PTR key) +{ + _handle = handle; + _key = key; +} + +void +IceInternal::NativeInfo::completed(SocketOperation operation) +{ + if(!PostQueuedCompletionStatus(_handle, 0, _key, getAsyncInfo(operation))) + { + Ice::SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; + } +} #endif IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; } diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index fe606f8fc2d..46b01689ec0 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -178,6 +178,8 @@ public: // #if defined(ICE_USE_IOCP) virtual AsyncInfo* getAsyncInfo(SocketOperation) = 0; + void initialize(HANDLE, ULONG_PTR); + void completed(SocketOperation operation); #elif defined(ICE_OS_WINRT) virtual void setCompletedHandler(SocketOperationCompletedHandler^) = 0; #endif @@ -185,6 +187,11 @@ public: protected: SOCKET _fd; + +#if defined(ICE_USE_IOCP) + HANDLE _handle; + ULONG_PTR _key; +#endif }; typedef IceUtil::Handle<NativeInfo> NativeInfoPtr; diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index c9763d6e2a9..48de01df197 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -140,6 +140,7 @@ Selector::initialize(EventHandler* handler) throw ex; } handler->__incRef(); + handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler)); } void @@ -482,7 +483,6 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } assert(ret > 0); - handlers.clear(); for(int i = 0; i < ret; ++i) { pair<EventHandler*, SocketOperation> p; @@ -701,7 +701,6 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } assert(ret > 0); - handlers.clear(); #if defined(ICE_USE_SELECT) if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0) diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index d82fafa17ce..c29391fa9da 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -46,7 +46,7 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) #endif SocketOperation -IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) { try { @@ -93,7 +93,7 @@ IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) // // Try to read the response. // - if(read(readBuffer)) + if(read(readBuffer, hasMoreData)) { // // Read completed without blocking - fall through. @@ -268,7 +268,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf) } bool -IceInternal::TcpTransceiver::read(Buffer& buf) +IceInternal::TcpTransceiver::read(Buffer& buf, bool&) { // // It's impossible for packetSize to be more than an Int. diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index a2afe4bb514..5faad7b9a46 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -41,10 +41,10 @@ public: virtual AsyncInfo* getAsyncInfo(SocketOperation); #endif - virtual SocketOperation initialize(Buffer&, Buffer&); + virtual SocketOperation initialize(Buffer&, Buffer&, bool&); virtual void close(); virtual bool write(Buffer&); - virtual bool read(Buffer&); + virtual bool read(Buffer&, bool&); #ifdef ICE_USE_IOCP virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 62652c786d2..32b7bbddde6 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -108,6 +108,29 @@ private: IceUtil::ThreadPtr _thread; }; +class InterruptWorkItem : public ThreadPoolWorkItem +{ +public: + + virtual void + execute(ThreadPoolCurrent& current) + { + // Nothing to do, this is just used to interrupt the thread pool selector. + } +}; +ThreadPoolWorkItemPtr interruptWorkItem; + +class InterruptWorkItemInit +{ +public: + + InterruptWorkItemInit() + { + interruptWorkItem = new InterruptWorkItem; + } +}; +InterruptWorkItemInit init; + // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -562,6 +585,20 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation Lock sync(*this); assert(!_destroyed); _selector.update(handler.get(), remove, add); +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) + if(add & SocketOperationRead && handler->_hasMoreData) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(handler.get()); + } + else if(remove & SocketOperationRead) + { + _pendingHandlers.erase(handler.get()); + } +#endif } void @@ -577,8 +614,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) // Clear the current ready handlers. The handlers from this vector can't be // reference counted and a handler might get destroyed once it's finished. // - _handlers.clear(); - _nextHandler = _handlers.end(); + //_handlers.clear(); + //_nextHandler = _handlers.end(); #else // If there are no pending asynchronous operations, we can call finish on the handler now. if(!handler->_pending) @@ -682,6 +719,19 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(select) { _handlers.swap(handlers); + if(!_pendingHandlers.empty()) + { + for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler) + { + _pendingHandlers.erase(_nextHandler->first); + } + set<EventHandler*>::const_iterator p; + for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p) + { + _handlers.push_back(make_pair(*p, SocketOperationRead)); + } + _pendingHandlers.clear(); + } _nextHandler = _handlers.begin(); _selector.finishSelect(); select = false; @@ -700,6 +750,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // the IO thread count now. // --_inUseIO; + if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } } else { @@ -707,7 +765,18 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // - _selector.enable(current._handler.get(), current.operation); + if(_serialize) + { + _selector.enable(current._handler.get(), current.operation); + if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } + } assert(_inUse > 0); --_inUse; } @@ -717,10 +786,22 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) return; // Wait timed-out. } } + else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } // // Get the next ready handler. // + while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered)) + { + ++_nextHandler; + } if(_nextHandler != _handlers.end()) { current._ioCompleted = false; @@ -748,6 +829,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) } else { + _handlers.clear(); _selector.startSelect(); select = true; thread->setState(ThreadStateIdle); @@ -889,13 +971,25 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_sizeMax > 1) { + #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; - - if(_serialize && !_destroyed) + + if(!_destroyed) { - _selector.disable(current._handler.get(), current.operation); - } + if(_serialize) + { + _selector.disable(current._handler.get(), current.operation); + } + else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } + } if(current._leader) { diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 43a8f2fb361..cf8a6d926b4 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -127,6 +127,7 @@ private: std::vector<std::pair<EventHandler*, SocketOperation> > _handlers; std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler; #endif + std::set<EventHandler*> _pendingHandlers; bool _promote; }; diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index c2c00cf151d..c1f6aa64b03 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -25,10 +25,10 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared public: virtual NativeInfoPtr getNativeInfo() = 0; - virtual SocketOperation initialize(Buffer&, Buffer&) = 0; + virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0; virtual void close() = 0; virtual bool write(Buffer&) = 0; - virtual bool read(Buffer&) = 0; + virtual bool read(Buffer&, bool&) = 0; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&) = 0; virtual void finishWrite(Buffer&) = 0; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index ded61171571..53d0901228c 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler #endif SocketOperation -IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/) +IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/) { if(_state == StateNeedConnect) { @@ -243,13 +243,13 @@ repeat: #ifdef ICE_OS_WINRT bool -IceInternal::UdpTransceiver::read(Buffer&) +IceInternal::UdpTransceiver::read(Buffer&, bool&) { return false; } #else bool -IceInternal::UdpTransceiver::read(Buffer& buf) +IceInternal::UdpTransceiver::read(Buffer& buf, bool&) { assert(buf.i == buf.b.begin()); assert(_fd != INVALID_SOCKET); diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index ad6bf128619..9768ff79a2d 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -47,10 +47,10 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); #endif - virtual SocketOperation initialize(Buffer&, Buffer&); + virtual SocketOperation initialize(Buffer&, Buffer&, bool&); virtual void close(); virtual bool write(Buffer&); - virtual bool read(Buffer&); + virtual bool read(Buffer&, bool&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index fbf683d947d..f48395deda9 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) #endif IceInternal::SocketOperation -IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) { try { @@ -557,7 +557,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } bool -IceSSL::TransceiverI::read(IceInternal::Buffer& buf) +IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) { if(_state == StateProxyConnectRequestPending) { diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h index 2c475c8638e..3b2b3a73937 100644 --- a/cpp/src/IceSSL/TransceiverI.h +++ b/cpp/src/IceSSL/TransceiverI.h @@ -46,10 +46,10 @@ public: virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation); #endif - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); virtual void close(); virtual bool write(IceInternal::Buffer&); - virtual bool read(IceInternal::Buffer&); + virtual bool read(IceInternal::Buffer&, bool&); #ifdef ICE_USE_IOCP virtual bool startWrite(IceInternal::Buffer&); virtual void finishWrite(IceInternal::Buffer&); diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp index 1d21b70379c..b6db7300986 100644 --- a/cpp/test/Ice/background/AllTests.cpp +++ b/cpp/test/Ice/background/AllTests.cpp @@ -313,6 +313,28 @@ allTests(const Ice::CommunicatorPtr& communicator) } cout << "ok" << endl; + cout << "testing buffered transport... " << flush; + { + configuration->buffered(true); + backgroundController->buffered(true); + Ice::AsyncResultPtr r; + for(int i = 0; i < 10000; ++i) + { + r = background->begin_op(); + if(i % 50 == 0) + { + backgroundController->holdAdapter(); + backgroundController->resumeAdapter(); + } + if(i % 100 == 0) + { + r->waitForCompleted(); + } + } + r->waitForCompleted(); + } + cout << "ok" << endl; + return background; } @@ -422,9 +444,8 @@ initializeTests(const ConfigurationPtr& configuration, { background->op(); } - catch(const Ice::LocalException& ex) + catch(const Ice::LocalException&) { - cerr << ex << endl; test(false); } background->ice_getConnection()->close(false); diff --git a/cpp/test/Ice/background/Configuration.cpp b/cpp/test/Ice/background/Configuration.cpp index 4a3e9498d04..45bc7872a6f 100644 --- a/cpp/test/Ice/background/Configuration.cpp +++ b/cpp/test/Ice/background/Configuration.cpp @@ -15,7 +15,8 @@ Configuration::Configuration() : _initializeSocketOperation(IceInternal::SocketOperationNone), _initializeResetCount(0), _readReadyCount(0), - _writeReadyCount(0) + _writeReadyCount(0), + _buffered(false) { assert(!_instance); _instance = this; @@ -174,6 +175,20 @@ Configuration::checkWriteException() } } +void +Configuration::buffered(bool buffered) +{ + Lock sync(*this); + _buffered = buffered; +} + +bool +Configuration::buffered() +{ + Lock sync(*this); + return _buffered; +} + Configuration* Configuration::getInstance() { diff --git a/cpp/test/Ice/background/Configuration.h b/cpp/test/Ice/background/Configuration.h index 9f0ff1ab634..ddcedc2d3f9 100644 --- a/cpp/test/Ice/background/Configuration.h +++ b/cpp/test/Ice/background/Configuration.h @@ -25,27 +25,30 @@ public: Configuration(); virtual ~Configuration(); - virtual void connectorsException(Ice::LocalException*); - virtual void checkConnectorsException(); - - virtual void connectException(Ice::LocalException*); - virtual void checkConnectException(); - - virtual void initializeSocketOperation(IceInternal::SocketOperation); - virtual void initializeException(Ice::LocalException*); - virtual IceInternal::SocketOperation initializeSocketOperation(); - virtual void checkInitializeException(); - - virtual void readReady(bool); - virtual void readException(Ice::LocalException*); - virtual bool readReady(); - virtual void checkReadException(); - - virtual void writeReady(bool); - virtual void writeException(Ice::LocalException*); - virtual bool writeReady(); - virtual void checkWriteException(); - + void connectorsException(Ice::LocalException*); + void checkConnectorsException(); + + void connectException(Ice::LocalException*); + void checkConnectException(); + + void initializeSocketOperation(IceInternal::SocketOperation); + void initializeException(Ice::LocalException*); + IceInternal::SocketOperation initializeSocketOperation(); + void checkInitializeException(); + + void readReady(bool); + void readException(Ice::LocalException*); + bool readReady(); + void checkReadException(); + + void writeReady(bool); + void writeException(Ice::LocalException*); + bool writeReady(); + void checkWriteException(); + + void buffered(bool); + bool buffered(); + static Configuration* getInstance(); private: @@ -59,6 +62,7 @@ private: IceUtil::UniquePtr<Ice::LocalException> _readException; int _writeReadyCount; IceUtil::UniquePtr<Ice::LocalException> _writeException; + bool _buffered; static Configuration* _instance; }; diff --git a/cpp/test/Ice/background/Test.ice b/cpp/test/Ice/background/Test.ice index 6fff0d79750..2c2ece09398 100644 --- a/cpp/test/Ice/background/Test.ice +++ b/cpp/test/Ice/background/Test.ice @@ -39,6 +39,8 @@ interface BackgroundController void writeReady(bool enable); void writeException(bool enable); + + void buffered(bool enable); }; }; diff --git a/cpp/test/Ice/background/TestI.cpp b/cpp/test/Ice/background/TestI.cpp index 26cb121abb4..0e6a44cc6cd 100644 --- a/cpp/test/Ice/background/TestI.cpp +++ b/cpp/test/Ice/background/TestI.cpp @@ -109,6 +109,12 @@ BackgroundControllerI::writeException(bool enable, const Ice::Current&) _configuration->writeException(enable ? new Ice::SocketException(__FILE__, __LINE__) : 0); } +void +BackgroundControllerI::buffered(bool enable, const Ice::Current&) +{ + _configuration->buffered(enable); +} + BackgroundControllerI::BackgroundControllerI(const Ice::ObjectAdapterPtr& adapter, const ConfigurationPtr& configuration) : _adapter(adapter), diff --git a/cpp/test/Ice/background/TestI.h b/cpp/test/Ice/background/TestI.h index 7dfe67d2fed..34bc097808e 100644 --- a/cpp/test/Ice/background/TestI.h +++ b/cpp/test/Ice/background/TestI.h @@ -53,6 +53,8 @@ public: virtual void writeReady(bool, const Ice::Current&); virtual void writeException(bool, const Ice::Current&); + + virtual void buffered(bool, const Ice::Current&); BackgroundControllerI(const Ice::ObjectAdapterPtr&, const ConfigurationPtr&); diff --git a/cpp/test/Ice/background/Transceiver.cpp b/cpp/test/Ice/background/Transceiver.cpp index 03e99934561..5260c7b024d 100644 --- a/cpp/test/Ice/background/Transceiver.cpp +++ b/cpp/test/Ice/background/Transceiver.cpp @@ -18,7 +18,7 @@ Transceiver::getNativeInfo() } IceInternal::SocketOperation -Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) +Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool& hasMoreData) { #ifndef ICE_USE_IOCP IceInternal::SocketOperation status = _configuration->initializeSocketOperation(); @@ -30,7 +30,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr { if(!_initialized) { - status = _transceiver->initialize(readBuffer, writeBuffer); + status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData); if(status != IceInternal::SocketOperationNone) { return status; @@ -48,7 +48,7 @@ Transceiver::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& wr _configuration->checkInitializeException(); if(!_initialized) { - IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer); + IceInternal::SocketOperation status = _transceiver->initialize(readBuffer, writeBuffer, hasMoreData); if(status != IceInternal::SocketOperationNone) { return status; @@ -77,7 +77,7 @@ Transceiver::write(IceInternal::Buffer& buf) } bool -Transceiver::read(IceInternal::Buffer& buf) +Transceiver::read(IceInternal::Buffer& buf, bool& moreData) { if(!_configuration->readReady()) { @@ -85,7 +85,41 @@ Transceiver::read(IceInternal::Buffer& buf) } _configuration->checkReadException(); - return _transceiver->read(buf); + + if(_configuration->buffered()) + { + while(buf.i != buf.b.end()) + { + if(_readBufferPos == _readBuffer.i) + { + _readBufferPos = _readBuffer.i = _readBuffer.b.begin(); + _transceiver->read(_readBuffer, moreData); + if(_readBufferPos == _readBuffer.i) + { + moreData = false; + return false; + } + } + assert(_readBuffer.i > _readBufferPos); + size_t requested = buf.b.end() - buf.i; + size_t available = _readBuffer.i - _readBufferPos; + assert(available > 0); + if(available >= requested) + { + available = requested; + } + + memcpy(buf.i, _readBufferPos, available); + _readBufferPos += available; + buf.i += available; + } + moreData = _readBufferPos < _readBuffer.i; + return true; + } + else + { + return _transceiver->read(buf, moreData); + } } #ifdef ICE_USE_IOCP @@ -107,14 +141,67 @@ void Transceiver::startRead(IceInternal::Buffer& buf) { _configuration->checkReadException(); - _transceiver->startRead(buf); + if(_configuration->buffered()) + { + size_t available = _readBuffer.i - _readBufferPos; + if(available > 0) + { + size_t requested = buf.b.end() - buf.i; + assert(available > 0); + if(available >= requested) + { + available = requested; + } + + memcpy(buf.i, _readBufferPos, available); + _readBufferPos += available; + buf.i += available; + } + + if(_readBufferPos == _readBuffer.i && buf.i != buf.b.end()) + { + _readBufferPos = _readBuffer.i = _readBuffer.b.begin(); + _transceiver->startRead(_readBuffer); + } + else + { + _transceiver->getNativeInfo()->completed(IceInternal::SocketOperationRead); + } + } + else + { + _transceiver->startRead(buf); + } } void Transceiver::finishRead(IceInternal::Buffer& buf) { _configuration->checkReadException(); - _transceiver->finishRead(buf); + if(_configuration->buffered()) + { + if(buf.i != buf.b.end()) + { + _transceiver->finishRead(_readBuffer); + + assert(_readBuffer.i > _readBufferPos); + size_t requested = buf.b.end() - buf.i; + size_t available = _readBuffer.i - _readBufferPos; + assert(available > 0); + if(available >= requested) + { + available = requested; + } + + memcpy(buf.i, _readBufferPos, available); + _readBufferPos += available; + buf.i += available; + } + } + else + { + _transceiver->finishRead(buf); + } } #endif @@ -148,6 +235,11 @@ Transceiver::checkSendSize(const IceInternal::Buffer& buf, size_t messageSizeMax Transceiver::Transceiver(const IceInternal::TransceiverPtr& transceiver) : _transceiver(transceiver), _configuration(Configuration::getInstance()), - _initialized(false) + _initialized(false), + _readBuffer(0) { + _readBuffer.b.resize(1024 * 8); // 8KB buffer + _readBufferPos = _readBuffer.b.begin(); + _readBuffer.i = _readBuffer.b.begin(); } + diff --git a/cpp/test/Ice/background/Transceiver.h b/cpp/test/Ice/background/Transceiver.h index 770272557b2..b6e06d40510 100644 --- a/cpp/test/Ice/background/Transceiver.h +++ b/cpp/test/Ice/background/Transceiver.h @@ -21,7 +21,7 @@ public: virtual void close(); virtual bool write(IceInternal::Buffer&); - virtual bool read(IceInternal::Buffer&); + virtual bool read(IceInternal::Buffer&, bool&); #ifdef ICE_USE_IOCP virtual bool startWrite(IceInternal::Buffer&); virtual void finishWrite(IceInternal::Buffer&); @@ -31,7 +31,7 @@ public: virtual std::string type() const; virtual std::string toString() const; virtual Ice::ConnectionInfoPtr getInfo() const; - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); virtual void checkSendSize(const IceInternal::Buffer&, size_t); private: @@ -44,6 +44,9 @@ private: const IceInternal::TransceiverPtr _transceiver; const ConfigurationPtr _configuration; bool _initialized; + + IceInternal::Buffer _readBuffer; + IceInternal::Buffer::Container::const_iterator _readBufferPos; }; #endif |