summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp215
1 files changed, 9 insertions, 206 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 422cafc5615..b00bbc2ab45 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -12,7 +12,6 @@
#include <IceStorm/Subscriber.h>
#include <IceStorm/TraceLevels.h>
#include <IceStorm/Event.h>
-#include <IceStorm/KeepAliveThread.h>
#include <IceStorm/SubscriberPool.h>
#include <Ice/LoggerUtil.h>
@@ -94,26 +93,6 @@ private:
const TopicIPtr _topic;
};
-class TopicUpstreamLinkI : public TopicUpstreamLink
-{
-public:
-
- TopicUpstreamLinkI(const SubscriberPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
-
- virtual void
- keepAlive(const Ice::Current&)
- {
- _subscriber->reachable();
- }
-
-private:
-
- const SubscriberPtr _subscriber;
-};
-
}
TopicI::TopicI(
@@ -127,7 +106,6 @@ TopicI::TopicI(
_connection(Freeze::createConnection(instance->communicator(), envName)),
_topics(_connection, dbName, false),
_topicRecord(topicRecord),
- _upstream(_connection, "upstream", false),
_destroyed(false)
{
//
@@ -161,38 +139,13 @@ TopicI::TopicI(
}
//
- // Create the subscriber object and the upstream servant and
- // add it to the set of subscribers.
+ // Create the subscriber object add it to the set of
+ // subscribers.
//
SubscriberPtr subscriber = Subscriber::create(_instance, p->second.obj, p->second.cost);
- TopicUpstreamLinkPrx upstream = TopicUpstreamLinkPrx::uncheckedCast(
- _instance->objectAdapter()->add(
- new TopicUpstreamLinkI(subscriber), p->second.upstream->ice_getIdentity()));
_subscribers.push_back(subscriber);
_instance->subscriberPool()->add(subscriber);
}
-
- PersistentUpstreamMap::const_iterator upI = _upstream.find(_name);
- if(upI != _upstream.end())
- {
- //
- // This record should really be there, but its possible for it
- // not to be in the event of a crash in between the add of the
- // topic record and the add of the upstream record.
- //
- _upstreamRecord = upI->second;
- }
-
- for(TopicUpstreamLinkPrxSeq::const_iterator q = _upstreamRecord.begin(); q != _upstreamRecord.end(); ++q)
- {
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " upstream " << _instance->communicator()->identityToString((*q)->ice_getIdentity());
- }
- _instance->keepAlive()->add(*q);
- }
}
string
@@ -340,8 +293,7 @@ TopicI::getLinkProxy(const Ice::Current&)
void
TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
{
- string name = topic->getName();
- TopicInternalPrx internal = TopicInternalPrx::checkedCast(topic);
+ TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
TopicLinkPrx link = internal->getLinkProxy();
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
@@ -352,6 +304,8 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
reap();
+ string name = topic->ice_getIdentity().name;
+
if(_topicRecord.find(name) != _topicRecord.end())
{
LinkExists ex;
@@ -367,47 +321,14 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
}
SubscriberPtr subscriber = Subscriber::create(_instance, link, cost);
- TopicUpstreamLinkPrx upstream;
- try
- {
- upstream = TopicUpstreamLinkPrx::uncheckedCast(
- _instance->objectAdapter()->addWithUUID(new TopicUpstreamLinkI(subscriber)));
- }
- catch(const Ice::ObjectAdapterDeactivatedException&)
- {
- subscriber->destroy();
- throw;
- }
//
- // Notify the downstream topic that it is now linked. For linking
- // we use the "notify & save" strategy. This is important because
- // if there is a failure after the notify and before the save then
- // the downstream service will detect that the topic upstream link
- // does not exist and correctly clean up. If we saved and then
- // notified we could have a downstream linked client that does not
- // know it is in fact linked -- and this is not easily detectable.
- //
- try
- {
- internal->linkNotification(_name, upstream);
- }
- catch(const Ice::Exception&)
- {
- // Cleanup.
- _instance->objectAdapter()->remove(upstream->ice_getIdentity());
- subscriber->destroy();
- throw;
- }
-
- //
// Create the LinkRecord
//
LinkRecord record;
record.obj = link;
record.cost = cost;
record.theTopic = topic;
- record.upstream = upstream;
//
// Save
@@ -423,12 +344,6 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
void
TopicI::unlink(const TopicPrx& topic, const Ice::Current& current)
{
- unlinkByName(topic->getName(), current);
-}
-
-void
-TopicI::unlinkByName(const string& name, const Ice::Current&)
-{
IceUtil::RecMutex::Lock topicSync(_topicRecordMutex);
if(_destroyed)
{
@@ -436,6 +351,8 @@ TopicI::unlinkByName(const string& name, const Ice::Current&)
}
reap();
+
+ string name = topic->ice_getIdentity().name;
LinkRecordDict::iterator q = _topicRecord.find(name);
if(q == _topicRecord.end())
@@ -452,18 +369,7 @@ TopicI::unlinkByName(const string& name, const Ice::Current&)
throw ex;
}
- //
- // First save and then notify. unlinking we use the "save &
- // notify" strategy. This is important because if there is a
- // failure after the save and before the notify then the
- // downstream service will detect that the topic upstream link
- // does not exist and correctly clean up.
- //
-
- //
- // Copy the record first because we use it after the save.
- //
- LinkRecord rec = q->second;
+ Ice::ObjectPrx subscriber = q->second.obj;
_topicRecord.erase(q);
//
@@ -471,40 +377,13 @@ TopicI::unlinkByName(const string& name, const Ice::Current&)
//
_topics.put(PersistentTopicMap::value_type(_name, _topicRecord));
- //
- // Remove the TopicUpstreamLink servant.
- //
- try
- {
- _instance->objectAdapter()->remove(rec.upstream->ice_getIdentity());
- }
- catch(const Ice::ObjectAdapterDeactivatedException&)
- {
- // Ignore -- this could occur on shutdown.
- }
-
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
out << _name << " unlink " << name;
}
- removeSubscriber(rec.obj);
-
- TopicInternalPrx internal = TopicInternalPrx::checkedCast(rec.theTopic);
- try
- {
- internal->unlinkNotification(_name, rec.upstream);
- }
- catch(const Ice::Exception& e)
- {
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlinkNotification failed: " << name << ": " << e;
- }
- // Ignore. This will be detected upon a restart.
- }
+ removeSubscriber(subscriber);
}
LinkInfoSeq
@@ -531,13 +410,7 @@ TopicI::getLinkInfoSeq(const Ice::Current&) const
void
TopicI::destroy(const Ice::Current&)
{
- //
- // This method must lock both the record mutexes otherwise we can
- // end up with the database record being re-added if
- // TopicManagerI::reap() is called concurrently with destroy/link.
- //
IceUtil::RecMutex::Lock sync(_topicRecordMutex);
- IceUtil::RecMutex::Lock sync2(_upstreamRecordMutex);
if(_destroyed)
{
@@ -556,59 +429,6 @@ TopicI::destroy(const Ice::Current&)
}
}
-void
-TopicI::linkNotification(const string& name, const TopicUpstreamLinkPrx& upstream, const Ice::Current&)
-{
- IceUtil::RecMutex::Lock topicSync(_upstreamRecordMutex);
- if(_destroyed)
- {
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " linkNotification " << name;
- }
-
- _upstreamRecord.push_back(upstream);
- _instance->keepAlive()->add(upstream);
-
- //
- // Save
- //
- _upstream.put(PersistentUpstreamMap::value_type(_name, _upstreamRecord));
-}
-
-void
-TopicI::unlinkNotification(const string& name, const TopicUpstreamLinkPrx& upstream, const Ice::Current&)
-{
- IceUtil::RecMutex::Lock topicSync(_upstreamRecordMutex);
- if(_destroyed)
- {
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->topic > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
- out << _name << " unlinkNotification " << name;
- }
-
- TopicUpstreamLinkPrxSeq::iterator p = find(_upstreamRecord.begin(), _upstreamRecord.end(), upstream);
- if(p != _upstreamRecord.end())
- {
- _upstreamRecord.erase(p);
- }
- _instance->keepAlive()->remove(upstream);
-
- //
- // Save
- //
- _upstream.put(PersistentUpstreamMap::value_type(_name, _upstreamRecord));
-}
-
bool
TopicI::destroyed() const
{
@@ -663,23 +483,6 @@ TopicI::reap()
_topics.put(PersistentTopicMap::value_type(_name, _topicRecord));
}
}
-
- //
- // Now reap any dead upstream topics.
- //
- {
- IceUtil::RecMutex::Lock topicSync(_upstreamRecordMutex);
- if(_destroyed)
- {
- return;
- }
-
- if(_instance->keepAlive()->filter(_upstreamRecord))
- {
- // Save.
- _upstream.put(PersistentUpstreamMap::value_type(_name, _upstreamRecord));
- }
- }
}
void