diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 71 |
1 files changed, 51 insertions, 20 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index a4b89d09e1b..529461f0f03 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -227,7 +227,7 @@ void SubscriberBatch::doFlush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // // If the subscriber isn't online we're done. // @@ -239,7 +239,7 @@ SubscriberBatch::doFlush() EventDataSeq v; v.swap(_events); assert(!v.empty()); - + if(_observer) { _outstandingCount = static_cast<Ice::Int>(v.size()); @@ -255,7 +255,7 @@ SubscriberBatch::doFlush() } Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests( - Ice::newCallback_Object_ice_flushBatchRequests(this, + Ice::newCallback_Object_ice_flushBatchRequests(this, &SubscriberBatch::exception, &SubscriberBatch::sent)); if(result->sentSynchronously()) @@ -278,7 +278,7 @@ SubscriberBatch::doFlush() { _lock.notify(); } - + // This is significantly faster than the async version, but it can // block the calling thread. Bad news! @@ -294,7 +294,7 @@ SubscriberBatch::sent(bool sentSynchronously) } IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // Decrement the _outstanding count. --_outstanding; assert(_outstanding == 0); @@ -334,7 +334,7 @@ void SubscriberOneway::flush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // // If the subscriber isn't online we're done. // @@ -360,7 +360,7 @@ SubscriberOneway::flush() try { Ice::AsyncResultPtr result = _obj->begin_ice_invoke( - e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, + e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, &SubscriberOneway::exception, &SubscriberOneway::sent)); if(!result->sentSynchronously()) @@ -394,7 +394,7 @@ SubscriberOneway::sent(bool sentSynchronously) } IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); @@ -481,7 +481,7 @@ void SubscriberLink::flush() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + if(_state != SubscriberStateOnline || _outstanding > 0) { return; @@ -673,23 +673,40 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) { break; } - + // // State transition to online. // setState(SubscriberStateOnline); // fall through } - + case SubscriberStateOnline: - copy(events.begin(), events.end(), back_inserter(_events)); + { + for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p) + { + if(static_cast<int>(_events.size()) == _instance->sendQueueSizeMax()) + { + if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber) + { + error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__)); + return false; + } + else // DropEvents + { + _events.pop_front(); + } + } + _events.push_back(*p); + } + if(_observer) { _observer->queued(static_cast<Ice::Int>(events.size())); } flush(); break; - + } case SubscriberStateError: return false; @@ -768,10 +785,24 @@ Subscriber::error(bool dec, const Ice::Exception& e) assert(_outstanding >= 0 && _outstanding < _maxOutstanding); } + // + // It's possible to be already in the error state if the queue maximum size + // has been reached or if an ObjectNotExistException occured before. + // + if(_state >= SubscriberStateError) + { + if(_shutdown) + { + _lock.notify(); + } + return; + } + // A hard error is an ObjectNotExistException or // NotRegisteredException. bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) || - dynamic_cast<const Ice::NotRegisteredException*>(&e); + dynamic_cast<const Ice::NotRegisteredException*>(&e) || + dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e); // // A twoway subscriber can queue multiple send events and @@ -834,7 +865,7 @@ Subscriber::error(bool dec, const Ice::Exception& e) { _events.clear(); setState(SubscriberStateError); - + TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 0) { @@ -871,13 +902,13 @@ Subscriber::completed(const Ice::AsyncResultPtr& result) { _observer->delivered(_outstandingCount); } - + // // A successful response means we're no longer retrying, we're // back active. // _currentRetry = 0; - + if(_events.empty() && _outstanding == 0 && _shutdown) { _lock.notify(); @@ -893,7 +924,7 @@ Subscriber::completed(const Ice::AsyncResultPtr& result) } } - + void Subscriber::shutdown() { @@ -954,7 +985,7 @@ Subscriber::Subscriber( rec.topicName, rec.obj, rec.theQoS, - rec.theTopic, + rec.theTopic, toSubscriberState(_state), 0)); } @@ -992,7 +1023,7 @@ Subscriber::setState(Subscriber::SubscriberState state) if(traceLevels->subscriber > 1) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj) + out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj) << " transition from: " << stateToString(_state) << " to: " << stateToString(state); } _state = state; |