diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeCache.cpp | 826 |
1 files changed, 351 insertions, 475 deletions
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index b727edb4966..8d397255f72 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -2,7 +2,6 @@ // Copyright (c) ZeroC, Inc. All rights reserved. // -#include <IceUtil/Functional.h> #include <Ice/Communicator.h> #include <Ice/Properties.h> #include <Ice/LoggerUtil.h> @@ -16,361 +15,122 @@ using namespace std; using namespace IceGrid; -namespace IceGrid +namespace { -struct ToInternalServerDescriptor : std::unary_function<CommunicatorDescriptorPtr&, void> +PropertyDescriptor +removeProperty(PropertyDescriptorSeq& properties, const string& name) { - ToInternalServerDescriptor(const InternalServerDescriptorPtr& descriptor, const InternalNodeInfoPtr& node, - int iceVersion) : - _desc(descriptor), - _node(node), - _iceVersion(iceVersion) + string value; + PropertyDescriptorSeq::iterator p = properties.begin(); + while(p != properties.end()) { - } - - void - operator()(const CommunicatorDescriptorPtr& desc) - { - // - // Figure out the configuration file name for the communicator - // (if it's a service, it's "config_<service name>", if it's - // the server, it's just "config"). - // - string filename = "config"; - ServiceDescriptorPtr svc = ServiceDescriptorPtr::dynamicCast(desc); - if(svc) - { - filename += "_" + svc->name; - _desc->services->push_back(svc->name); - } - - PropertyDescriptorSeq& props = _desc->properties[filename]; - PropertyDescriptorSeq communicatorProps = desc->propertySet.properties; - - // - // If this is a service communicator and the IceBox server has Admin - // enabled or Admin endpoints configured, we ignore the server-lifetime attributes - // of the service object adapters and assume it's set to false. - // - bool ignoreServerLifetime = false; - if(svc) - { - if(_iceVersion == 0 || _iceVersion >= 30300) - { - if(getPropertyAsInt(_desc->properties["config"], "Ice.Admin.Enabled") > 0 || - getProperty(_desc->properties["config"], "Ice.Admin.Endpoints") != "") - { - ignoreServerLifetime = true; - } - } - } - // - // Add the adapters and their configuration. - // - for(AdapterDescriptorSeq::const_iterator q = desc->adapters.begin(); q != desc->adapters.end(); ++q) - { - _desc->adapters.push_back(new InternalAdapterDescriptor(q->id, - ignoreServerLifetime ? false : q->serverLifetime)); - - props.push_back(createProperty("# Object adapter " + q->name)); - PropertyDescriptor prop = removeProperty(communicatorProps, q->name + ".Endpoints"); - prop.name = q->name + ".Endpoints"; - props.push_back(prop); - props.push_back(createProperty(q->name + ".AdapterId", q->id)); - if(!q->replicaGroupId.empty()) - { - props.push_back(createProperty(q->name + ".ReplicaGroupId", q->replicaGroupId)); - } - - // - // Ignore the register process attribute if the server is using Ice >= 3.3.0 - // - if(_iceVersion != 0 && _iceVersion < 30300) - { - if(q->registerProcess) - { - props.push_back(createProperty(q->name + ".RegisterProcess", "1")); - _desc->processRegistered = true; - } - } - } - - _desc->logs.insert(_desc->logs.end(), desc->logs.begin(), desc->logs.end()); - - const string dbsPath = _node->dataDir + "/servers/" + _desc->id + "/dbs/"; - for(DbEnvDescriptorSeq::const_iterator p = desc->dbEnvs.begin(); p != desc->dbEnvs.end(); ++p) - { - props.push_back(createProperty("# Database environment " + p->name)); - if(p->dbHome.empty()) - { - _desc->dbEnvs.push_back(new InternalDbEnvDescriptor(p->name, p->properties)); - props.push_back(createProperty("Freeze.DbEnv." + p->name + ".DbHome", dbsPath + p->name)); - } - else - { - props.push_back(createProperty("Freeze.DbEnv." + p->name + ".DbHome", p->dbHome)); - } - } - - // - // Copy the communicator descriptor properties. - // - if(!communicatorProps.empty()) - { - if(svc) - { - props.push_back(createProperty("# Service descriptor properties")); - } - else - { - props.push_back(createProperty("# Server descriptor properties")); - } - copy(communicatorProps.begin(), communicatorProps.end(), back_inserter(props)); - } - - // - // For Ice servers > 3.3.0 escape the properties. - // - if(_iceVersion == 0 || _iceVersion >= 30300) - { - for(PropertyDescriptorSeq::iterator p = props.begin(); p != props.end(); ++p) - { - if(p->name.find('#') != 0 || !p->value.empty()) - { - p->name = escapeProperty(p->name, true); - p->value = escapeProperty(p->value); - } - } - } - } - - PropertyDescriptor - removeProperty(PropertyDescriptorSeq& properties, const string& name) - { - string value; - PropertyDescriptorSeq::iterator p = properties.begin(); - while(p != properties.end()) - { - if(p->name == name) - { - value = p->value; - p = properties.erase(p); - } - else - { - ++p; - } - } - PropertyDescriptor desc; - desc.name = name; - desc.value = value; - return desc; - } - - InternalServerDescriptorPtr _desc; - InternalNodeInfoPtr _node; - int _iceVersion; -}; - -class LoadCB : public virtual IceUtil::Shared -{ -public: - - LoadCB(const TraceLevelsPtr& traceLevels, const ServerEntryPtr& server, const string& node, int timeout) : - _traceLevels(traceLevels), _server(server), _id(server->getId()), _node(node), _timeout(timeout) - { - } - - void - response(const ServerPrx& server, const AdapterPrxDict& adapters, int at, int dt) - { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "loaded `" << _id << "' on node `" << _node << "'"; - } - - // - // Add the node session timeout on the proxies to ensure the - // timeout is large enough. - // - _server->loadCallback(server, adapters, at + _timeout, dt + _timeout); - } - - void - exception(const Ice::Exception& lex) - { - try - { - lex.ice_throw(); - } - catch(const DeploymentException& ex) - { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex.reason; - } - - ostringstream os; - os << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex.reason; - _server->exception(DeploymentException(os.str())); - } - catch(const Ice::Exception& ex) - { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex; - } - - ostringstream os; - os << ex; - _server->exception(NodeUnreachableException(_node, os.str())); - } - } - -private: - - const TraceLevelsPtr _traceLevels; - const ServerEntryPtr _server; - const string _id; - const string _node; - const int _timeout; -}; - -class DestroyCB : public virtual IceUtil::Shared -{ -public: - - DestroyCB(const TraceLevelsPtr& traceLevels, const ServerEntryPtr& server, const string& node) : - _traceLevels(traceLevels), _server(server), _id(server->getId()), _node(node) - { - } - - void - response() - { - if(_traceLevels && _traceLevels->server > 1) + if(p->name == name) { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "unloaded `" << _id << "' on node `" << _node << "'"; - } - _server->destroyCallback(); - } - - void - exception(const Ice::Exception& dex) - { - try - { - dex.ice_throw(); - } - catch(const DeploymentException& ex) - { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex.reason; - } - - ostringstream os; - os << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex.reason; - _server->exception(DeploymentException(os.str())); + value = p->value; + p = properties.erase(p); } - catch(const Ice::Exception& ex) + else { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex; - } - ostringstream os; - os << ex; - _server->exception(NodeUnreachableException(_node, os.str())); + ++p; } } - -private: - - const TraceLevelsPtr _traceLevels; - const ServerEntryPtr _server; - const string _id; - const string _node; -}; + return { name, value }; +} } -NodeCache::NodeCache(const Ice::CommunicatorPtr& communicator, ReplicaCache& replicaCache, const string& replicaName) : +NodeCache::NodeCache(const shared_ptr<Ice::Communicator>& communicator, + ReplicaCache& replicaCache, const string& replicaName) : _communicator(communicator), _replicaName(replicaName), _replicaCache(replicaCache) { } -NodeEntryPtr +shared_ptr<NodeEntry> NodeCache::get(const string& name, bool create) const { - Lock sync(*this); - NodeEntryPtr entry = getImpl(name); - if(!entry && create) + lock_guard lock(_mutex); + + auto cacheEntry = getImpl(name); + if(!cacheEntry && create) { NodeCache& self = const_cast<NodeCache&>(*this); - entry = new NodeEntry(self, name); - self.addImpl(name, entry); + cacheEntry = make_shared<NodeEntry>(self, name); + self.addImpl(name, cacheEntry); } - if(!entry) + if(!cacheEntry) { throw NodeNotExistException(name); } + + // Get a self removing shared_ptr to the cached NodeEntry which will remove + // itself from the this cache upon destruction + auto entry = cacheEntry->_selfRemovingPtr.lock(); + + if (!entry) + { + // Create self removing shared_ptr of cacheEntry. The cacheEntry maintains a ref count for the case where + // the self removing shared_ptr has no more references but its deleter has yet to run (weak_ptr has expired) + // and at the same time another thread calls NodeCache::get which refreshes the self removing ptr before + // the cached entry can be removed. + entry = shared_ptr<NodeEntry>(const_cast<NodeEntry*>(cacheEntry.get()), + [cache = const_cast<NodeCache*>(this), name](NodeEntry* e) + { + lock_guard cacheLock(cache->_mutex); + if(--e->_selfRemovingRefCount == 0) + { + cache->removeImpl(name); + } + }); + cacheEntry->_selfRemovingRefCount++; + cacheEntry->_selfRemovingPtr = entry; + } + return entry; } NodeEntry::NodeEntry(NodeCache& cache, const std::string& name) : _cache(cache), - _ref(0), _name(name), - _registering(false) -{ -} - -NodeEntry::~NodeEntry() + _registering(false), + _selfRemovingRefCount(0) { } void NodeEntry::addDescriptor(const string& application, const NodeDescriptor& descriptor) { - Lock sync(*this); + lock_guard lock(_mutex); _descriptors.insert(make_pair(application, descriptor)); } void NodeEntry::removeDescriptor(const string& application) { - Lock sync(*this); + lock_guard lock(_mutex); _descriptors.erase(application); } void -NodeEntry::addServer(const ServerEntryPtr& entry) +NodeEntry::addServer(const shared_ptr<ServerEntry>& entry) { - Lock sync(*this); + lock_guard lock(_mutex); _servers.insert(make_pair(entry->getId(), entry)); } void -NodeEntry::removeServer(const ServerEntryPtr& entry) +NodeEntry::removeServer(const shared_ptr<ServerEntry>& entry) { - Lock sync(*this); + lock_guard lock(_mutex); _servers.erase(entry->getId()); } void -NodeEntry::setSession(const NodeSessionIPtr& session) +NodeEntry::setSession(const shared_ptr<NodeSessionI>& session) { - Lock sync(*this); + unique_lock lock(_mutex); if(session) { @@ -380,12 +140,12 @@ NodeEntry::setSession(const NodeSessionIPtr& session) { // If the current session has just been destroyed, wait for the setSession(0) call. assert(session != _session); - wait(); + _condVar.wait(lock); } else { - NodeSessionIPtr s = _session; - sync.release(); + auto s = _session; + lock.unlock(); try { s->getNode()->ice_ping(); @@ -401,7 +161,7 @@ NodeEntry::setSession(const NodeSessionIPtr& session) { } } - sync.acquire(); + lock.lock(); } } @@ -410,7 +170,7 @@ NodeEntry::setSession(const NodeSessionIPtr& session) // so we won't need anymore to try to register it with this // registry. // - _proxy = 0; + _proxy = nullptr; } else { @@ -421,12 +181,12 @@ NodeEntry::setSession(const NodeSessionIPtr& session) } _session = session; - notifyAll(); + _condVar.notify_all(); if(_registering) { _registering = false; - notifyAll(); + _condVar.notify_all(); } if(session) @@ -447,28 +207,28 @@ NodeEntry::setSession(const NodeSessionIPtr& session) } } -NodePrx +shared_ptr<NodePrx> NodeEntry::getProxy() const { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); return _session->getNode(); } -InternalNodeInfoPtr +shared_ptr<InternalNodeInfo> NodeEntry::getInfo() const { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); return _session->getInfo(); } ServerEntrySeq NodeEntry::getServers() const { - Lock sync(*this); + lock_guard lock(_mutex); ServerEntrySeq entries; - for(map<string, ServerEntryPtr>::const_iterator p = _servers.begin(); p != _servers.end(); ++p) + for(map<string, shared_ptr<ServerEntry>>::const_iterator p = _servers.begin(); p != _servers.end(); ++p) { entries.push_back(p->second); } @@ -478,8 +238,8 @@ NodeEntry::getServers() const LoadInfo NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor) const { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); map<string, NodeDescriptor>::const_iterator p = _descriptors.find(application); if(p == _descriptors.end()) @@ -523,46 +283,45 @@ NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor return _session->getLoadInfo(); } -NodeSessionIPtr +shared_ptr<NodeSessionI> NodeEntry::getSession() const { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); return _session; } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> NodeEntry::getAdminProxy() const { - Ice::ObjectPrx prx = getProxy(); + auto prx = getProxy(); assert(prx); - Ice::Identity adminId; - adminId.name = "NodeAdmin-" + _name ; - adminId.category = prx->ice_getIdentity().category; - return prx->ice_identity(adminId); + return prx->ice_identity({ "NodeAdmin-" + _name, prx->ice_getIdentity().category }); } bool NodeEntry::canRemove() { - Lock sync(*this); - return _servers.empty() && !_session && _descriptors.empty(); + lock_guard lock(_mutex); + + // The cache mutex must be locked to acesss _selfRemovingRefCount + return _servers.empty() && !_session && _descriptors.empty() && _selfRemovingRefCount == 0; } void -NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, const SessionIPtr& session, int timeout, - bool noRestart) +NodeEntry::loadServer(const shared_ptr<ServerEntry>& entry, const ServerInfo& server, + const shared_ptr<SessionI>& session, chrono::seconds timeout, bool noRestart) { try { - NodePrx node; - int sessionTimeout; - InternalServerDescriptorPtr desc; + shared_ptr<NodePrx> node; + chrono::seconds sessionTimeout; + shared_ptr<InternalServerDescriptor> desc; { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); node = _session->getNode(); - sessionTimeout = _session->getTimeout(Ice::emptyCurrent); + sessionTimeout = chrono::seconds(_session->getTimeout(Ice::emptyCurrent)); // // Check if we should use a specific timeout (the load @@ -570,9 +329,10 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con // time to deactivate, up to "deactivation-timeout" // seconds). // - if(timeout > 0) + if(timeout > 0s) { - node = NodePrx::uncheckedCast(node->ice_invocationTimeout(timeout * 1000)); + auto timeoutInMilliseconds = secondsToInt(timeout) * 1000; + node = Ice::uncheckedCast<NodePrx>(node->ice_invocationTimeout(move(timeoutInMilliseconds))); } ServerInfo info = server; @@ -603,38 +363,79 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con } } + auto response = [traceLevels = _cache.getTraceLevels(), entry, name = _name, sessionTimeout] + (shared_ptr<ServerPrx> serverPrx, AdapterPrxDict adapters, int at, int dt) + { + if(traceLevels && traceLevels->server > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->serverCat); + out << "loaded `" << entry->getId() << "' on node `" << name << "'"; + } + + // + // Add the node session timeout on the proxies to ensure the + // timeout is large enough. + // + entry->loadCallback(move(serverPrx), move(adapters), + chrono::seconds(at) + sessionTimeout, + chrono::seconds(dt) + sessionTimeout); + + }; + + auto exception = [traceLevels = _cache.getTraceLevels(), entry, name = _name](auto exptr) + { + try + { + rethrow_exception(exptr); + } + catch(const DeploymentException& ex) + { + if(traceLevels && traceLevels->server > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->serverCat); + out << "couldn't load `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason; + } + + ostringstream os; + os << "couldn't load `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason; + entry->exception(make_exception_ptr(DeploymentException(os.str()))); + } + catch(const Ice::Exception& ex) + { + if(traceLevels && traceLevels->server > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->serverCat); + out << "couldn't load `" << entry->getId() << "' on node `" << name << "':\n" << ex; + } + + entry->exception(make_exception_ptr(NodeUnreachableException(name, ex.what()))); + } + }; + if(noRestart) { - node->begin_loadServerWithoutRestart(desc, _cache.getReplicaName(), - newCallback_Node_loadServerWithoutRestart( - new LoadCB(_cache.getTraceLevels(), entry, _name, sessionTimeout), - &LoadCB::response, - &LoadCB::exception)); + node->loadServerWithoutRestartAsync(desc, _cache.getReplicaName(), move(response), move(exception)); } else { - node->begin_loadServer(desc, _cache.getReplicaName(), - newCallback_Node_loadServer( - new LoadCB(_cache.getTraceLevels(), entry, _name, sessionTimeout), - &LoadCB::response, - &LoadCB::exception)); + node->loadServerAsync(desc, _cache.getReplicaName(), move(response), move(exception)); } } - catch(const NodeUnreachableException& ex) + catch(const NodeUnreachableException&) { - entry->exception(ex); + entry->exception(current_exception()); } } void -NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, int timeout, bool noRestart) +NodeEntry::destroyServer(const shared_ptr<ServerEntry>& entry, const ServerInfo& info, chrono::seconds timeout, bool noRestart) { try { - NodePrx node; + shared_ptr<NodePrx> node; { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); node = _session->getNode(); // @@ -643,9 +444,10 @@ NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, in // time to deactivate, up to "deactivation-timeout" // seconds). // - if(timeout > 0) + if(timeout > 0s) { - node = NodePrx::uncheckedCast(node->ice_invocationTimeout(timeout * 1000)); + int timeoutInMilliseconds = secondsToInt(timeout) * 1000; + node = Ice::uncheckedCast<NodePrx>(node->ice_invocationTimeout(move(timeoutInMilliseconds))); } } @@ -655,35 +457,67 @@ NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, in out << "unloading `" << info.descriptor->id << "' on node `" << _name << "'"; } + auto response = [traceLevels = _cache.getTraceLevels(), entry, name = _name] + { + if(traceLevels && traceLevels->server > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->serverCat); + out << "unloaded `" << entry->getId() << "' on node `" << name << "'"; + } + entry->destroyCallback(); + }; + + auto exception = [traceLevels = _cache.getTraceLevels(), entry, name = _name](auto exptr) + { + try + { + rethrow_exception(exptr); + } + catch(const DeploymentException& ex) + { + if(traceLevels && traceLevels->server > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->serverCat); + out << "couldn't unload `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason; + } + + ostringstream os; + os << "couldn't unload `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason; + entry->exception(make_exception_ptr(DeploymentException(os.str()))); + } + catch(const Ice::Exception& ex) + { + if(traceLevels && traceLevels->server > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->serverCat); + out << "couldn't unload `" << entry->getId() << "' on node `" << name << "':\n" << ex; + } + entry->exception(make_exception_ptr(NodeUnreachableException(name, ex.what()))); + } + }; + if(noRestart) { - node->begin_destroyServerWithoutRestart(info.descriptor->id, info.uuid, info.revision, - _cache.getReplicaName(), - newCallback_Node_destroyServerWithoutRestart( - new DestroyCB(_cache.getTraceLevels(), entry, _name), - &DestroyCB::response, - &DestroyCB::exception)); + node->destroyServerWithoutRestartAsync(info.descriptor->id, info.uuid, info.revision, + _cache.getReplicaName(), move(response), move(exception)); } else { - node->begin_destroyServer(info.descriptor->id, info.uuid, info.revision, _cache.getReplicaName(), - newCallback_Node_destroyServer( - new DestroyCB(_cache.getTraceLevels(), entry, _name), - &DestroyCB::response, - &DestroyCB::exception)); + node->destroyServerAsync(info.descriptor->id, info.uuid, info.revision, _cache.getReplicaName(), + move(response), move(exception)); } } - catch(const NodeUnreachableException& ex) + catch(const NodeUnreachableException&) { - entry->exception(ex); + entry->exception(current_exception()); } } ServerInfo -NodeEntry::getServerInfo(const ServerInfo& server, const SessionIPtr& session) +NodeEntry::getServerInfo(const ServerInfo& server, const shared_ptr<SessionI>& session) { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); ServerInfo info = server; info.descriptor = getServerDescriptor(server, session); @@ -691,11 +525,11 @@ NodeEntry::getServerInfo(const ServerInfo& server, const SessionIPtr& session) return info; } -InternalServerDescriptorPtr -NodeEntry::getInternalServerDescriptor(const ServerInfo& server, const SessionIPtr& session) +shared_ptr<InternalServerDescriptor> +NodeEntry::getInternalServerDescriptor(const ServerInfo& server, const shared_ptr<SessionI>& session) { - Lock sync(*this); - checkSession(); + unique_lock lock(_mutex); + checkSession(lock); ServerInfo info = server; try @@ -715,54 +549,7 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& server, const SessionIP } void -NodeEntry::__incRef() -{ - Lock sync(*this); - assert(_ref >= 0); - ++_ref; -} - -void -NodeEntry::__decRef() -{ - // - // The node entry implements its own reference counting. If the - // reference count drops to 1, this means that only the cache - // holds a reference on the node entry. If that's the case, we - // check if the node entry can be removed or not and if it can be - // removed we remove it from the cache map. - // - - bool doRemove = false; - bool doDelete = false; - { - Lock sync(*this); // We use a recursive mutex so it's fine to - // create Ptr with the mutex locked. - assert(_ref > 0); - --_ref; - - if(_ref == 1) - { - doRemove = canRemove(); - } - else if(_ref == 0) - { - doDelete = true; - } - } - - if(doRemove) - { - _cache.remove(_name); - } - else if(doDelete) - { - delete this; - } -} - -void -NodeEntry::checkSession() const +NodeEntry::checkSession(unique_lock<mutex>& lock) const { if(_session) { @@ -796,22 +583,27 @@ NodeEntry::checkSession() const // hang in the while loop. // _registering = true; - NodeEntry* self = const_cast<NodeEntry*>(this); - _proxy->begin_registerWithReplica(_cache.getReplicaCache().getInternalRegistry(), - newCallback_Node_registerWithReplica(self, - &NodeEntry::finishedRegistration, - &NodeEntry::finishedRegistration)); - _proxy = 0; // Registration with the proxy is only attempted once. - } - while(_registering) - { - if(!timedWait(IceUtil::Time::seconds(10))) - { - break; // Consider the node down if it doesn't respond promptly. - } + // 'this' is only ever accessed though the self removing pointer, ensuring _selfRemovingPtr is always + // valid and its access is thread safe + auto self = _selfRemovingPtr.lock(); + assert(self); + _proxy->registerWithReplicaAsync(_cache.getReplicaCache().getInternalRegistry(), + [self] + { + self->finishedRegistration(); + }, + [self] (exception_ptr ex) + { + self->finishedRegistration(ex); + }); + _proxy = nullptr; // Registration with the proxy is only attempted once. + } + // Consider the node down if it doesn't respond promptly. + _condVar.wait_for(lock, 10s, [this] { return !_registering; }); + if(!_session || _session->isDestroyed()) { throw NodeUnreachableException(_name, "the node is not active"); @@ -819,9 +611,9 @@ NodeEntry::checkSession() const } void -NodeEntry::setProxy(const NodePrx& node) +NodeEntry::setProxy(const shared_ptr<NodePrx>& node) { - Lock sync(*this); + lock_guard lock(_mutex); // // If the node has already established a session with the @@ -838,7 +630,7 @@ NodeEntry::setProxy(const NodePrx& node) void NodeEntry::finishedRegistration() { - Lock sync(*this); + lock_guard lock(_mutex); if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0) { Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat); @@ -855,29 +647,36 @@ NodeEntry::finishedRegistration() if(_registering) { _registering = false; - notifyAll(); + _condVar.notify_all(); } } void -NodeEntry::finishedRegistration(const Ice::Exception& ex) +NodeEntry::finishedRegistration(exception_ptr exptr) { - Lock sync(*this); + lock_guard lock(_mutex); if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0) { - Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat); - out << "node `" << _name << "' session creation failed:\n" << ex; + try + { + rethrow_exception(exptr); + } + catch(const std::exception& ex) + { + Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat); + out << "node `" << _name << "' session creation failed:\n" << ex.what(); + } } if(_registering) { _registering = false; - notifyAll(); + _condVar.notify_all(); } } -ServerDescriptorPtr -NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& session) +shared_ptr<ServerDescriptor> +NodeEntry::getServerDescriptor(const ServerInfo& server, const shared_ptr<SessionI>& session) { assert(_session); @@ -891,7 +690,7 @@ NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& sess resolve.setReserved("session.id", session->getId()); } - IceBoxDescriptorPtr iceBox = IceBoxDescriptorPtr::dynamicCast(server.descriptor); + auto iceBox = dynamic_pointer_cast<IceBoxDescriptor>(server.descriptor); if(iceBox) { return IceBoxHelper(iceBox).instantiate(resolve, PropertyDescriptorSeq(), PropertySetDescriptorDict()); @@ -903,7 +702,7 @@ NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& sess } } -InternalServerDescriptorPtr +shared_ptr<InternalServerDescriptor> NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const { // @@ -911,7 +710,7 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const // assert(_session); - InternalServerDescriptorPtr server = new InternalServerDescriptor(); + shared_ptr<InternalServerDescriptor> server = make_shared<InternalServerDescriptor>(); server->id = info.descriptor->id; server->application = info.application; server->uuid = info.uuid; @@ -929,7 +728,6 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const // server->logs: assigned for each communicator (see below) // server->adapters: assigned for each communicator (see below) - // server->dbEnvs: assigned for each communicator (see below) // server->properties: assigned for each communicator (see below) // @@ -966,7 +764,8 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const } else { - props.push_back(createProperty("Ice.Admin.Endpoints", "tcp -h localhost")); + props.push_back(createProperty("Ice.Admin.Endpoints", "tcp -h 127.0.0.1")); + props.push_back(createProperty("Ice.Admin.ServerName", "127.0.0.1")); server->processRegistered = true; } } @@ -985,12 +784,12 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const // Add IceBox properties. // string servicesStr; - IceBoxDescriptorPtr iceBox = IceBoxDescriptorPtr::dynamicCast(info.descriptor); + auto iceBox = dynamic_pointer_cast<IceBoxDescriptor>(info.descriptor); if(iceBox) { - for(ServiceInstanceDescriptorSeq::const_iterator p = iceBox->services.begin(); p != iceBox->services.end();++p) + for(const auto& serviceInstance : iceBox->services) { - ServiceDescriptorPtr s = p->descriptor; + const auto& s = serviceInstance.descriptor; const string path = _session->getInfo()->dataDir + "/servers/" + server->id + "/config/config_" + s->name; // @@ -1012,38 +811,115 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const } props.push_back(createProperty("IceBox.LoadOrder", servicesStr)); + } - if(iceVersion != 0 && iceVersion < 30300) + // + // Now, for each communicator of the descriptor, add the necessary + // logs, adapters and properties to the internal server + // descriptor. + // + forEachCommunicator(info.descriptor, + [server, node = _session->getInfo(), iceVersion](const auto& desc) { - if(hasProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.RegisterProcess")) + // + // Figure out the configuration file name for the communicator + // (if it's a service, it's "config_<service name>", if it's + // the server, it's just "config"). + // + string filename = "config"; + auto svc = dynamic_pointer_cast<ServiceDescriptor>(desc); + if(svc) { - if(getProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.RegisterProcess") != "0") + filename += "_" + svc->name; + server->services->push_back(svc->name); + } + + PropertyDescriptorSeq& serverProps = server->properties[filename]; + PropertyDescriptorSeq communicatorProps = desc->propertySet.properties; + + // + // If this is a service communicator and the IceBox server has Admin + // enabled or Admin endpoints configured, we ignore the server-lifetime attributes + // of the service object adapters and assume it's set to false. + // + bool ignoreServerLifetime = false; + if(svc) + { + if(iceVersion == 0 || iceVersion >= 30300) { - server->processRegistered = true; + if(getPropertyAsInt(server->properties["config"], "Ice.Admin.Enabled") > 0 || + getProperty(server->properties["config"], "Ice.Admin.Endpoints") != "") + { + ignoreServerLifetime = true; + } } } - else + // + // Add the adapters and their configuration. + // + for(const auto& adapter : desc->adapters) { - props.push_back(createProperty("IceBox.ServiceManager.RegisterProcess", "1")); - server->processRegistered = true; + server->adapters.push_back(make_shared<InternalAdapterDescriptor>(adapter.id, + ignoreServerLifetime ? false : + adapter.serverLifetime)); + + serverProps.push_back(createProperty("# Object adapter " + adapter.name)); + + PropertyDescriptor prop = removeProperty(communicatorProps, adapter.name + ".Endpoints"); + prop.name = adapter.name + ".Endpoints"; + serverProps.push_back(prop); + serverProps.push_back(createProperty(adapter.name + ".AdapterId", adapter.id)); + + if(!adapter.replicaGroupId.empty()) + { + serverProps.push_back(createProperty(adapter.name + ".ReplicaGroupId", adapter.replicaGroupId)); + } + + // + // Ignore the register process attribute if the server is using Ice >= 3.3.0 + // + if(iceVersion != 0 && iceVersion < 30300) + { + if(adapter.registerProcess) + { + serverProps.push_back(createProperty(adapter.name + ".RegisterProcess", "1")); + server->processRegistered = true; + } + } } - if(!hasProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.Endpoints")) + + server->logs.insert(server->logs.end(), desc->logs.begin(), desc->logs.end()); + + // + // Copy the communicator descriptor properties. + // + if(!communicatorProps.empty()) { - props.push_back(createProperty("IceBox.ServiceManager.Endpoints", "tcp -h 127.0.0.1")); + if(svc) + { + serverProps.push_back(createProperty("# Service descriptor properties")); + } + else + { + serverProps.push_back(createProperty("# Server descriptor properties")); + } + copy(communicatorProps.begin(), communicatorProps.end(), back_inserter(serverProps)); } - } - if(!hasProperty(info.descriptor->propertySet.properties, "IceBox.InstanceName") && - hasProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.Endpoints")) - { - props.push_back(createProperty("IceBox.InstanceName", server->id)); - } - } - // - // Now, for each communicator of the descriptor, add the necessary - // logs, adapters, db envs and properties to the internal server - // descriptor. - // - forEachCommunicator(ToInternalServerDescriptor(server, _session->getInfo(), iceVersion))(info.descriptor); + // + // For Ice servers > 3.3.0 escape the properties. + // + if(iceVersion == 0 || iceVersion >= 30300) + { + for(PropertyDescriptorSeq::iterator p = serverProps.begin(); p != serverProps.end(); ++p) + { + if(p->name.find('#') != 0 || !p->value.empty()) + { + p->name = escapeProperty(p->name, true); + p->value = escapeProperty(p->value); + } + } + } + }); return server; } |