summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp1421
1 files changed, 1126 insertions, 295 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 1f3ea907310..a391ab306de 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -12,20 +12,33 @@
#include <IceStorm/Instance.h>
#include <IceStorm/Subscriber.h>
#include <IceStorm/TraceLevels.h>
-#include <IceStorm/SubscriberPool.h>
+#include <IceStorm/NodeI.h>
+#include <IceStorm/Observers.h>
#include <Ice/LoggerUtil.h>
-#include <Freeze/Initialize.h>
+#include <Freeze/Freeze.h>
#include <algorithm>
-using namespace IceStorm;
using namespace std;
+using namespace IceStorm;
+using namespace IceStormElection;
namespace
{
+void
+halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex)
+{
+ {
+ Ice::Error error(com->getLogger());
+ error << "fatal exception: " << ex << "\n*** Aborting application ***";
+ }
+
+ abort();
+}
+
//
// The servant has a 1-1 association with a topic. It is used to
// receive events from Publishers.
@@ -34,16 +47,22 @@ class PublisherI : public Ice::BlobjectArray
{
public:
- PublisherI(const TopicIPtr& topic) :
- _topic(topic)
+ PublisherI(const TopicImplPtr& topic, const InstancePtr& instance) :
+ _topic(topic), _instance(instance)
{
}
+ ~PublisherI()
+ {
+ //cout << "~PublisherI" << endl;
+ }
+
virtual bool
ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
Ice::ByteSeq&,
const Ice::Current& current)
{
+ // The publish call does a cached read.
EventDataPtr event = new EventData(
current.operation,
current.mode,
@@ -66,7 +85,8 @@ public:
private:
- const TopicIPtr _topic;
+ const TopicImplPtr _topic;
+ const InstancePtr _instance;
};
//
@@ -77,118 +97,440 @@ class TopicLinkI : public TopicLink
{
public:
- TopicLinkI(const TopicIPtr& topic) :
- _topic(topic)
+ TopicLinkI(const TopicImplPtr& impl, const InstancePtr& instance) :
+ _impl(impl), _instance(instance)
{
}
+ ~TopicLinkI()
+ {
+ //cout << "~TopicLinkI" << endl;
+ }
+
virtual void
forward(const EventDataSeq& v, const Ice::Current& current)
{
- _topic->publish(true, v);
+ // The publish call does a cached read.
+ _impl->publish(true, v);
+ }
+
+private:
+
+ const TopicImplPtr _impl;
+ const InstancePtr _instance;
+};
+
+class TopicI : public TopicInternal
+{
+public:
+
+ TopicI(const TopicImplPtr& impl, const InstancePtr& instance) :
+ _impl(impl), _instance(instance)
+ {
+ }
+
+ ~TopicI()
+ {
+ //cout << "~TopicI" << endl;
+ }
+
+ virtual string getName(const Ice::Current&) const
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->getName();
+ }
+
+ virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->getPublisher();
+ }
+
+ virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current&) const
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->getNonReplicatedPublisher();
+ }
+
+ virtual void subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& current)
+ {
+ while(true)
+ {
+ Ice::Long generation = -1;
+ TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ master->subscribe(qos, obj);
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ _impl->subscribe(qos, obj);
+ }
+ break;
+ }
+ }
+
+ virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj,
+ const Ice::Current& current)
+ {
+ while(true)
+ {
+ Ice::Long generation = -1;
+ TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ return master->subscribeAndGetPublisher(qos, obj);
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ return _impl->subscribeAndGetPublisher(qos, obj);
+ }
+ }
+ }
+
+ virtual void unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current)
+ {
+ while(true)
+ {
+ Ice::Long generation = -1;
+ TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ master->unsubscribe(subscriber);
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ _impl->unsubscribe(subscriber);
+ }
+ break;
+ }
+ }
+
+ virtual TopicLinkPrx getLinkProxy(const Ice::Current&)
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->getLinkProxy();
+ }
+
+ virtual void reap(const Ice::IdentitySeq& ids, const Ice::Current& current)
+ {
+ NodeIPtr node = _instance->node();
+ if(!node->updateMaster(__FILE__, __LINE__))
+ {
+ throw ReapWouldBlock();
+ }
+ FinishUpdateHelper unlock(node);
+ _impl->reap(ids);
+ }
+
+ virtual void link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current)
+ {
+ while(true)
+ {
+ Ice::Long generation = -1;
+ TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ master->link(topic, cost);
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ _impl->link(topic, cost);
+ }
+ break;
+ }
+ }
+
+ virtual void unlink(const TopicPrx& topic, const Ice::Current& current)
+ {
+ while(true)
+ {
+ Ice::Long generation = -1;
+ TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ master->unlink(topic);
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ _impl->unlink(topic);
+ }
+ break;
+ }
+ }
+
+ virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const
+ {
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+ return _impl->getLinkInfoSeq();
+ }
+
+ virtual void destroy(const Ice::Current& current)
+ {
+ while(true)
+ {
+ Ice::Long generation = -1;
+ TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
+ if(master)
+ {
+ try
+ {
+ master->destroy();
+ }
+ catch(const Ice::ConnectFailedException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ catch(const Ice::TimeoutException&)
+ {
+ _instance->node()->recovery(generation);
+ continue;
+ }
+ }
+ else
+ {
+ FinishUpdateHelper unlock(_instance->node());
+ _impl->destroy();
+ }
+ break;
+ }
}
private:
- const TopicIPtr _topic;
+ TopicPrx getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const
+ {
+ NodeIPtr node = _instance->node();
+ Ice::ObjectPrx master;
+ if(node)
+ {
+ master = _instance->node()->startUpdate(generation, file, line);
+ }
+ return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx();
+ }
+
+ const TopicImplPtr _impl;
+ const InstancePtr _instance;
};
}
+
namespace IceStorm
{
extern string identityToTopicName(const Ice::Identity& id);
}
-TopicI::TopicI(
+TopicImpl::TopicImpl(
const InstancePtr& instance,
const string& name,
const Ice::Identity& id,
- const LinkRecordSeq& topicRecord,
- const string& envName,
- const string& dbName) :
+ const SubscriberRecordSeq& subscribers) :
_instance(instance),
_name(name),
_id(id),
- _connection(Freeze::createConnection(instance->communicator(), envName)),
- _topics(_connection, dbName, false),
- _topicRecord(topicRecord),
+ _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())),
+ _subscriberMap(_connection, "subscribers"),
+ _llumap(_connection, "llu"),
_destroyed(false)
{
- //
- // Create a servant per topic to receive event data. If the
- // category is empty then we are in backwards compatibility
- // mode. In this case the servant's identity is
- // category=<topicname>, name=publish, otherwise the name is
- // <instancename>/publisher.<topicname>. The same applies to the
- // link proxy.
- //
- // Activate the object and save a reference to give to publishers.
- //
- Ice::Identity pubid;
- Ice::Identity linkid;
- if(id.category.empty())
- {
- pubid.category = _name;
- pubid.name = "publish";
- linkid.category = _name;
- linkid.name = "link";
- }
- else
+ try
{
- pubid.category = id.category;
- pubid.name = _name + ".publish";
- linkid.category = id.category;
- linkid.name = _name + ".link";
- }
-
- _publisherPrx = _instance->objectAdapter()->add(new PublisherI(this), pubid);
- _linkPrx = TopicLinkPrx::uncheckedCast(_instance->objectAdapter()->add(new TopicLinkI(this), linkid));
+ __setNoDelete(true);
+ // TODO: If we want to improve the performance of the
+ // non-replicated case we could allocate a null-topic impl here.
+ _servant = new TopicI(this, instance);
- //
- // Re-establish linked subscribers.
- //
- for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p)
- {
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
+ //
+ // Create a servant per topic to receive event data. If the
+ // category is empty then we are in backwards compatibility
+ // mode. In this case the servant's identity is
+ // category=<topicname>, name=publish, otherwise the name is
+ // <instancename>/<topicname>.publish. The same applies to the
+ // link proxy.
+ //
+ // Activate the object and save a reference to give to publishers.
+ //
+ Ice::Identity pubid;
+ Ice::Identity linkid;
+ if(id.category.empty())
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " relink " << _instance->communicator()->identityToString(p->theTopic->ice_getIdentity());
+ pubid.category = _name;
+ pubid.name = "publish";
+ linkid.category = _name;
+ linkid.name = "link";
+ }
+ else
+ {
+ pubid.category = id.category;
+ pubid.name = _name + ".publish";
+ linkid.category = id.category;
+ linkid.name = _name + ".link";
}
+ _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
+ _linkPrx = TopicLinkPrx::uncheckedCast(
+ _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
+
//
- // Create the subscriber object add it to the set of
- // subscribers.
+ // Re-establish subscribers.
//
- SubscriberPtr subscriber = Subscriber::create(_instance, p->obj, p->cost);
- _subscribers.push_back(subscriber);
- _instance->subscriberPool()->add(subscriber);
+ for(SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
+ {
+ Ice::Identity id = p->obj->ice_getIdentity();
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " recreate " << _instance->communicator()->identityToString(id);
+ }
+
+ try
+ {
+ //
+ // Create the subscriber object add it to the set of
+ // subscribers.
+ //
+ SubscriberPtr subscriber = Subscriber::create(_instance, *p);
+ _subscribers.push_back(subscriber);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ Ice::Warning out(traceLevels->logger);
+ out << _name << " recreate " << _instance->communicator()->identityToString(id) << " failed: " << ex;
+ continue;
+ }
+ }
}
+ catch(...)
+ {
+ shutdown();
+ __setNoDelete(false);
+ throw;
+ }
+ __setNoDelete(false);
}
-TopicI::~TopicI()
+TopicImpl::~TopicImpl()
{
+ //cout << "~TopicImpl" << endl;
}
string
-TopicI::getName(const Ice::Current&) const
+TopicImpl::getName() const
{
// Immutable
return _name;
}
Ice::ObjectPrx
-TopicI::getPublisher(const Ice::Current&) const
+TopicImpl::getPublisher() const
{
// Immutable
+ if(_instance->publisherReplicaProxy())
+ {
+ return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
+ }
return _publisherPrx;
}
+Ice::ObjectPrx
+TopicImpl::getNonReplicatedPublisher() const
+{
+ // If there is an adapter id configured then we're using icegrid
+ // so create an indirect proxy, otherwise create a direct proxy.
+ if(!_publisherPrx->ice_getAdapterId().empty())
+ {
+ return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
+ }
+ else
+ {
+ return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
+ }
+}
+
//
// COMPILERFIX: For some reason with VC6 find reports an error.
//
#if defined(_MSC_VER) && (_MSC_VER < 1300)
-vector<SubscriberPtr>::iterator
+namespace
+{
+static vector<SubscriberPtr>::iterator
find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, const Ice::Identity& ident)
{
while(start != end)
@@ -201,10 +543,29 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end,
}
return end;
}
+}
#endif
+namespace
+{
+static void
+trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& s)
+{
+ out << '[';
+ for(vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
+ {
+ if(p != s.begin())
+ {
+ out << ",";
+ }
+ out << instance->communicator()->identityToString((*p)->id());
+ }
+ out << "]";
+}
+}
+
void
-TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
+TopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj)
{
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
@@ -212,7 +573,7 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Subscribe: " << _instance->communicator()->identityToString(id);
+ out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
if(traceLevels->topic > 1)
{
out << " QoS: ";
@@ -224,10 +585,11 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr
}
out << '[' << p->first << "," << p->second << ']';
}
+ out << " subscriptions: ";
+ trace(out, _instance, _subscribers);
}
}
-
string reliability = "oneway";
{
QoS::iterator p = qos.find("reliability");
@@ -273,28 +635,103 @@ TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Curr
}
IceUtil::Mutex::Lock sync(_subscribersMutex);
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
+ SubscriberRecord record;
+ record.id = id;
+ record.obj = newObj;
+ record.theQoS = qos;
+ record.topicName = _name;
+ record.link = false;
+ record.cost = 0;
+
+ LogUpdate llu;
+
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
if(p != _subscribers.end())
{
+ // If we already have this subscriber remove it from our
+ // subscriber list and remove it from the database.
(*p)->destroy();
- _instance->subscriberPool()->remove(*p);
_subscribers.erase(p);
+
+ for(;;)
+ {
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = record.id;
+ SubscriberMap::iterator e = _subscriberMap.find(key);
+ if(e != _subscriberMap.end())
+ {
+ _subscriberMap.erase(e);
+ }
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ llu = ci->second;
+ llu.iteration++;
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+ Ice::IdentitySeq ids;
+ ids.push_back(id);
+ _instance->observers()->removeSubscriber(llu, _name, ids);
+ }
+
+ SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ for(;;)
+ {
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = subscriber->id();
+ _subscriberMap.put(SubscriberMap::value_type(key, record));
+ // Update the LLU.
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ llu = ci->second;
+ llu.iteration++;
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
}
- SubscriberPtr subscriber = Subscriber::create(_instance, _name, newObj, qos);
_subscribers.push_back(subscriber);
- _instance->subscriberPool()->add(subscriber);
+
+ _instance->observers()->addSubscriber(llu, _name, record);
}
Ice::ObjectPrx
-TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
+TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
{
Ice::Identity id = obj->ice_getIdentity();
+
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Subscribe: " << _instance->communicator()->identityToString(id);
+ out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
if(traceLevels->topic > 1)
{
out << " QoS: ";
@@ -304,27 +741,70 @@ TopicI::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, cons
{
out << ',';
}
- out << '[' << p->first << "," << p->second << ']';
+
}
+ out << " subscriptions: ";
+ trace(out, _instance, _subscribers);
}
}
IceUtil::Mutex::Lock sync(_subscribersMutex);
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
+
+ SubscriberRecord record;
+ record.id = id;
+ record.obj = obj;
+ record.theQoS = qos;
+ record.topicName = _name;
+ record.link = false;
+ record.cost = 0;
+
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
if(p != _subscribers.end())
{
throw AlreadySubscribed();
}
- SubscriberPtr subscriber = Subscriber::create(_instance, _name, obj, qos);
+ LogUpdate llu;
+
+ SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ for(;;)
+ {
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = subscriber->id();
+ _subscriberMap.put(SubscriberMap::value_type(key, record));
+
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ llu = ci->second;
+ llu.iteration++;
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+
_subscribers.push_back(subscriber);
- _instance->subscriberPool()->add(subscriber);
+
+ _instance->observers()->addSubscriber(llu, _name, record);
return subscriber->proxy();
}
void
-TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
+TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
{
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(!subscriber)
@@ -342,110 +822,124 @@ TopicI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "Unsubscribe: " << _instance->communicator()->identityToString(id);
+ out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
+ if(traceLevels->topic > 1)
+ {
+ trace(out, _instance, _subscribers);
+ }
}
- //
- // Unsubscribe the subscriber with this identity.
- //
- removeSubscriber(subscriber);
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ Ice::IdentitySeq ids;
+ ids.push_back(id);
+ removeSubscribers(ids);
}
TopicLinkPrx
-TopicI::getLinkProxy(const Ice::Current&)
+TopicImpl::getLinkProxy()
{
// immutable
+ if(_instance->publisherReplicaProxy())
+ {
+ return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity(
+ _linkPrx->ice_getIdentity()));
+ }
return _linkPrx;
}
void
-TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
+TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
{
TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
TopicLinkPrx link = internal->getLinkProxy();
- IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
- if(_destroyed)
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
{
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
+ << " cost " << cost;
}
- reap();
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
Ice::Identity id = topic->ice_getIdentity();
- string name = identityToTopicName(id);
- // Validate that this topic doesn't already have an established
- // link.
- for(LinkRecordSeq::const_iterator p = _topicRecord.begin(); p != _topicRecord.end(); ++p)
+ SubscriberRecord record;
+ record.id = id;
+ record.obj = link;
+ record.theTopic = topic;
+ record.topicName = _name;
+ record.link = true;
+ record.cost = cost;
+
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
+ if(p != _subscribers.end())
{
- if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
- {
- LinkExists ex;
- ex.name = name;
- throw ex;
- }
+ string name = identityToTopicName(id);
+ LinkExists ex;
+ ex.name = name;
+ throw ex;
}
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
+
+ LogUpdate llu;
+
+ SubscriberPtr subscriber = Subscriber::create(_instance, record);
+
+ for(;;)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " link " << _instance->communicator()->identityToString(id)
- << " cost " << cost;
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = id;
+ _subscriberMap.put(SubscriberMap::value_type(key, record));
+
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ llu = ci->second;
+ llu.iteration++;
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
}
- SubscriberPtr subscriber = Subscriber::create(_instance, link, cost);
-
- //
- // Create the LinkRecord
- //
- LinkRecord record;
- record.obj = link;
- record.cost = cost;
- record.theTopic = topic;
-
- //
- // Save
- //
- _topicRecord.push_back(record);
- _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
-
- IceUtil::Mutex::Lock subscriberSync(_subscribersMutex);
_subscribers.push_back(subscriber);
- _instance->subscriberPool()->add(subscriber);
+
+ _instance->observers()->addSubscriber(llu, _name, record);
}
void
-TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
+TopicImpl::unlink(const TopicPrx& topic)
{
- IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
if(_destroyed)
{
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
- reap();
-
Ice::Identity id = topic->ice_getIdentity();
- string name = identityToTopicName(id);
- LinkRecordSeq::iterator p = _topicRecord.begin();
- while(p != _topicRecord.end())
- {
- if(p->theTopic->ice_getIdentity() == topic->ice_getIdentity())
- {
- break;
- }
- ++p;
- }
- if(p == _topicRecord.end())
+ vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
+ if(p == _subscribers.end())
{
+ string name = identityToTopicName(id);
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlink " << name << " failed - not linked";
+ out << _name << ": unlink " << name << " failed - not linked";
}
NoSuchLink ex;
@@ -453,48 +947,79 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
throw ex;
}
- Ice::ObjectPrx subscriber = p->obj;
- _topicRecord.erase(p);
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << " unlink " << _instance->communicator()->identityToString(id);
+ }
+
+ Ice::IdentitySeq ids;
+ ids.push_back(id);
+ removeSubscribers(ids);
+}
- //
- // Save
- //
- _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
+void
+TopicImpl::reap(const Ice::IdentitySeq& ids)
+{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlink " << _instance->communicator()->identityToString(id);
+ out << _name << ": reap ";
+ for(Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end() ; ++p)
+ {
+ if(p != ids.begin())
+ {
+ out << ",";
+ }
+ out << _instance->communicator()->identityToString(*p);
+ }
+ }
+
+ removeSubscribers(ids);
+}
+
+void
+TopicImpl::shutdown()
+{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ _servant = 0;
+
+ // Shutdown each subscriber. This waits for the event queues to drain.
+ for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ (*p)->shutdown();
}
- removeSubscriber(subscriber);
}
LinkInfoSeq
-TopicI::getLinkInfoSeq(const Ice::Current&) const
+TopicImpl::getLinkInfoSeq() const
{
- IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
- TopicI* This = const_cast<TopicI*>(this);
- This->reap();
-
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
LinkInfoSeq seq;
-
- for(LinkRecordSeq::const_iterator q = _topicRecord.begin(); q != _topicRecord.end(); ++q)
+ for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
{
- LinkInfo info;
- info.name = identityToTopicName(q->theTopic->ice_getIdentity());
- info.cost = q->cost;
- info.theTopic = q->theTopic;
- seq.push_back(info);
+ SubscriberRecord record = (*p)->record();
+ if(record.link && !(*p)->errored())
+ {
+ LinkInfo info;
+ info.name = identityToTopicName(record.theTopic->ice_getIdentity());
+ info.cost = record.cost;
+ info.theTopic = record.theTopic;
+ seq.push_back(info);
+ }
}
-
return seq;
}
void
-TopicI::destroy(const Ice::Current&)
+TopicImpl::destroy()
{
- IceUtil::RecMutex::Lock sync(_topicRecordMutex);
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
if(_destroyed)
{
@@ -502,208 +1027,514 @@ TopicI::destroy(const Ice::Current&)
}
_destroyed = true;
- try
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << ": destroy";
+ }
+
+ // destroyInternal clears out the topic content.
+ LogUpdate llu = {0,0};
+ _instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
+}
+
+TopicContent
+TopicImpl::getContent() const
+{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ TopicContent content;
+ content.id = _id;
+ for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ // Don't return errored subscribers (subscribers that have
+ // errored out, but not reaped due to a failure with the
+ // master). This means we can avoid the reaping step later.
+ if(!(*p)->errored())
+ {
+ content.records.push_back((*p)->record());
+ }
+ }
+ return content;
+}
+
+void
+TopicImpl::update(const SubscriberRecordSeq& records)
+{
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ // We do this with two scans. The first runs through the subscribers
+ // that we have and removes those not in the init list. The second
+ // runs through the init list and add the ones that don't
+ // exist.
+
{
- _instance->objectAdapter()->remove(_linkPrx->ice_getIdentity());
- _instance->objectAdapter()->remove(_publisherPrx->ice_getIdentity());
+ vector<SubscriberPtr>::iterator p = _subscribers.begin();
+ while(p != _subscribers.end())
+ {
+ SubscriberRecordSeq::const_iterator q;
+ for(q = records.begin(); q != records.end(); ++q)
+ {
+ if((*p)->id() == q->id)
+ {
+ break;
+ }
+ }
+ // The subscriber doesn't exist in the incoming subscriber
+ // set so destroy it.
+ if(q == records.end())
+ {
+ (*p)->destroy();
+ p = _subscribers.erase(p);
+ }
+ else
+ {
+ // Otherwise reset the reaped status if necessary.
+ (*p)->resetIfReaped();
+ ++p;
+ }
+ }
}
- catch(const Ice::ObjectAdapterDeactivatedException&)
+
+ for(SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
{
- // Ignore -- this could occur on shutdown.
+ vector<SubscriberPtr>::iterator q;
+ for(q = _subscribers.begin(); q != _subscribers.end(); ++q)
+ {
+ if((*q)->id() == p->id)
+ {
+ break;
+ }
+ }
+ if(q == _subscribers.end())
+ {
+ SubscriberPtr subscriber = Subscriber::create(_instance, *p);
+ _subscribers.push_back(subscriber);
+ }
}
}
bool
-TopicI::destroyed() const
+TopicImpl::destroyed() const
{
- IceUtil::RecMutex::Lock sync(_topicRecordMutex);
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
return _destroyed;
}
Ice::Identity
-TopicI::id() const
+TopicImpl::id() const
{
// immutable
return _id;
}
-void
-TopicI::reap()
+TopicPrx
+TopicImpl::proxy() const
{
- IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
- if(_destroyed)
+ // immutable
+ Ice::ObjectPrx prx;
+ if(_instance->topicReplicaProxy())
{
- return;
+ prx = _instance->topicReplicaProxy()->ice_identity(_id);
+ }
+ else
+ {
+ prx = _instance->topicAdapter()->createProxy(_id);
}
- bool updated = false;
+ return TopicPrx::uncheckedCast(prx);
+}
- //
- // Run through all invalid subscribers and remove them from the
- // database.
- //
- list<SubscriberPtr> error;
+namespace
+{
+
+class TopicInternal_reapI : public AMI_TopicInternal_reap
+{
+public:
+
+ TopicInternal_reapI(const InstancePtr& instance, Ice::Long generation) :
+ _instance(instance), _generation(generation)
{
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.swap(error);
}
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- for(list<SubscriberPtr>::const_iterator p = error.begin(); p != error.end(); ++p)
+ virtual void ice_response()
+ {
+ }
+
+ virtual void ice_exception(const Ice::Exception& ex)
+ {
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << "exception when calling `reap' on the master replica: " << ex;
+ }
+ _instance->node()->recovery(_generation);
+ }
+
+private:
+
+ const InstancePtr _instance;
+ const Ice::Long _generation;
+};
+
+}
+
+void
+TopicImpl::publish(bool forwarded, const EventDataSeq& events)
+{
+ TopicInternalPrx masterInternal;
+ Ice::Long generation = -1;
+ Ice::IdentitySeq reap;
{
- SubscriberPtr subscriber = *p;
- assert(subscriber->persistent()); // Only persistent subscribers need to be reaped.
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
- bool found = false;
//
- // If this turns out to be a performance problem then we
- // can create an in memory map cache.
+ // Copy of the subscriber list so that event publishing can occur
+ // in parallel.
//
- LinkRecordSeq::iterator q = _topicRecord.begin();
- while(q != _topicRecord.end())
+ vector<SubscriberPtr> copy;
{
- if(q->obj->ice_getIdentity() == subscriber->id())
- {
- _topicRecord.erase(q);
- updated = true;
- found = true;
- break;
- }
- ++q;
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ copy = _subscribers;
}
- if(traceLevels->topic > 0)
+
+ //
+ // Queue each event, gathering a list of those subscribers that
+ // must be reaped.
+ //
+ for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
{
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << "reaping " << _instance->communicator()->identityToString(subscriber->id());
- if(!found)
+ if(!(*p)->queue(forwarded, events) && (*p)->reap())
{
- out << ": failed - not in database";
+ reap.push_back((*p)->id());
}
}
+
+ // If there are no subscribers in error then we're done.
+ if(reap.empty())
+ {
+ return;
+ }
+ if(!unlock.getMaster())
+ {
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+ removeSubscribers(reap);
+ return;
+ }
+ masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id));
+ generation = unlock.generation();
}
- if(updated)
- {
- _topics.put(PersistentTopicMap::value_type(_id, _topicRecord));
- }
+
+
+ // Tell the master to reap this set of subscribers. This is an
+ // AMI invocation so it shouldn't block the caller (in the
+ // typical case) we do it outside of the mutex lock for
+ // performance reasons.
+ //
+ // We must release the cached lock before calling this as the AMI
+ // call may raise an exception in the caller (that is directly
+ // call ice_exception) which calls recover() on the node which
+ // would result in a deadlock since the node is locked.
+ masterInternal->reap_async(new TopicInternal_reapI(_instance, generation), reap);
}
void
-TopicI::publish(bool forwarded, const EventDataSeq& events)
+TopicImpl::observerAddSubscriber(const LogUpdate& llu, const SubscriberRecord& record)
{
- //
- // Copy of the subscriber list so that event publishing can occur
- // in parallel.
- //
- vector<SubscriberPtr> copy;
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- copy = _subscribers;
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << ": add replica observer: " << _instance->communicator()->identityToString(record.id);
+ if(traceLevels->topic > 1)
+ {
+ out << " QoS: ";
+ for(QoS::const_iterator p = record.theQoS.begin(); p != record.theQoS.end() ; ++p)
+ {
+ if(p != record.theQoS.begin())
+ {
+ out << ',';
+ }
+ out << '[' << p->first << "," << p->second << ']';
+ }
+ }
+ out << " llu: " << llu.generation << "/" << llu.iteration;
}
- //
- // Queue each event. This results in two lists -- one the list of
- // subscribers in error and the second a list of subscribers that
- // need to be flushed.
- //
- vector<Ice::Identity> e;
- list<SubscriberPtr> flush;
- for(vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
+ if(p != _subscribers.end())
{
- Subscriber::QueueState state = (*p)->queue(forwarded, events);
- switch(state)
+ // If the subscriber is already in the database display a
+ // diagnostic.
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
{
- case Subscriber::QueueStateError:
- e.push_back((*p)->id());
- break;
- case Subscriber::QueueStateFlush:
- flush.push_back(*p);
- break;
- case Subscriber::QueueStateNoFlush:
- break;
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _instance->communicator()->identityToString(record.id) << ": already subscribed";
}
+ return;
}
- //
- // Now we add each subscriber to be flushed to the flush manager.
- //
- if(!flush.empty())
+ SubscriberPtr subscriber = Subscriber::create(_instance, record);
+ for(;;)
{
- _instance->subscriberPool()->flush(flush);
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = subscriber->id();
+ _subscriberMap.put(SubscriberMap::value_type(key, record));
+
+ // Update the LLU.
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
}
- //
- // Run through the error list removing those subscribers that are
- // in error from the subscriber list.
- //
- list<SubscriberPtr> reap;
- if(!e.empty())
- {
- IceUtil::Mutex::Lock sync(_subscribersMutex);
- for(vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
- {
- //
- // Its possible for the subscriber to already have been
- // removed since the copy is iterated over outside of
- // mutex protection.
- //
- // Note that although this could be quicker if we used a
- // map, the most optimal case should be pushing around
- // events not searching for a particular subscriber.
- //
- // The subscriber is immediately destroyed & removed from
- // the _subscribers list. If the subscriber is persistent
- // its added to an list of error'd subscribers and removed
- // from the database on the next reap.
- //
- vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
- if(q != _subscribers.end())
+ _subscribers.push_back(subscriber);
+}
+
+void
+TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids)
+{
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->topic > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
+ out << _name << ": remove replica observer: ";
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ {
+ if(id != ids.begin())
{
- //
- // Destroy the subscriber in any case.
- //
- (*q)->destroy();
- if((*q)->persistent())
- {
- reap.push_back(*q);
- }
- _instance->subscriberPool()->remove(*q);
- _subscribers.erase(q);
+ out << ",";
}
+ out << _instance->communicator()->identityToString(*id);
+ }
+ out << " llu: " << llu.generation << "/" << llu.iteration;
+ }
+
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ // Remove the subscriber from the subscribers list. If the
+ // subscriber had a local failure and was removed from the
+ // subscriber list it could already be gone. That's not a problem.
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ {
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
+ if(p != _subscribers.end())
+ {
+ (*p)->destroy();
+ _subscribers.erase(p);
}
}
- if(!reap.empty())
+ // Next remove from the database.
+ for(;;)
{
- //
- // This is why _error is a list, so we can splice on the
- // reaped subscribers.
- //
- IceUtil::Mutex::Lock errorSync(_errorMutex);
- _error.splice(_error.begin(), reap);
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ {
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = *id;
+ SubscriberMap::iterator e = _subscriberMap.find(key);
+ if(e != _subscriberMap.end())
+ {
+ _subscriberMap.erase(e);
+ }
+ }
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
}
}
void
-TopicI::removeSubscriber(const Ice::ObjectPrx& obj)
+TopicImpl::observerDestroyTopic(const LogUpdate& llu)
{
- Ice::Identity id = obj->ice_getIdentity();
-
IceUtil::Mutex::Lock sync(_subscribersMutex);
- vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
- if(p != _subscribers.end())
+
+ if(_destroyed)
{
- (*p)->destroy();
- _instance->subscriberPool()->remove(*p);
- _subscribers.erase(p);
return;
}
-
- //
- // If the subscriber was not found then display a diagnostic.
- //
+ _destroyed = true;
+
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _instance->communicator()->identityToString(id) << ": not subscribed.";
+ out << _name << ": destroyed";
+ out << " llu: " << llu.generation << "/" << llu.iteration;
+ }
+ destroyInternal(llu, false);
+}
+
+Ice::ObjectPtr
+TopicImpl::getServant() const
+{
+ return _servant;
+}
+
+LogUpdate
+TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
+{
+ _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
+ _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
+
+ // Destroy each of the subscribers.
+ for(vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
+ {
+ (*p)->destroy();
}
+ _subscribers.clear();
+
+ // Clear out the database records related to this topic.
+ LogUpdate llu;
+ for(;;)
+ {
+ try
+ {
+ SubscriberRecordKey key;
+ key.topic = _id;
+
+ Freeze::TransactionHolder txn(_connection);
+ // Erase all subscriber records and the topic record.
+ SubscriberMap::iterator p = _subscriberMap.find(key);
+ while(p != _subscriberMap.end() && p->first.topic == key.topic)
+ {
+ _subscriberMap.erase(p++);
+ }
+
+ // Update the LLU.
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ if(master)
+ {
+ llu = ci->second;
+ llu.iteration++;
+ }
+ else
+ {
+ llu = origLLU;
+ }
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+
+ _instance->topicAdapter()->remove(_id);
+
+ _servant = 0;
+
+ return llu;
+}
+
+void
+TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
+{
+ Ice::IdentitySeq removed;
+
+ // First remove the subscriber from the subscribers list. Its
+ // possible that some of these subscribers have already been
+ // removed (consider, for example, a concurrent reap call from two
+ // replicas on the same subscriber). To avoid sending unnecessary
+ // observer updates keep track of the observers that are actually
+ // removed.
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ {
+ vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
+ if(p != _subscribers.end())
+ {
+ (*p)->destroy();
+ _subscribers.erase(p);
+ removed.push_back(*id);
+ }
+ }
+
+ // If there is no further work to do we are done.
+ if(removed.empty())
+ {
+ return;
+ }
+
+ // Next update the database and send the notification to any
+ // slaves.
+ LogUpdate llu;
+ for(;;)
+ {
+ try
+ {
+ Freeze::TransactionHolder txn(_connection);
+
+ for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
+ {
+ SubscriberRecordKey key;
+ key.topic = _id;
+ key.id = *id;
+ SubscriberMap::iterator e = _subscriberMap.find(key);
+ if(e != _subscriberMap.end())
+ {
+ _subscriberMap.erase(e);
+ }
+ }
+
+ LLUMap::iterator ci = _llumap.find("_manager");
+ assert(ci != _llumap.end());
+ llu = ci->second;
+ llu.iteration++;
+ ci.set(llu);
+ txn.commit();
+ break;
+ }
+ catch(const Freeze::DeadlockException&)
+ {
+ continue;
+ }
+ catch(const Freeze::DatabaseException& ex)
+ {
+ halt(_instance->communicator(), ex);
+ }
+ }
+
+ _instance->observers()->removeSubscriber(llu, _name, ids);
}