diff options
Diffstat (limited to 'cpp/src/IceStorm/Instance.cpp')
-rw-r--r-- | cpp/src/IceStorm/Instance.cpp | 147 |
1 files changed, 127 insertions, 20 deletions
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp index c99a2f19444..16ab7555954 100644 --- a/cpp/src/IceStorm/Instance.cpp +++ b/cpp/src/IceStorm/Instance.cpp @@ -8,40 +8,68 @@ // ********************************************************************** #include <IceStorm/Instance.h> -#include <IceStorm/BatchFlusher.h> #include <IceStorm/TraceLevels.h> -#include <IceStorm/SubscriberPool.h> +#include <IceStorm/Observers.h> +#include <IceStorm/NodeI.h> +#include <IceUtil/Timer.h> #include <Ice/Communicator.h> #include <Ice/Properties.h> -using namespace IceStorm; using namespace std; +using namespace IceStorm; +using namespace IceStormElection; Instance::Instance( const string& instanceName, const string& name, const Ice::CommunicatorPtr& communicator, - const Ice::ObjectAdapterPtr& adapter) : + const Ice::ObjectAdapterPtr& publishAdapter, + const Ice::ObjectAdapterPtr& topicAdapter, + bool iceGridDeployment, + const Ice::ObjectAdapterPtr& nodeAdapter, + const NodePrx& nodeProxy) : _instanceName(instanceName), + _serviceName(name), _communicator(communicator), - _adapter(adapter), + _publishAdapter(publishAdapter), + _topicAdapter(topicAdapter), + _nodeAdapter(nodeAdapter), + _nodeProxy(nodeProxy), _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())), _discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault( - "IceStorm.Discard.Interval", 60))), // default one minute. + name + ".Discard.Interval", 60))), // default one minute. + _flushInterval(IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault( + name + ".Flush.Timeout", 1000))), // default one second. // default one minute. - _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault("IceStorm.Send.Timeout", 60 * 1000)) + _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)) { try { __setNoDelete(true); - _batchFlusher = new BatchFlusher(this); - _subscriberPool = new SubscriberPool(this); + if(!iceGridDeployment) + { + Ice::PropertiesPtr properties = communicator->getProperties(); + string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints"); + if(!p.empty()) + { + const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) = communicator->stringToProxy("dummy:" + p); + } + p = properties->getProperty(name + ".ReplicatedPublishEndpoints"); + if(!p.empty()) + { + const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) = communicator->stringToProxy("dummy:" + p); + } + } + _observers = new Observers(this); + _batchFlusher = new IceUtil::Timer(); + _timer = new IceUtil::Timer(); } catch(...) { shutdown(); + destroy(); __setNoDelete(false); throw; @@ -51,6 +79,13 @@ Instance::Instance( Instance::~Instance() { + //cout << "~Instance" << endl; +} + +void +Instance::setNode(const NodeIPtr& node) +{ + _node = node; } string @@ -59,6 +94,12 @@ Instance::instanceName() const return _instanceName; } +string +Instance::serviceName() const +{ + return _serviceName; +} + Ice::CommunicatorPtr Instance::communicator() const { @@ -72,9 +113,39 @@ Instance::properties() const } Ice::ObjectAdapterPtr -Instance::objectAdapter() const +Instance::publishAdapter() const { - return _adapter; + return _publishAdapter; +} + +Ice::ObjectAdapterPtr +Instance::topicAdapter() const +{ + return _topicAdapter; +} + +Ice::ObjectAdapterPtr +Instance::nodeAdapter() const +{ + return _nodeAdapter; +} + +ObserversPtr +Instance::observers() const +{ + return _observers; +} + +NodeIPtr +Instance::node() const +{ + return _node; +} + +NodePrx +Instance::nodeProxy() const +{ + return _nodeProxy; } TraceLevelsPtr @@ -83,16 +154,28 @@ Instance::traceLevels() const return _traceLevels; } -BatchFlusherPtr +IceUtil::TimerPtr Instance::batchFlusher() const { return _batchFlusher; } -SubscriberPoolPtr -Instance::subscriberPool() const +IceUtil::TimerPtr +Instance::timer() const +{ + return _timer; +} + +Ice::ObjectPrx +Instance::topicReplicaProxy() const +{ + return _topicReplicaProxy; +} + +Ice::ObjectPrx +Instance::publisherReplicaProxy() const { - return _subscriberPool; + return _publisherReplicaProxy; } IceUtil::Time @@ -101,6 +184,12 @@ Instance::discardInterval() const return _discardInterval; } +IceUtil::Time +Instance::flushInterval() const +{ + return _flushInterval; +} + int Instance::sendTimeout() const { @@ -110,14 +199,32 @@ Instance::sendTimeout() const void Instance::shutdown() { - if(_batchFlusher) + if(_node) { - _batchFlusher->destroy(); - _batchFlusher->getThreadControl().join(); + _node->destroy(); + assert(_nodeAdapter); + _nodeAdapter->destroy(); + } + + _topicAdapter->destroy(); + _publishAdapter->destroy(); + + if(_timer) + { + _timer->destroy(); } +} - if(_subscriberPool) +void +Instance::destroy() +{ + if(_batchFlusher) { - _subscriberPool->destroy(); + _batchFlusher->destroy(); } + + // The node instance must be cleared as the node holds the + // replica (TopicManager) which holds the instance causing a + // cyclic reference. + _node = 0; } |