diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-11-24 14:10:34 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-11-24 14:10:34 +0000 |
commit | df8bf3b419976462a82ea6cc5ba55513f50f7a54 (patch) | |
tree | b3befacd3c9ac0b6ee28f9a1845134b5d292eee1 /cpp/src | |
parent | *** empty log message *** (diff) | |
download | ice-df8bf3b419976462a82ea6cc5ba55513f50f7a54.tar.bz2 ice-df8bf3b419976462a82ea6cc5ba55513f50f7a54.tar.xz ice-df8bf3b419976462a82ea6cc5ba55513f50f7a54.zip |
Code cleanup
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/AdminI.cpp | 42 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.cpp | 28 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 250 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 49 | ||||
-rw-r--r-- | cpp/src/IceGrid/Internal.ice | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorRegistryI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 63 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.cpp | 53 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 11 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.cpp | 12 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.h | 5 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionI.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/WellKnownObjectsManager.cpp | 2 |
18 files changed, 244 insertions, 307 deletions
diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp index 7e3934a88d6..9d8e8032a09 100644 --- a/cpp/src/IceGrid/AdminI.cpp +++ b/cpp/src/IceGrid/AdminI.cpp @@ -35,7 +35,7 @@ public: ServerProxyWrapper(const DatabasePtr& database, const string& id) : _id(id) { - _proxy = database->getServerWithTimeouts(id, _activationTimeout, _deactivationTimeout, _node); + _proxy = database->getServer(_id)->getProxy(_activationTimeout, _deactivationTimeout, _node); } void @@ -372,8 +372,9 @@ AdminI::patchApplication_async(const AMD_Admin_patchApplicationPtr& amdCB, AMI_Node_patchPtr cb = new PatchCB(aggregator, *p); try { - Resolver resolve(_database->getNodeInfo(*p), _database->getCommunicator()); - _database->getNode(*p)->patch_async(cb, name, "", resolve(appDistrib), shutdown); + NodeEntryPtr node = _database->getNode(*p); + Resolver resolve(node->getInfo(), _database->getCommunicator()); + node->getProxy()->patch_async(cb, name, "", resolve(appDistrib), shutdown); } catch(const Ice::Exception& ex) { @@ -451,7 +452,7 @@ AdminI::getAllApplicationNames(const Current&) const ServerInfo AdminI::getServerInfo(const string& id, const Current&) const { - return _database->getServerInfo(id, true); + return _database->getServer(id)->getInfo(true); } ServerState @@ -521,7 +522,7 @@ void AdminI::patchServer_async(const AMD_Admin_patchServerPtr& amdCB, const string& id, bool shutdown, const Current& current) { - ServerInfo info = _database->getServerInfo(id); + ServerInfo info = _database->getServer(id)->getInfo(); ApplicationInfo appInfo = _database->getApplicationInfo(info.application); ApplicationHelper helper(current.adapter->getCommunicator(), appInfo.descriptor); DistributionDescriptor appDistrib; @@ -540,8 +541,9 @@ AdminI::patchServer_async(const AMD_Admin_patchServerPtr& amdCB, const string& i AMI_Node_patchPtr amiCB = new ServerPatchCB(amdCB, _traceLevels, id, *p); try { - Resolver resolve(_database->getNodeInfo(*p), _database->getCommunicator()); - _database->getNode(*p)->patch_async(amiCB, info.application, id, resolve(appDistrib), shutdown); + NodeEntryPtr node = _database->getNode(*p); + Resolver resolve(node->getInfo(), _database->getCommunicator()); + node->getProxy()->patch_async(amiCB, info.application, id, resolve(appDistrib), shutdown); } catch(const Ice::Exception& ex) { @@ -577,11 +579,10 @@ AdminI::writeMessage(const string& id, const string& message, Int fd, const Curr } } - StringSeq AdminI::getAllServerIds(const Current&) const { - return _database->getAllServers(); + return _database->getServerCache().getAll(""); } void @@ -716,7 +717,7 @@ AdminI::getAllObjectInfos(const string& expression, const Ice::Current&) const NodeInfo AdminI::getNodeInfo(const string& name, const Ice::Current&) const { - return _database->getNodeInfo(name); + return _database->getNode(name)->getInfo(); } bool @@ -724,7 +725,7 @@ AdminI::pingNode(const string& name, const Current&) const { try { - _database->getNode(name)->ice_ping(); + _database->getNode(name)->getProxy()->ice_ping(); return true; } catch(const NodeUnreachableException&) @@ -746,7 +747,7 @@ AdminI::getNodeLoad(const string& name, const Current&) const { try { - return _database->getNode(name)->getLoad(); + return _database->getNode(name)->getProxy()->getLoad(); } catch(const Ice::ObjectNotExistException&) { @@ -764,10 +765,9 @@ AdminI::getNodeLoad(const string& name, const Current&) const void AdminI::shutdownNode(const string& name, const Current&) { - NodePrx node = _database->getNode(name); try { - node->shutdown(); + _database->getNode(name)->getProxy()->shutdown(); } catch(const Ice::ObjectNotExistException&) { @@ -784,10 +784,9 @@ AdminI::shutdownNode(const string& name, const Current&) string AdminI::getNodeHostname(const string& name, const Current&) const { - NodePrx node = _database->getNode(name); try { - return node->getHostname(); + return _database->getNode(name)->getInfo().hostname; } catch(const Ice::ObjectNotExistException&) { @@ -806,7 +805,7 @@ AdminI::getNodeHostname(const string& name, const Current&) const StringSeq AdminI::getAllNodeNames(const Current&) const { - return _database->getAllNodes(); + return _database->getNodeCache().getAll(""); } RegistryInfo @@ -818,7 +817,7 @@ AdminI::getRegistryInfo(const string& name, const Ice::Current&) const } else { - return _database->getReplicaInfo(name); + return _database->getReplica(name)->getInfo(); } } @@ -832,7 +831,7 @@ AdminI::pingRegistry(const string& name, const Current&) const try { - _database->getReplica(name)->ice_ping(); + _database->getReplica(name)->getProxy()->ice_ping(); return true; } catch(const Ice::ObjectNotExistException&) @@ -855,10 +854,9 @@ AdminI::shutdownRegistry(const string& name, const Current&) return; } - InternalRegistryPrx registry = _database->getReplica(name); try { - registry->shutdown(); + _database->getReplica(name)->getProxy()->shutdown(); } catch(const Ice::ObjectNotExistException&) { @@ -875,7 +873,7 @@ AdminI::shutdownRegistry(const string& name, const Current&) StringSeq AdminI::getAllRegistryNames(const Current&) const { - Ice::StringSeq replicas = _database->getAllReplicas(); + Ice::StringSeq replicas = _database->getReplicaCache().getAll(""); replicas.push_back(_registry->getName()); return replicas; } diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index db49207138d..e3b4b7a87de 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -195,38 +195,54 @@ AdminSessionI::getReplicaName(const Ice::Current& current) const FileIteratorPrx AdminSessionI::openServerStdOut(const std::string& id, const Ice::Current& current) { - return addFileIterator(_database->getServer(id), "stdout", current); + return addFileIterator(_database->getServer(id)->getProxy(), "stdout", current); } FileIteratorPrx AdminSessionI::openServerStdErr(const std::string& id, const Ice::Current& current) { - return addFileIterator(_database->getServer(id), "stderr", current); + return addFileIterator(_database->getServer(id)->getProxy(), "stderr", current); } FileIteratorPrx AdminSessionI::openNodeStdOut(const std::string& name, const Ice::Current& current) { - return addFileIterator(_database->getNode(name), "stdout", current); + return addFileIterator(_database->getNode(name)->getProxy(), "stdout", current); } FileIteratorPrx AdminSessionI::openNodeStdErr(const std::string& name, const Ice::Current& current) { - return addFileIterator(_database->getNode(name), "stderr", current); + return addFileIterator(_database->getNode(name)->getProxy(), "stderr", current); } FileIteratorPrx AdminSessionI::openRegistryStdOut(const std::string& name, const Ice::Current& current) { - FileReaderPrx reader = name == _replicaName ? _database->getInternalRegistry() : _database->getReplica(name); + FileReaderPrx reader; + if(name == _replicaName) + { + reader = _database->getReplicaCache().getInternalRegistry(); + } + else + { + reader = _database->getReplica(name)->getProxy(); + } return addFileIterator(reader, "stdout", current); } FileIteratorPrx AdminSessionI::openRegistryStdErr(const std::string& name, const Ice::Current& current) { - FileReaderPrx reader = name == _replicaName ? _database->getInternalRegistry() : _database->getReplica(name); + FileReaderPrx reader; + if(name == _replicaName) + { + reader = _database->getReplicaCache().getInternalRegistry(); + } + else + { + reader = _database->getReplica(name)->getProxy(); + } return addFileIterator(reader, "stderr", current); } diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index d9126730814..95e8d66e7a1 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -348,12 +348,6 @@ Database::syncObjects(const ObjectInfoSeq& objects) txHolder.commit(); } -Ice::ObjectPrx -Database::getReplicatedEndpoints(const string& name, const Ice::ObjectPrx& proxy) -{ - return _replicaCache.getEndpoints(name, proxy); -} - void Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) { @@ -627,104 +621,13 @@ Database::getAllApplications(const string& expression) return getMatchingKeys<StringApplicationInfoDict>(descriptors, expression); } -void -Database::addNode(const string& name, const NodeSessionIPtr& session) -{ - _nodeCache.get(name, true)->setSession(session); - - ObjectInfo info; - info.type = Node::ice_staticId(); - info.proxy = session->getNode(); - addInternalObject(info, true); -} - void -Database::setNodeProxy(const string& name, const NodePrx& node) -{ - _nodeCache.get(name)->setProxy(node); -} - -NodePrx -Database::getNode(const string& name) const -{ - return _nodeCache.get(name)->getProxy(); -} - -NodeInfo -Database::getNodeInfo(const string& name) const -{ - return _nodeCache.get(name)->getInfo(); -} - -void -Database::removeNode(const string& name, const NodeSessionIPtr& session, bool shutdown) -{ - // - // If the registry isn't being shutdown and this registry is the - // master we remove the node well-known proxy from the object - // adapter. Replicas will be notified through replication. - // - if(!shutdown) - { - removeInternalObject(session->getNode()->ice_getIdentity()); - } - - // - // We must notify the observer first (there's an assert in the - // observer to ensure that only nodes which are up are teared - // down). - // - _nodeObserverTopic->nodeDown(name); - - // - // Clear the node session. Once this is called, the node can - // create a new session. - // - _nodeCache.get(name)->setSession(0); -} - -Ice::StringSeq -Database::getAllNodes(const string& expression) -{ - return _nodeCache.getAll(expression); -} - -void -Database::addReplica(const string& name, const ReplicaSessionIPtr& session) -{ - _replicaCache.add(name, session); - _registryObserverTopic->registryUp(session->getInfo()); -} - -InternalRegistryPrx -Database::getReplica(const string& name) const -{ - return _replicaCache.get(name)->getProxy(); -} - -RegistryInfo -Database::getReplicaInfo(const string& name) const -{ - return _replicaCache.get(name)->getInfo(); -} - -void -Database::replicaReceivedUpdate(const string& replica, TopicName name, int serial, const string& failure) -{ - ObserverTopicPtr topic = getObserverTopic(name); - if(topic) - { - topic->receivedUpdate(replica, serial, failure); - } -} - -void -Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr& cb, - const string& application, - int revision) +Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb, + const string& application, + int revision) { Lock sync(*this); - map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(application); + map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(application); if(p != _updating.end()) { p->second.push_back(cb); @@ -735,73 +638,52 @@ Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplication } } -void -Database::removeReplica(const string& name, bool shutdown) +NodeCache& +Database::getNodeCache() { - _registryObserverTopic->registryDown(name); - _replicaCache.remove(name, shutdown); + return _nodeCache; } -Ice::StringSeq -Database::getAllReplicas(const string& expression) +NodeEntryPtr +Database::getNode(const string& name, bool create) const { - return _replicaCache.getAll(expression); + return _nodeCache.get(name, create); } -void -Database::setInternalRegistry(const InternalRegistryPrx& proxy) +ReplicaCache& +Database::getReplicaCache() { - _replicaCache.setInternalRegistry(proxy); + return _replicaCache; } -InternalRegistryPrx -Database::getInternalRegistry() const -{ - return _replicaCache.getInternalRegistry(); -} - -void -Database::loadServer(const std::string& id) -{ - _serverCache.get(id)->load(); -} - -void -Database::unloadServer(const std::string& id) +ReplicaEntryPtr +Database::getReplica(const string& name) const { - _serverCache.get(id)->unload(); + return _replicaCache.get(name); } -ServerInfo -Database::getServerInfo(const std::string& id, bool resolve) +ServerCache& +Database::getServerCache() { - return _serverCache.get(id)->getServerInfo(resolve); + return _serverCache; } -ServerPrx -Database::getServer(const string& id, bool upToDate) +ServerEntryPtr +Database::getServer(const string& id) const { - int activationTimeout, deactivationTimeout; - string node; - return getServerWithTimeouts(id, activationTimeout, deactivationTimeout, node, upToDate); + return _serverCache.get(id); } -ServerPrx -Database::getServerWithTimeouts(const string& id, int& actTimeout, int& deactTimeout, string& node, bool upToDate) +AllocatableObjectCache& +Database::getAllocatableObjectCache() { - return _serverCache.get(id)->getProxy(actTimeout, deactTimeout, node, upToDate); + return _allocatableObjectCache; } -Ice::StringSeq -Database::getAllServers(const string& expression) +AllocatableObjectEntryPtr +Database::getAllocatableObject(const Ice::Identity& id) const { - return _serverCache.getAll(expression); -} - -Ice::StringSeq -Database::getAllNodeServers(const string& node) -{ - return _nodeCache.get(node)->getServers(); + return _allocatableObjectCache.get(id); } bool @@ -1096,7 +978,7 @@ Database::getAllAdapters(const string& expression) } void -Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) +Database::addObject(const ObjectInfo& info) { Lock sync(*this); const Ice::Identity id = info.proxy->ice_getIdentity(); @@ -1106,40 +988,48 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) throw ObjectExistsException(id); } - bool update = false; if(_objects.find(id) != _objects.end()) { - if(!replaceIfExistsInDatabase) - { - throw ObjectExistsException(id); - } - else - { - update = true; - } + throw ObjectExistsException(id); } _objects.put(IdentityObjectInfoDict::value_type(id, info)); - if(!update) + _objectObserverTopic->objectAdded(info); + + if(_traceLevels->object > 0) { - _objectObserverTopic->objectAdded(info); + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "added object `" << _communicator->identityToString(id) << "'"; } - else +} + +void +Database::addOrUpdateObject(const ObjectInfo& info) +{ + Lock sync(*this); + const Ice::Identity id = info.proxy->ice_getIdentity(); + + if(_objectCache.has(id)) + { + throw ObjectExistsException(id); + } + + bool update = _objects.find(id) != _objects.end(); + _objects.put(IdentityObjectInfoDict::value_type(id, info)); + + if(update) { _objectObserverTopic->objectUpdated(info); } - + else + { + _objectObserverTopic->objectAdded(info); + } + if(_traceLevels->object > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - if(!update) - { - out << "added object `" << _communicator->identityToString(id) << "'"; - } - else - { - out << "updated object `" << _communicator->identityToString(id) << "'"; - } + out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'"; } } @@ -1241,24 +1131,6 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) txHolder.commit(); } -void -Database::allocateObject(const Ice::Identity& id, const ObjectAllocationRequestPtr& request) -{ - _allocatableObjectCache.get(id)->allocate(request); -} - -void -Database::allocateObjectByType(const string& type, const ObjectAllocationRequestPtr& request) -{ - _allocatableObjectCache.allocateByType(type, request); -} - -void -Database::releaseObject(const Ice::Identity& id, const SessionIPtr& session) -{ - _allocatableObjectCache.get(id)->release(session); -} - Ice::ObjectPrx Database::getObjectProxy(const Ice::Identity& id) { @@ -1752,16 +1624,16 @@ void Database::startUpdating(const string& name) { // Must be called within the synchronization. - _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationReplicationPtr>())); + _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationUpdatePtr>())); } void Database::finishUpdating(const string& name) { // Must be called within the synchronization. - map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(name); + map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(name); assert(p != _updating.end()); - for(vector<AMD_NodeSession_waitForApplicationReplicationPtr>::const_iterator q = p->second.begin(); + for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) { (*q)->ice_response(); diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index 08825a5d98a..0cc38449a90 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -69,42 +69,27 @@ public: void syncApplications(const ApplicationInfoSeq&); void syncAdapters(const AdapterInfoSeq&); void syncObjects(const ObjectInfoSeq&); - Ice::ObjectPrx getReplicatedEndpoints(const std::string&, const Ice::ObjectPrx&); void addApplication(const ApplicationInfo&, AdminSessionI* = 0); void updateApplication(const ApplicationUpdateInfo&, AdminSessionI* = 0); void syncApplicationDescriptor(const ApplicationDescriptor&, AdminSessionI* = 0); void instantiateServer(const std::string&, const std::string&, const ServerInstanceDescriptor&, AdminSessionI* =0); void removeApplication(const std::string&, AdminSessionI* = 0); - ApplicationInfo getApplicationInfo(const std::string&); Ice::StringSeq getAllApplications(const std::string& = std::string()); + void waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr&, const std::string&, int); + + NodeCache& getNodeCache(); + NodeEntryPtr getNode(const std::string&, bool = false) const; + + ReplicaCache& getReplicaCache(); + ReplicaEntryPtr getReplica(const std::string&) const; + + ServerCache& getServerCache(); + ServerEntryPtr getServer(const std::string&) const; - void addNode(const std::string&, const NodeSessionIPtr&); - void setNodeProxy(const std::string&, const NodePrx&); - NodePrx getNode(const std::string&) const; - NodeInfo getNodeInfo(const std::string&) const; - void removeNode(const std::string&, const NodeSessionIPtr&, bool); - Ice::StringSeq getAllNodes(const std::string& = std::string()); - - void addReplica(const std::string&, const ReplicaSessionIPtr&); - RegistryInfo getReplicaInfo(const std::string&) const; - InternalRegistryPrx getReplica(const std::string&) const; - void replicaReceivedUpdate(const std::string&, TopicName, int, const std::string&); - void waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr&, const std::string&, - int); - void removeReplica(const std::string&, bool); - Ice::StringSeq getAllReplicas(const std::string& = std::string()); - void setInternalRegistry(const InternalRegistryPrx&); - InternalRegistryPrx getInternalRegistry() const; - - ServerInfo getServerInfo(const std::string&, bool = false); - ServerPrx getServer(const std::string&, bool = true); - void loadServer(const std::string&); - void unloadServer(const std::string&); - ServerPrx getServerWithTimeouts(const std::string&, int&, int&, std::string&, bool = true); - Ice::StringSeq getAllServers(const std::string& = std::string()); - Ice::StringSeq getAllNodeServers(const std::string&); + AllocatableObjectCache& getAllocatableObjectCache(); + AllocatableObjectEntryPtr getAllocatableObject(const Ice::Identity&) const; bool setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&); Ice::ObjectPrx getAdapterDirectProxy(const std::string&); @@ -114,17 +99,13 @@ public: AdapterInfoSeq getAdapterInfo(const std::string&); Ice::StringSeq getAllAdapters(const std::string& = std::string()); - void addObject(const ObjectInfo&, bool = false); + void addObject(const ObjectInfo&); + void addOrUpdateObject(const ObjectInfo&); void removeObject(const Ice::Identity&); void updateObject(const Ice::ObjectPrx&); - int addOrUpdateObjectsInDatabase(const ObjectInfoSeq&); void removeObjectsInDatabase(const ObjectInfoSeq&); - void allocateObject(const Ice::Identity&, const ObjectAllocationRequestPtr&); - void allocateObjectByType(const std::string&, const ObjectAllocationRequestPtr&); - void releaseObject(const Ice::Identity&, const SessionIPtr&); - Ice::ObjectPrx getObjectProxy(const Ice::Identity&); Ice::ObjectPrx getObjectByType(const std::string&); Ice::ObjectPrx getObjectByTypeOnLeastLoadedNode(const std::string&, LoadSample); @@ -198,7 +179,7 @@ private: int _replicaApplicationSerial; int _adapterSerial; int _objectSerial; - std::map<std::string, std::vector<AMD_NodeSession_waitForApplicationReplicationPtr> > _updating; + std::map<std::string, std::vector<AMD_NodeSession_waitForApplicationUpdatePtr> > _updating; }; typedef IceUtil::Handle<Database> DatabasePtr; diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index bb915f43924..f4f57a0d702 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -332,10 +332,14 @@ interface NodeSession /** * - * Wait for the replication of the given application to be done. + * Wait for the application update to complete (the application is + * completely updated once all the registry replicas have been + * updated). This is used by the node to ensure that before to + * start a server all the replicas have the up-to-date descriptor + * of the server. * **/ - ["amd", "ami", "cpp:const"] void waitForApplicationReplication(string application, int revision); + ["amd", "ami", "cpp:const"] void waitForApplicationUpdate(string application, int revision); /** * diff --git a/cpp/src/IceGrid/LocatorRegistryI.cpp b/cpp/src/IceGrid/LocatorRegistryI.cpp index 38b9c005255..12b0c44f8ce 100644 --- a/cpp/src/IceGrid/LocatorRegistryI.cpp +++ b/cpp/src/IceGrid/LocatorRegistryI.cpp @@ -415,7 +415,7 @@ LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_set // the server is released during the server startup. // AMI_Server_setProcessPtr amiCB = new AMI_Server_setProcessI(cb); - _database->getServer(id, false)->setProcess_async(amiCB, proxy); + _database->getServer(id)->getProxy(false)->setProcess_async(amiCB, proxy); const TraceLevelsPtr traceLevels = _database->getTraceLevels(); if(traceLevels->locator > 1) diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index a43e3335798..c765115d331 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -549,7 +549,7 @@ NodeEntry::__decRef() if(_ref == 1) { - doRemove = _servers.empty() && !_session && _descriptors.empty(); + doRemove = canRemove(); } else if(_ref == 0) { diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index 8df3407c322..fa3f29903d4 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -33,7 +33,12 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database, __setNoDelete(true); try { - _database->addNode(name, this); + _database->getNode(name, true)->setSession(this); + + ObjectInfo info; + info.type = Node::ice_staticId(); + info.proxy = _node; + _database->addInternalObject(info, true); // Add or update previous node proxy. } catch(...) { @@ -72,15 +77,7 @@ NodeSessionI::getTimeout(const Ice::Current& current) const NodeObserverPrx NodeSessionI::getObserver(const Ice::Current& current) const { - NodeObserverTopicPtr topic = NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName)); - if(topic) - { - return topic->getPublisher(); - } - else - { - return 0; - } + return NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->getPublisher(); } void @@ -89,12 +86,12 @@ NodeSessionI::loadServers(const Ice::Current& current) const // // Get the server proxies to load them on the node. // - Ice::StringSeq servers = _database->getAllNodeServers(_name); + Ice::StringSeq servers = _database->getNode(_name)->getServers(); for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) { try { - _database->loadServer(*p); + _database->getServer(*p)->load(); } catch(const Ice::UserException&) { @@ -106,21 +103,23 @@ NodeSessionI::loadServers(const Ice::Current& current) const Ice::StringSeq NodeSessionI::getServers(const Ice::Current& current) const { - return _database->getAllNodeServers(_name); + return _database->getNode(_name)->getServers(); } void -NodeSessionI::waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr& cb, - const std::string& application, - int revision, - const Ice::Current&) const +NodeSessionI::waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr& cb, + const std::string& application, + int revision, + const Ice::Current&) const { - _database->waitForApplicationReplication(cb, application, revision); + _database->waitForApplicationUpdate(cb, application, revision); } void NodeSessionI::destroy(const Ice::Current& current) { + const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp + { Lock sync(*this); if(_destroy) @@ -130,12 +129,12 @@ NodeSessionI::destroy(const Ice::Current& current) _destroy = true; } - Ice::StringSeq servers = _database->getAllNodeServers(_name); + Ice::StringSeq servers = _database->getNode(_name)->getServers(); for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) { try { - _database->unloadServer(*p); + _database->getServer(*p)->unload(); } catch(const Ice::UserException&) { @@ -143,9 +142,29 @@ NodeSessionI::destroy(const Ice::Current& current) } } - _database->removeNode(_name, this, !current.adapter); + // + // If the registry isn't being shutdown we remove the node + // internal proxy from the database. + // + if(!shutdown) + { + _database->removeInternalObject(_node->ice_getIdentity()); + } + + // + // Next we notify the observer. + // + NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_name); + + + // + // Finally, we clear the session, this must be done last. As soon + // as the node entry session is set to 0 another session might be + // created. + // + _database->getNode(_name)->setSession(0); - if(current.adapter) + if(!shutdown) { try { diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index 9c89c065db8..eeb66cf5bac 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -32,8 +32,8 @@ public: virtual NodeObserverPrx getObserver(const Ice::Current&) const; virtual void loadServers(const Ice::Current&) const; virtual Ice::StringSeq getServers(const Ice::Current&) const; - virtual void waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr&, - const std::string&, int, const Ice::Current&) const; + virtual void waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr&, + const std::string&, int, const Ice::Current&) const; virtual void destroy(const Ice::Current&); const NodePrx& getNode() const; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 1e4bf5bf07d..ad9c6ff96a7 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -457,7 +457,7 @@ RegistryI::setupInternalRegistry(const Ice::ObjectAdapterPtr& registryAdapter) _wellKnownObjects->add(proxy, InternalRegistry::ice_staticId()); InternalRegistryPrx registry = InternalRegistryPrx::uncheckedCast(proxy); - _database->setInternalRegistry(registry); + _database->getReplicaCache().setInternalRegistry(registry); return registry; } @@ -1225,7 +1225,7 @@ RegistryI::registerNodes(const InternalRegistryPrx& internalRegistry, const Node assert((*p)->ice_getIdentity().name.find(prefix) != string::npos); try { - _database->setNodeProxy((*p)->ice_getIdentity().name.substr(prefix.size()), *p); + _database->getNode((*p)->ice_getIdentity().name.substr(prefix.size()))->setProxy(*p); } catch(const NodeNotExistException&) { diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index c04f680deba..cb4ace333da 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -104,7 +104,7 @@ ReplicaCache::remove(const string& name, bool shutdown) { try { - _nodes->replicaRemoved(entry->getSession()->getInternalRegistry()); + _nodes->replicaRemoved(entry->getProxy()); } catch(const Ice::ConnectionRefusedException&) { diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp index f558e3339d2..43d68dd200f 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.cpp +++ b/cpp/src/IceGrid/ReplicaSessionI.cpp @@ -16,6 +16,17 @@ using namespace std; using namespace IceGrid; +namespace IceGrid +{ + +static bool +operator==(const ObjectInfo& info, const Ice::Identity& id) +{ + return info.proxy->ice_getIdentity() == id; +} + +} + ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database, const WellKnownObjectsManagerPtr& wellKnownObjects, const string& name, @@ -35,7 +46,9 @@ ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database, __setNoDelete(true); try { - _database->addReplica(name, this); + _database->getReplicaCache().add(name, this); + ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName); + RegistryObserverTopicPtr::dynamicCast(obsv)->registryUp(_info); } catch(...) { @@ -130,15 +143,20 @@ ReplicaSessionI::setAdapterDirectProxy(const string& adapterId, } void -ReplicaSessionI::receivedUpdate(TopicName topic, int serial, const string& failure, const Ice::Current&) +ReplicaSessionI::receivedUpdate(TopicName topicName, int serial, const string& failure, const Ice::Current&) { - _database->replicaReceivedUpdate(_name, topic, serial, failure); + ObserverTopicPtr topic = _database->getObserverTopic(topicName); + if(topic) + { + topic->receivedUpdate(_name, serial, failure); + } } void ReplicaSessionI::destroy(const Ice::Current& current) { - bool shutdown = !current.adapter; + const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp + { Lock sync(*this); if(_destroy) @@ -157,14 +175,16 @@ ReplicaSessionI::destroy(const Ice::Current& current) if(!_replicaWellKnownObjects.empty()) { - _database->removeObjectsInDatabase(_replicaWellKnownObjects); - if(shutdown) + if(shutdown) // Don't remove the replica proxy from the database if the registry is being shutdown. { - ObjectInfo info; - info.type = InternalRegistry::ice_staticId(); - info.proxy = _internalRegistry; - _database->addObject(info, true); + ObjectInfoSeq::iterator p = find(_replicaWellKnownObjects.begin(), _replicaWellKnownObjects.end(), + _internalRegistry->ice_getIdentity()); + if(p != _replicaWellKnownObjects.end()) + { + _replicaWellKnownObjects.erase(p); + } } + _database->removeObjectsInDatabase(_replicaWellKnownObjects); } if(!shutdown) @@ -172,7 +192,18 @@ ReplicaSessionI::destroy(const Ice::Current& current) _wellKnownObjects->updateReplicatedWellKnownObjects(); // No need to update these if we're shutting down. } - _database->removeReplica(_name, shutdown); + // + // Notify the observer that the registry is down. + // + ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName); + RegistryObserverTopicPtr::dynamicCast(obsv)->registryDown(_name); + + // + // Remove the replica from the cache. This must be done last. As + // soon as the replica is removed another session might be + // created. + // + _database->getReplicaCache().remove(_name, shutdown); if(current.adapter) { diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 6c6d3f91d92..cabd6ef264a 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -149,7 +149,7 @@ public: string failure; try { - _database->addObject(info, true); + _database->addOrUpdateObject(info); } catch(const ObjectExistsException& ex) { @@ -167,7 +167,14 @@ public: string failure; try { - _database->addObject(info, true); + _database->addOrUpdateObject(info); + } + catch(const ObjectExistsException& ex) + { + ostringstream os; + os << ex << ":\n"; + os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity()); + failure = os.str(); } catch(const DeploymentException& ex) { diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index 124359630d2..774b8b074f3 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -127,7 +127,7 @@ ServerCache::remove(const string& id, bool destroy) Lock sync(*this); ServerEntryPtr entry = getImpl(id); - ServerInfo info = entry->getServerInfo(); + ServerInfo info = entry->getInfo(); if(destroy) { entry->destroy(); @@ -296,7 +296,7 @@ ServerEntry::destroy() } ServerInfo -ServerEntry::getServerInfo(bool resolve) const +ServerEntry::getInfo(bool resolve) const { ServerInfo info; SessionIPtr session; @@ -336,6 +336,14 @@ ServerEntry::getId() const } ServerPrx +ServerEntry::getProxy(bool upToDate) +{ + int actTimeout, deactTimeout; + string node; + return getProxy(actTimeout, deactTimeout, node, upToDate); +} + +ServerPrx ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& node, bool upToDate) { { diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h index d056e43c852..892e89d140a 100644 --- a/cpp/src/IceGrid/ServerCache.h +++ b/cpp/src/IceGrid/ServerCache.h @@ -42,10 +42,11 @@ public: void update(const ServerInfo&); void destroy(); - ServerInfo getServerInfo(bool = false) const; + ServerInfo getInfo(bool = false) const; std::string getId() const; - ServerPrx getProxy(int&, int&, std::string&, bool); + ServerPrx getProxy(int&, int&, std::string&, bool = true); + ServerPrx getProxy(bool = true); AdapterPrx getAdapter(const std::string&, bool); float getLoad(LoadSample) const; diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index e64801fc89e..6ff039f625b 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -167,11 +167,11 @@ private: const TraceLevelsPtr _traceLevels; }; -class WaitForApplicationReplicationCB : public AMI_NodeSession_waitForApplicationReplication +class WaitForApplicationUpdateCB : public AMI_NodeSession_waitForApplicationUpdate { public: - WaitForApplicationReplicationCB(const ServerIPtr& server) : _server(server) + WaitForApplicationUpdateCB(const ServerIPtr& server) : _server(server) { } @@ -1352,8 +1352,8 @@ ServerI::activate() NodeSessionPrx session = _node->getMasterNodeSession(); if(session) { - AMI_NodeSession_waitForApplicationReplicationPtr cb = new WaitForApplicationReplicationCB(this); - _node->getMasterNodeSession()->waitForApplicationReplication_async(cb, info.uuid, info.revision); + AMI_NodeSession_waitForApplicationUpdatePtr cb = new WaitForApplicationUpdateCB(this); + _node->getMasterNodeSession()->waitForApplicationUpdate_async(cb, info.uuid, info.revision); return; } } diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp index 65ae312a1de..9f099595fa4 100644 --- a/cpp/src/IceGrid/SessionI.cpp +++ b/cpp/src/IceGrid/SessionI.cpp @@ -226,7 +226,7 @@ SessionI::allocateObjectById_async(const AMD_Session_allocateObjectByIdPtr& cb, const Ice::Identity& id, const Ice::Current&) { - _database->allocateObject(id, newAllocateObject(this, cb)); + _database->getAllocatableObject(id)->allocate(newAllocateObject(this, cb)); } void @@ -234,13 +234,13 @@ SessionI::allocateObjectByType_async(const AMD_Session_allocateObjectByTypePtr& const string& type, const Ice::Current&) { - _database->allocateObjectByType(type, newAllocateObject(this, cb)); + _database->getAllocatableObjectCache().allocateByType(type, newAllocateObject(this, cb)); } void SessionI::releaseObject(const Ice::Identity& id, const Ice::Current&) { - _database->releaseObject(id, this); + _database->getAllocatableObject(id)->release(this); } void diff --git a/cpp/src/IceGrid/WellKnownObjectsManager.cpp b/cpp/src/IceGrid/WellKnownObjectsManager.cpp index d09bdde2f65..e1bb5b9607a 100644 --- a/cpp/src/IceGrid/WellKnownObjectsManager.cpp +++ b/cpp/src/IceGrid/WellKnownObjectsManager.cpp @@ -93,7 +93,7 @@ WellKnownObjectsManager::updateReplicatedWellKnownObjects() Lock sync(*this); - Ice::ObjectPrx replicatedClientProxy = _database->getReplicatedEndpoints("Client", _endpoints["Client"]); + Ice::ObjectPrx replicatedClientProxy = _database->getReplicaCache().getEndpoints("Client", _endpoints["Client"]); id.name = "Query"; info.type = Query::ice_staticId(); |