// // Copyright (c) ZeroC, Inc. All rights reserved. // #include #include #include #include #include #include #include #include #include #include using namespace std; using namespace IceStorm; using namespace IceStormElection; using namespace IceStormInternal; namespace IceStormInternal { extern IceDB::IceContext dbContext; } void TopicReaper::add(const string& name) { lock_guard lg(_mutex); _topics.push_back(name); } vector TopicReaper::consumeReapedTopics() { lock_guard lg(_mutex); vector reaped; reaped.swap(_topics); return reaped; } Instance::Instance(const string& instanceName, const string& name, shared_ptr communicator, shared_ptr publishAdapter, shared_ptr topicAdapter, shared_ptr nodeAdapter, shared_ptr nodeProxy) : _instanceName(instanceName), _serviceName(name), _communicator(move(communicator)), _publishAdapter(move(publishAdapter)), _topicAdapter(move(topicAdapter)), _nodeAdapter(move(nodeAdapter)), _nodeProxy(move(nodeProxy)), _traceLevels(make_shared(name, _communicator->getProperties(), _communicator->getLogger())), // default one minute. _discardInterval(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Discard.Interval", 60)), // default one second. _flushInterval(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Flush.Timeout", 1000)), // default one minute. _sendTimeout(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)), _sendQueueSizeMax(_communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)), _sendQueueSizeMaxPolicy(RemoveSubscriber), _topicReaper(make_shared()), _observers(make_shared(_traceLevels)) { try { auto properties = _communicator->getProperties(); if(properties->getProperty(name + ".TopicManager.AdapterId").empty()) { string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints"); if(!p.empty()) { const_cast&>(_topicReplicaProxy) = _communicator->stringToProxy("dummy:" + p); } p = properties->getProperty(name + ".ReplicatedPublishEndpoints"); if(!p.empty()) { const_cast&>(_publisherReplicaProxy) = _communicator->stringToProxy("dummy:" + p); } } _timer = new IceUtil::Timer(); string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy"); if(policy == "RemoveSubscriber") { const_cast(_sendQueueSizeMaxPolicy) = RemoveSubscriber; } else if(policy == "DropEvents") { const_cast(_sendQueueSizeMaxPolicy) = DropEvents; } else if(!policy.empty()) { Ice::Warning warn(_traceLevels->logger); warn << "invalid value `" << policy << "' for `" << name << ".Send.QueueSizeMaxPolicy'"; } // // If an Ice metrics observer is setup on the communicator, also // enable metrics for IceStorm. // auto o = dynamic_pointer_cast(_communicator->getObserver()); if(o) { _observer = make_shared(o->getFacet()); } } catch(const std::exception&) { shutdown(); destroy(); throw; } } Instance::~Instance() { } void Instance::setNode(shared_ptr node) { _node = move(node); } string Instance::instanceName() const { return _instanceName; } string Instance::serviceName() const { return _serviceName; } shared_ptr Instance::communicator() const { return _communicator; } shared_ptr Instance::properties() const { return _communicator->getProperties(); } shared_ptr Instance::publishAdapter() const { return _publishAdapter; } shared_ptr Instance::topicAdapter() const { return _topicAdapter; } shared_ptr Instance::nodeAdapter() const { return _nodeAdapter; } shared_ptr Instance::observers() const { return _observers; } shared_ptr Instance::node() const { return _node; } shared_ptr Instance::nodeProxy() const { return _nodeProxy; } shared_ptr Instance::traceLevels() const { return _traceLevels; } IceUtil::TimerPtr Instance::timer() const { return _timer; } shared_ptr Instance::topicReplicaProxy() const { return _topicReplicaProxy; } shared_ptr Instance::publisherReplicaProxy() const { return _publisherReplicaProxy; } shared_ptr Instance::observer() const { return _observer; } shared_ptr Instance::topicReaper() const { return _topicReaper; } chrono::seconds Instance::discardInterval() const { return _discardInterval; } chrono::milliseconds Instance::flushInterval() const { return _flushInterval; } chrono::milliseconds Instance::sendTimeout() const { return _sendTimeout; } int Instance::sendQueueSizeMax() const { return _sendQueueSizeMax; } Instance::SendQueueSizeMaxPolicy Instance::sendQueueSizeMaxPolicy() const { return _sendQueueSizeMaxPolicy; } void Instance::shutdown() { if(_node) { _node->destroy(); assert(_nodeAdapter); _nodeAdapter->destroy(); } _topicAdapter->destroy(); _publishAdapter->destroy(); if(_timer) { _timer->destroy(); } } void Instance::destroy() { // The node instance must be cleared as the node holds the // replica (TopicManager) which holds the instance causing a // cyclic reference. _node = nullptr; // // The observer instance must be cleared as it holds the // TopicManagerImpl which holds the instance causing a // cyclic reference. // _observer = nullptr; } PersistentInstance::PersistentInstance(const string& instanceName, const string& name, shared_ptr communicator, shared_ptr publishAdapter, shared_ptr topicAdapter, shared_ptr nodeAdapter, shared_ptr nodeProxy) : Instance(instanceName, name, communicator, move(publishAdapter), move(topicAdapter), move(nodeAdapter), move(nodeProxy)), _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) + "/icedb.lock"), _dbEnv(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name), 2, IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize"))) { try { dbContext.communicator = move(communicator); dbContext.encoding.minor = 1; dbContext.encoding.major = 1; IceDB::ReadWriteTxn txn(_dbEnv); _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE); _subscriberMap = SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey); txn.commit(); } catch(const std::exception&) { shutdown(); destroy(); throw; } } void PersistentInstance::destroy() { _dbEnv.close(); dbContext.communicator = nullptr; Instance::destroy(); }