diff options
Diffstat (limited to 'cpp/src/IceStorm/Instance.cpp')
-rw-r--r-- | cpp/src/IceStorm/Instance.cpp | 214 |
1 files changed, 101 insertions, 113 deletions
diff --git a/cpp/src/IceStorm/Instance.cpp b/cpp/src/IceStorm/Instance.cpp index ad22c9151ad..f1b5e764fb0 100644 --- a/cpp/src/IceStorm/Instance.cpp +++ b/cpp/src/IceStorm/Instance.cpp @@ -27,109 +27,64 @@ extern IceDB::IceContext dbContext; void TopicReaper::add(const string& name) { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); _topics.push_back(name); } vector<string> TopicReaper::consumeReapedTopics() { - Lock sync(*this); + lock_guard<mutex> lg(_mutex); vector<string> reaped; reaped.swap(_topics); return reaped; } -PersistentInstance::PersistentInstance( - const string& instanceName, - const string& name, - const Ice::CommunicatorPtr& communicator, - const Ice::ObjectAdapterPtr& publishAdapter, - const Ice::ObjectAdapterPtr& topicAdapter, - const Ice::ObjectAdapterPtr& nodeAdapter, - const NodePrx& nodeProxy) : - Instance(instanceName, name, communicator, publishAdapter, topicAdapter, nodeAdapter, nodeProxy), - _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) + "/icedb.lock"), - _dbEnv(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name), 2, - IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize"))) -{ - try - { - dbContext.communicator = communicator; - dbContext.encoding.minor = 1; - dbContext.encoding.major = 1; - - IceDB::ReadWriteTxn txn(_dbEnv); - - _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE); - _subscriberMap = SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey); - - txn.commit(); - } - catch(...) - { - shutdown(); - destroy(); - - throw; - } -} - -void -PersistentInstance::destroy() -{ - _dbEnv.close(); - dbContext.communicator = 0; - - Instance::destroy(); -} - -Instance::Instance( - const string& instanceName, - const string& name, - const Ice::CommunicatorPtr& communicator, - const Ice::ObjectAdapterPtr& publishAdapter, - const Ice::ObjectAdapterPtr& topicAdapter, - const Ice::ObjectAdapterPtr& nodeAdapter, - const NodePrx& nodeProxy) : +Instance::Instance(const string& instanceName, + const string& name, + shared_ptr<Ice::Communicator> communicator, + shared_ptr<Ice::ObjectAdapter> publishAdapter, + shared_ptr<Ice::ObjectAdapter> topicAdapter, + shared_ptr<Ice::ObjectAdapter> nodeAdapter, + shared_ptr<NodePrx> nodeProxy) : _instanceName(instanceName), _serviceName(name), - _communicator(communicator), - _publishAdapter(publishAdapter), - _topicAdapter(topicAdapter), - _nodeAdapter(nodeAdapter), - _nodeProxy(nodeProxy), - _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())), - _discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault( - name + ".Discard.Interval", 60))), // default one minute. - _flushInterval(IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault( - name + ".Flush.Timeout", 1000))), // default one second. + _communicator(move(communicator)), + _publishAdapter(move(publishAdapter)), + _topicAdapter(move(topicAdapter)), + _nodeAdapter(move(nodeAdapter)), + _nodeProxy(move(nodeProxy)), + _traceLevels(make_shared<TraceLevels>(name, _communicator->getProperties(), _communicator->getLogger())), + // default one minute. + _discardInterval(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Discard.Interval", 60)), + // default one second. + _flushInterval(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Flush.Timeout", 1000)), // default one minute. - _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)), - _sendQueueSizeMax(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)), + _sendTimeout(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)), + _sendQueueSizeMax(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)), _sendQueueSizeMaxPolicy(RemoveSubscriber), - _topicReaper(new TopicReaper()) + _topicReaper(make_shared<TopicReaper>()), + _observers(make_shared<Observers>(_traceLevels)) { try { - __setNoDelete(true); - - Ice::PropertiesPtr properties = communicator->getProperties(); + auto properties = _communicator->getProperties(); if(properties->getProperty(name + ".TopicManager.AdapterId").empty()) { string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints"); if(!p.empty()) { - const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) = communicator->stringToProxy("dummy:" + p); + const_cast<shared_ptr<Ice::ObjectPrx>&>(_topicReplicaProxy) = + _communicator->stringToProxy("dummy:" + p); } p = properties->getProperty(name + ".ReplicatedPublishEndpoints"); if(!p.empty()) { - const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) = communicator->stringToProxy("dummy:" + p); + const_cast<shared_ptr<Ice::ObjectPrx>&>(_publisherReplicaProxy) = + _communicator->stringToProxy("dummy:" + p); } } - _observers = new Observers(this); - _batchFlusher = new IceUtil::Timer(); + _timer = new IceUtil::Timer(); string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy"); @@ -151,28 +106,28 @@ Instance::Instance( // If an Ice metrics observer is setup on the communicator, also // enable metrics for IceStorm. // - IceInternal::CommunicatorObserverIPtr o = - IceInternal::CommunicatorObserverIPtr::dynamicCast(communicator->getObserver()); + auto o = dynamic_pointer_cast<IceInternal::CommunicatorObserverI>(_communicator->getObserver()); if(o) { - _observer = new TopicManagerObserverI(o->getFacet()); + _observer = make_shared<TopicManagerObserverI>(o->getFacet()); } } - catch(...) + catch(const std::exception&) { shutdown(); destroy(); - __setNoDelete(false); - throw; } - __setNoDelete(false); +} + +Instance::~Instance() +{ } void -Instance::setNode(const NodeIPtr& node) +Instance::setNode(shared_ptr<NodeI> node) { - _node = node; + _node = move(node); } string @@ -187,109 +142,103 @@ Instance::serviceName() const return _serviceName; } -Ice::CommunicatorPtr +shared_ptr<Ice::Communicator> Instance::communicator() const { return _communicator; } -Ice::PropertiesPtr +shared_ptr<Ice::Properties> Instance::properties() const { return _communicator->getProperties(); } -Ice::ObjectAdapterPtr +shared_ptr<Ice::ObjectAdapter> Instance::publishAdapter() const { return _publishAdapter; } -Ice::ObjectAdapterPtr +shared_ptr<Ice::ObjectAdapter> Instance::topicAdapter() const { return _topicAdapter; } -Ice::ObjectAdapterPtr +shared_ptr<Ice::ObjectAdapter> Instance::nodeAdapter() const { return _nodeAdapter; } -ObserversPtr +shared_ptr<Observers> Instance::observers() const { return _observers; } -NodeIPtr +shared_ptr<NodeI> Instance::node() const { return _node; } -NodePrx +shared_ptr<NodePrx> Instance::nodeProxy() const { return _nodeProxy; } -TraceLevelsPtr +shared_ptr<TraceLevels> Instance::traceLevels() const { return _traceLevels; } IceUtil::TimerPtr -Instance::batchFlusher() const -{ - return _batchFlusher; -} - -IceUtil::TimerPtr Instance::timer() const { return _timer; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> Instance::topicReplicaProxy() const { return _topicReplicaProxy; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> Instance::publisherReplicaProxy() const { return _publisherReplicaProxy; } -IceStorm::Instrumentation::TopicManagerObserverPtr +shared_ptr<IceStorm::Instrumentation::TopicManagerObserver> Instance::observer() const { return _observer; } -IceStorm::TopicReaperPtr +shared_ptr<IceStorm::TopicReaper> Instance::topicReaper() const { return _topicReaper; } -IceUtil::Time +chrono::seconds Instance::discardInterval() const { return _discardInterval; } -IceUtil::Time +chrono::milliseconds Instance::flushInterval() const { return _flushInterval; } -int +chrono::milliseconds Instance::sendTimeout() const { return _sendTimeout; @@ -329,19 +278,58 @@ Instance::shutdown() void Instance::destroy() { - if(_batchFlusher) - { - _batchFlusher->destroy(); - } - // The node instance must be cleared as the node holds the // replica (TopicManager) which holds the instance causing a // cyclic reference. - _node = 0; + _node = nullptr; // // The observer instance must be cleared as it holds the - // TopicManagerImpl which hodlds the instance causing a + // TopicManagerImpl which holds the instance causing a // cyclic reference. // - _observer = 0; + _observer = nullptr; +} + +PersistentInstance::PersistentInstance(const string& instanceName, + const string& name, + shared_ptr<Ice::Communicator> communicator, + shared_ptr<Ice::ObjectAdapter> publishAdapter, + shared_ptr<Ice::ObjectAdapter> topicAdapter, + shared_ptr<Ice::ObjectAdapter> nodeAdapter, + shared_ptr<NodePrx> nodeProxy) : + Instance(instanceName, name, communicator, move(publishAdapter), move(topicAdapter), move(nodeAdapter), + move(nodeProxy)), + _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) + "/icedb.lock"), + _dbEnv(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name), 2, + IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize"))) +{ + try + { + dbContext.communicator = move(communicator); + dbContext.encoding.minor = 1; + dbContext.encoding.major = 1; + + IceDB::ReadWriteTxn txn(_dbEnv); + + _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE); + _subscriberMap = SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey); + + txn.commit(); + } + catch(const std::exception&) + { + shutdown(); + destroy(); + + throw; + } +} + +void +PersistentInstance::destroy() +{ + _dbEnv.close(); + dbContext.communicator = nullptr; + + Instance::destroy(); } |