diff options
author | Mark Spruiell <mes@zeroc.com> | 2003-05-16 18:48:45 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2003-05-16 18:48:45 +0000 |
commit | 1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7 (patch) | |
tree | 424b71a50fad11d1ffbd67a5ff1ad97635e23224 /cpp/src/IceStorm/LinkSubscriber.cpp | |
parent | readline/ncurses update (diff) | |
download | ice-1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7.tar.bz2 ice-1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7.tar.xz ice-1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7.zip |
Changed the service to queue events to subscribers and links so that only
one thread at a time is delivering to a particular proxy. Event is now
reference-counted, and QueuedProxy was added to manage the event queue.
Before this change, a misbehaved subscriber could cause each invocation
thread in the service to attempt to reconnect to the subscriber.
Diffstat (limited to 'cpp/src/IceStorm/LinkSubscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/LinkSubscriber.cpp | 31 |
1 files changed, 17 insertions, 14 deletions
diff --git a/cpp/src/IceStorm/LinkSubscriber.cpp b/cpp/src/IceStorm/LinkSubscriber.cpp index 31fe7f6051d..e753ad38e2b 100644 --- a/cpp/src/IceStorm/LinkSubscriber.cpp +++ b/cpp/src/IceStorm/LinkSubscriber.cpp @@ -14,20 +14,23 @@ #include <Ice/Ice.h> #include <IceStorm/LinkSubscriber.h> +#include <IceStorm/SubscriberFactory.h> #include <IceStorm/TraceLevels.h> using namespace IceStorm; using namespace std; -LinkSubscriber::LinkSubscriber(const TraceLevelsPtr& traceLevels, const TopicLinkPrx& obj, Ice::Int cost) : - Subscriber(traceLevels, obj->ice_getIdentity()), - _obj(TopicLinkPrx::uncheckedCast(obj->ice_batchOneway())), - _cost(cost) +LinkSubscriber::LinkSubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels, + const QueuedProxyPtr& obj, Ice::Int cost) : + Subscriber(traceLevels, obj->proxy()->ice_getIdentity()), + _factory(factory), _obj(obj), _cost(cost) { + _factory->incProxyUsageCount(_obj); } LinkSubscriber::~LinkSubscriber() { + _factory->decProxyUsageCount(_obj); } bool @@ -51,7 +54,7 @@ LinkSubscriber::unsubscribe() if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << "Unsubscribe " << _obj->ice_getIdentity(); + out << "Unsubscribe " << id(); } } @@ -64,26 +67,26 @@ LinkSubscriber::replace() if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << "Replace " << _obj->ice_getIdentity(); + out << "Replace " << id(); } } void -LinkSubscriber::publish(const Event& event) +LinkSubscriber::publish(const EventPtr& event) { // // Don't propagate a message that has already been forwarded. // Also, if this link has a non-zero cost, then don't propagate // a message whose cost exceeds the link cost. // - if(event.forwarded || (_cost > 0 && event.cost > _cost)) + if(event->forwarded || (_cost > 0 && event->cost > _cost)) { return; } try { - _obj->forward(event.op, event.mode, event.data, event.context); + _obj->publish(event); } catch(const Ice::ObjectNotExistException& e) { @@ -96,7 +99,7 @@ LinkSubscriber::publish(const Event& event) if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << _obj->ice_getIdentity() << ": link topic publish failed: " << e; + out << id() << ": link topic publish failed: " << e; } } catch(const Ice::LocalException& e) @@ -104,7 +107,7 @@ LinkSubscriber::publish(const Event& event) if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << _obj->ice_getIdentity() << ": link topic publish failed: " << e; + out << id() << ": link topic publish failed: " << e; } } } @@ -114,7 +117,7 @@ LinkSubscriber::flush() { try { - _obj->ice_flush(); + _obj->proxy()->ice_flush(); } catch(const Ice::ObjectNotExistException& e) { @@ -127,7 +130,7 @@ LinkSubscriber::flush() if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << _obj->ice_getIdentity() << ": link topic flush failed: " << e; + out << id() << ": link topic flush failed: " << e; } } catch(const Ice::LocalException& e) @@ -135,7 +138,7 @@ LinkSubscriber::flush() if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << _obj->ice_getIdentity() << ": link topic flush failed: " << e; + out << id() << ": link topic flush failed: " << e; } } } |