diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 71 |
1 files changed, 32 insertions, 39 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index afbd0309a2d..c4ce19d4bdf 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -319,52 +319,45 @@ SubscriberTwoway::SubscriberTwoway( bool SubscriberTwoway::flush() { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) - { - return false; - } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - // - // Get the current set of events, but release the lock before - // attempting to deliver the events. This allows other threads - // to add events in case we block (such as during connection - // establishment). - // - EventDataSeq v; - v.swap(_events); - sync.release(); - - // - // Deliver the events without holding the lock. - // - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + EventDataPtr e; { - _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context); + IceUtil::Mutex::Lock sync(_mutex); + + // + // If the subscriber errored out then we're done. + // + if(_state == SubscriberStateError) + { + return false; + } + assert(_state == SubscriberStateFlushPending); + assert(!_events.empty()); + + e = _events.front(); + _events.erase(_events.begin()); } + + _obj->ice_invoke_async(new TwowayInvokeI(this), e->op, e->mode, e->data, e->context); // - // Reacquire the lock before we check the queue again. - // - sync.acquire(); - - // - // If there have been more events queued in the meantime then - // we have a pending flush. + // We process the subscriber state after the event send and not + // before to prevent the subscriber from being requeued + // concurrently. // - if(!_events.empty()) { - assert(_state == SubscriberStateFlushPending); - return true; + IceUtil::Mutex::Lock sync(_mutex); + // + // If there have been more events queued in the meantime then + // we have a pending flush. + // + if(!_events.empty()) + { + assert(_state == SubscriberStateFlushPending); + return true; + } + _state = SubscriberStateOnline; + return false; } - _state = SubscriberStateOnline; - return false; } namespace |