diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rwxr-xr-x | cpp/src/IceStorm/Subscriber.cpp | 150 |
1 files changed, 126 insertions, 24 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 74b15583cc1..8e9a8c74c38 100755 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -99,6 +99,7 @@ public: } void doFlush(); + void sent(bool); private: @@ -207,27 +208,24 @@ SubscriberBatch::flush() void SubscriberBatch::doFlush() { - EventDataSeq v; + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // + // If the subscriber isn't online we're done. + // + if(_state != SubscriberStateOnline) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - --_outstanding; - assert(_outstanding == 0); - - if(_shutdown) - { - _lock.notify(); - } + return; + } - // - // If the subscriber isn't online we're done. - // - if(_state != SubscriberStateOnline) - { - return; - } - v.swap(_events); - assert(!v.empty()); + EventDataSeq v; + v.swap(_events); + assert(!v.empty()); + + if(_observer) + { + _outstandingCount = v.size(); + _observer->outstanding(v.size()); } try @@ -237,6 +235,20 @@ SubscriberBatch::doFlush() { _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); } + + Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests( + Ice::newCallback_Object_ice_flushBatchRequests(this, + &SubscriberBatch::exception, + &SubscriberBatch::sent)); + if(result->sentSynchronously()) + { + --_outstanding; + assert(_outstanding == 0); + if(_observer) + { + _observer->delivered(_outstandingCount); + } + } } catch(const Ice::Exception& ex) { @@ -244,15 +256,46 @@ SubscriberBatch::doFlush() return; } - _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(this, - &SubscriberBatch::exception)); - + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + // This is significantly faster than the async version, but it can // block the calling thread. Bad news! //_obj->ice_flushBatchRequests(); } +void +SubscriberBatch::sent(bool sentSynchronously) +{ + if(sentSynchronously) + { + return; + } + + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding == 0); + if(_observer) + { + _observer->delivered(_outstandingCount); + } + + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + else if(!_events.empty()) + { + flush(); + } + +} + SubscriberOneway::SubscriberOneway( const InstancePtr& instance, const SubscriberRecord& rec, @@ -291,6 +334,11 @@ SubscriberOneway::flush() // EventDataPtr e = _events.front(); _events.erase(_events.begin()); + if(_observer) + { + _observer->outstanding(1); + } + try { Ice::AsyncResultPtr result = _obj->begin_ice_invoke( @@ -301,6 +349,10 @@ SubscriberOneway::flush() { ++_outstanding; } + else if(_observer) + { + _observer->delivered(1); + } } catch(const Ice::Exception& ex) { @@ -328,6 +380,10 @@ SubscriberOneway::sent(bool sentSynchronously) // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + if(_observer) + { + _observer->delivered(1); + } if(_events.empty() && _outstanding == 0 && _shutdown) { @@ -374,7 +430,11 @@ SubscriberTwoway::flush() EventDataPtr e = _events.front(); _events.erase(_events.begin()); ++_outstanding; - + if(_observer) + { + _observer->outstanding(1); + } + try { _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context, @@ -437,6 +497,11 @@ SubscriberLink::flush() try { ++_outstanding; + if(_observer) + { + _outstandingCount = v.size(); + _observer->outstanding(_outstandingCount); + } _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); } catch(const Ice::Exception& ex) @@ -600,6 +665,10 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) case SubscriberStateOnline: copy(events.begin(), events.end(), back_inserter(_events)); + if(_observer) + { + _observer->queued(events.size()); + } flush(); break; @@ -664,6 +733,8 @@ Subscriber::destroy() // Ignore } } + + _observer.detach(); } void @@ -773,10 +844,14 @@ Subscriber::completed(const Ice::AsyncResultPtr& result) result->throwLocalException(); IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + if(_observer) + { + _observer->delivered(_outstandingCount); + } // // A successful response means we're no longer retrying, we're @@ -810,6 +885,22 @@ Subscriber::shutdown() { _lock.wait(); } + + _observer.detach(); +} + +void +Subscriber::updateObserver() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), + _rec.topicName, + _rec.obj, + _rec.theQoS, + _rec.theTopic, _observer.get())); + } } Subscriber::Subscriber( @@ -827,6 +918,7 @@ Subscriber::Subscriber( _shutdown(false), _state(SubscriberStateOnline), _outstanding(0), + _outstandingCount(1), _currentRetry(0) { if(_proxy && _instance->publisherReplicaProxy()) @@ -834,6 +926,16 @@ Subscriber::Subscriber( const_cast<Ice::ObjectPrx&>(_proxyReplica) = _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity()); } + + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), + rec.topicName, + rec.obj, + rec.theQoS, + rec.theTopic, + 0)); + } } namespace |