diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 11 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.cpp | 1 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Network.cpp | 18 | ||||
-rw-r--r-- | cpp/src/Ice/Network.h | 7 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 3 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/TcpTransceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 108 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Transceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.cpp | 6 | ||||
-rw-r--r-- | cpp/src/Ice/UdpTransceiver.h | 4 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceSSL/TransceiverI.h | 4 |
15 files changed, 152 insertions, 30 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 8091d2f2233..9abb600eae2 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1366,7 +1366,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { if(_readHeader) // Read header if necessary. { - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData)) { return; } @@ -1437,7 +1437,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _observer.startRead(_readStream.i); } - if(!_transceiver->read(_readStream)) + if(!_transceiver->read(_readStream, _hasMoreData)) { assert(!_readStream.b.empty()); scheduleTimeout(SocketOperationRead, _endpoint->timeout()); @@ -2135,7 +2135,8 @@ Ice::ConnectionI::setState(State state) } if(_state == StateHolding) { - _threadPool->_register(this, SocketOperationRead); // We need to continue to read in closing state. + // We need to continue to read in closing state. + _threadPool->_register(this, SocketOperationRead); } break; } @@ -2280,7 +2281,7 @@ Ice::ConnectionI::initiateShutdown() bool Ice::ConnectionI::initialize(SocketOperation operation) { - SocketOperation s = _transceiver->initialize(_readStream, _writeStream); + SocketOperation s = _transceiver->initialize(_readStream, _writeStream, _hasMoreData); if(s != SocketOperationNone) { scheduleTimeout(s, connectTimeout()); @@ -2349,7 +2350,7 @@ Ice::ConnectionI::validate(SocketOperation operation) _observer.startRead(_readStream.i); } - if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream)) + if(_readStream.i != _readStream.b.end() && !_transceiver->read(_readStream, _hasMoreData)) { scheduleTimeout(SocketOperationRead, connectTimeout()); _threadPool->update(this, operation, SocketOperationRead); diff --git a/cpp/src/Ice/EventHandler.cpp b/cpp/src/Ice/EventHandler.cpp index f0505a8b7db..413d44c2e32 100644 --- a/cpp/src/Ice/EventHandler.cpp +++ b/cpp/src/Ice/EventHandler.cpp @@ -25,6 +25,7 @@ IceInternal::EventHandler::EventHandler() : #else _disabled(SocketOperationNone), #endif + _hasMoreData(false), _registered(SocketOperationNone) { } diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h index 7e51f0e9159..565e963a11f 100644 --- a/cpp/src/Ice/EventHandler.h +++ b/cpp/src/Ice/EventHandler.h @@ -65,6 +65,7 @@ protected: #else SocketOperation _disabled; #endif + bool _hasMoreData; SocketOperation _registered; friend class ThreadPool; diff --git a/cpp/src/Ice/Network.cpp b/cpp/src/Ice/Network.cpp index fb4189d7e6e..7e76a350862 100644 --- a/cpp/src/Ice/Network.cpp +++ b/cpp/src/Ice/Network.cpp @@ -543,6 +543,24 @@ IceInternal::AsyncInfo::AsyncInfo(SocketOperation s) ZeroMemory(this, sizeof(AsyncInfo)); status = s; } + +void +IceInternal::NativeInfo::initialize(HANDLE handle, ULONG_PTR key) +{ + _handle = handle; + _key = key; +} + +void +IceInternal::NativeInfo::completed(SocketOperation operation) +{ + if(!PostQueuedCompletionStatus(_handle, 0, _key, getAsyncInfo(operation))) + { + Ice::SocketException ex(__FILE__, __LINE__); + ex.error = GetLastError(); + throw ex; + } +} #endif IceUtil::Shared* IceInternal::upCast(NetworkProxy* p) { return p; } diff --git a/cpp/src/Ice/Network.h b/cpp/src/Ice/Network.h index fe606f8fc2d..46b01689ec0 100644 --- a/cpp/src/Ice/Network.h +++ b/cpp/src/Ice/Network.h @@ -178,6 +178,8 @@ public: // #if defined(ICE_USE_IOCP) virtual AsyncInfo* getAsyncInfo(SocketOperation) = 0; + void initialize(HANDLE, ULONG_PTR); + void completed(SocketOperation operation); #elif defined(ICE_OS_WINRT) virtual void setCompletedHandler(SocketOperationCompletedHandler^) = 0; #endif @@ -185,6 +187,11 @@ public: protected: SOCKET _fd; + +#if defined(ICE_USE_IOCP) + HANDLE _handle; + ULONG_PTR _key; +#endif }; typedef IceUtil::Handle<NativeInfo> NativeInfoPtr; diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index c9763d6e2a9..48de01df197 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -140,6 +140,7 @@ Selector::initialize(EventHandler* handler) throw ex; } handler->__incRef(); + handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler)); } void @@ -482,7 +483,6 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } assert(ret > 0); - handlers.clear(); for(int i = 0; i < ret; ++i) { pair<EventHandler*, SocketOperation> p; @@ -701,7 +701,6 @@ Selector::select(vector<pair<EventHandler*, SocketOperation> >& handlers, int ti } assert(ret > 0); - handlers.clear(); #if defined(ICE_USE_SELECT) if(_selectedReadFdSet.fd_count == 0 && _selectedWriteFdSet.fd_count == 0 && _selectedErrorFdSet.fd_count == 0) diff --git a/cpp/src/Ice/TcpTransceiver.cpp b/cpp/src/Ice/TcpTransceiver.cpp index d82fafa17ce..c29391fa9da 100644 --- a/cpp/src/Ice/TcpTransceiver.cpp +++ b/cpp/src/Ice/TcpTransceiver.cpp @@ -46,7 +46,7 @@ IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status) #endif SocketOperation -IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) +IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer, bool& hasMoreData) { try { @@ -93,7 +93,7 @@ IceInternal::TcpTransceiver::initialize(Buffer& readBuffer, Buffer& writeBuffer) // // Try to read the response. // - if(read(readBuffer)) + if(read(readBuffer, hasMoreData)) { // // Read completed without blocking - fall through. @@ -268,7 +268,7 @@ IceInternal::TcpTransceiver::write(Buffer& buf) } bool -IceInternal::TcpTransceiver::read(Buffer& buf) +IceInternal::TcpTransceiver::read(Buffer& buf, bool&) { // // It's impossible for packetSize to be more than an Int. diff --git a/cpp/src/Ice/TcpTransceiver.h b/cpp/src/Ice/TcpTransceiver.h index a2afe4bb514..5faad7b9a46 100644 --- a/cpp/src/Ice/TcpTransceiver.h +++ b/cpp/src/Ice/TcpTransceiver.h @@ -41,10 +41,10 @@ public: virtual AsyncInfo* getAsyncInfo(SocketOperation); #endif - virtual SocketOperation initialize(Buffer&, Buffer&); + virtual SocketOperation initialize(Buffer&, Buffer&, bool&); virtual void close(); virtual bool write(Buffer&); - virtual bool read(Buffer&); + virtual bool read(Buffer&, bool&); #ifdef ICE_USE_IOCP virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 62652c786d2..32b7bbddde6 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -108,6 +108,29 @@ private: IceUtil::ThreadPtr _thread; }; +class InterruptWorkItem : public ThreadPoolWorkItem +{ +public: + + virtual void + execute(ThreadPoolCurrent& current) + { + // Nothing to do, this is just used to interrupt the thread pool selector. + } +}; +ThreadPoolWorkItemPtr interruptWorkItem; + +class InterruptWorkItemInit +{ +public: + + InterruptWorkItemInit() + { + interruptWorkItem = new InterruptWorkItem; + } +}; +InterruptWorkItemInit init; + // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -562,6 +585,20 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation Lock sync(*this); assert(!_destroyed); _selector.update(handler.get(), remove, add); +#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) + if(add & SocketOperationRead && handler->_hasMoreData) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(handler.get()); + } + else if(remove & SocketOperationRead) + { + _pendingHandlers.erase(handler.get()); + } +#endif } void @@ -577,8 +614,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) // Clear the current ready handlers. The handlers from this vector can't be // reference counted and a handler might get destroyed once it's finished. // - _handlers.clear(); - _nextHandler = _handlers.end(); + //_handlers.clear(); + //_nextHandler = _handlers.end(); #else // If there are no pending asynchronous operations, we can call finish on the handler now. if(!handler->_pending) @@ -682,6 +719,19 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) if(select) { _handlers.swap(handlers); + if(!_pendingHandlers.empty()) + { + for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler) + { + _pendingHandlers.erase(_nextHandler->first); + } + set<EventHandler*>::const_iterator p; + for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p) + { + _handlers.push_back(make_pair(*p, SocketOperationRead)); + } + _pendingHandlers.clear(); + } _nextHandler = _handlers.begin(); _selector.finishSelect(); select = false; @@ -700,6 +750,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // the IO thread count now. // --_inUseIO; + if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } } else { @@ -707,7 +765,18 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // - _selector.enable(current._handler.get(), current.operation); + if(_serialize) + { + _selector.enable(current._handler.get(), current.operation); + if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } + } assert(_inUse > 0); --_inUse; } @@ -717,10 +786,22 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) return; // Wait timed-out. } } + else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } // // Get the next ready handler. // + while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered)) + { + ++_nextHandler; + } if(_nextHandler != _handlers.end()) { current._ioCompleted = false; @@ -748,6 +829,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) } else { + _handlers.clear(); _selector.startSelect(); select = true; thread->setState(ThreadStateIdle); @@ -889,13 +971,25 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) if(_sizeMax > 1) { + #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; - - if(_serialize && !_destroyed) + + if(!_destroyed) { - _selector.disable(current._handler.get(), current.operation); - } + if(_serialize) + { + _selector.disable(current._handler.get(), current.operation); + } + else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead) + { + if(_pendingHandlers.empty()) + { + _workQueue->queue(interruptWorkItem); // Interrupt select() + } + _pendingHandlers.insert(current._handler.get()); + } + } if(current._leader) { diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 43a8f2fb361..cf8a6d926b4 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -127,6 +127,7 @@ private: std::vector<std::pair<EventHandler*, SocketOperation> > _handlers; std::vector<std::pair<EventHandler*, SocketOperation> >::const_iterator _nextHandler; #endif + std::set<EventHandler*> _pendingHandlers; bool _promote; }; diff --git a/cpp/src/Ice/Transceiver.h b/cpp/src/Ice/Transceiver.h index c2c00cf151d..c1f6aa64b03 100644 --- a/cpp/src/Ice/Transceiver.h +++ b/cpp/src/Ice/Transceiver.h @@ -25,10 +25,10 @@ class ICE_API Transceiver : virtual public ::IceUtil::Shared public: virtual NativeInfoPtr getNativeInfo() = 0; - virtual SocketOperation initialize(Buffer&, Buffer&) = 0; + virtual SocketOperation initialize(Buffer&, Buffer&, bool&) = 0; virtual void close() = 0; virtual bool write(Buffer&) = 0; - virtual bool read(Buffer&) = 0; + virtual bool read(Buffer&, bool&) = 0; #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&) = 0; virtual void finishWrite(Buffer&) = 0; diff --git a/cpp/src/Ice/UdpTransceiver.cpp b/cpp/src/Ice/UdpTransceiver.cpp index ded61171571..53d0901228c 100644 --- a/cpp/src/Ice/UdpTransceiver.cpp +++ b/cpp/src/Ice/UdpTransceiver.cpp @@ -80,7 +80,7 @@ IceInternal::UdpTransceiver::setCompletedHandler(SocketOperationCompletedHandler #endif SocketOperation -IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/) +IceInternal::UdpTransceiver::initialize(Buffer& /*readBuffer*/, Buffer& /*writeBuffer*/, bool& /*hasMoreData*/) { if(_state == StateNeedConnect) { @@ -243,13 +243,13 @@ repeat: #ifdef ICE_OS_WINRT bool -IceInternal::UdpTransceiver::read(Buffer&) +IceInternal::UdpTransceiver::read(Buffer&, bool&) { return false; } #else bool -IceInternal::UdpTransceiver::read(Buffer& buf) +IceInternal::UdpTransceiver::read(Buffer& buf, bool&) { assert(buf.i == buf.b.begin()); assert(_fd != INVALID_SOCKET); diff --git a/cpp/src/Ice/UdpTransceiver.h b/cpp/src/Ice/UdpTransceiver.h index ad6bf128619..9768ff79a2d 100644 --- a/cpp/src/Ice/UdpTransceiver.h +++ b/cpp/src/Ice/UdpTransceiver.h @@ -47,10 +47,10 @@ public: virtual void setCompletedHandler(SocketOperationCompletedHandler^); #endif - virtual SocketOperation initialize(Buffer&, Buffer&); + virtual SocketOperation initialize(Buffer&, Buffer&, bool&); virtual void close(); virtual bool write(Buffer&); - virtual bool read(Buffer&); + virtual bool read(Buffer&, bool&); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) virtual bool startWrite(Buffer&); virtual void finishWrite(Buffer&); diff --git a/cpp/src/IceSSL/TransceiverI.cpp b/cpp/src/IceSSL/TransceiverI.cpp index fbf683d947d..f48395deda9 100644 --- a/cpp/src/IceSSL/TransceiverI.cpp +++ b/cpp/src/IceSSL/TransceiverI.cpp @@ -50,7 +50,7 @@ IceSSL::TransceiverI::getAsyncInfo(IceInternal::SocketOperation status) #endif IceInternal::SocketOperation -IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer) +IceSSL::TransceiverI::initialize(IceInternal::Buffer& readBuffer, IceInternal::Buffer& writeBuffer, bool&) { try { @@ -557,7 +557,7 @@ IceSSL::TransceiverI::write(IceInternal::Buffer& buf) } bool -IceSSL::TransceiverI::read(IceInternal::Buffer& buf) +IceSSL::TransceiverI::read(IceInternal::Buffer& buf, bool&) { if(_state == StateProxyConnectRequestPending) { diff --git a/cpp/src/IceSSL/TransceiverI.h b/cpp/src/IceSSL/TransceiverI.h index 2c475c8638e..3b2b3a73937 100644 --- a/cpp/src/IceSSL/TransceiverI.h +++ b/cpp/src/IceSSL/TransceiverI.h @@ -46,10 +46,10 @@ public: virtual IceInternal::AsyncInfo* getAsyncInfo(IceInternal::SocketOperation); #endif - virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&); + virtual IceInternal::SocketOperation initialize(IceInternal::Buffer&, IceInternal::Buffer&, bool&); virtual void close(); virtual bool write(IceInternal::Buffer&); - virtual bool read(IceInternal::Buffer&); + virtual bool read(IceInternal::Buffer&, bool&); #ifdef ICE_USE_IOCP virtual bool startWrite(IceInternal::Buffer&); virtual void finishWrite(IceInternal::Buffer&); |