summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp213
1 files changed, 179 insertions, 34 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 844f9902b10..1089b4f3e01 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -167,6 +167,11 @@ private:
const TopicLinkPrx _obj;
const int _cost;
+ // The first event that is sent we check the whether the object
+ // supports the "V2" facet.
+ bool _checkV2;
+ TopicLinkV2Prx _objV2;
+
// The next to try sending a new event if we're offline.
IceUtil::Time _next;
bool _warn;
@@ -204,6 +209,10 @@ SubscriberOneway::flush()
{
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -281,6 +290,10 @@ SubscriberOneway::flush()
return false;
}
+ if(_state == SubscriberStateShutdown)
+ {
+ return false;
+ }
if(!_events.empty())
{
assert(_state == SubscriberStateFlushPending);
@@ -351,6 +364,10 @@ SubscriberTwoway::flush()
{
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -377,6 +394,11 @@ SubscriberTwoway::flush()
//
sync.acquire();
+ if(_state == SubscriberStateShutdown)
+ {
+ return false;
+ }
+
//
// If there have been more events queued in the meantime then
// we have a pending flush.
@@ -444,6 +466,10 @@ SubscriberTwowayOrdered::flush()
{
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -459,15 +485,17 @@ SubscriberTwowayOrdered::flush()
void
SubscriberTwowayOrdered::response()
{
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ assert(_state != SubscriberStateError);
+ if(_state == SubscriberStateShutdown)
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- assert(_state != SubscriberStateError);
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return;
- }
+ return;
+ }
+ if(_events.empty())
+ {
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -479,7 +507,45 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
{
public:
- Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : _subscriber(subscriber)
+ Topiclink_forwardI(const SubscriberLinkPtr& subscriber) :
+ _subscriber(subscriber)
+ {
+ }
+
+ virtual void
+ ice_response()
+ {
+ _subscriber->response();
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception& ex)
+ {
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ _subscriber->error(ex);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _subscriber->offline(ex);
+ }
+ }
+
+private:
+
+ const SubscriberLinkPtr _subscriber;
+};
+
+class TopiclinkV2_forwardI : public IceStorm::AMI_TopicLinkV2_forward
+{
+public:
+
+ TopiclinkV2_forwardI(const SubscriberLinkPtr& subscriber) :
+ _subscriber(subscriber)
{
}
@@ -520,6 +586,7 @@ SubscriberLink::SubscriberLink(
Subscriber(instance, 0, true, obj->ice_getIdentity()),
_obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))),
_cost(cost),
+ _checkV2(false),
_warn(true)
{
}
@@ -544,6 +611,10 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events)
{
return QueueStateError;
}
+ if(_state == SubscriberStateShutdown)
+ {
+ return QueueStateNoFlush;
+ }
//
// If the proxy is offline and its time to send another event then
@@ -618,13 +689,66 @@ SubscriberLink::flush()
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
v.swap(_events);
}
- _obj->forward_async(new Topiclink_forwardI(this), v);
+ //
+ // Only one thread at a time can be active at this point.
+ //
+ if(!_checkV2)
+ {
+ try
+ {
+ _objV2 = TopicLinkV2Prx::checkedCast(_obj, "V2");
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ error(ex);
+ return false;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ offline(ex);
+ return false;
+ }
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(!_objV2 && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": link is v1 IceStorm.";
+ }
+ _checkV2 = true;
+ }
+
+ if(_objV2)
+ {
+ _objV2->forward_async(new TopiclinkV2_forwardI(this), v);
+ }
+ else
+ {
+ //
+ // Transform the event sequence into a EventData.
+ //
+ EventDataSeq events;
+ events.reserve(v.size());
+ for(EventSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ EventData data;
+ data.op = (*p)->op;
+ data.mode = (*p)->mode;
+ data.data = (*p)->data;
+ data.context = (*p)->context;
+ events.push_back(data);
+ }
+ _obj->forward_async(new Topiclink_forwardI(this), events);
+ }
return false;
}
@@ -632,25 +756,27 @@ SubscriberLink::flush()
void
SubscriberLink::response()
{
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ assert(_state != SubscriberStateError);
+ if(_state == SubscriberStateShutdown)
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- assert(_state == SubscriberStateFlushPending);
-
- //
- // A successful response means we're no longer retrying, we're
- // back active.
- //
- _warn = true;
-
- //
- // No more events, no need to requeue this subscriber.
- //
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return;
- }
+ return;
+ }
+
+ //
+ // A successful response means we're no longer retrying, we're
+ // back active.
+ //
+ _warn = true;
+
+ //
+ // No more events, no need to requeue this subscriber.
+ //
+ if(_events.empty())
+ {
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -659,6 +785,10 @@ void
SubscriberLink::offline(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
+ if(_state == SubscriberStateShutdown)
+ {
+ return;
+ }
assert(_state != SubscriberStateOffline);
_next = IceUtil::Time::now() + _instance->discardInterval();
@@ -780,7 +910,10 @@ Subscriber::create(
const TopicLinkPrx& link,
int cost)
{
- return new SubscriberLink(instance, link, cost);
+ return new SubscriberLink(
+ instance,
+ TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
+ cost);
}
Subscriber::~Subscriber()
@@ -814,6 +947,11 @@ Subscriber::queue(bool, const EventSeq& events)
{
return QueueStateError;
}
+ if(_state == SubscriberStateShutdown)
+ {
+ return QueueStateNoFlush;
+ }
+
copy(events.begin(), events.end(), back_inserter(_events));
if(_state == SubscriberStateFlushPending)
{
@@ -840,10 +978,21 @@ Subscriber::destroy()
{
// Ignore
}
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ // Ignore
+ }
}
}
void
+Subscriber::shutdown()
+{
+ IceUtil::Mutex::Lock sync(_mutex);
+ _state = SubscriberStateShutdown;
+}
+
+void
Subscriber::flushTime(const IceUtil::Time& interval)
{
if(_resetMax || interval > _maxSend)
@@ -857,10 +1006,6 @@ Subscriber::flushTime(const IceUtil::Time& interval)
IceUtil::Time
Subscriber::pollMaxFlushTime(const IceUtil::Time& now)
{
- //IceUtil::Time max = _maxSend;
- //_maxSend = _maxSend * 0.95;
- //return max;
-
// The next call to flushTime can reset the max time.
_resetMax = true;
return _maxSend;
@@ -870,7 +1015,7 @@ void
Subscriber::error(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
- if(_state != SubscriberStateError)
+ if(_state != SubscriberStateError && _state != SubscriberStateShutdown)
{
_state = SubscriberStateError;
_events.clear();