summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rwxr-xr-xcpp/src/IceStorm/Subscriber.cpp150
1 files changed, 126 insertions, 24 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 74b15583cc1..8e9a8c74c38 100755
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -99,6 +99,7 @@ public:
}
void doFlush();
+ void sent(bool);
private:
@@ -207,27 +208,24 @@ SubscriberBatch::flush()
void
SubscriberBatch::doFlush()
{
- EventDataSeq v;
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ //
+ // If the subscriber isn't online we're done.
+ //
+ if(_state != SubscriberStateOnline)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
- --_outstanding;
- assert(_outstanding == 0);
-
- if(_shutdown)
- {
- _lock.notify();
- }
+ return;
+ }
- //
- // If the subscriber isn't online we're done.
- //
- if(_state != SubscriberStateOnline)
- {
- return;
- }
- v.swap(_events);
- assert(!v.empty());
+ EventDataSeq v;
+ v.swap(_events);
+ assert(!v.empty());
+
+ if(_observer)
+ {
+ _outstandingCount = v.size();
+ _observer->outstanding(v.size());
}
try
@@ -237,6 +235,20 @@ SubscriberBatch::doFlush()
{
_obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
}
+
+ Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests(
+ Ice::newCallback_Object_ice_flushBatchRequests(this,
+ &SubscriberBatch::exception,
+ &SubscriberBatch::sent));
+ if(result->sentSynchronously())
+ {
+ --_outstanding;
+ assert(_outstanding == 0);
+ if(_observer)
+ {
+ _observer->delivered(_outstandingCount);
+ }
+ }
}
catch(const Ice::Exception& ex)
{
@@ -244,15 +256,46 @@ SubscriberBatch::doFlush()
return;
}
- _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(this,
- &SubscriberBatch::exception));
-
+ if(_events.empty() && _outstanding == 0 && _shutdown)
+ {
+ _lock.notify();
+ }
+
// This is significantly faster than the async version, but it can
// block the calling thread. Bad news!
//_obj->ice_flushBatchRequests();
}
+void
+SubscriberBatch::sent(bool sentSynchronously)
+{
+ if(sentSynchronously)
+ {
+ return;
+ }
+
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ // Decrement the _outstanding count.
+ --_outstanding;
+ assert(_outstanding == 0);
+ if(_observer)
+ {
+ _observer->delivered(_outstandingCount);
+ }
+
+ if(_events.empty() && _outstanding == 0 && _shutdown)
+ {
+ _lock.notify();
+ }
+ else if(!_events.empty())
+ {
+ flush();
+ }
+
+}
+
SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
const SubscriberRecord& rec,
@@ -291,6 +334,11 @@ SubscriberOneway::flush()
//
EventDataPtr e = _events.front();
_events.erase(_events.begin());
+ if(_observer)
+ {
+ _observer->outstanding(1);
+ }
+
try
{
Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
@@ -301,6 +349,10 @@ SubscriberOneway::flush()
{
++_outstanding;
}
+ else if(_observer)
+ {
+ _observer->delivered(1);
+ }
}
catch(const Ice::Exception& ex)
{
@@ -328,6 +380,10 @@ SubscriberOneway::sent(bool sentSynchronously)
// Decrement the _outstanding count.
--_outstanding;
assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
+ if(_observer)
+ {
+ _observer->delivered(1);
+ }
if(_events.empty() && _outstanding == 0 && _shutdown)
{
@@ -374,7 +430,11 @@ SubscriberTwoway::flush()
EventDataPtr e = _events.front();
_events.erase(_events.begin());
++_outstanding;
-
+ if(_observer)
+ {
+ _observer->outstanding(1);
+ }
+
try
{
_obj->begin_ice_invoke(e->op, e->mode, e->data, e->context,
@@ -437,6 +497,11 @@ SubscriberLink::flush()
try
{
++_outstanding;
+ if(_observer)
+ {
+ _outstandingCount = v.size();
+ _observer->outstanding(_outstandingCount);
+ }
_obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
}
catch(const Ice::Exception& ex)
@@ -600,6 +665,10 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events)
case SubscriberStateOnline:
copy(events.begin(), events.end(), back_inserter(_events));
+ if(_observer)
+ {
+ _observer->queued(events.size());
+ }
flush();
break;
@@ -664,6 +733,8 @@ Subscriber::destroy()
// Ignore
}
}
+
+ _observer.detach();
}
void
@@ -773,10 +844,14 @@ Subscriber::completed(const Ice::AsyncResultPtr& result)
result->throwLocalException();
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
+
// Decrement the _outstanding count.
--_outstanding;
assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
+ if(_observer)
+ {
+ _observer->delivered(_outstandingCount);
+ }
//
// A successful response means we're no longer retrying, we're
@@ -810,6 +885,22 @@ Subscriber::shutdown()
{
_lock.wait();
}
+
+ _observer.detach();
+}
+
+void
+Subscriber::updateObserver()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ if(_instance->observer())
+ {
+ _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
+ _rec.topicName,
+ _rec.obj,
+ _rec.theQoS,
+ _rec.theTopic, _observer.get()));
+ }
}
Subscriber::Subscriber(
@@ -827,6 +918,7 @@ Subscriber::Subscriber(
_shutdown(false),
_state(SubscriberStateOnline),
_outstanding(0),
+ _outstandingCount(1),
_currentRetry(0)
{
if(_proxy && _instance->publisherReplicaProxy())
@@ -834,6 +926,16 @@ Subscriber::Subscriber(
const_cast<Ice::ObjectPrx&>(_proxyReplica) =
_instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
}
+
+ if(_instance->observer())
+ {
+ _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
+ rec.topicName,
+ rec.obj,
+ rec.theQoS,
+ rec.theTopic,
+ 0));
+ }
}
namespace