summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp71
1 files changed, 32 insertions, 39 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index afbd0309a2d..c4ce19d4bdf 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -319,52 +319,45 @@ SubscriberTwoway::SubscriberTwoway(
bool
SubscriberTwoway::flush()
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
- {
- return false;
- }
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- //
- // Get the current set of events, but release the lock before
- // attempting to deliver the events. This allows other threads
- // to add events in case we block (such as during connection
- // establishment).
- //
- EventDataSeq v;
- v.swap(_events);
- sync.release();
-
- //
- // Deliver the events without holding the lock.
- //
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ EventDataPtr e;
{
- _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ //
+ // If the subscriber errored out then we're done.
+ //
+ if(_state == SubscriberStateError)
+ {
+ return false;
+ }
+ assert(_state == SubscriberStateFlushPending);
+ assert(!_events.empty());
+
+ e = _events.front();
+ _events.erase(_events.begin());
}
+
+ _obj->ice_invoke_async(new TwowayInvokeI(this), e->op, e->mode, e->data, e->context);
//
- // Reacquire the lock before we check the queue again.
- //
- sync.acquire();
-
- //
- // If there have been more events queued in the meantime then
- // we have a pending flush.
+ // We process the subscriber state after the event send and not
+ // before to prevent the subscriber from being requeued
+ // concurrently.
//
- if(!_events.empty())
{
- assert(_state == SubscriberStateFlushPending);
- return true;
+ IceUtil::Mutex::Lock sync(_mutex);
+ //
+ // If there have been more events queued in the meantime then
+ // we have a pending flush.
+ //
+ if(!_events.empty())
+ {
+ assert(_state == SubscriberStateFlushPending);
+ return true;
+ }
+ _state = SubscriberStateOnline;
+ return false;
}
- _state = SubscriberStateOnline;
- return false;
}
namespace