diff options
author | Benoit Foucher <benoit@zeroc.com> | 2013-07-26 09:09:03 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2013-07-26 09:09:03 +0200 |
commit | 1e6a694714985125f37bf67dc5d35fd76fc7a33f (patch) | |
tree | 92dbf46286fcd8d02010b00093907511be8ab08d /cpp/src | |
parent | ICE-5313 - more fixes for checksums (diff) | |
download | ice-1e6a694714985125f37bf67dc5d35fd76fc7a33f.tar.bz2 ice-1e6a694714985125f37bf67dc5d35fd76fc7a33f.tar.xz ice-1e6a694714985125f37bf67dc5d35fd76fc7a33f.zip |
Revert "Fix to allow transceivers to read more data than requested."
This reverts commit 9c4e79ce6760badf047568fd300fcbe3455f31b7.
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 11 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.cpp | 1 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Network.cpp | 18 | ||||
-rw-r--r-- | cpp/src/Ice/Network.h | 7 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 3 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 108 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Transceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.h | 4 |
15 files changed, 30 insertions, 152 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 9abb600eae2..8091d2f2233 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1366,7 +1366,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { if(_readHeader) // Read header if necessary. { - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData)) + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) { return; } @@ -1437,7 +1437,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _observer.startRead(_readStream.i); } - if(!_transceiver->read(_readStream, _hasMoreData)) + if(!_transceiver->read(_readStream)) { assert(!_readStream.b.empty()); scheduleTimeout(SocketOperationRead, _endpoint->timeout()); @@ -2135,8 +2135,7 @@ Ice::ConnectionI::setState(State state) } if(_state == StateHolding) { - // We need to continue to read in closing state. - _threadPool->_register(this, SocketOperationRead); + _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state. } break; } @@ -2281,7 +2280,7 @@ Ice::ConnectionI::initiateShutdown() bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream); if(s != SocketOperationNone) { scheduleTimeout(s, connectTimeout()); @@ -2350,7 +2349,7 @@ Ice::ConnectionI::validate(SocketOperation operation) _observer.startRead(_readStream.i); } - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData)) + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) { scheduleTimeout(SocketOperationRead, connectTimeout()); _threadPool->update(this, operation, SocketOperationRead); diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp index 413d44c2e32..f0505a8b7db 100644 --- a/cpp/src/Ice/EventHandler.cpp +++ b/cpp/src/Ice/EventHandler.cpp @@ -25,7 +25,6 @@ IceInternal::EventHandler::EventHandler() : #else _disabled(SocketOperationNone), #endif - _hasMoreData(false), _registered(SocketOperationNone) { } diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h index 565e963a11f..7e51f0e9159 100644 --- a/cpp/src/Ice/EventHandler.h +++ b/cpp/src/Ice/EventHandler.h @@ -65,7 +65,6 @@ protected: #else SocketOperation _disabled; #endif - bool _hasMoreData; SocketOperation _registered; friend class ThreadPool; diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index 0b280166a37..987ba372326 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -543,24 +543,6 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s) ZeroMemory(this, sizeof(AsyncInfo)); status = s; } - -void -IceInternal::NativeInfo::initialize(HANDLE handle, ULONG_PTR key) -{ - _handle = handle; - _key = key; -} - -void -IceInternal::NativeInfo::completed(SocketOperation operation) -{ - if(!PostQueuedCompletionStatus(_handle, 0, _key, getAsyncInfo(operation))) - { - Ice::SocketException ex(__FILE__, __LINE__); - ex.error = GetLastError(); - throw ex; - } -} #endif IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; } diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index c9416ee4ef6..4f045031dc5 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -178,8 +178,6 @@ public: // #if defined(ICE_USE_IOCP) virtual AsyncInfo* getAsyncInfo(SocketOperation) = 0; - void initialize(HANDLE, ULONG_PTR); - void completed(SocketOperation operation); #elif defined(ICE_OS_WINRT) virtual void setCompletedHandler(SocketOperationCompletedHandler^) = 0; #endif @@ -187,11 +185,6 @@ public: protected: SOCKET _fd; - -#if defined(ICE_USE_IOCP) - HANDLE _handle; - ULONG_PTR _key; -#endif }; typedef IceUtil::Handle<NativeInfo> NativeInfoPtr; diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index 48de01df197..c9763d6e2a9 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -140,7 +140,6 @@ Selector::initialize(EventHandler* handler) throw ex; } handler->__incRef(); - handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler)); } void @@ -483,6 +482,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } assert(ret > 0); + handlers.clear(); for(int i = 0; i < ret; ++i) { pair<EventHandler*, SocketOperation> p; @@ -701,6 +701,7 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } assert(ret > 0); + handlers.clear(); #if defined(ICE_USE_SELECT) if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0) diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index c29391fa9da..d82fafa17ce 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -46,7 +46,7 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) #endif SocketOperation -IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) { try { @@ -93,7 +93,7 @@ IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, // // Try to read the response. // - if(read(readBuffer, hasMoreData)) + if(read(readBuffer)) { // // Read completed without blocking - fall through. @@ -268,7 +268,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf) } bool -IceInternal::TcpTransceiver::read(Buffer& buf, bool&) +IceInternal::TcpTransceiver::read(Buffer& buf) { // // It's impossible for packetSize to be more than an Int. diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index 5faad7b9a46..a2afe4bb514 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -41,10 +41,10 @@ public: virtual AsyncInfo* getAsyncInfo(SocketOperation); #endif - virtual SocketOperation initialize(Buffer&, Buffer&, bool&); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual void close(); virtual bool write(Buffer&); - virtual bool read(Buffer&, bool&); + virtual bool read(Buffer&); #ifdef ICE_USE_IOCP virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 32b7bbddde6..62652c786d2 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -108,29 +108,6 @@ private: IceUtil::ThreadPtr _thread; }; -class InterruptWorkItem : public ThreadPoolWorkItem -{ -public: - - virtual void - execute(ThreadPoolCurrent& current) - { - // Nothing to do, this is just used to interrupt the thread pool selector. - } -}; -ThreadPoolWorkItemPtr interruptWorkItem; - -class InterruptWorkItemInit -{ -public: - - InterruptWorkItemInit() - { - interruptWorkItem = new InterruptWorkItem; - } -}; -InterruptWorkItemInit init; - // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -585,20 +562,6 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation Lock sync(*this); assert(!_destroyed); _selector.update(handler.get(), remove, add); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - if(add & SocketOperationRead && handler->_hasMoreData) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(handler.get()); - } - else if(remove & SocketOperationRead) - { - _pendingHandlers.erase(handler.get()); - } -#endif } void @@ -614,8 +577,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) // Clear the current ready handlers. The handlers from this vector can't be // reference counted and a handler might get destroyed once it's finished. // - //_handlers.clear(); - //_nextHandler = _handlers.end(); + _handlers.clear(); + _nextHandler = _handlers.end(); #else // If there are no pending asynchronous operations, we can call finish on the handler now. if(!handler->_pending) @@ -719,19 +682,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(select) { _handlers.swap(handlers); - if(!_pendingHandlers.empty()) - { - for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler) - { - _pendingHandlers.erase(_nextHandler->first); - } - set<EventHandler*>::const_iterator p; - for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p) - { - _handlers.push_back(make_pair(*p, SocketOperationRead)); - } - _pendingHandlers.clear(); - } _nextHandler = _handlers.begin(); _selector.finishSelect(); select = false; @@ -750,14 +700,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // the IO thread count now. // --_inUseIO; - if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } } else { @@ -765,18 +707,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // - if(_serialize) - { - _selector.enable(current._handler.get(), current.operation); - if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } - } + _selector.enable(current._handler.get(), current.operation); assert(_inUse > 0); --_inUse; } @@ -786,22 +717,10 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) return; // Wait timed-out. } } - else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } // // Get the next ready handler. // - while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered)) - { - ++_nextHandler; - } if(_nextHandler != _handlers.end()) { current._ioCompleted = false; @@ -829,7 +748,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) } else { - _handlers.clear(); _selector.startSelect(); select = true; thread->setState(ThreadStateIdle); @@ -971,25 +889,13 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_sizeMax > 1) { - #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; - - if(!_destroyed) + + if(_serialize && !_destroyed) { - if(_serialize) - { - _selector.disable(current._handler.get(), current.operation); - } - else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) - { - if(_pendingHandlers.empty()) - { - _workQueue->queue(interruptWorkItem); // Interrupt select() - } - _pendingHandlers.insert(current._handler.get()); - } - } + _selector.disable(current._handler.get(), current.operation); + } if(current._leader) { diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index cf8a6d926b4..43a8f2fb361 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -127,7 +127,6 @@ private: std::vector<std::pair<EventHandler*, SocketOperation> > _handlers; std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler; #endif - std::set<EventHandler*> _pendingHandlers; bool _promote; }; diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index c1f6aa64b03..c2c00cf151d 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -25,10 +25,10 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared public: virtual NativeInfoPtr getNativeInfo() = 0; - virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0; + virtual SocketOperation initialize(Buffer&, Buffer&) = 0; virtual void close() = 0; virtual bool write(Buffer&) = 0; - virtual bool read(Buffer&, bool&) = 0; + virtual bool read(Buffer&) = 0; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&) = 0; virtual void finishWrite(Buffer&) = 0; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index 53d0901228c..ded61171571 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler #endif SocketOperation -IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/) +IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/) { if(_state == StateNeedConnect) { @@ -243,13 +243,13 @@ repeat: #ifdef ICE_OS_WINRT bool -IceInternal::UdpTransceiver::read(Buffer&, bool&) +IceInternal::UdpTransceiver::read(Buffer&) { return false; } #else bool -IceInternal::UdpTransceiver::read(Buffer& buf, bool&) +IceInternal::UdpTransceiver::read(Buffer& buf) { assert(buf.i == buf.b.begin()); assert(_fd != INVALID_SOCKET); diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index 9768ff79a2d..ad6bf128619 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -47,10 +47,10 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); #endif - virtual SocketOperation initialize(Buffer&, Buffer&, bool&); + virtual SocketOperation initialize(Buffer&, Buffer&); virtual void close(); virtual bool write(Buffer&); - virtual bool read(Buffer&, bool&); + virtual bool read(Buffer&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index f48395deda9..fbf683d947d 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) #endif IceInternal::SocketOperation -IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) { try { @@ -557,7 +557,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } bool -IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) +IceSSL::TransceiverI::read(IceInternal::Buffer& buf) { if(_state == StateProxyConnectRequestPending) { diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h index 3b2b3a73937..2c475c8638e 100644 --- a/cpp/src/IceSSL/TransceiverI.h +++ b/cpp/src/IceSSL/TransceiverI.h @@ -46,10 +46,10 @@ public: virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation); #endif - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); virtual void close(); virtual bool write(IceInternal::Buffer&); - virtual bool read(IceInternal::Buffer&, bool&); + virtual bool read(IceInternal::Buffer&); #ifdef ICE_USE_IOCP virtual bool startWrite(IceInternal::Buffer&); virtual void finishWrite(IceInternal::Buffer&); |