summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp71
1 files changed, 51 insertions, 20 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index a4b89d09e1b..529461f0f03 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -227,7 +227,7 @@ void
SubscriberBatch::doFlush()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
//
// If the subscriber isn't online we're done.
//
@@ -239,7 +239,7 @@ SubscriberBatch::doFlush()
EventDataSeq v;
v.swap(_events);
assert(!v.empty());
-
+
if(_observer)
{
_outstandingCount = static_cast<Ice::Int>(v.size());
@@ -255,7 +255,7 @@ SubscriberBatch::doFlush()
}
Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests(
- Ice::newCallback_Object_ice_flushBatchRequests(this,
+ Ice::newCallback_Object_ice_flushBatchRequests(this,
&SubscriberBatch::exception,
&SubscriberBatch::sent));
if(result->sentSynchronously())
@@ -278,7 +278,7 @@ SubscriberBatch::doFlush()
{
_lock.notify();
}
-
+
// This is significantly faster than the async version, but it can
// block the calling thread. Bad news!
@@ -294,7 +294,7 @@ SubscriberBatch::sent(bool sentSynchronously)
}
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
// Decrement the _outstanding count.
--_outstanding;
assert(_outstanding == 0);
@@ -334,7 +334,7 @@ void
SubscriberOneway::flush()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
//
// If the subscriber isn't online we're done.
//
@@ -360,7 +360,7 @@ SubscriberOneway::flush()
try
{
Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
- e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
+ e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
&SubscriberOneway::exception,
&SubscriberOneway::sent));
if(!result->sentSynchronously())
@@ -394,7 +394,7 @@ SubscriberOneway::sent(bool sentSynchronously)
}
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
// Decrement the _outstanding count.
--_outstanding;
assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
@@ -481,7 +481,7 @@ void
SubscriberLink::flush()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
if(_state != SubscriberStateOnline || _outstanding > 0)
{
return;
@@ -673,23 +673,40 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events)
{
break;
}
-
+
//
// State transition to online.
//
setState(SubscriberStateOnline);
// fall through
}
-
+
case SubscriberStateOnline:
- copy(events.begin(), events.end(), back_inserter(_events));
+ {
+ for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
+ {
+ if(static_cast<int>(_events.size()) == _instance->sendQueueSizeMax())
+ {
+ if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
+ {
+ error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__));
+ return false;
+ }
+ else // DropEvents
+ {
+ _events.pop_front();
+ }
+ }
+ _events.push_back(*p);
+ }
+
if(_observer)
{
_observer->queued(static_cast<Ice::Int>(events.size()));
}
flush();
break;
-
+ }
case SubscriberStateError:
return false;
@@ -768,10 +785,24 @@ Subscriber::error(bool dec, const Ice::Exception& e)
assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
}
+ //
+ // It's possible to be already in the error state if the queue maximum size
+ // has been reached or if an ObjectNotExistException occured before.
+ //
+ if(_state >= SubscriberStateError)
+ {
+ if(_shutdown)
+ {
+ _lock.notify();
+ }
+ return;
+ }
+
// A hard error is an ObjectNotExistException or
// NotRegisteredException.
bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) ||
- dynamic_cast<const Ice::NotRegisteredException*>(&e);
+ dynamic_cast<const Ice::NotRegisteredException*>(&e) ||
+ dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e);
//
// A twoway subscriber can queue multiple send events and
@@ -834,7 +865,7 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
_events.clear();
setState(SubscriberStateError);
-
+
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->subscriber > 0)
{
@@ -871,13 +902,13 @@ Subscriber::completed(const Ice::AsyncResultPtr& result)
{
_observer->delivered(_outstandingCount);
}
-
+
//
// A successful response means we're no longer retrying, we're
// back active.
//
_currentRetry = 0;
-
+
if(_events.empty() && _outstanding == 0 && _shutdown)
{
_lock.notify();
@@ -893,7 +924,7 @@ Subscriber::completed(const Ice::AsyncResultPtr& result)
}
}
-
+
void
Subscriber::shutdown()
{
@@ -954,7 +985,7 @@ Subscriber::Subscriber(
rec.topicName,
rec.obj,
rec.theQoS,
- rec.theTopic,
+ rec.theTopic,
toSubscriberState(_state),
0));
}
@@ -992,7 +1023,7 @@ Subscriber::setState(Subscriber::SubscriberState state)
if(traceLevels->subscriber > 1)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
+ out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
<< " transition from: " << stateToString(_state) << " to: " << stateToString(state);
}
_state = state;