summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp304
1 files changed, 182 insertions, 122 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 1e93b7d250e..000f7cbfd5e 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -107,9 +107,10 @@ public:
virtual void destroy();
private:
+
const bool _batch;
const Ice::ObjectPrx _obj;
- /*const*/ Ice::ObjectPrx _objBatch;
+ const Ice::ObjectPrx _objBatch;
};
class SubscriberTwoway : public Subscriber
@@ -159,16 +160,21 @@ public:
virtual bool flush();
void response();
+ void offline(const Ice::Exception&);
+
private:
const TopicLinkPrx _obj;
const int _cost;
+
+ // The next to try sending a new event if we're offline.
+ IceUtil::Time _next;
+ bool _warn;
};
typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr;
}
-
SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
const Ice::ObjectPrx& proxy,
@@ -176,9 +182,9 @@ SubscriberOneway::SubscriberOneway(
bool batch) :
Subscriber(instance, proxy, false, obj->ice_getIdentity()),
_batch(batch),
- _obj(obj)
+ _obj(obj),
+ _objBatch(obj->ice_isDatagram() ? obj->ice_batchDatagram() : obj->ice_batchOneway())
{
- _objBatch = obj->ice_isDatagram() ? _obj->ice_batchDatagram() : _obj->ice_batchOneway();
if(batch)
{
@@ -194,13 +200,12 @@ SubscriberOneway::flush()
//
// If the subscriber errored out then we're done.
//
- if(_state != StateActive)
+ if(_state == SubscriberStateError)
{
- _busy = false;
return false;
}
+ assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
- assert(_busy);
try
{
@@ -215,12 +220,12 @@ SubscriberOneway::flush()
sync.release();
// XXX:
- /*
+/*
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(_obj->ice_getIdentity().name.substr(0, 4) == "slow")
{
- //Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- //out << "deliberately stalling";
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << "deliberately stalling";
sleep(2);
}
if(_obj->ice_getIdentity().name.substr(0, 5) == "block")
@@ -235,7 +240,7 @@ SubscriberOneway::flush()
out << "<- stall for 100s";
}
}
- */
+*/
//
// Deliver the events without holding the lock.
@@ -267,22 +272,22 @@ SubscriberOneway::flush()
// Reacquire the lock before we check the queue again.
//
sync.acquire();
-
- //
- // If there have been more events queued in the meantime then
- // we are still busy.
- //
- _busy = !_events.empty();
}
catch(const Ice::LocalException& ex)
{
assert(!sync.acquired());
- // setError will re-acquire and release the lock.
- setError(ex);
+ // error will re-acquire and release the lock.
+ error(ex);
return false;
}
- return _busy;
+ if(!_events.empty())
+ {
+ _state = SubscriberStateFlushPending;
+ return true;
+ }
+ _state = SubscriberStateOnline;
+ return false;
}
void
@@ -315,7 +320,7 @@ public:
virtual void
ice_exception(const Ice::Exception& e)
{
- _subscriber->setError(e);
+ _subscriber->error(e);
}
private:
@@ -342,13 +347,12 @@ SubscriberTwoway::flush()
//
// If the subscriber errored out then we're done.
//
- if(_state != StateActive)
+ if(_state == SubscriberStateError)
{
- _busy = false;
return false;
}
+ assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
- assert(_busy);
//
// Get the current set of events, but release the lock before
@@ -375,11 +379,15 @@ SubscriberTwoway::flush()
//
// If there have been more events queued in the meantime then
- // we are still busy.
+ // we have a pending flush.
//
- _busy = !_events.empty();
-
- return _busy;
+ if(!_events.empty())
+ {
+ _state = SubscriberStateFlushPending;
+ return true;
+ }
+ _state = SubscriberStateOnline;
+ return false;
}
namespace
@@ -403,7 +411,7 @@ public:
virtual void
ice_exception(const Ice::Exception& ex)
{
- _subscriber->setError(ex);
+ _subscriber->error(ex);
}
private:
@@ -432,13 +440,12 @@ SubscriberTwowayOrdered::flush()
//
// If the subscriber errored out then we're done.
//
- if(_state != StateActive)
+ if(_state == SubscriberStateError)
{
- _busy = false;
return false;
}
+ assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
- assert(_busy);
e = _events.front();
_events.erase(_events.begin());
@@ -455,11 +462,10 @@ SubscriberTwowayOrdered::response()
{
IceUtil::Mutex::Lock sync(_mutex);
- assert(_state == StateActive && _busy);
-
+ assert(_state != SubscriberStateError);
if(_events.empty())
{
- _busy = false;
+ _state = SubscriberStateOnline;
return;
}
}
@@ -492,11 +498,11 @@ public:
}
catch(const Ice::ObjectNotExistException& ex)
{
- _subscriber->setError(ex);
+ _subscriber->error(ex);
}
catch(const Ice::LocalException& ex)
{
- _subscriber->setUnreachable(ex);
+ _subscriber->offline(ex);
}
}
@@ -513,7 +519,8 @@ SubscriberLink::SubscriberLink(
int cost) :
Subscriber(instance, 0, true, obj->ice_getIdentity()),
_obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))),
- _cost(cost)
+ _cost(cost),
+ _warn(true)
{
}
@@ -527,29 +534,51 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events)
//
// Don't propagate a message that has already been forwarded.
- // Also, if this link has a non-zero cost, then don't propagate
- // a message whose cost exceeds the link cost.
+ // Also, if this link has a non-zero cost, then don't propagate a
+ // message whose cost exceeds the link cost.
//
IceUtil::Mutex::Lock sync(_mutex);
-
- if(_state != StateActive)
+
+ if(_state == SubscriberStateError)
{
+ return QueueStateError;
+ }
+
+ //
+ // If the proxy is offline and its time to send another event then
+ // put us into retry state.
+ //
+ if(_state == SubscriberStateOffline)
+ {
+ //
+ // If there are alot of subscribers offline then we will call
+ // Time::now() alot, which could be costly. This could be
+ // optimized to only one per event-batch by making the
+ // forwarded argument an EventInfo thing where the queue-time
+ // is lazy initialized.
//
- // Either the state is error here or the link is inactive.
+ if(IceUtil::Time::now() < _next)
+ {
+ return QueueStateNoFlush;
+ }
+
//
- return (_state == StateError) ? QueueStateError : QueueStateNoFlush;
+ // State transition to online.
+ //
+ _state = SubscriberStateOnline;
}
- size_t s = _events.size();
+ int queued = 0;
for(EventSeq::const_iterator p = events.begin(); p != events.end(); ++p)
{
if(_cost != 0)
{
//
- // Note that we could calculate this cost once and
- // cache it in a private form of the event to avoid
- // this if this really is a performance problem.
+ // Note that we could calculate this cost once and cache
+ // it in a private form of the event to avoid this if this
+ // really is a performance problem (this could use the
+ // EventInfo thing discussed above).
//
int cost = 0;
Ice::Context::const_iterator q = (*p)->context.find("cost");
@@ -562,19 +591,15 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events)
continue;
}
}
+ ++queued;
_events.push_back(*p);
}
- //
- // If no event was queued, or we're busy then the subscriber
- // doesn't need to be flushed, otherwise it must be.
- //
- if(_busy || s == _events.size())
+ if(_state == SubscriberStateFlushPending || queued == 0)
{
return QueueStateNoFlush;
}
-
- _busy = true;
+ _state = SubscriberStateFlushPending;
return QueueStateFlush;
}
@@ -588,13 +613,13 @@ SubscriberLink::flush()
//
// If the subscriber errored out then we're done.
//
- if(_state != StateActive)
+ if(_state == SubscriberStateError)
{
- _busy = false;
return false;
}
+
+ assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
- assert(_busy);
v.swap(_events);
}
@@ -609,18 +634,62 @@ SubscriberLink::response()
{
{
IceUtil::Mutex::Lock sync(_mutex);
+
+ assert(_state == SubscriberStateFlushPending);
- assert(_state == StateActive && _busy);
-
+ //
+ // A successful response means we're no longer retrying, we're
+ // back active.
+ //
+ _warn = true;
+
+ //
+ // No more events, no need to requeue this subscriber.
+ //
if(_events.empty())
{
- _busy = false;
+ _state = SubscriberStateOnline;
return;
}
}
_instance->subscriberPool()->flush(this);
}
+void
+SubscriberLink::offline(const Ice::Exception& e)
+{
+ IceUtil::Mutex::Lock sync(_mutex);
+ if(_state != SubscriberStateOffline)
+ {
+ _next = IceUtil::Time::now() + _instance->discardInterval();
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(_warn)
+ {
+ Ice::Warning warn(traceLevels->logger);
+ warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id)
+ << ": link offline: " << e;
+ }
+ else
+ {
+ if(traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": link offline: " << e
+ << " discarding events: " << _instance->discardInterval() << "s";
+ }
+ }
+
+ _state = SubscriberStateOffline;
+ _warn = false;
+
+ //
+ // Clear all queued events.
+ //
+ _events.clear();
+ }
+}
+
SubscriberPtr
Subscriber::create(
const InstancePtr& instance,
@@ -632,6 +701,7 @@ Subscriber::create(
TraceLevelsPtr traceLevels = instance->traceLevels();
SubscriberPtr subscriber;
+
try
{
string reliability = "oneway";
@@ -640,28 +710,45 @@ Subscriber::create(
{
reliability = p->second;
}
+
+ //
+ // Override the timeout.
+ //
Ice::ObjectPrx newObj;
+ try
+ {
+ newObj = obj->ice_timeout(instance->sendTimeout());
+ }
+ catch(const Ice::FixedProxyException&)
+ {
+ //
+ // In the event IceStorm is collocated this could be a
+ // fixed proxy in which case its not possible to set the
+ // timeout.
+ //
+ newObj = obj;
+ }
if(reliability == "batch")
{
- if(obj->ice_isDatagram())
+ if(newObj->ice_isDatagram())
{
- newObj = obj->ice_batchDatagram();
+ newObj = newObj->ice_batchDatagram();
}
else
{
- newObj = obj->ice_batchOneway();
+ newObj = newObj->ice_batchOneway();
}
subscriber = new SubscriberOneway(instance, proxy, newObj, true);
}
else if(reliability == "twoway")
{
- newObj = obj->ice_twoway();
+ newObj = newObj->ice_twoway();
subscriber = new SubscriberTwoway(instance, proxy, newObj);
}
else if(reliability == "twoway ordered")
{
- newObj = obj->ice_twoway();
+ newObj = newObj->ice_twoway();
subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
}
else // reliability == "oneway"
@@ -671,13 +758,9 @@ Subscriber::create(
Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
out << reliability <<" mode not understood.";
}
- if(obj->ice_isDatagram())
- {
- newObj = obj;
- }
- else
+ if(!newObj->ice_isDatagram())
{
- newObj = obj->ice_oneway();
+ newObj = newObj->ice_oneway();
}
subscriber = new SubscriberOneway(instance, proxy, newObj, false);
}
@@ -705,23 +788,6 @@ Subscriber::~Subscriber()
{
}
-void
-Subscriber::reachable()
-{
- IceUtil::Mutex::Lock sync(_mutex);
- if(_state == StateUnreachable)
- {
- _state = StateActive;
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << "Reachable " << _instance->communicator()->identityToString(id());
- }
- }
-}
-
Ice::ObjectPrx
Subscriber::proxy() const
{
@@ -745,22 +811,17 @@ Subscriber::queue(bool, const EventSeq& events)
{
IceUtil::Mutex::Lock sync(_mutex);
- if(_state != StateActive)
+ if(_state == SubscriberStateError)
{
return QueueStateError;
}
-
copy(events.begin(), events.end(), back_inserter(_events));
-
- //
- // If another thread is busy delivering events then the subscriber
- // does not need to be flushed.
- //
- if(_busy)
+ if(_state == SubscriberStateFlushPending)
{
return QueueStateNoFlush;
}
- _busy = true;
+
+ _state = SubscriberStateFlushPending;
return QueueStateFlush;
}
@@ -807,13 +868,13 @@ Subscriber::pollMaxFlushTime(const IceUtil::Time& now)
}
void
-Subscriber::setError(const Ice::Exception& e)
+Subscriber::error(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
- if(_state != StateError)
+ if(_state != SubscriberStateError)
{
- _state = StateError;
- _busy = false;
+ _state = SubscriberStateError;
+ _events.clear();
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->subscriber > 0)
@@ -824,24 +885,6 @@ Subscriber::setError(const Ice::Exception& e)
}
}
-void
-Subscriber::setUnreachable(const Ice::Exception& e)
-{
- IceUtil::Mutex::Lock sync(_mutex);
- if(_state != StateUnreachable)
- {
- _state = StateUnreachable;
- _busy = false;
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": link publish unreachable: " << e;
- }
- }
-}
-
Subscriber::Subscriber(
const InstancePtr& instance,
const Ice::ObjectPrx& proxy,
@@ -851,8 +894,7 @@ Subscriber::Subscriber(
_id(id),
_persistent(persistent),
_proxy(proxy),
- _state(StateActive),
- _busy(false),
+ _state(SubscriberStateOnline),
_resetMax(true),
_maxSend(IceUtil::Time::seconds(60*24)) // A long time
{
@@ -864,3 +906,21 @@ IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id)
return subscriber->id() == id;
}
+bool
+IceStorm::operator==(const Subscriber& s1, const Subscriber& s2)
+{
+ return &s1 == &s2;
+}
+
+bool
+IceStorm::operator!=(const Subscriber& s1, const Subscriber& s2)
+{
+ return &s1 != &s2;
+}
+
+bool
+IceStorm::operator<(const Subscriber& s1, const Subscriber& s2)
+{
+ return &s1 < &s2;
+}
+