diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-15 08:20:15 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-15 08:20:15 +0200 |
commit | 4c6a4a6e77eeb513e2031912698a1fa839d9065a (patch) | |
tree | fdfad1d1dceb1fcc614da334b7058efc99925703 /cpp | |
parent | ICE-5739 missing AMD test in c++ optional (diff) | |
download | ice-4c6a4a6e77eeb513e2031912698a1fa839d9065a.tar.bz2 ice-4c6a4a6e77eeb513e2031912698a1fa839d9065a.tar.xz ice-4c6a4a6e77eeb513e2031912698a1fa839d9065a.zip |
Fixed ICE-5454: close acceptor on adapter deactivation
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.cpp | 40 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionFactory.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 63 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 4 | ||||
-rw-r--r-- | cpp/src/Ice/EventHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.cpp | 33 | ||||
-rw-r--r-- | cpp/src/Ice/Selector.h | 21 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 36 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 4 |
9 files changed, 113 insertions, 92 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index f1fc0380727..154e1ef193b 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -1526,11 +1526,18 @@ IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) } void -IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&) +IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&, bool close) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(_state == StateClosed); setState(StateFinished); + + assert(_acceptor); + + if(close) + { + closeAcceptor(true); + } } string @@ -1752,26 +1759,23 @@ IceInternal::IncomingConnectionFactory::setState(State state) { if(_acceptor) { - dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this); + // + // If possible, close the acceptor now to prevent new connections from + // being accepted while we are deactivating. This is especially useful + // if there are no more threads in the thread pool available to dispatch + // the finish() call. Not all selector implementations do support this + // however. + // + if(dynamic_cast<ObjectAdapterI*>(_adapter.get())->getThreadPool()->finish(this, true)) + { + closeAcceptor(true); + } } else { state = StateFinished; } -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - // - // With IOCP and WinRT, we close the acceptor now to cancel all the pending - // asynchronous operations. It's important to wait for the pending asynchronous - // operations to return before ConnectionI::finished(). Otherwise, if there was - // a pending message waiting to be sent, the connection wouldn't know whether - // or not the send failed or succeeded, potentially breaking at-most-once semantics. - // - if(_acceptor) - { - closeAcceptor(true); - } -#endif for_each(_connections.begin(), _connections.end(), bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); break; @@ -1780,12 +1784,6 @@ IceInternal::IncomingConnectionFactory::setState(State state) case StateFinished: { assert(_state == StateClosed); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - if(_acceptor) - { - closeAcceptor(true); - } -#endif break; } } diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index 92603a55b04..c6b688225b5 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -190,7 +190,7 @@ public: #endif virtual void message(ThreadPoolCurrent&); - virtual void finished(ThreadPoolCurrent&); + virtual void finished(ThreadPoolCurrent&, bool); virtual std::string toString() const; virtual NativeInfoPtr getNativeInfo(); diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 92dc4a1693d..01df99084be 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -97,16 +97,16 @@ public: private: - ConnectionIPtr _connection; - ConnectionI::StartCallbackPtr _startCB; - vector<ConnectionI::OutgoingMessage> _sentCBs; - Byte _compress; - Int _requestId; - Int _invokeNum; - ServantManagerPtr _servantManager; - ObjectAdapterPtr _adapter; - OutgoingAsyncPtr _outAsync; - ConnectionCallbackPtr _heartbeatCallback; + const ConnectionIPtr _connection; + const ConnectionI::StartCallbackPtr _startCB; + const vector<ConnectionI::OutgoingMessage> _sentCBs; + const Byte _compress; + const Int _requestId; + const Int _invokeNum; + const ServantManagerPtr _servantManager; + const ObjectAdapterPtr _adapter; + const OutgoingAsyncPtr _outAsync; + const ConnectionCallbackPtr _heartbeatCallback; BasicStream _stream; }; @@ -114,19 +114,21 @@ class FinishCall : public DispatchWorkItem { public: - FinishCall(const Ice::ConnectionIPtr& connection) : DispatchWorkItem(connection), _connection(connection) + FinishCall(const Ice::ConnectionIPtr& connection, bool close) : + DispatchWorkItem(connection), _connection(connection), _close(close) { } virtual void run() { - _connection->finish(); + _connection->finish(_close); } private: - - ConnectionIPtr _connection; + + const ConnectionIPtr _connection; + const bool _close; }; ConnectionState connectionStateMap[] = { @@ -2092,7 +2094,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess } void -Ice::ConnectionI::finished(ThreadPoolCurrent& current) +Ice::ConnectionI::finished(ThreadPoolCurrent& current, bool close) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -2107,23 +2109,23 @@ Ice::ConnectionI::finished(ThreadPoolCurrent& current) // if(!_startCallback && _sendStreams.empty() && _asyncRequests.empty() && !_callback) { - finish(); + finish(close); return; } current.ioCompleted(); if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher. { - finish(); + finish(close); } else { - _threadPool->dispatchFromThisThread(new FinishCall(this)); + _threadPool->dispatchFromThisThread(new FinishCall(this, close)); } } void -Ice::ConnectionI::finish() +Ice::ConnectionI::finish(bool close) { if(!_initialized) { @@ -2131,7 +2133,8 @@ Ice::ConnectionI::finish() { string verb = _connector ? "establish" : "accept"; Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); - out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() << "\n" << *_exception.get(); + out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() + << "\n" << *_exception.get(); } } else @@ -2143,6 +2146,11 @@ Ice::ConnectionI::finish() } } + if(close) + { + _transceiver->close(); + } + if(_startCallback) { _startCallback->connectionStartFailed(this, *_exception.get()); @@ -2574,19 +2582,20 @@ Ice::ConnectionI::setState(State state) return; } - _threadPool->finish(this); -#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - _transceiver->close(); -#endif + // + // Don't need to close now for connections so only close the transceiver + // if the selector request it. + // + if(_threadPool->finish(this, false)) + { + _transceiver->close(); + } break; } case StateFinished: { assert(_state == StateClosed); -#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - _transceiver->close(); -#endif _communicator = 0; break; } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 3ecec79a247..6ed4618eed7 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -212,7 +212,7 @@ public: #endif virtual void message(IceInternal::ThreadPoolCurrent&); - virtual void finished(IceInternal::ThreadPoolCurrent&); + virtual void finished(IceInternal::ThreadPoolCurrent&, bool); virtual std::string toString() const; // From Connection and EvantHandler. virtual IceInternal::NativeInfoPtr getNativeInfo(); @@ -228,7 +228,7 @@ public: void dispatch(const StartCallbackPtr&, const std::vector<OutgoingMessage>&, Byte, Int, Int, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&, const IceInternal::OutgoingAsyncPtr&, const ConnectionCallbackPtr&, IceInternal::BasicStream&); - void finish(); + void finish(bool); void closeCallback(const ConnectionCallbackPtr&); diff --git a/cpp/src/Ice/EventHandler.h b/cpp/src/Ice/EventHandler.h index 705efd208d9..2ec0a6841b1 100644 --- a/cpp/src/Ice/EventHandler.h +++ b/cpp/src/Ice/EventHandler.h @@ -40,7 +40,7 @@ public: // // Called when the event handler is unregistered. // - virtual void finished(ThreadPoolCurrent&) = 0; + virtual void finished(ThreadPoolCurrent&, bool) = 0; // // Get a textual representation of the event handler. diff --git a/cpp/src/Ice/Selector.cpp b/cpp/src/Ice/Selector.cpp index e7cc3e1bfc7..ef36a25c71f 100644 --- a/cpp/src/Ice/Selector.cpp +++ b/cpp/src/Ice/Selector.cpp @@ -173,7 +173,7 @@ Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation } void -Selector::finish(EventHandler* handler) +Selector::finish(IceInternal::EventHandler* handler) { handler->_registered = SocketOperationNone; handler->__decRef(); @@ -414,13 +414,30 @@ Selector::disable(EventHandler* handler, SocketOperation status) } } -void -Selector::finish(EventHandler* handler) +bool +Selector::finish(EventHandler* handler, bool closeNow) { if(handler->_registered) { - update(handler, handler->_registered, SocketOperationNone); + if(closeNow) + { + // + // Don't bother to un-register if the call wants to close + // the FD now, kqueue/epoll will automatically unregister + // the FD when it's closed. + // + handler->_registered = SocketOperationNone; + } + else + { + // + // If close on finish is requested, we can safely + // unregister the FD from the selector. + // + update(handler, handler->_registered, SocketOperationNone); + } } + return closeNow; } #if defined(ICE_USE_KQUEUE) @@ -603,13 +620,15 @@ Selector::disable(EventHandler* handler, SocketOperation status) } } -void -Selector::finish(EventHandler* handler) +bool +Selector::finish(EventHandler* handler, bool closeNow) { if(handler->_registered) { update(handler, handler->_registered, SocketOperationNone); + return false; // Don't close now if selecting. } + return closeNow; } void @@ -798,7 +817,7 @@ Selector::updateImpl(EventHandler* handler) if(interrupted()) { continue; - } + } Ice::SocketException ex(__FILE__, __LINE__); ex.error = IceInternal::getSocketErrno(); diff --git a/cpp/src/Ice/Selector.h b/cpp/src/Ice/Selector.h index 331814b16f7..196d96ccf54 100644 --- a/cpp/src/Ice/Selector.h +++ b/cpp/src/Ice/Selector.h @@ -46,12 +46,11 @@ class SelectorTimeoutException struct SelectEvent { - SelectEvent(IceInternal::EventHandler* handler, SocketOperation status) : - handler(handler), status(status) + SelectEvent(EventHandler* handler, SocketOperation status) : handler(handler), status(status) { } - IceInternal::EventHandler* handler; + EventHandler* handler; SocketOperation status; }; @@ -63,13 +62,13 @@ public: void destroy(); - void initialize(IceInternal::EventHandler*); - void update(IceInternal::EventHandler*, SocketOperation, SocketOperation); - void finish(IceInternal::EventHandler*); + void initialize(EventHandler*); + void update(EventHandler*, SocketOperation, SocketOperation); + void finish(EventHandler*); - IceInternal::EventHandler* getNextHandler(SocketOperation&, int); + EventHandler* getNextHandler(SocketOperation&, int); - void completed(IceInternal::EventHandler*, SocketOperation); + void completed(EventHandler*, SocketOperation); private: @@ -121,11 +120,11 @@ public: void update(EventHandler*, SocketOperation, SocketOperation); void enable(EventHandler*, SocketOperation); void disable(EventHandler*, SocketOperation); - void finish(EventHandler*); + bool finish(EventHandler*, bool); #if defined(ICE_USE_KQUEUE) void updateSelector(); -#endif +#endif void startSelect() @@ -180,7 +179,7 @@ public: void update(EventHandler*, SocketOperation, SocketOperation); void enable(EventHandler*, SocketOperation); void disable(EventHandler*, SocketOperation); - void finish(EventHandler*); + bool finish(EventHandler*, bool); void startSelect(); void finishSelect(); diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 1db49555d9e..d748ec70a7e 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -63,19 +63,20 @@ class FinishedWorkItem : public ThreadPoolWorkItem { public: - FinishedWorkItem(const EventHandlerPtr& handler) : _handler(handler) + FinishedWorkItem(const EventHandlerPtr& handler, bool close) : _handler(handler), _close(close) { } virtual void execute(ThreadPoolCurrent& current) { - _handler->finished(current); + _handler->finished(current, _close); } private: const EventHandlerPtr _handler; + const bool _close; }; class JoinThreadWorkItem : public ThreadPoolWorkItem @@ -297,7 +298,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current) } void -IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&) +IceInternal::ThreadPoolWorkQueue::finished(ThreadPoolCurrent&, bool) { assert(false); } @@ -585,32 +586,27 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation #endif } -void -IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) +bool +IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow) { Lock sync(*this); assert(!_destroyed); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - _selector.finish(handler.get()); // This must be called before! - _workQueue->queue(new FinishedWorkItem(handler)); - - // - // Clear the current ready handlers. The handlers from this vector can't be - // reference counted and a handler might get destroyed once it's finished. - // - //_handlers.clear(); - //_nextHandler = _handlers.end(); + closeNow = _selector.finish(handler.get(), closeNow); // This must be called before! + _workQueue->queue(new FinishedWorkItem(handler, !closeNow)); + return closeNow; #else // If there are no pending asynchronous operations, we can call finish on the handler now. if(!handler->_pending) { - _workQueue->queue(new FinishedWorkItem(handler)); + _workQueue->queue(new FinishedWorkItem(handler, false)); _selector.finish(handler.get()); } else { handler->_finish = true; } + return true; // Always close now to interrupt the pending call. #endif } @@ -669,7 +665,7 @@ IceInternal::ThreadPool::joinWithAllThreads() } #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) - _selector.finish(_workQueue.get()); + _selector.finish(_workQueue.get(), true); #endif _selector.destroy(); } @@ -1109,7 +1105,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!current._handler->_pending && current._handler->_finish) { - _workQueue->queue(new FinishedWorkItem(current._handler)); + _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } return false; @@ -1123,7 +1119,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!current._handler->_pending && current._handler->_finish) { - _workQueue->queue(new FinishedWorkItem(current._handler)); + _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } return false; @@ -1146,7 +1142,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current) current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation); if(!current._handler->_pending && current._handler->_finish) { - _workQueue->queue(new FinishedWorkItem(current._handler)); + _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } return false; @@ -1177,7 +1173,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current) if(!current._handler->_pending && current._handler->_finish) { // There are no more pending async operations, it's time to call finish. - _workQueue->queue(new FinishedWorkItem(current._handler)); + _workQueue->queue(new FinishedWorkItem(current._handler, false)); _selector.finish(current._handler.get()); } } diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 15c913c6c07..b59a2cab1c8 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -105,7 +105,7 @@ public: { update(handler, status, SocketOperationNone); } - void finish(const EventHandlerPtr&); + bool finish(const EventHandlerPtr&, bool); void dispatchFromThisThread(const DispatchWorkItemPtr&); void dispatch(const DispatchWorkItemPtr&); @@ -231,7 +231,7 @@ public: #endif virtual void message(ThreadPoolCurrent&); - virtual void finished(ThreadPoolCurrent&); + virtual void finished(ThreadPoolCurrent&, bool); virtual std::string toString() const; virtual NativeInfoPtr getNativeInfo(); virtual void postMessage(); |