summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/LinkSubscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/LinkSubscriber.cpp')
-rw-r--r--cpp/src/IceStorm/LinkSubscriber.cpp31
1 files changed, 17 insertions, 14 deletions
diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp
index 31fe7f6051d..e753ad38e2b 100644
--- a/cpp/src/IceStorm/LinkSubscriber.cpp
+++ b/cpp/src/IceStorm/LinkSubscriber.cpp
@@ -14,20 +14,23 @@
#include <Ice/Ice.h>
#include <IceStorm/LinkSubscriber.h>
+#include <IceStorm/SubscriberFactory.h>
#include <IceStorm/TraceLevels.h>
using namespace IceStorm;
using namespace std;
-LinkSubscriber::LinkSubscriber(const TraceLevelsPtr& traceLevels, const TopicLinkPrx& obj, Ice::Int cost) :
- Subscriber(traceLevels, obj->ice_getIdentity()),
- _obj(TopicLinkPrx::uncheckedCast(obj->ice_batchOneway())),
- _cost(cost)
+LinkSubscriber::LinkSubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels,
+ const QueuedProxyPtr& obj, Ice::Int cost) :
+ Subscriber(traceLevels, obj->proxy()->ice_getIdentity()),
+ _factory(factory), _obj(obj), _cost(cost)
{
+ _factory->incProxyUsageCount(_obj);
}
LinkSubscriber::~LinkSubscriber()
{
+ _factory->decProxyUsageCount(_obj);
}
bool
@@ -51,7 +54,7 @@ LinkSubscriber::unsubscribe()
if(_traceLevels->subscriber > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << "Unsubscribe " << _obj->ice_getIdentity();
+ out << "Unsubscribe " << id();
}
}
@@ -64,26 +67,26 @@ LinkSubscriber::replace()
if(_traceLevels->subscriber > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << "Replace " << _obj->ice_getIdentity();
+ out << "Replace " << id();
}
}
void
-LinkSubscriber::publish(const Event& event)
+LinkSubscriber::publish(const EventPtr& event)
{
//
// Don't propagate a message that has already been forwarded.
// Also, if this link has a non-zero cost, then don't propagate
// a message whose cost exceeds the link cost.
//
- if(event.forwarded || (_cost > 0 && event.cost > _cost))
+ if(event->forwarded || (_cost > 0 && event->cost > _cost))
{
return;
}
try
{
- _obj->forward(event.op, event.mode, event.data, event.context);
+ _obj->publish(event);
}
catch(const Ice::ObjectNotExistException& e)
{
@@ -96,7 +99,7 @@ LinkSubscriber::publish(const Event& event)
if(_traceLevels->subscriber > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << _obj->ice_getIdentity() << ": link topic publish failed: " << e;
+ out << id() << ": link topic publish failed: " << e;
}
}
catch(const Ice::LocalException& e)
@@ -104,7 +107,7 @@ LinkSubscriber::publish(const Event& event)
if(_traceLevels->subscriber > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << _obj->ice_getIdentity() << ": link topic publish failed: " << e;
+ out << id() << ": link topic publish failed: " << e;
}
}
}
@@ -114,7 +117,7 @@ LinkSubscriber::flush()
{
try
{
- _obj->ice_flush();
+ _obj->proxy()->ice_flush();
}
catch(const Ice::ObjectNotExistException& e)
{
@@ -127,7 +130,7 @@ LinkSubscriber::flush()
if(_traceLevels->subscriber > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << _obj->ice_getIdentity() << ": link topic flush failed: " << e;
+ out << id() << ": link topic flush failed: " << e;
}
}
catch(const Ice::LocalException& e)
@@ -135,7 +138,7 @@ LinkSubscriber::flush()
if(_traceLevels->subscriber > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat);
- out << _obj->ice_getIdentity() << ": link topic flush failed: " << e;
+ out << id() << ": link topic flush failed: " << e;
}
}
}