summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2006-11-20 02:09:09 +0000
committerMatthew Newhook <matthew@zeroc.com>2006-11-20 02:09:09 +0000
commitd994d0d5549aa90a50559888a2c29f6b709ce70d (patch)
tree09a78e7f6bc71af7d2d99d3d9f0abaed0f991853
parentminor fix (diff)
downloadice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.bz2
ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.xz
ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.zip
Added shutdown state.
-rw-r--r--cpp/src/IceStorm/IceStormInternal.ice44
-rw-r--r--cpp/src/IceStorm/Service.cpp12
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp213
-rw-r--r--cpp/src/IceStorm/Subscriber.h4
-rw-r--r--cpp/src/IceStorm/SubscriberPool.cpp52
-rw-r--r--cpp/src/IceStorm/SubscriberPool.h4
-rw-r--r--cpp/src/IceStorm/TopicI.cpp27
7 files changed, 299 insertions, 57 deletions
diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice
index 96d1f0e3b00..eb889338e61 100644
--- a/cpp/src/IceStorm/IceStormInternal.ice
+++ b/cpp/src/IceStorm/IceStormInternal.ice
@@ -18,6 +18,26 @@ module IceStorm
/**
*
+ * The event data.
+ *
+ **/
+struct EventData
+{
+ /** The operation name. */
+ string op;
+ /** The operation mode. */
+ Ice::OperationMode mode;
+ /** The encoded data for the operation's input parameters. */
+ Ice::ByteSeq data;
+ /** The Ice::Current::Context data from the originating request. */
+ Ice::Context context;
+};
+
+/** A sequence of EventData. */
+sequence<EventData> EventDataSeq;
+
+/**
+ *
* The TopicLink interface. This is used to forward events between
* federated Topic instances.
*
@@ -33,7 +53,7 @@ interface TopicLink
* @param events The events to forward.
*
**/
- ["ami"] void forward(EventSeq events);
+ ["ami"] void forward(EventDataSeq events);
};
/**
@@ -55,6 +75,28 @@ interface TopicInternal extends Topic
idempotent TopicLink* getLinkProxy();
};
+/**
+ *
+ * This is version 2 of the TopicLink interface. This is used to
+ * forward events between federated Topic instances for Ice 3.2 and
+ * later.
+ *
+ * @see TopicInternal
+ *
+ **/
+interface TopicLinkV2
+{
+ /**
+ *
+ * Forward a sequence of events.
+ *
+ * @param events The events to forward.
+ *
+ **/
+ ["ami"] void forward(EventSeq events);
+};
+
+
}; // End module IceStorm
#endif
diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp
index f17ab500ace..e81e0370a4c 100644
--- a/cpp/src/IceStorm/Service.cpp
+++ b/cpp/src/IceStorm/Service.cpp
@@ -123,13 +123,6 @@ IceStorm::ServiceI::start(
_topicAdapter->activate();
_publishAdapter->activate();
-
- //
- // The keep alive thread must be started after all topics are
- // installed so that any upstream topics are notified immediately
- // after startup.
- //
- //_instance->keepAlive()->startPinging();
}
void
@@ -181,9 +174,8 @@ IceStorm::ServiceI::stop()
// does not clear the references. This is because the shutdown has
// to be in two stages. First we destroy & join with the threads
// so that no further activity can take place. Then we reap()
- // which has to call on various instance objects (such as the keep
- // alive thread), then we clear the instance which breaks any
- // cycles.
+ // which has to call on various instance objects, then we clear
+ // the instance which breaks any cycles.
//
//
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 844f9902b10..1089b4f3e01 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -167,6 +167,11 @@ private:
const TopicLinkPrx _obj;
const int _cost;
+ // The first event that is sent we check the whether the object
+ // supports the "V2" facet.
+ bool _checkV2;
+ TopicLinkV2Prx _objV2;
+
// The next to try sending a new event if we're offline.
IceUtil::Time _next;
bool _warn;
@@ -204,6 +209,10 @@ SubscriberOneway::flush()
{
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -281,6 +290,10 @@ SubscriberOneway::flush()
return false;
}
+ if(_state == SubscriberStateShutdown)
+ {
+ return false;
+ }
if(!_events.empty())
{
assert(_state == SubscriberStateFlushPending);
@@ -351,6 +364,10 @@ SubscriberTwoway::flush()
{
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -377,6 +394,11 @@ SubscriberTwoway::flush()
//
sync.acquire();
+ if(_state == SubscriberStateShutdown)
+ {
+ return false;
+ }
+
//
// If there have been more events queued in the meantime then
// we have a pending flush.
@@ -444,6 +466,10 @@ SubscriberTwowayOrdered::flush()
{
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -459,15 +485,17 @@ SubscriberTwowayOrdered::flush()
void
SubscriberTwowayOrdered::response()
{
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ assert(_state != SubscriberStateError);
+ if(_state == SubscriberStateShutdown)
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- assert(_state != SubscriberStateError);
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return;
- }
+ return;
+ }
+ if(_events.empty())
+ {
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -479,7 +507,45 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
{
public:
- Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : _subscriber(subscriber)
+ Topiclink_forwardI(const SubscriberLinkPtr& subscriber) :
+ _subscriber(subscriber)
+ {
+ }
+
+ virtual void
+ ice_response()
+ {
+ _subscriber->response();
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception& ex)
+ {
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ _subscriber->error(ex);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _subscriber->offline(ex);
+ }
+ }
+
+private:
+
+ const SubscriberLinkPtr _subscriber;
+};
+
+class TopiclinkV2_forwardI : public IceStorm::AMI_TopicLinkV2_forward
+{
+public:
+
+ TopiclinkV2_forwardI(const SubscriberLinkPtr& subscriber) :
+ _subscriber(subscriber)
{
}
@@ -520,6 +586,7 @@ SubscriberLink::SubscriberLink(
Subscriber(instance, 0, true, obj->ice_getIdentity()),
_obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))),
_cost(cost),
+ _checkV2(false),
_warn(true)
{
}
@@ -544,6 +611,10 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events)
{
return QueueStateError;
}
+ if(_state == SubscriberStateShutdown)
+ {
+ return QueueStateNoFlush;
+ }
//
// If the proxy is offline and its time to send another event then
@@ -618,13 +689,66 @@ SubscriberLink::flush()
return false;
}
+ //
+ // Flush cannot be called with SubscriberStateShutdown because
+ // the pool workers are joined with before flushing.
+ //
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
v.swap(_events);
}
- _obj->forward_async(new Topiclink_forwardI(this), v);
+ //
+ // Only one thread at a time can be active at this point.
+ //
+ if(!_checkV2)
+ {
+ try
+ {
+ _objV2 = TopicLinkV2Prx::checkedCast(_obj, "V2");
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ error(ex);
+ return false;
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ offline(ex);
+ return false;
+ }
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(!_objV2 && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": link is v1 IceStorm.";
+ }
+ _checkV2 = true;
+ }
+
+ if(_objV2)
+ {
+ _objV2->forward_async(new TopiclinkV2_forwardI(this), v);
+ }
+ else
+ {
+ //
+ // Transform the event sequence into a EventData.
+ //
+ EventDataSeq events;
+ events.reserve(v.size());
+ for(EventSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ EventData data;
+ data.op = (*p)->op;
+ data.mode = (*p)->mode;
+ data.data = (*p)->data;
+ data.context = (*p)->context;
+ events.push_back(data);
+ }
+ _obj->forward_async(new Topiclink_forwardI(this), events);
+ }
return false;
}
@@ -632,25 +756,27 @@ SubscriberLink::flush()
void
SubscriberLink::response()
{
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ assert(_state != SubscriberStateError);
+ if(_state == SubscriberStateShutdown)
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- assert(_state == SubscriberStateFlushPending);
-
- //
- // A successful response means we're no longer retrying, we're
- // back active.
- //
- _warn = true;
-
- //
- // No more events, no need to requeue this subscriber.
- //
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return;
- }
+ return;
+ }
+
+ //
+ // A successful response means we're no longer retrying, we're
+ // back active.
+ //
+ _warn = true;
+
+ //
+ // No more events, no need to requeue this subscriber.
+ //
+ if(_events.empty())
+ {
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -659,6 +785,10 @@ void
SubscriberLink::offline(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
+ if(_state == SubscriberStateShutdown)
+ {
+ return;
+ }
assert(_state != SubscriberStateOffline);
_next = IceUtil::Time::now() + _instance->discardInterval();
@@ -780,7 +910,10 @@ Subscriber::create(
const TopicLinkPrx& link,
int cost)
{
- return new SubscriberLink(instance, link, cost);
+ return new SubscriberLink(
+ instance,
+ TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
+ cost);
}
Subscriber::~Subscriber()
@@ -814,6 +947,11 @@ Subscriber::queue(bool, const EventSeq& events)
{
return QueueStateError;
}
+ if(_state == SubscriberStateShutdown)
+ {
+ return QueueStateNoFlush;
+ }
+
copy(events.begin(), events.end(), back_inserter(_events));
if(_state == SubscriberStateFlushPending)
{
@@ -840,10 +978,21 @@ Subscriber::destroy()
{
// Ignore
}
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ // Ignore
+ }
}
}
void
+Subscriber::shutdown()
+{
+ IceUtil::Mutex::Lock sync(_mutex);
+ _state = SubscriberStateShutdown;
+}
+
+void
Subscriber::flushTime(const IceUtil::Time& interval)
{
if(_resetMax || interval > _maxSend)
@@ -857,10 +1006,6 @@ Subscriber::flushTime(const IceUtil::Time& interval)
IceUtil::Time
Subscriber::pollMaxFlushTime(const IceUtil::Time& now)
{
- //IceUtil::Time max = _maxSend;
- //_maxSend = _maxSend * 0.95;
- //return max;
-
// The next call to flushTime can reset the max time.
_resetMax = true;
return _maxSend;
@@ -870,7 +1015,7 @@ void
Subscriber::error(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
- if(_state != SubscriberStateError)
+ if(_state != SubscriberStateError && _state != SubscriberStateShutdown)
{
_state = SubscriberStateError;
_events.clear();
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h
index 580bef03cbb..ba0502b43dc 100644
--- a/cpp/src/IceStorm/Subscriber.h
+++ b/cpp/src/IceStorm/Subscriber.h
@@ -47,6 +47,7 @@ public:
//
virtual bool flush() = 0;
virtual void destroy();
+ void shutdown();
//
// These methods must only be called by the SubscriberPool they
@@ -74,7 +75,8 @@ protected:
SubscriberStateOnline,
SubscriberStateFlushPending,
SubscriberStateOffline,
- SubscriberStateError
+ SubscriberStateError,
+ SubscriberStateShutdown
};
SubscriberState _state; // The subscriber state.
EventSeq _events; // The queue of events to send.
diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp
index ee3865bc74d..f118874295f 100644
--- a/cpp/src/IceStorm/SubscriberPool.cpp
+++ b/cpp/src/IceStorm/SubscriberPool.cpp
@@ -81,7 +81,7 @@ SubscriberPoolMonitor::SubscriberPoolMonitor(const SubscriberPoolPtr& manager, c
_manager(manager),
_timeout(timeout),
_needCheck(false),
- _destroy(false)
+ _destroyed(false)
{
start();
}
@@ -97,7 +97,7 @@ SubscriberPoolMonitor::run()
{
{
Lock sync(*this);
- if(_destroy)
+ if(_destroyed)
{
return;
}
@@ -148,7 +148,7 @@ void
SubscriberPoolMonitor::destroy()
{
Lock sync(*this);
- _destroy = true;
+ _destroyed = true;
notify();
}
@@ -162,7 +162,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) :
"IceStorm.SubscriberPool.Timeout", 1000), 50))),
// 10 * the stall timeout.
_stallCheck(_timeout * 10),
- _destroy(false),
+ _destroyed(false),
_reap(0)
{
try
@@ -197,6 +197,10 @@ void
SubscriberPool::flush(list<SubscriberPtr>& subscribers)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
//
// Splice on the new set of subscribers to SubscriberPool.
//
@@ -209,6 +213,10 @@ void
SubscriberPool::flush(const SubscriberPtr& subscriber)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
_pending.push_back(subscriber);
assert(invariants());
notify();
@@ -218,6 +226,10 @@ void
SubscriberPool::add(const SubscriberPtr& subscriber)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
_subscribers.push_back(subscriber);
assert(invariants());
}
@@ -226,6 +238,10 @@ void
SubscriberPool::remove(const SubscriberPtr& subscriber)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
//
// Note that this cannot remove based on the subscriber id because
// the pool is TopicManager scoped and not topic scoped therefore
@@ -245,6 +261,12 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ subscriber = 0;
+ return;
+ }
+
if(subscriber)
{
if(requeue)
@@ -353,7 +375,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
}
}
- while(_pending.empty() && !_destroy)
+ while(_pending.empty() && !_destroyed)
{
//
// If we wait then there is no need to monitor anymore.
@@ -362,7 +384,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil::
wait();
}
- if(_destroy)
+ if(_destroyed)
{
return;
}
@@ -398,18 +420,22 @@ SubscriberPool::destroy()
{
//
// First mark the pool as destroyed. This causes all of the worker
- // threads to unblock and terminate.
+ // threads to unblock and terminate. We also copy the list of subscribers
+ // for shutdown. No new subscribers can be added once _destroyed is set.
//
+ std::list<SubscriberPtr> subscribers;
+
{
Lock sync(*this);
- _destroy = true;
+ _destroyed = true;
notifyAll();
if(_subscriberPoolMonitor)
{
_subscriberPoolMonitor->destroy();
}
+ subscribers = _subscribers;
+ _subscribers.clear();
}
-
//
// Next join with each worker.
//
@@ -420,6 +446,14 @@ SubscriberPool::destroy()
_workers.clear();
//
+ // Shutdown each subscriber.
+ //
+ for(list<SubscriberPtr>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
+ {
+ (*q)->shutdown();
+ }
+
+ //
// Once all of the workers have gone then we'll no longer have
// concurrent access to the pool monitor, so we can join with it
// and then clear to remove the circular reference count.
diff --git a/cpp/src/IceStorm/SubscriberPool.h b/cpp/src/IceStorm/SubscriberPool.h
index 0fd2855d548..76289fe3429 100644
--- a/cpp/src/IceStorm/SubscriberPool.h
+++ b/cpp/src/IceStorm/SubscriberPool.h
@@ -50,7 +50,7 @@ private:
const SubscriberPoolPtr _manager;
const IceUtil::Time _timeout;
bool _needCheck;
- bool _destroy;
+ bool _destroyed;
};
typedef IceUtil::Handle<SubscriberPoolMonitor> SubscriberPoolMonitorPtr;
@@ -91,7 +91,7 @@ private:
std::list<SubscriberPtr> _pending;
std::list<SubscriberPtr> _subscribers;
- bool _destroy;
+ bool _destroyed;
std::list<IceUtil::ThreadPtr> _workers;
int _reap;
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index b00bbc2ab45..98b11165cd6 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -83,6 +83,32 @@ public:
}
virtual void
+ forward(const EventDataSeq& v, const Ice::Current& current)
+ {
+ EventSeq events;
+ events.reserve(v.size());
+ for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ events.push_back(new Event(p->op, p->mode, p->data, p->context));
+ }
+ _topic->publish(true, events);
+ }
+
+private:
+
+ const TopicIPtr _topic;
+};
+
+class TopicLinkV2I : public TopicLinkV2
+{
+public:
+
+ TopicLinkV2I(const TopicIPtr& topic) :
+ _topic(topic)
+ {
+ }
+
+ virtual void
forward(const EventSeq& v, const Ice::Current& current)
{
_topic->publish(true, v);
@@ -125,6 +151,7 @@ TopicI::TopicI(
//
id.name = "link";
_linkPrx = TopicLinkPrx::uncheckedCast(_instance->objectAdapter()->add(new TopicLinkI(this), id));
+ _instance->objectAdapter()->addFacet(new TopicLinkV2I(this), id, "V2");
//
// Re-establish linked subscribers.