summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Instance.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Instance.cpp')
-rw-r--r--cpp/src/IceStorm/Instance.cpp214
1 files changed, 101 insertions, 113 deletions
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp
index ad22c9151ad..f1b5e764fb0 100644
--- a/cpp/src/IceStorm/Instance.cpp
+++ b/cpp/src/IceStorm/Instance.cpp
@@ -27,109 +27,64 @@ extern IceDB::IceContext dbContext;
void
TopicReaper::add(const string& name)
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
_topics.push_back(name);
}
vector<string>
TopicReaper::consumeReapedTopics()
{
- Lock sync(*this);
+ lock_guard<mutex> lg(_mutex);
vector<string> reaped;
reaped.swap(_topics);
return reaped;
}
-PersistentInstance::PersistentInstance(
- const string& instanceName,
- const string& name,
- const Ice::CommunicatorPtr& communicator,
- const Ice::ObjectAdapterPtr& publishAdapter,
- const Ice::ObjectAdapterPtr& topicAdapter,
- const Ice::ObjectAdapterPtr& nodeAdapter,
- const NodePrx& nodeProxy) :
- Instance(instanceName, name, communicator, publishAdapter, topicAdapter, nodeAdapter, nodeProxy),
- _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) + "/icedb.lock"),
- _dbEnv(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name), 2,
- IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize")))
-{
- try
- {
- dbContext.communicator = communicator;
- dbContext.encoding.minor = 1;
- dbContext.encoding.major = 1;
-
- IceDB::ReadWriteTxn txn(_dbEnv);
-
- _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE);
- _subscriberMap = SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey);
-
- txn.commit();
- }
- catch(...)
- {
- shutdown();
- destroy();
-
- throw;
- }
-}
-
-void
-PersistentInstance::destroy()
-{
- _dbEnv.close();
- dbContext.communicator = 0;
-
- Instance::destroy();
-}
-
-Instance::Instance(
- const string& instanceName,
- const string& name,
- const Ice::CommunicatorPtr& communicator,
- const Ice::ObjectAdapterPtr& publishAdapter,
- const Ice::ObjectAdapterPtr& topicAdapter,
- const Ice::ObjectAdapterPtr& nodeAdapter,
- const NodePrx& nodeProxy) :
+Instance::Instance(const string& instanceName,
+ const string& name,
+ shared_ptr<Ice::Communicator> communicator,
+ shared_ptr<Ice::ObjectAdapter> publishAdapter,
+ shared_ptr<Ice::ObjectAdapter> topicAdapter,
+ shared_ptr<Ice::ObjectAdapter> nodeAdapter,
+ shared_ptr<NodePrx> nodeProxy) :
_instanceName(instanceName),
_serviceName(name),
- _communicator(communicator),
- _publishAdapter(publishAdapter),
- _topicAdapter(topicAdapter),
- _nodeAdapter(nodeAdapter),
- _nodeProxy(nodeProxy),
- _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
- _discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault(
- name + ".Discard.Interval", 60))), // default one minute.
- _flushInterval(IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault(
- name + ".Flush.Timeout", 1000))), // default one second.
+ _communicator(move(communicator)),
+ _publishAdapter(move(publishAdapter)),
+ _topicAdapter(move(topicAdapter)),
+ _nodeAdapter(move(nodeAdapter)),
+ _nodeProxy(move(nodeProxy)),
+ _traceLevels(make_shared<TraceLevels>(name, _communicator->getProperties(), _communicator->getLogger())),
+ // default one minute.
+ _discardInterval(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Discard.Interval", 60)),
+ // default one second.
+ _flushInterval(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Flush.Timeout", 1000)),
// default one minute.
- _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)),
- _sendQueueSizeMax(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)),
+ _sendTimeout(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)),
+ _sendQueueSizeMax(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)),
_sendQueueSizeMaxPolicy(RemoveSubscriber),
- _topicReaper(new TopicReaper())
+ _topicReaper(make_shared<TopicReaper>()),
+ _observers(make_shared<Observers>(_traceLevels))
{
try
{
- __setNoDelete(true);
-
- Ice::PropertiesPtr properties = communicator->getProperties();
+ auto properties = _communicator->getProperties();
if(properties->getProperty(name + ".TopicManager.AdapterId").empty())
{
string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints");
if(!p.empty())
{
- const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) = communicator->stringToProxy("dummy:" + p);
+ const_cast<shared_ptr<Ice::ObjectPrx>&>(_topicReplicaProxy) =
+ _communicator->stringToProxy("dummy:" + p);
}
p = properties->getProperty(name + ".ReplicatedPublishEndpoints");
if(!p.empty())
{
- const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) = communicator->stringToProxy("dummy:" + p);
+ const_cast<shared_ptr<Ice::ObjectPrx>&>(_publisherReplicaProxy) =
+ _communicator->stringToProxy("dummy:" + p);
}
}
- _observers = new Observers(this);
- _batchFlusher = new IceUtil::Timer();
+
_timer = new IceUtil::Timer();
string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy");
@@ -151,28 +106,28 @@ Instance::Instance(
// If an Ice metrics observer is setup on the communicator, also
// enable metrics for IceStorm.
//
- IceInternal::CommunicatorObserverIPtr o =
- IceInternal::CommunicatorObserverIPtr::dynamicCast(communicator->getObserver());
+ auto o = dynamic_pointer_cast<IceInternal::CommunicatorObserverI>(_communicator->getObserver());
if(o)
{
- _observer = new TopicManagerObserverI(o->getFacet());
+ _observer = make_shared<TopicManagerObserverI>(o->getFacet());
}
}
- catch(...)
+ catch(const std::exception&)
{
shutdown();
destroy();
- __setNoDelete(false);
-
throw;
}
- __setNoDelete(false);
+}
+
+Instance::~Instance()
+{
}
void
-Instance::setNode(const NodeIPtr& node)
+Instance::setNode(shared_ptr<NodeI> node)
{
- _node = node;
+ _node = move(node);
}
string
@@ -187,109 +142,103 @@ Instance::serviceName() const
return _serviceName;
}
-Ice::CommunicatorPtr
+shared_ptr<Ice::Communicator>
Instance::communicator() const
{
return _communicator;
}
-Ice::PropertiesPtr
+shared_ptr<Ice::Properties>
Instance::properties() const
{
return _communicator->getProperties();
}
-Ice::ObjectAdapterPtr
+shared_ptr<Ice::ObjectAdapter>
Instance::publishAdapter() const
{
return _publishAdapter;
}
-Ice::ObjectAdapterPtr
+shared_ptr<Ice::ObjectAdapter>
Instance::topicAdapter() const
{
return _topicAdapter;
}
-Ice::ObjectAdapterPtr
+shared_ptr<Ice::ObjectAdapter>
Instance::nodeAdapter() const
{
return _nodeAdapter;
}
-ObserversPtr
+shared_ptr<Observers>
Instance::observers() const
{
return _observers;
}
-NodeIPtr
+shared_ptr<NodeI>
Instance::node() const
{
return _node;
}
-NodePrx
+shared_ptr<NodePrx>
Instance::nodeProxy() const
{
return _nodeProxy;
}
-TraceLevelsPtr
+shared_ptr<TraceLevels>
Instance::traceLevels() const
{
return _traceLevels;
}
IceUtil::TimerPtr
-Instance::batchFlusher() const
-{
- return _batchFlusher;
-}
-
-IceUtil::TimerPtr
Instance::timer() const
{
return _timer;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
Instance::topicReplicaProxy() const
{
return _topicReplicaProxy;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
Instance::publisherReplicaProxy() const
{
return _publisherReplicaProxy;
}
-IceStorm::Instrumentation::TopicManagerObserverPtr
+shared_ptr<IceStorm::Instrumentation::TopicManagerObserver>
Instance::observer() const
{
return _observer;
}
-IceStorm::TopicReaperPtr
+shared_ptr<IceStorm::TopicReaper>
Instance::topicReaper() const
{
return _topicReaper;
}
-IceUtil::Time
+chrono::seconds
Instance::discardInterval() const
{
return _discardInterval;
}
-IceUtil::Time
+chrono::milliseconds
Instance::flushInterval() const
{
return _flushInterval;
}
-int
+chrono::milliseconds
Instance::sendTimeout() const
{
return _sendTimeout;
@@ -329,19 +278,58 @@ Instance::shutdown()
void
Instance::destroy()
{
- if(_batchFlusher)
- {
- _batchFlusher->destroy();
- }
-
// The node instance must be cleared as the node holds the
// replica (TopicManager) which holds the instance causing a
// cyclic reference.
- _node = 0;
+ _node = nullptr;
//
// The observer instance must be cleared as it holds the
- // TopicManagerImpl which hodlds the instance causing a
+ // TopicManagerImpl which holds the instance causing a
// cyclic reference.
//
- _observer = 0;
+ _observer = nullptr;
+}
+
+PersistentInstance::PersistentInstance(const string& instanceName,
+ const string& name,
+ shared_ptr<Ice::Communicator> communicator,
+ shared_ptr<Ice::ObjectAdapter> publishAdapter,
+ shared_ptr<Ice::ObjectAdapter> topicAdapter,
+ shared_ptr<Ice::ObjectAdapter> nodeAdapter,
+ shared_ptr<NodePrx> nodeProxy) :
+ Instance(instanceName, name, communicator, move(publishAdapter), move(topicAdapter), move(nodeAdapter),
+ move(nodeProxy)),
+ _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) + "/icedb.lock"),
+ _dbEnv(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name), 2,
+ IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize")))
+{
+ try
+ {
+ dbContext.communicator = move(communicator);
+ dbContext.encoding.minor = 1;
+ dbContext.encoding.major = 1;
+
+ IceDB::ReadWriteTxn txn(_dbEnv);
+
+ _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE);
+ _subscriberMap = SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey);
+
+ txn.commit();
+ }
+ catch(const std::exception&)
+ {
+ shutdown();
+ destroy();
+
+ throw;
+ }
+}
+
+void
+PersistentInstance::destroy()
+{
+ _dbEnv.close();
+ dbContext.communicator = nullptr;
+
+ Instance::destroy();
}