diff options
author | Benoit Foucher <benoit@zeroc.com> | 2015-06-08 13:05:22 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2015-06-08 13:05:22 +0200 |
commit | b2538adea86a17fccbacf87814361f050c97ae7e (patch) | |
tree | db900e7f606ca8ce9d114f11cf3ac2e9cc2abaf4 /cpp/src/IceGrid/ServerCache.cpp | |
parent | Fix IceSSL.CertFile property typo (diff) | |
download | ice-b2538adea86a17fccbacf87814361f050c97ae7e.tar.bz2 ice-b2538adea86a17fccbacf87814361f050c97ae7e.tar.xz ice-b2538adea86a17fccbacf87814361f050c97ae7e.zip |
Fixed ICE-6573 - IceGrid application update failure when updating server with allocatables and clients are waiting to allocate the allocatable
Diffstat (limited to 'cpp/src/IceGrid/ServerCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/ServerCache.cpp | 217 |
1 files changed, 161 insertions, 56 deletions
diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index cd0a6382d6d..d6a320b68eb 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -31,13 +31,19 @@ namespace IceGrid _serverCache(serverCache), _entry(entry), _application(application) { } - + void operator()(const CommunicatorDescriptorPtr& desc) { - _serverCache.addCommunicator(desc, _entry, _application); + _serverCache.addCommunicator(0, desc, _entry, _application); + } + + void + operator()(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc) + { + _serverCache.addCommunicator(oldDesc, newDesc, _entry, _application); } - + ServerCache& _serverCache; const ServerEntryPtr _entry; const string _application; @@ -45,7 +51,7 @@ namespace IceGrid struct RemoveCommunicator : std::unary_function<CommunicatorDescriptorPtr&, void> { - RemoveCommunicator(ServerCache& serverCache, const ServerEntryPtr& entry) : + RemoveCommunicator(ServerCache& serverCache, const ServerEntryPtr& entry) : _serverCache(serverCache), _entry(entry) { } @@ -53,7 +59,13 @@ namespace IceGrid void operator()(const CommunicatorDescriptorPtr& desc) { - _serverCache.removeCommunicator(desc, _entry); + _serverCache.removeCommunicator(desc, 0, _entry); + } + + void + operator()(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc) + { + _serverCache.removeCommunicator(oldDesc, newDesc, _entry); } ServerCache& _serverCache; @@ -63,11 +75,11 @@ namespace IceGrid } -CheckUpdateResult::CheckUpdateResult(const string& server, - const string& node, +CheckUpdateResult::CheckUpdateResult(const string& server, + const string& node, bool noRestart, bool remove, - const Ice::AsyncResultPtr& result) : + const Ice::AsyncResultPtr& result) : _server(server), _node(node), _noRestart(noRestart), _result(result) { } @@ -99,36 +111,34 @@ CheckUpdateResult::getResult() os << ex; throw NodeUnreachableException(_node, os.str()); } - return false; + return false; } ServerCache::ServerCache(const Ice::CommunicatorPtr& communicator, const string& instanceName, - NodeCache& nodeCache, - AdapterCache& adapterCache, + NodeCache& nodeCache, + AdapterCache& adapterCache, ObjectCache& objectCache, AllocatableObjectCache& allocatableObjectCache) : _communicator(communicator), _instanceName(instanceName), - _nodeCache(nodeCache), - _adapterCache(adapterCache), + _nodeCache(nodeCache), + _adapterCache(adapterCache), _objectCache(objectCache), _allocatableObjectCache(allocatableObjectCache) { } ServerEntryPtr -ServerCache::add(const ServerInfo& info, bool noRestart) +ServerCache::add(const ServerInfo& info) { Lock sync(*this); + assert(!getImpl(info.descriptor->id)); - ServerEntryPtr entry = getImpl(info.descriptor->id); - if(!entry) - { - entry = new ServerEntry(*this, info.descriptor->id); - addImpl(info.descriptor->id, entry); - } - entry->update(info, noRestart); + ServerEntryPtr entry = new ServerEntry(*this, info.descriptor->id); + addImpl(info.descriptor->id, entry); + + entry->update(info, false); _nodeCache.get(info.node, true)->addServer(entry); forEachCommunicator(AddCommunicator(*this, entry, info.application))(info.descriptor); @@ -136,7 +146,6 @@ ServerCache::add(const ServerInfo& info, bool noRestart) if(_traceLevels && _traceLevels->server > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "added server `" << info.descriptor->id << "' (`" << info.uuid << "', `" << info.revision << "')"; } @@ -166,29 +175,78 @@ ServerCache::has(const string& id) const } ServerEntryPtr -ServerCache::remove(const string& id, bool destroy, bool noRestart) +ServerCache::remove(const string& id, bool noRestart) { Lock sync(*this); ServerEntryPtr entry = getImpl(id); + assert(entry); + ServerInfo info = entry->getInfo(); forEachCommunicator(RemoveCommunicator(*this, entry))(info.descriptor); _nodeCache.get(info.node)->removeServer(entry); - if(destroy) + entry->destroy(noRestart); // This must be done after otherwise some allocatable objects + // might allocate a destroyed server. + + if(_traceLevels && _traceLevels->server > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); + out << "removed server `" << id << "'"; + } + + return entry; +} + +void +ServerCache::preUpdate(const ServerInfo& newInfo, bool noRestart) +{ + Lock sync(*this); + + const string& id = newInfo.descriptor->id; + ServerEntryPtr entry = getImpl(id); + assert(entry); + + if(!noRestart) { - // - // This must be done after otherwise some allocatable objects - // might allocate a destroyed server. - // - entry->destroy(noRestart); + ServerInfo info = entry->getInfo(); + forEachCommunicator(RemoveCommunicator(*this, entry))(info.descriptor, newInfo.descriptor); + _nodeCache.get(info.node)->removeServer(entry); } if(_traceLevels && _traceLevels->server > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "removed server `" << id << "'"; + out << "updating server `" << id << "'"; + if(noRestart) + { + out << " with no restart"; + } + } +} + +ServerEntryPtr +ServerCache::postUpdate(const ServerInfo& info, bool noRestart) +{ + Lock sync(*this); + + ServerEntryPtr entry = getImpl(info.descriptor->id); + assert(entry); + + ServerInfo oldInfo = entry->getInfo(); + entry->update(info, noRestart); + + if(!noRestart) + { + _nodeCache.get(info.node, true)->addServer(entry); + forEachCommunicator(AddCommunicator(*this, entry, info.application))(oldInfo.descriptor, info.descriptor); + } + + if(_traceLevels && _traceLevels->server > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); + out << "updated server `" << info.descriptor->id << "' (`" << info.uuid << "', `" << info.revision << "')"; } return entry; @@ -202,12 +260,29 @@ ServerCache::clear(const string& id) } void -ServerCache::addCommunicator(const CommunicatorDescriptorPtr& comm, +ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc, + const CommunicatorDescriptorPtr& newDesc, const ServerEntryPtr& server, const string& application) { - for(AdapterDescriptorSeq::const_iterator q = comm->adapters.begin() ; q != comm->adapters.end(); ++q) + if(!newDesc) { + return; // Nothing to add + } + for(AdapterDescriptorSeq::const_iterator q = newDesc->adapters.begin() ; q != newDesc->adapters.end(); ++q) + { + AdapterDescriptor oldAdpt; + if(oldDesc) + { + for(AdapterDescriptorSeq::const_iterator p = oldDesc->adapters.begin() ; p != oldDesc->adapters.end(); ++p) + { + if(p->id == q->id) + { + oldAdpt = *p; + break; + } + } + } assert(!q->id.empty()); _adapterCache.addServerAdapter(*q, server, application); @@ -217,23 +292,53 @@ ServerCache::addCommunicator(const CommunicatorDescriptorPtr& comm, } for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r) { - _allocatableObjectCache.add(toObjectInfo(_communicator, *r, q->id), server); + ObjectDescriptorSeq::const_iterator s; + for(s = oldAdpt.allocatables.begin(); s != oldAdpt.allocatables.end() && s->id != r->id; ++s); + if(s == oldAdpt.allocatables.end() || *s != *r) // Only add new or updated allocatables + { + _allocatableObjectCache.add(toObjectInfo(_communicator, *r, q->id), server); + } } } } void -ServerCache::removeCommunicator(const CommunicatorDescriptorPtr& comm, const ServerEntryPtr& /*entry*/) +ServerCache::removeCommunicator(const CommunicatorDescriptorPtr& oldDesc, + const CommunicatorDescriptorPtr& newDesc, + const ServerEntryPtr& /*entry*/) { - for(AdapterDescriptorSeq::const_iterator q = comm->adapters.begin() ; q != comm->adapters.end(); ++q) + if(!oldDesc) { + return; // Nothing to remove + } + for(AdapterDescriptorSeq::const_iterator q = oldDesc->adapters.begin() ; q != oldDesc->adapters.end(); ++q) + { + AdapterDescriptor newAdpt; + if(newDesc) + { + for(AdapterDescriptorSeq::const_iterator p = newDesc->adapters.begin() ; p != newDesc->adapters.end(); ++p) + { + if(p->id == q->id) + { + newAdpt = *p; + break; + } + } + } + for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r) { _objectCache.remove((*r).id); } for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r) { - _allocatableObjectCache.remove((*r).id); + // Don't remove the allocatable if it's still in the new descriptor. + ObjectDescriptorSeq::const_iterator s; + for(s = newAdpt.allocatables.begin(); s != newAdpt.allocatables.end() && s->id != r->id; ++s); + if(s == newAdpt.allocatables.end() || *s != *r) // Only removed updated or removed allocatables + { + _allocatableObjectCache.remove(r->id); + } } _adapterCache.removeServerAdapter(q->id); } @@ -362,7 +467,7 @@ ServerEntry::destroy(bool noRestart) _destroy = _load; } } - + _noRestart = noRestart; _load.reset(0); _loaded.reset(0); @@ -413,7 +518,7 @@ ServerPrx ServerEntry::getProxy(bool upToDate, int timeout) { // - // NOTE: this might throw ServerNotExistException, NodeUnreachableException + // NOTE: this might throw ServerNotExistException, NodeUnreachableException // or DeploymentException. // @@ -426,7 +531,7 @@ ServerPrx ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& node, bool upToDate, int timeout) { // - // NOTE: this might throw ServerNotExistException, NodeUnreachableException + // NOTE: this might throw ServerNotExistException, NodeUnreachableException // or DeploymentException. // while(true) @@ -475,7 +580,7 @@ AdapterPrx ServerEntry::getAdapter(const string& id, bool upToDate) { // - // NOTE: this might throw AdapterNotExistException, NodeUnreachableException + // NOTE: this might throw AdapterNotExistException, NodeUnreachableException // or DeploymentException. // @@ -487,7 +592,7 @@ AdapterPrx ServerEntry::getAdapter(int& activationTimeout, int& deactivationTimeout, const string& id, bool upToDate) { // - // NOTE: this might throw AdapterNotExistException, NodeUnreachableException + // NOTE: this might throw AdapterNotExistException, NodeUnreachableException // or DeploymentException. // while(true) @@ -602,7 +707,7 @@ ServerEntry::syncImpl() noRestart = _noRestart; _synchronizing = true; } - + if(destroy.descriptor) { try @@ -612,7 +717,7 @@ ServerEntry::syncImpl() catch(const NodeNotExistException&) { exception(NodeUnreachableException(destroy.node, "node is not active")); - } + } } else if(load.descriptor) { @@ -624,7 +729,7 @@ ServerEntry::syncImpl() { exception(NodeUnreachableException(load.node, "node is not active")); } - } + } } void @@ -642,7 +747,7 @@ ServerEntry::waitImpl(int timeout) break; // Timeout } } - else + else { wait(); } @@ -681,7 +786,7 @@ ServerEntry::waitImpl(int timeout) } } } - + void ServerEntry::synchronized() { @@ -791,7 +896,7 @@ ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, i catch(const NodeNotExistException&) { exception(NodeUnreachableException(destroy.node, "node is not active")); - } + } } else if(load.descriptor) { @@ -934,7 +1039,7 @@ ServerEntry::checkUpdate(const ServerInfo& info, bool noRestart) { throw ServerNotExistException(); } - + oldInfo = _loaded.get() ? *_loaded : *_load; session = _session; } @@ -1059,7 +1164,7 @@ ServerEntry::allocated(const SessionIPtr& session) if(traceLevels && traceLevels->server > 0) { Ice::Trace out(traceLevels->logger, traceLevels->serverCat); - out << "couldn't add Glacier2 filters for server `" << _id << "' allocated by `" + out << "couldn't add Glacier2 filters for server `" << _id << "' allocated by `" << session->getId() << ":\n" << ex; } } @@ -1071,14 +1176,14 @@ ServerEntry::allocatedNoSync(const SessionIPtr& /*session*/) { { Lock sync(*this); - if(!_updated || - (_loaded.get() && _loaded->descriptor->activation != "session") || + if(!_updated || + (_loaded.get() && _loaded->descriptor->activation != "session") || (_load.get() && _load->descriptor->activation != "session")) { return; } } - + sync(); waitForSyncNoThrow(); } @@ -1092,12 +1197,12 @@ ServerEntry::released(const SessionIPtr& session) } ServerDescriptorPtr desc = _loaded.get() ? _loaded->descriptor : _load->descriptor; - + // // If the server has the session activation mode, we re-load the // server on the node as its deployment might have changed (it's // possible to use ${session.*} variable with server with the - // session activation mode. Synchronizing the server will also + // session activation mode. Synchronizing the server will also // shutdown the server on the node. // if(desc->activation == "session") @@ -1165,8 +1270,8 @@ ServerEntry::releasedNoSync(const SessionIPtr& /*session*/) { { Lock sync(*this); - if(!_updated || - (_loaded.get() && _loaded->descriptor->activation != "session") || + if(!_updated || + (_loaded.get() && _loaded->descriptor->activation != "session") || (_load.get() && _load->descriptor->activation != "session")) { return; |