summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Instance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Instance.cpp')
-rw-r--r--cpp/src/IceStorm/Instance.cpp147
1 files changed, 127 insertions, 20 deletions
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp
index c99a2f19444..16ab7555954 100644
--- a/cpp/src/IceStorm/Instance.cpp
+++ b/cpp/src/IceStorm/Instance.cpp
@@ -8,40 +8,68 @@
// **********************************************************************
#include <IceStorm/Instance.h>
-#include <IceStorm/BatchFlusher.h>
#include <IceStorm/TraceLevels.h>
-#include <IceStorm/SubscriberPool.h>
+#include <IceStorm/Observers.h>
+#include <IceStorm/NodeI.h>
+#include <IceUtil/Timer.h>
#include <Ice/Communicator.h>
#include <Ice/Properties.h>
-using namespace IceStorm;
using namespace std;
+using namespace IceStorm;
+using namespace IceStormElection;
Instance::Instance(
const string& instanceName,
const string& name,
const Ice::CommunicatorPtr& communicator,
- const Ice::ObjectAdapterPtr& adapter) :
+ const Ice::ObjectAdapterPtr& publishAdapter,
+ const Ice::ObjectAdapterPtr& topicAdapter,
+ bool iceGridDeployment,
+ const Ice::ObjectAdapterPtr& nodeAdapter,
+ const NodePrx& nodeProxy) :
_instanceName(instanceName),
+ _serviceName(name),
_communicator(communicator),
- _adapter(adapter),
+ _publishAdapter(publishAdapter),
+ _topicAdapter(topicAdapter),
+ _nodeAdapter(nodeAdapter),
+ _nodeProxy(nodeProxy),
_traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
_discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault(
- "IceStorm.Discard.Interval", 60))), // default one minute.
+ name + ".Discard.Interval", 60))), // default one minute.
+ _flushInterval(IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault(
+ name + ".Flush.Timeout", 1000))), // default one second.
// default one minute.
- _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault("IceStorm.Send.Timeout", 60 * 1000))
+ _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000))
{
try
{
__setNoDelete(true);
- _batchFlusher = new BatchFlusher(this);
- _subscriberPool = new SubscriberPool(this);
+ if(!iceGridDeployment)
+ {
+ Ice::PropertiesPtr properties = communicator->getProperties();
+ string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints");
+ if(!p.empty())
+ {
+ const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) = communicator->stringToProxy("dummy:" + p);
+ }
+ p = properties->getProperty(name + ".ReplicatedPublishEndpoints");
+ if(!p.empty())
+ {
+ const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) = communicator->stringToProxy("dummy:" + p);
+ }
+ }
+ _observers = new Observers(this);
+ _batchFlusher = new IceUtil::Timer();
+ _timer = new IceUtil::Timer();
}
catch(...)
{
shutdown();
+ destroy();
__setNoDelete(false);
throw;
@@ -51,6 +79,13 @@ Instance::Instance(
Instance::~Instance()
{
+ //cout << "~Instance" << endl;
+}
+
+void
+Instance::setNode(const NodeIPtr& node)
+{
+ _node = node;
}
string
@@ -59,6 +94,12 @@ Instance::instanceName() const
return _instanceName;
}
+string
+Instance::serviceName() const
+{
+ return _serviceName;
+}
+
Ice::CommunicatorPtr
Instance::communicator() const
{
@@ -72,9 +113,39 @@ Instance::properties() const
}
Ice::ObjectAdapterPtr
-Instance::objectAdapter() const
+Instance::publishAdapter() const
{
- return _adapter;
+ return _publishAdapter;
+}
+
+Ice::ObjectAdapterPtr
+Instance::topicAdapter() const
+{
+ return _topicAdapter;
+}
+
+Ice::ObjectAdapterPtr
+Instance::nodeAdapter() const
+{
+ return _nodeAdapter;
+}
+
+ObserversPtr
+Instance::observers() const
+{
+ return _observers;
+}
+
+NodeIPtr
+Instance::node() const
+{
+ return _node;
+}
+
+NodePrx
+Instance::nodeProxy() const
+{
+ return _nodeProxy;
}
TraceLevelsPtr
@@ -83,16 +154,28 @@ Instance::traceLevels() const
return _traceLevels;
}
-BatchFlusherPtr
+IceUtil::TimerPtr
Instance::batchFlusher() const
{
return _batchFlusher;
}
-SubscriberPoolPtr
-Instance::subscriberPool() const
+IceUtil::TimerPtr
+Instance::timer() const
+{
+ return _timer;
+}
+
+Ice::ObjectPrx
+Instance::topicReplicaProxy() const
+{
+ return _topicReplicaProxy;
+}
+
+Ice::ObjectPrx
+Instance::publisherReplicaProxy() const
{
- return _subscriberPool;
+ return _publisherReplicaProxy;
}
IceUtil::Time
@@ -101,6 +184,12 @@ Instance::discardInterval() const
return _discardInterval;
}
+IceUtil::Time
+Instance::flushInterval() const
+{
+ return _flushInterval;
+}
+
int
Instance::sendTimeout() const
{
@@ -110,14 +199,32 @@ Instance::sendTimeout() const
void
Instance::shutdown()
{
- if(_batchFlusher)
+ if(_node)
{
- _batchFlusher->destroy();
- _batchFlusher->getThreadControl().join();
+ _node->destroy();
+ assert(_nodeAdapter);
+ _nodeAdapter->destroy();
+ }
+
+ _topicAdapter->destroy();
+ _publishAdapter->destroy();
+
+ if(_timer)
+ {
+ _timer->destroy();
}
+}
- if(_subscriberPool)
+void
+Instance::destroy()
+{
+ if(_batchFlusher)
{
- _subscriberPool->destroy();
+ _batchFlusher->destroy();
}
+
+ // The node instance must be cleared as the node holds the
+ // replica (TopicManager) which holds the instance causing a
+ // cyclic reference.
+ _node = 0;
}