summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2002-02-28 17:30:43 +0000
committerMatthew Newhook <matthew@zeroc.com>2002-02-28 17:30:43 +0000
commita0262246c57db3d224f2c6bc60b93b0af9ba091e (patch)
treee19a2b3ea3bddcd94c03e4fc5012fb4ad6a10328 /cpp/src
parentfixes (diff)
downloadice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.tar.bz2
ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.tar.xz
ice-a0262246c57db3d224f2c6bc60b93b0af9ba091e.zip
IceStorm changes
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceStorm/Flusher.cpp6
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.cpp13
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.h1
-rw-r--r--cpp/src/IceStorm/OnewayBatchSubscriber.cpp19
-rw-r--r--cpp/src/IceStorm/OnewayBatchSubscriber.h3
-rw-r--r--cpp/src/IceStorm/OnewaySubscriber.cpp13
-rw-r--r--cpp/src/IceStorm/OnewaySubscriber.h1
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp6
-rw-r--r--cpp/src/IceStorm/Subscriber.h67
-rw-r--r--cpp/src/IceStorm/TopicI.cpp179
-rw-r--r--cpp/src/IceStorm/TopicI.h4
-rw-r--r--cpp/src/IceStorm/TopicManagerI.cpp105
-rw-r--r--cpp/src/IceStorm/TopicManagerI.h5
13 files changed, 221 insertions, 201 deletions
diff --git a/cpp/src/IceStorm/Flusher.cpp b/cpp/src/IceStorm/Flusher.cpp
index 913612843c2..70558d76eac 100644
--- a/cpp/src/IceStorm/Flusher.cpp
+++ b/cpp/src/IceStorm/Flusher.cpp
@@ -120,12 +120,6 @@ private:
//IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
//
- // Using standard algorithms I don't think there is a way to
- // do this in one pass. For instance, I thought about using
- // remove_if - but the predicate needs to be a pure function
- // (see Meyers for details). If this is fixed then fix TopicI
- // also.
- //
// remove_if doesn't work with handle types. remove_if also
// isn't present in the STLport implementation
//
diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp
index b04cfd8f9cc..b13ee7ec91e 100644
--- a/cpp/src/IceStorm/LinkSubscriber.cpp
+++ b/cpp/src/IceStorm/LinkSubscriber.cpp
@@ -54,6 +54,19 @@ LinkSubscriber::unsubscribe()
}
void
+LinkSubscriber::replace()
+{
+ IceUtil::Mutex::Lock sync(_stateMutex);
+ _state = StateReplaced;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
+ out << "Replace " << _obj->ice_getIdentity();
+ }
+}
+
+void
LinkSubscriber::publish(const Event& event)
{
//
diff --git a/cpp/src/IceStorm/LinkSubscriber.h b/cpp/src/IceStorm/LinkSubscriber.h
index 41722cacfc1..00f62683f8f 100644
--- a/cpp/src/IceStorm/LinkSubscriber.h
+++ b/cpp/src/IceStorm/LinkSubscriber.h
@@ -29,6 +29,7 @@ public:
virtual bool persistent() const;
virtual bool inactive() const;
virtual void unsubscribe();
+ virtual void replace();
virtual void publish(const Event&);
virtual void flush();
diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
index 4a9915af023..e42a9728ef1 100644
--- a/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
+++ b/cpp/src/IceStorm/OnewayBatchSubscriber.cpp
@@ -48,6 +48,25 @@ OnewayBatchSubscriber::unsubscribe()
_flusher->remove(this);
}
+void
+OnewayBatchSubscriber::replace()
+{
+ IceUtil::Mutex::Lock sync(_stateMutex);
+ _state = StateReplaced;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
+ out << "Replace " << _obj->ice_getIdentity();
+ }
+
+ //
+ // If this subscriber has been registered with the flusher then
+ // remove it.
+ //
+ _flusher->remove(this);
+}
+
bool
OnewayBatchSubscriber::inactive() const
{
diff --git a/cpp/src/IceStorm/OnewayBatchSubscriber.h b/cpp/src/IceStorm/OnewayBatchSubscriber.h
index 3aaea29e47b..4b90d33d1a6 100644
--- a/cpp/src/IceStorm/OnewayBatchSubscriber.h
+++ b/cpp/src/IceStorm/OnewayBatchSubscriber.h
@@ -27,10 +27,11 @@ class OnewayBatchSubscriber : public OnewaySubscriber, public Flushable
{
public:
- OnewayBatchSubscriber(const TraceLevelsPtr& traceLevels, const FlusherPtr&, const Ice::ObjectPrx&);
+ OnewayBatchSubscriber(const TraceLevelsPtr&, const FlusherPtr&, const Ice::ObjectPrx&);
~OnewayBatchSubscriber();
virtual void unsubscribe();
+ virtual void replace();
virtual bool inactive() const;
virtual void flush();
diff --git a/cpp/src/IceStorm/OnewaySubscriber.cpp b/cpp/src/IceStorm/OnewaySubscriber.cpp
index b1007792355..5c547ad4f8f 100644
--- a/cpp/src/IceStorm/OnewaySubscriber.cpp
+++ b/cpp/src/IceStorm/OnewaySubscriber.cpp
@@ -46,6 +46,19 @@ OnewaySubscriber::unsubscribe()
}
void
+OnewaySubscriber::replace()
+{
+ IceUtil::Mutex::Lock sync(_stateMutex);
+ _state = StateReplaced;
+
+ if (_traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
+ out << "Replace " << _obj->ice_getIdentity();
+ }
+}
+
+void
OnewaySubscriber::publish(const Event& event)
{
try
diff --git a/cpp/src/IceStorm/OnewaySubscriber.h b/cpp/src/IceStorm/OnewaySubscriber.h
index 640532b1cab..032821f271a 100644
--- a/cpp/src/IceStorm/OnewaySubscriber.h
+++ b/cpp/src/IceStorm/OnewaySubscriber.h
@@ -25,6 +25,7 @@ public:
virtual bool persistent() const;
virtual void unsubscribe();
+ virtual void replace();
virtual void publish(const Event&);
protected:
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index ac9226f1f85..62bb7eaab8d 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -32,11 +32,11 @@ Subscriber::inactive() const
return _state != StateActive;;
}
-Subscriber::State
-Subscriber::state() const
+bool
+Subscriber::error() const
{
IceUtil::Mutex::Lock sync(_stateMutex);
- return _state;
+ return _state == StateError;
}
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h
index b2db80e3695..453c8b125b3 100644
--- a/cpp/src/IceStorm/Subscriber.h
+++ b/cpp/src/IceStorm/Subscriber.h
@@ -56,35 +56,15 @@ public:
virtual bool persistent() const = 0;
//
- // Subscriber state.
- //
- enum State
- {
- //
- // The Subscriber is active
- //
- StateActive,
- //
- // The Subscriber encountered an error during event
- // transmission
- //
- StateError,
- //
- // The Subscriber has been unsubscribed
- //
- StateUnsubscribed
- };
-
- //
- // This method is for ease of use with STL algorithms.
- // Return _state != StateActive
+ // Return true if the Subscriber is not active, false otherwise.
//
bool inactive() const;
//
- // Retrieve the state of the Subscriber.
+ // Retrieve true if the Subscriber is in the error state, false
+ // otherwise.
//
- State state() const;
+ bool error() const;
//
// Retrieve the identity of the Subscriber.
@@ -97,8 +77,13 @@ public:
virtual void unsubscribe() = 0;
//
- // Publish the given event. Mark the state as Error in the event
- // of a problem.
+ // Unsubscribe. Mark the state as Replaced.
+ //
+ virtual void replace() = 0;
+
+ //
+ // Publish the given event. Mark the state as Error in the event of
+ // a problem.
//
virtual void publish(const Event&) = 0;
@@ -107,16 +92,40 @@ protected:
// Immutable
TraceLevelsPtr _traceLevels;
+ //
+ // Subscriber state.
+ //
+ enum State
+ {
+ //
+ // The Subscriber is active.
+ //
+ StateActive,
+ //
+ // The Subscriber encountered an error during event
+ // transmission.
+ //
+ StateError,
+ //
+ // The Subscriber has been unsubscribed.
+ //
+ StateUnsubscribed,
+ //
+ // The Subscriber has been replaced.
+ //
+ StateReplaced
+ };
+
IceUtil::Mutex _stateMutex;
State _state;
private:
//
- // This id is the full id of the subscriber for a particular topic
- // (that is <prefix>#<topicname>
+ // This id is the full id of the subscriber for a particular topic.
+ //
+ // Immutable.
//
- // Immutable
Ice::Identity _id;
};
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index 1e35b5d9308..f4d93751563 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -105,7 +105,31 @@ public:
void
add(const SubscriberPtr& subscriber)
{
+ Ice::Identity id = subscriber->id();
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
+
+ //
+ // If a subscriber with this identity is already subscribed
+ // then mark the subscriber as replaced.
+ //
+ // Note that this doesn't actually remove the Subscribe from
+ // the list of subscribers - it marks the Subscriber as
+ // replaced, and it's removed on the next event publish.
+ //
+ for (SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
+ {
+ if ((*i)->id() == id)
+ {
+ //
+ // This marks the subscriber as invalid. It will be
+ // removed on the next event publish.
+ //
+ (*i)->replace();
+ break;
+ }
+ }
+
//
// Add to the set of subscribers
@@ -116,16 +140,17 @@ public:
//
// Unsubscribe the Subscriber with the given identity. Note that
// this doesn't remove the Subscriber from the list of subscribers
- // - it marks the Subscriber as Unsubscribed, and it's removed on
+ // - it marks the Subscriber as unsubscribed, and it's removed on
// the next event publish.
//
void
- unsubscribe(const Ice::Identity& id)
+ remove(const Ice::ObjectPrx& obj)
{
+ Ice::Identity id = obj->ice_getIdentity();
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
- SubscriberList::iterator i;
- for (i = _subscribers.begin() ; i != _subscribers.end(); ++i)
+ for (SubscriberList::iterator i = _subscribers.begin() ; i != _subscribers.end(); ++i)
{
if ((*i)->id() == id)
{
@@ -134,20 +159,17 @@ public:
// removed on the next event publish.
//
(*i)->unsubscribe();
- break;
+ return;
}
}
//
// If the subscriber was not found then display a diagnostic
//
- if (i == _subscribers.end())
+ if (_traceLevels->topic > 0)
{
- if (_traceLevels->topic > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
- out << id << ": not subscribed.";
- }
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
+ out << id << ": not subscribed.";
}
}
@@ -161,55 +183,61 @@ public:
// advantage that publishes can occur in parallel and less
// subscriber list iterations.
//
-
void
publish(const Event& event)
{
- IceUtil::Mutex::Lock sync(_subscribersMutex);
-
//
- // Using standard algorithms I don't think there is a way to
- // do this in one pass. For instance, I thought about using
- // remove_if - but the predicate needs to be a pure function
- // (see Meyers for details). If this is fixed then fix Flusher
- // also.
+ // Copy of the subscriber list so that event publishing can
+ // occur in parallel.
//
- // remove_if doesn't work with handle types. remove_if also
- // isn't present in the STLport implementation
+ // TODO: Find out whether this is a false optimization - how
+ // expensive is the cost of copying vs. lack of parallelism?
//
- // _subscribers.remove_if(IceUtil::constMemFun(&Subscriber::inactive));
- //
+ SubscriberList copy;
- //
- // Erase the inactive subscribers from the _subscribers
- // list. Copy the subscribers in error to the error list.
- //
- SubscriberList::iterator p = remove_if(_subscribers.begin(), _subscribers.end(),
- IceUtil::constMemFun(&Subscriber::inactive));
-
- if (p != _subscribers.end())
{
- IceUtil::Mutex::Lock errorSync(_errorMutex);
+ IceUtil::Mutex::Lock sync(_subscribersMutex);
+
//
- // Copy each of the invalid subscribers that was not
- // unsubscribed. Note that there is no copy_if algorithm.
+ // Copy of the subscribers that are in error.
//
- // Note that this could also be written in terms of splice
- // & remove_if.
+ SubscriberList e;
+
+ //
+ // Erase the inactive subscribers from the _subscribers
+ // list. Copy the subscribers in error to the error list.
//
+ SubscriberList::iterator p = _subscribers.begin();
while (p != _subscribers.end())
{
- if ((*p)->state() == Subscriber::StateError)
+ if ((*p)->inactive())
+ {
+ if ((*p)->error())
+ {
+ e.push_back(*p);
+ }
+
+ SubscriberList::iterator tmp = p;
+ ++p;
+ _subscribers.erase(tmp);
+ }
+ else
{
- _error.push_front(*p);
+ copy.push_back(*p);
+ ++p;
}
- _subscribers.erase(p++);
+ }
+
+ if (!e.empty())
+ {
+ IceUtil::Mutex::Lock errorSync(_errorMutex);
+ _error.splice(_error.begin(), e);
}
}
- for (SubscriberList::iterator i = _subscribers.begin(); i != _subscribers.end(); ++i)
+ for (SubscriberList::iterator p = copy.begin(); p != copy.end(); ++p)
{
- (*i)->publish(event);
+ (*p)->publish(event);
}
}
@@ -217,7 +245,7 @@ public:
// Clear & return the set of subscribers that are in error.
//
SubscriberList
- clearErrorList() const
+ clearErrorList()
{
//
// Uses splice for efficiency
@@ -229,17 +257,20 @@ public:
}
private:
-
+
TraceLevelsPtr _traceLevels;
+ //
+ // TODO: Should there be a map from identity to subscriber?
+ //
IceUtil::Mutex _subscribersMutex;
SubscriberList _subscribers;
//
- // Set of subscribers that encountered an error.
+ // Set of subscribers that have encountered an error.
//
IceUtil::Mutex _errorMutex;
- mutable SubscriberList _error;
+ SubscriberList _error;
};
} // End namespace IceStorm
@@ -319,7 +350,7 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
//
// Create a servant per Topic to receive linked event data. The
// servants Identity is category=<topicname>, name=link. Activate
- // the object and save a reference to give to linked topics..
+ // the object and save a reference to give to linked topics.
//
_link = new TopicLinkI(_subscribers);
@@ -327,16 +358,20 @@ TopicI::TopicI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& trace
_linkPrx = TopicLinkPrx::uncheckedCast(_adapter->add(_link, id));
//
- // Run through link database - re-establishing linked subscribers
+ // Run through link database re-establishing linked subscribers.
//
for (IdentityLinkDict::const_iterator p = _links.begin(); p != _links.end(); ++p)
{
if (_traceLevels->topic > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
- out << _name << " relink " << p->second.obj->ice_getIdentity();
+ out << _name << " relink " << p->first;
}
+ //
+ // Create the Subscriber object and add to the set of
+ // Subscribers.
+ //
SubscriberPtr subscriber = _factory->createLinkSubscriber(p->second.obj, p->second.info.cost);
_subscribers->add(subscriber);
}
@@ -411,12 +446,15 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
}
//
- // Retrieve the TopicLink
+ // Retrieve the TopicLink.
//
TopicInternalPrx internal = TopicInternalPrx::checkedCast(topic);
TopicLinkPrx link = internal->getLinkProxy();
Ice::Identity ident = link->ice_getIdentity();
+ //
+ // Create the LinkDB record.
+ //
LinkDB dbInfo;
dbInfo.obj = link;
dbInfo.info.topic = topic;
@@ -424,20 +462,9 @@ TopicI::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
dbInfo.info.cost = cost;
//
- // If the link already exists then remove the original subscriber.
- //
- // Note: If events arrive before the new subscriber is added then
- // they will be lost. An alternative to this strategy would be to
- // update the subscriber information.
+ // Create the Subscriber object and add to the setup of
+ // subscribers.
//
- IdentityLinkDict::iterator p = _links.find(ident);
- if (p != _links.end())
- {
- _subscribers->unsubscribe(ident);
- }
-
- _links.insert(make_pair(ident, dbInfo));
-
SubscriberPtr subscriber = _factory->createLinkSubscriber(dbInfo.obj, dbInfo.info.cost);
_subscribers->add(subscriber);
}
@@ -464,7 +491,7 @@ TopicI::unlink(const TopicPrx& topic, const Ice::Current&)
Ice::Trace out(_traceLevels->logger, _traceLevels->topicCat);
out << _name << " unlink " << topic->getName();
}
- _subscribers->unsubscribe(link->ice_getIdentity());
+ _subscribers->remove(link);
}
else
{
@@ -514,7 +541,7 @@ TopicI::destroyed() const
}
void
-TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos)
+TopicI::subscribe(const Ice::ObjectPrx& obj, const QoS& qos)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -526,23 +553,14 @@ TopicI::subscribe(const Ice::ObjectPrx& tmpl, const string& id, const QoS& qos)
reap();
//
- // Create the identity category=id, name=<topicname>.
- //
- Ice::Identity ident;
- ident.category = id;
- ident.name = _name;
- Ice::ObjectPrx obj = tmpl->ice_newIdentity(ident);
-
- //
- // Create the subscriber with this id and add to the set of
- // subscribers.
+ // Add this subscriber to the set of subscribers.
//
SubscriberPtr subscriber = _factory->createSubscriber(qos, obj);
_subscribers->add(subscriber);
}
void
-TopicI::unsubscribe(const string& id)
+TopicI::unsubscribe(const Ice::ObjectPrx& obj)
{
IceUtil::RecMutex::Lock sync(*this);
@@ -554,16 +572,9 @@ TopicI::unsubscribe(const string& id)
reap();
//
- // Create the identity category=id, name=<topicname>.
- //
- Ice::Identity ident;
- ident.category = id;
- ident.name = _name;
-
- //
// Unsubscribe the subscriber with this identity.
//
- _subscribers->unsubscribe(ident);
+ _subscribers->remove(obj);
}
void
@@ -584,7 +595,7 @@ TopicI::reap()
for (SubscriberList::iterator p = error.begin(); p != error.end(); ++p)
{
SubscriberPtr subscriber = *p;
- assert(subscriber->state() == Subscriber::StateError);
+ assert(subscriber->error());
if (subscriber->persistent())
{
if (_links.erase(subscriber->id()) > 0)
diff --git a/cpp/src/IceStorm/TopicI.h b/cpp/src/IceStorm/TopicI.h
index f920310bc7c..26e670203ea 100644
--- a/cpp/src/IceStorm/TopicI.h
+++ b/cpp/src/IceStorm/TopicI.h
@@ -55,8 +55,8 @@ public:
// Internal methods
bool destroyed() const;
- void subscribe(const Ice::ObjectPrx&, const std::string&, const QoS&);
- void unsubscribe(const std::string&);
+ void subscribe(const Ice::ObjectPrx&, const QoS&);
+ void unsubscribe(const Ice::ObjectPrx&);
void reap();
diff --git a/cpp/src/IceStorm/TopicManagerI.cpp b/cpp/src/IceStorm/TopicManagerI.cpp
index 9cf3616b867..8bac763e306 100644
--- a/cpp/src/IceStorm/TopicManagerI.cpp
+++ b/cpp/src/IceStorm/TopicManagerI.cpp
@@ -51,9 +51,8 @@ TopicManagerI::TopicManagerI(const Ice::CommunicatorPtr& communicator, const Ice
{
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << ex;
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << ex;
}
StringBoolDict::iterator tmp = p;
++p;
@@ -137,104 +136,66 @@ TopicManagerI::retrieveAll(const Ice::Current&)
}
void
-TopicManagerI::subscribe(const string& id, const QoS& qos, const StringSeq& topics, const Ice::ObjectPrx& tmpl,
- const Ice::Current&)
+TopicManagerI::subscribe(const QoS& qos, const Ice::ObjectPrx& subscriber, const Ice::Current&)
{
IceUtil::Mutex::Lock sync(*this);
+ Ice::Identity ident = subscriber->ice_getIdentity();
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << "Subscribe: " << id;
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << "Subscribe: " << Ice::identityToString(ident);
if (_traceLevels->topicMgr > 1)
{
- s << " QoS: ";
+ out << " QoS: ";
for (QoS::const_iterator qi = qos.begin(); qi != qos.end() ; ++qi)
{
if (qi != qos.begin())
{
- s << ',';
+ out << ',';
}
- s << '[' << qi->first << "," << qi->second << ']';
- }
- s << " Topics: ";
- for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
- {
- if (ti != topics.begin())
- {
- s << ",";
- }
- s << *ti;
+ out << '[' << qi->first << "," << qi->second << ']';
}
}
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
}
//
- // First scan the set of topics to ensure that each exists.
+ // Ensure that the identities category refers to an existing
+ // channel.
//
- // TODO: This could be slightly optimized by remembering the
- // TopicIPtr's so that the list doesn't need to scanned again.
- //
- StringSeq::const_iterator i;
- for (i = topics.begin() ; i != topics.end() ; ++i)
+ TopicIMap::iterator elem = _topicIMap.find(ident.category);
+ if (elem == _topicIMap.end())
{
- TopicIMap::iterator elem = _topicIMap.find(*i);
- if (elem == _topicIMap.end())
- {
- NoSuchTopic ex;
- ex.name = *i;
- throw ex;
- }
+ NoSuchTopic ex;
+ ex.name = ident.category;
+ throw ex;
}
//
- // Subscribe to each Topic.
+ // Subscribe to the topic.
//
- for (i = topics.begin() ; i != topics.end() ; ++i)
- {
- TopicIMap::iterator elem = _topicIMap.find(*i);
- if (elem != _topicIMap.end())
- {
- elem->second->subscribe(tmpl, id, qos);
- }
- }
+ elem->second->subscribe(subscriber, qos);
}
void
-TopicManagerI::unsubscribe(const string& id, const StringSeq& topics, const Ice::Current&)
+TopicManagerI::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
{
IceUtil::Mutex::Lock sync(*this);
+ Ice::Identity ident = subscriber->ice_getIdentity();
+
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << "Unsubscribe: " << id;
- if (_traceLevels->topicMgr > 1)
- {
- s << " Topics: ";
- for (StringSeq::const_iterator ti = topics.begin(); ti != topics.end() ; ++ti)
- {
- if (ti != topics.begin())
- {
- s << ",";
- }
- s << *ti;
- }
- }
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+
+ out << "Unsubscribe: " << Ice::identityToString(ident);
}
- //
- // Unsubscribe to each Topic.
- //
- for (StringSeq::const_iterator i = topics.begin() ; i != topics.end() ; ++i)
+
+ TopicIMap::iterator elem = _topicIMap.find(ident.category);
+ if (elem != _topicIMap.end())
{
- TopicIMap::iterator elem = _topicIMap.find(*i);
- if (elem != _topicIMap.end())
- {
- elem->second->unsubscribe(id);
- }
+ elem->second->unsubscribe(subscriber);
}
}
@@ -260,9 +221,8 @@ TopicManagerI::reap()
{
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << "Reaping " << i->first;
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << "Reaping " << i->first;
}
_topics.erase(i->first);
_topicIMap.erase(i++);
@@ -279,9 +239,8 @@ TopicManagerI::installTopic(const std::string& message, const std::string& name,
{
if (_traceLevels->topicMgr > 0)
{
- ostringstream s;
- s << message << ' ' << name;
- _communicator->getLogger()->trace(_traceLevels->topicMgrCat, s.str());
+ Ice::Trace out(_traceLevels->logger, _traceLevels->topicMgrCat);
+ out << message << ' ' << name;
}
// TODO: instance
diff --git a/cpp/src/IceStorm/TopicManagerI.h b/cpp/src/IceStorm/TopicManagerI.h
index 4df698d4743..0aa5b91f784 100644
--- a/cpp/src/IceStorm/TopicManagerI.h
+++ b/cpp/src/IceStorm/TopicManagerI.h
@@ -51,9 +51,8 @@ public:
virtual TopicPrx create(const std::string&, const Ice::Current&);
virtual TopicPrx retrieve(const std::string&, const Ice::Current&);
virtual TopicDict retrieveAll(const Ice::Current&);
- virtual void subscribe(const std::string&, const QoS&, const StringSeq&, const Ice::ObjectPrx&,
- const Ice::Current&);
- virtual void unsubscribe(const std::string&, const StringSeq&, const Ice::Current&);
+ virtual void subscribe(const QoS&, const Ice::ObjectPrx&, const Ice::Current&);
+ virtual void unsubscribe(const Ice::ObjectPrx&, const Ice::Current&);
virtual void shutdown(const Ice::Current&);
void reap();