diff options
author | Mark Spruiell <mes@zeroc.com> | 2003-05-16 18:48:45 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2003-05-16 18:48:45 +0000 |
commit | 1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7 (patch) | |
tree | 424b71a50fad11d1ffbd67a5ff1ad97635e23224 /cpp/src/IceStorm/OnewaySubscriber.cpp | |
parent | readline/ncurses update (diff) | |
download | ice-1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7.tar.bz2 ice-1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7.tar.xz ice-1b193fc1a1d8b3f21e0fb83b7dce5822ba9ae5c7.zip |
Changed the service to queue events to subscribers and links so that only
one thread at a time is delivering to a particular proxy. Event is now
reference-counted, and QueuedProxy was added to manage the event queue.
Before this change, a misbehaved subscriber could cause each invocation
thread in the service to attempt to reconnect to the subscriber.
Diffstat (limited to 'cpp/src/IceStorm/OnewaySubscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/OnewaySubscriber.cpp | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/cpp/src/IceStorm/OnewaySubscriber.cpp b/cpp/src/IceStorm/OnewaySubscriber.cpp index e3d709f4d0c..5b6ba0d47fa 100644 --- a/cpp/src/IceStorm/OnewaySubscriber.cpp +++ b/cpp/src/IceStorm/OnewaySubscriber.cpp @@ -14,19 +14,23 @@ #include <Ice/Ice.h> #include <IceStorm/OnewaySubscriber.h> +#include <IceStorm/SubscriberFactory.h> #include <IceStorm/TraceLevels.h> using namespace IceStorm; using namespace std; -OnewaySubscriber::OnewaySubscriber(const TraceLevelsPtr& traceLevels, const Ice::ObjectPrx& obj) : - Subscriber(traceLevels, obj->ice_getIdentity()), - _obj(obj) +OnewaySubscriber::OnewaySubscriber(const SubscriberFactoryPtr& factory, const TraceLevelsPtr& traceLevels, + const QueuedProxyPtr& obj) : + Subscriber(traceLevels, obj->proxy()->ice_getIdentity()), + _factory(factory), _obj(obj) { + _factory->incProxyUsageCount(_obj); } OnewaySubscriber::~OnewaySubscriber() { + _factory->decProxyUsageCount(_obj); } bool @@ -44,7 +48,7 @@ OnewaySubscriber::unsubscribe() if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << "Unsubscribe " << _obj->ice_getIdentity(); + out << "Unsubscribe " << id(); } } @@ -57,35 +61,34 @@ OnewaySubscriber::replace() if(_traceLevels->subscriber > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << "Replace " << _obj->ice_getIdentity(); + out << "Replace " << id(); } } void -OnewaySubscriber::publish(const Event& event) +OnewaySubscriber::publish(const EventPtr& event) { try { - std::vector< ::Ice::Byte> dummy; - _obj->ice_invoke(event.op, ::Ice::Idempotent, event.data, dummy, event.context); + _obj->publish(event); } catch(const Ice::LocalException& e) { IceUtil::Mutex::Lock sync(_stateMutex); - // - // It's possible that the subscriber was unsubscribed, or - // marked invalid by another thread. Don't display a - // diagnostic in this case. - // - if(_state == StateActive) - { - if(_traceLevels->subscriber > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); - out << _obj->ice_getIdentity() << ": publish failed: " << e; - } - _state = StateError; - } - } + // + // It's possible that the subscriber was unsubscribed, or + // marked invalid by another thread. Don't display a + // diagnostic in this case. + // + if(_state == StateActive) + { + if(_traceLevels->subscriber > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->subscriberCat); + out << id() << ": publish failed: " << e; + } + _state = StateError; + } + } } |