summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-11-24 14:10:34 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-11-24 14:10:34 +0000
commitdf8bf3b419976462a82ea6cc5ba55513f50f7a54 (patch)
treeb3befacd3c9ac0b6ee28f9a1845134b5d292eee1 /cpp/src
parent*** empty log message *** (diff)
downloadice-df8bf3b419976462a82ea6cc5ba55513f50f7a54.tar.bz2
ice-df8bf3b419976462a82ea6cc5ba55513f50f7a54.tar.xz
ice-df8bf3b419976462a82ea6cc5ba55513f50f7a54.zip
Code cleanup
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/AdminI.cpp42
-rw-r--r--cpp/src/IceGrid/AdminSessionI.cpp28
-rw-r--r--cpp/src/IceGrid/Database.cpp250
-rw-r--r--cpp/src/IceGrid/Database.h49
-rw-r--r--cpp/src/IceGrid/Internal.ice8
-rw-r--r--cpp/src/IceGrid/LocatorRegistryI.cpp2
-rw-r--r--cpp/src/IceGrid/NodeCache.cpp2
-rw-r--r--cpp/src/IceGrid/NodeSessionI.cpp63
-rw-r--r--cpp/src/IceGrid/NodeSessionI.h4
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp4
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp2
-rw-r--r--cpp/src/IceGrid/ReplicaSessionI.cpp53
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp11
-rw-r--r--cpp/src/IceGrid/ServerCache.cpp12
-rw-r--r--cpp/src/IceGrid/ServerCache.h5
-rw-r--r--cpp/src/IceGrid/ServerI.cpp8
-rw-r--r--cpp/src/IceGrid/SessionI.cpp6
-rw-r--r--cpp/src/IceGrid/WellKnownObjectsManager.cpp2
18 files changed, 244 insertions, 307 deletions
diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp
index 7e3934a88d6..9d8e8032a09 100644
--- a/cpp/src/IceGrid/AdminI.cpp
+++ b/cpp/src/IceGrid/AdminI.cpp
@@ -35,7 +35,7 @@ public:
ServerProxyWrapper(const DatabasePtr& database, const string& id) : _id(id)
{
- _proxy = database->getServerWithTimeouts(id, _activationTimeout, _deactivationTimeout, _node);
+ _proxy = database->getServer(_id)->getProxy(_activationTimeout, _deactivationTimeout, _node);
}
void
@@ -372,8 +372,9 @@ AdminI::patchApplication_async(const AMD_Admin_patchApplicationPtr& amdCB,
AMI_Node_patchPtr cb = new PatchCB(aggregator, *p);
try
{
- Resolver resolve(_database->getNodeInfo(*p), _database->getCommunicator());
- _database->getNode(*p)->patch_async(cb, name, "", resolve(appDistrib), shutdown);
+ NodeEntryPtr node = _database->getNode(*p);
+ Resolver resolve(node->getInfo(), _database->getCommunicator());
+ node->getProxy()->patch_async(cb, name, "", resolve(appDistrib), shutdown);
}
catch(const Ice::Exception& ex)
{
@@ -451,7 +452,7 @@ AdminI::getAllApplicationNames(const Current&) const
ServerInfo
AdminI::getServerInfo(const string& id, const Current&) const
{
- return _database->getServerInfo(id, true);
+ return _database->getServer(id)->getInfo(true);
}
ServerState
@@ -521,7 +522,7 @@ void
AdminI::patchServer_async(const AMD_Admin_patchServerPtr& amdCB, const string& id, bool shutdown,
const Current& current)
{
- ServerInfo info = _database->getServerInfo(id);
+ ServerInfo info = _database->getServer(id)->getInfo();
ApplicationInfo appInfo = _database->getApplicationInfo(info.application);
ApplicationHelper helper(current.adapter->getCommunicator(), appInfo.descriptor);
DistributionDescriptor appDistrib;
@@ -540,8 +541,9 @@ AdminI::patchServer_async(const AMD_Admin_patchServerPtr& amdCB, const string& i
AMI_Node_patchPtr amiCB = new ServerPatchCB(amdCB, _traceLevels, id, *p);
try
{
- Resolver resolve(_database->getNodeInfo(*p), _database->getCommunicator());
- _database->getNode(*p)->patch_async(amiCB, info.application, id, resolve(appDistrib), shutdown);
+ NodeEntryPtr node = _database->getNode(*p);
+ Resolver resolve(node->getInfo(), _database->getCommunicator());
+ node->getProxy()->patch_async(amiCB, info.application, id, resolve(appDistrib), shutdown);
}
catch(const Ice::Exception& ex)
{
@@ -577,11 +579,10 @@ AdminI::writeMessage(const string& id, const string& message, Int fd, const Curr
}
}
-
StringSeq
AdminI::getAllServerIds(const Current&) const
{
- return _database->getAllServers();
+ return _database->getServerCache().getAll("");
}
void
@@ -716,7 +717,7 @@ AdminI::getAllObjectInfos(const string& expression, const Ice::Current&) const
NodeInfo
AdminI::getNodeInfo(const string& name, const Ice::Current&) const
{
- return _database->getNodeInfo(name);
+ return _database->getNode(name)->getInfo();
}
bool
@@ -724,7 +725,7 @@ AdminI::pingNode(const string& name, const Current&) const
{
try
{
- _database->getNode(name)->ice_ping();
+ _database->getNode(name)->getProxy()->ice_ping();
return true;
}
catch(const NodeUnreachableException&)
@@ -746,7 +747,7 @@ AdminI::getNodeLoad(const string& name, const Current&) const
{
try
{
- return _database->getNode(name)->getLoad();
+ return _database->getNode(name)->getProxy()->getLoad();
}
catch(const Ice::ObjectNotExistException&)
{
@@ -764,10 +765,9 @@ AdminI::getNodeLoad(const string& name, const Current&) const
void
AdminI::shutdownNode(const string& name, const Current&)
{
- NodePrx node = _database->getNode(name);
try
{
- node->shutdown();
+ _database->getNode(name)->getProxy()->shutdown();
}
catch(const Ice::ObjectNotExistException&)
{
@@ -784,10 +784,9 @@ AdminI::shutdownNode(const string& name, const Current&)
string
AdminI::getNodeHostname(const string& name, const Current&) const
{
- NodePrx node = _database->getNode(name);
try
{
- return node->getHostname();
+ return _database->getNode(name)->getInfo().hostname;
}
catch(const Ice::ObjectNotExistException&)
{
@@ -806,7 +805,7 @@ AdminI::getNodeHostname(const string& name, const Current&) const
StringSeq
AdminI::getAllNodeNames(const Current&) const
{
- return _database->getAllNodes();
+ return _database->getNodeCache().getAll("");
}
RegistryInfo
@@ -818,7 +817,7 @@ AdminI::getRegistryInfo(const string& name, const Ice::Current&) const
}
else
{
- return _database->getReplicaInfo(name);
+ return _database->getReplica(name)->getInfo();
}
}
@@ -832,7 +831,7 @@ AdminI::pingRegistry(const string& name, const Current&) const
try
{
- _database->getReplica(name)->ice_ping();
+ _database->getReplica(name)->getProxy()->ice_ping();
return true;
}
catch(const Ice::ObjectNotExistException&)
@@ -855,10 +854,9 @@ AdminI::shutdownRegistry(const string& name, const Current&)
return;
}
- InternalRegistryPrx registry = _database->getReplica(name);
try
{
- registry->shutdown();
+ _database->getReplica(name)->getProxy()->shutdown();
}
catch(const Ice::ObjectNotExistException&)
{
@@ -875,7 +873,7 @@ AdminI::shutdownRegistry(const string& name, const Current&)
StringSeq
AdminI::getAllRegistryNames(const Current&) const
{
- Ice::StringSeq replicas = _database->getAllReplicas();
+ Ice::StringSeq replicas = _database->getReplicaCache().getAll("");
replicas.push_back(_registry->getName());
return replicas;
}
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp
index db49207138d..e3b4b7a87de 100644
--- a/cpp/src/IceGrid/AdminSessionI.cpp
+++ b/cpp/src/IceGrid/AdminSessionI.cpp
@@ -195,38 +195,54 @@ AdminSessionI::getReplicaName(const Ice::Current& current) const
FileIteratorPrx
AdminSessionI::openServerStdOut(const std::string& id, const Ice::Current& current)
{
- return addFileIterator(_database->getServer(id), "stdout", current);
+ return addFileIterator(_database->getServer(id)->getProxy(), "stdout", current);
}
FileIteratorPrx
AdminSessionI::openServerStdErr(const std::string& id, const Ice::Current& current)
{
- return addFileIterator(_database->getServer(id), "stderr", current);
+ return addFileIterator(_database->getServer(id)->getProxy(), "stderr", current);
}
FileIteratorPrx
AdminSessionI::openNodeStdOut(const std::string& name, const Ice::Current& current)
{
- return addFileIterator(_database->getNode(name), "stdout", current);
+ return addFileIterator(_database->getNode(name)->getProxy(), "stdout", current);
}
FileIteratorPrx
AdminSessionI::openNodeStdErr(const std::string& name, const Ice::Current& current)
{
- return addFileIterator(_database->getNode(name), "stderr", current);
+ return addFileIterator(_database->getNode(name)->getProxy(), "stderr", current);
}
FileIteratorPrx
AdminSessionI::openRegistryStdOut(const std::string& name, const Ice::Current& current)
{
- FileReaderPrx reader = name == _replicaName ? _database->getInternalRegistry() : _database->getReplica(name);
+ FileReaderPrx reader;
+ if(name == _replicaName)
+ {
+ reader = _database->getReplicaCache().getInternalRegistry();
+ }
+ else
+ {
+ reader = _database->getReplica(name)->getProxy();
+ }
return addFileIterator(reader, "stdout", current);
}
FileIteratorPrx
AdminSessionI::openRegistryStdErr(const std::string& name, const Ice::Current& current)
{
- FileReaderPrx reader = name == _replicaName ? _database->getInternalRegistry() : _database->getReplica(name);
+ FileReaderPrx reader;
+ if(name == _replicaName)
+ {
+ reader = _database->getReplicaCache().getInternalRegistry();
+ }
+ else
+ {
+ reader = _database->getReplica(name)->getProxy();
+ }
return addFileIterator(reader, "stderr", current);
}
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index d9126730814..95e8d66e7a1 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -348,12 +348,6 @@ Database::syncObjects(const ObjectInfoSeq& objects)
txHolder.commit();
}
-Ice::ObjectPrx
-Database::getReplicatedEndpoints(const string& name, const Ice::ObjectPrx& proxy)
-{
- return _replicaCache.getEndpoints(name, proxy);
-}
-
void
Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
{
@@ -627,104 +621,13 @@ Database::getAllApplications(const string& expression)
return getMatchingKeys<StringApplicationInfoDict>(descriptors, expression);
}
-void
-Database::addNode(const string& name, const NodeSessionIPtr& session)
-{
- _nodeCache.get(name, true)->setSession(session);
-
- ObjectInfo info;
- info.type = Node::ice_staticId();
- info.proxy = session->getNode();
- addInternalObject(info, true);
-}
-
void
-Database::setNodeProxy(const string& name, const NodePrx& node)
-{
- _nodeCache.get(name)->setProxy(node);
-}
-
-NodePrx
-Database::getNode(const string& name) const
-{
- return _nodeCache.get(name)->getProxy();
-}
-
-NodeInfo
-Database::getNodeInfo(const string& name) const
-{
- return _nodeCache.get(name)->getInfo();
-}
-
-void
-Database::removeNode(const string& name, const NodeSessionIPtr& session, bool shutdown)
-{
- //
- // If the registry isn't being shutdown and this registry is the
- // master we remove the node well-known proxy from the object
- // adapter. Replicas will be notified through replication.
- //
- if(!shutdown)
- {
- removeInternalObject(session->getNode()->ice_getIdentity());
- }
-
- //
- // We must notify the observer first (there's an assert in the
- // observer to ensure that only nodes which are up are teared
- // down).
- //
- _nodeObserverTopic->nodeDown(name);
-
- //
- // Clear the node session. Once this is called, the node can
- // create a new session.
- //
- _nodeCache.get(name)->setSession(0);
-}
-
-Ice::StringSeq
-Database::getAllNodes(const string& expression)
-{
- return _nodeCache.getAll(expression);
-}
-
-void
-Database::addReplica(const string& name, const ReplicaSessionIPtr& session)
-{
- _replicaCache.add(name, session);
- _registryObserverTopic->registryUp(session->getInfo());
-}
-
-InternalRegistryPrx
-Database::getReplica(const string& name) const
-{
- return _replicaCache.get(name)->getProxy();
-}
-
-RegistryInfo
-Database::getReplicaInfo(const string& name) const
-{
- return _replicaCache.get(name)->getInfo();
-}
-
-void
-Database::replicaReceivedUpdate(const string& replica, TopicName name, int serial, const string& failure)
-{
- ObserverTopicPtr topic = getObserverTopic(name);
- if(topic)
- {
- topic->receivedUpdate(replica, serial, failure);
- }
-}
-
-void
-Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr& cb,
- const string& application,
- int revision)
+Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
+ const string& application,
+ int revision)
{
Lock sync(*this);
- map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(application);
+ map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(application);
if(p != _updating.end())
{
p->second.push_back(cb);
@@ -735,73 +638,52 @@ Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplication
}
}
-void
-Database::removeReplica(const string& name, bool shutdown)
+NodeCache&
+Database::getNodeCache()
{
- _registryObserverTopic->registryDown(name);
- _replicaCache.remove(name, shutdown);
+ return _nodeCache;
}
-Ice::StringSeq
-Database::getAllReplicas(const string& expression)
+NodeEntryPtr
+Database::getNode(const string& name, bool create) const
{
- return _replicaCache.getAll(expression);
+ return _nodeCache.get(name, create);
}
-void
-Database::setInternalRegistry(const InternalRegistryPrx& proxy)
+ReplicaCache&
+Database::getReplicaCache()
{
- _replicaCache.setInternalRegistry(proxy);
+ return _replicaCache;
}
-InternalRegistryPrx
-Database::getInternalRegistry() const
-{
- return _replicaCache.getInternalRegistry();
-}
-
-void
-Database::loadServer(const std::string& id)
-{
- _serverCache.get(id)->load();
-}
-
-void
-Database::unloadServer(const std::string& id)
+ReplicaEntryPtr
+Database::getReplica(const string& name) const
{
- _serverCache.get(id)->unload();
+ return _replicaCache.get(name);
}
-ServerInfo
-Database::getServerInfo(const std::string& id, bool resolve)
+ServerCache&
+Database::getServerCache()
{
- return _serverCache.get(id)->getServerInfo(resolve);
+ return _serverCache;
}
-ServerPrx
-Database::getServer(const string& id, bool upToDate)
+ServerEntryPtr
+Database::getServer(const string& id) const
{
- int activationTimeout, deactivationTimeout;
- string node;
- return getServerWithTimeouts(id, activationTimeout, deactivationTimeout, node, upToDate);
+ return _serverCache.get(id);
}
-ServerPrx
-Database::getServerWithTimeouts(const string& id, int& actTimeout, int& deactTimeout, string& node, bool upToDate)
+AllocatableObjectCache&
+Database::getAllocatableObjectCache()
{
- return _serverCache.get(id)->getProxy(actTimeout, deactTimeout, node, upToDate);
+ return _allocatableObjectCache;
}
-Ice::StringSeq
-Database::getAllServers(const string& expression)
+AllocatableObjectEntryPtr
+Database::getAllocatableObject(const Ice::Identity& id) const
{
- return _serverCache.getAll(expression);
-}
-
-Ice::StringSeq
-Database::getAllNodeServers(const string& node)
-{
- return _nodeCache.get(node)->getServers();
+ return _allocatableObjectCache.get(id);
}
bool
@@ -1096,7 +978,7 @@ Database::getAllAdapters(const string& expression)
}
void
-Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
+Database::addObject(const ObjectInfo& info)
{
Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
@@ -1106,40 +988,48 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
throw ObjectExistsException(id);
}
- bool update = false;
if(_objects.find(id) != _objects.end())
{
- if(!replaceIfExistsInDatabase)
- {
- throw ObjectExistsException(id);
- }
- else
- {
- update = true;
- }
+ throw ObjectExistsException(id);
}
_objects.put(IdentityObjectInfoDict::value_type(id, info));
- if(!update)
+ _objectObserverTopic->objectAdded(info);
+
+ if(_traceLevels->object > 0)
{
- _objectObserverTopic->objectAdded(info);
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "added object `" << _communicator->identityToString(id) << "'";
}
- else
+}
+
+void
+Database::addOrUpdateObject(const ObjectInfo& info)
+{
+ Lock sync(*this);
+ const Ice::Identity id = info.proxy->ice_getIdentity();
+
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ bool update = _objects.find(id) != _objects.end();
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ if(update)
{
_objectObserverTopic->objectUpdated(info);
}
-
+ else
+ {
+ _objectObserverTopic->objectAdded(info);
+ }
+
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- if(!update)
- {
- out << "added object `" << _communicator->identityToString(id) << "'";
- }
- else
- {
- out << "updated object `" << _communicator->identityToString(id) << "'";
- }
+ out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'";
}
}
@@ -1241,24 +1131,6 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
txHolder.commit();
}
-void
-Database::allocateObject(const Ice::Identity& id, const ObjectAllocationRequestPtr& request)
-{
- _allocatableObjectCache.get(id)->allocate(request);
-}
-
-void
-Database::allocateObjectByType(const string& type, const ObjectAllocationRequestPtr& request)
-{
- _allocatableObjectCache.allocateByType(type, request);
-}
-
-void
-Database::releaseObject(const Ice::Identity& id, const SessionIPtr& session)
-{
- _allocatableObjectCache.get(id)->release(session);
-}
-
Ice::ObjectPrx
Database::getObjectProxy(const Ice::Identity& id)
{
@@ -1752,16 +1624,16 @@ void
Database::startUpdating(const string& name)
{
// Must be called within the synchronization.
- _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationReplicationPtr>()));
+ _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationUpdatePtr>()));
}
void
Database::finishUpdating(const string& name)
{
// Must be called within the synchronization.
- map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(name);
+ map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(name);
assert(p != _updating.end());
- for(vector<AMD_NodeSession_waitForApplicationReplicationPtr>::const_iterator q = p->second.begin();
+ for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->second.begin();
q != p->second.end(); ++q)
{
(*q)->ice_response();
diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h
index 08825a5d98a..0cc38449a90 100644
--- a/cpp/src/IceGrid/Database.h
+++ b/cpp/src/IceGrid/Database.h
@@ -69,42 +69,27 @@ public:
void syncApplications(const ApplicationInfoSeq&);
void syncAdapters(const AdapterInfoSeq&);
void syncObjects(const ObjectInfoSeq&);
- Ice::ObjectPrx getReplicatedEndpoints(const std::string&, const Ice::ObjectPrx&);
void addApplication(const ApplicationInfo&, AdminSessionI* = 0);
void updateApplication(const ApplicationUpdateInfo&, AdminSessionI* = 0);
void syncApplicationDescriptor(const ApplicationDescriptor&, AdminSessionI* = 0);
void instantiateServer(const std::string&, const std::string&, const ServerInstanceDescriptor&, AdminSessionI* =0);
void removeApplication(const std::string&, AdminSessionI* = 0);
-
ApplicationInfo getApplicationInfo(const std::string&);
Ice::StringSeq getAllApplications(const std::string& = std::string());
+ void waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr&, const std::string&, int);
+
+ NodeCache& getNodeCache();
+ NodeEntryPtr getNode(const std::string&, bool = false) const;
+
+ ReplicaCache& getReplicaCache();
+ ReplicaEntryPtr getReplica(const std::string&) const;
+
+ ServerCache& getServerCache();
+ ServerEntryPtr getServer(const std::string&) const;
- void addNode(const std::string&, const NodeSessionIPtr&);
- void setNodeProxy(const std::string&, const NodePrx&);
- NodePrx getNode(const std::string&) const;
- NodeInfo getNodeInfo(const std::string&) const;
- void removeNode(const std::string&, const NodeSessionIPtr&, bool);
- Ice::StringSeq getAllNodes(const std::string& = std::string());
-
- void addReplica(const std::string&, const ReplicaSessionIPtr&);
- RegistryInfo getReplicaInfo(const std::string&) const;
- InternalRegistryPrx getReplica(const std::string&) const;
- void replicaReceivedUpdate(const std::string&, TopicName, int, const std::string&);
- void waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr&, const std::string&,
- int);
- void removeReplica(const std::string&, bool);
- Ice::StringSeq getAllReplicas(const std::string& = std::string());
- void setInternalRegistry(const InternalRegistryPrx&);
- InternalRegistryPrx getInternalRegistry() const;
-
- ServerInfo getServerInfo(const std::string&, bool = false);
- ServerPrx getServer(const std::string&, bool = true);
- void loadServer(const std::string&);
- void unloadServer(const std::string&);
- ServerPrx getServerWithTimeouts(const std::string&, int&, int&, std::string&, bool = true);
- Ice::StringSeq getAllServers(const std::string& = std::string());
- Ice::StringSeq getAllNodeServers(const std::string&);
+ AllocatableObjectCache& getAllocatableObjectCache();
+ AllocatableObjectEntryPtr getAllocatableObject(const Ice::Identity&) const;
bool setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&);
Ice::ObjectPrx getAdapterDirectProxy(const std::string&);
@@ -114,17 +99,13 @@ public:
AdapterInfoSeq getAdapterInfo(const std::string&);
Ice::StringSeq getAllAdapters(const std::string& = std::string());
- void addObject(const ObjectInfo&, bool = false);
+ void addObject(const ObjectInfo&);
+ void addOrUpdateObject(const ObjectInfo&);
void removeObject(const Ice::Identity&);
void updateObject(const Ice::ObjectPrx&);
-
int addOrUpdateObjectsInDatabase(const ObjectInfoSeq&);
void removeObjectsInDatabase(const ObjectInfoSeq&);
- void allocateObject(const Ice::Identity&, const ObjectAllocationRequestPtr&);
- void allocateObjectByType(const std::string&, const ObjectAllocationRequestPtr&);
- void releaseObject(const Ice::Identity&, const SessionIPtr&);
-
Ice::ObjectPrx getObjectProxy(const Ice::Identity&);
Ice::ObjectPrx getObjectByType(const std::string&);
Ice::ObjectPrx getObjectByTypeOnLeastLoadedNode(const std::string&, LoadSample);
@@ -198,7 +179,7 @@ private:
int _replicaApplicationSerial;
int _adapterSerial;
int _objectSerial;
- std::map<std::string, std::vector<AMD_NodeSession_waitForApplicationReplicationPtr> > _updating;
+ std::map<std::string, std::vector<AMD_NodeSession_waitForApplicationUpdatePtr> > _updating;
};
typedef IceUtil::Handle<Database> DatabasePtr;
diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice
index bb915f43924..f4f57a0d702 100644
--- a/cpp/src/IceGrid/Internal.ice
+++ b/cpp/src/IceGrid/Internal.ice
@@ -332,10 +332,14 @@ interface NodeSession
/**
*
- * Wait for the replication of the given application to be done.
+ * Wait for the application update to complete (the application is
+ * completely updated once all the registry replicas have been
+ * updated). This is used by the node to ensure that before to
+ * start a server all the replicas have the up-to-date descriptor
+ * of the server.
*
**/
- ["amd", "ami", "cpp:const"] void waitForApplicationReplication(string application, int revision);
+ ["amd", "ami", "cpp:const"] void waitForApplicationUpdate(string application, int revision);
/**
*
diff --git a/cpp/src/IceGrid/LocatorRegistryI.cpp b/cpp/src/IceGrid/LocatorRegistryI.cpp
index 38b9c005255..12b0c44f8ce 100644
--- a/cpp/src/IceGrid/LocatorRegistryI.cpp
+++ b/cpp/src/IceGrid/LocatorRegistryI.cpp
@@ -415,7 +415,7 @@ LocatorRegistryI::setServerProcessProxy_async(const Ice::AMD_LocatorRegistry_set
// the server is released during the server startup.
//
AMI_Server_setProcessPtr amiCB = new AMI_Server_setProcessI(cb);
- _database->getServer(id, false)->setProcess_async(amiCB, proxy);
+ _database->getServer(id)->getProxy(false)->setProcess_async(amiCB, proxy);
const TraceLevelsPtr traceLevels = _database->getTraceLevels();
if(traceLevels->locator > 1)
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp
index a43e3335798..c765115d331 100644
--- a/cpp/src/IceGrid/NodeCache.cpp
+++ b/cpp/src/IceGrid/NodeCache.cpp
@@ -549,7 +549,7 @@ NodeEntry::__decRef()
if(_ref == 1)
{
- doRemove = _servers.empty() && !_session && _descriptors.empty();
+ doRemove = canRemove();
}
else if(_ref == 0)
{
diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp
index 8df3407c322..fa3f29903d4 100644
--- a/cpp/src/IceGrid/NodeSessionI.cpp
+++ b/cpp/src/IceGrid/NodeSessionI.cpp
@@ -33,7 +33,12 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database,
__setNoDelete(true);
try
{
- _database->addNode(name, this);
+ _database->getNode(name, true)->setSession(this);
+
+ ObjectInfo info;
+ info.type = Node::ice_staticId();
+ info.proxy = _node;
+ _database->addInternalObject(info, true); // Add or update previous node proxy.
}
catch(...)
{
@@ -72,15 +77,7 @@ NodeSessionI::getTimeout(const Ice::Current& current) const
NodeObserverPrx
NodeSessionI::getObserver(const Ice::Current& current) const
{
- NodeObserverTopicPtr topic = NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName));
- if(topic)
- {
- return topic->getPublisher();
- }
- else
- {
- return 0;
- }
+ return NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->getPublisher();
}
void
@@ -89,12 +86,12 @@ NodeSessionI::loadServers(const Ice::Current& current) const
//
// Get the server proxies to load them on the node.
//
- Ice::StringSeq servers = _database->getAllNodeServers(_name);
+ Ice::StringSeq servers = _database->getNode(_name)->getServers();
for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
try
{
- _database->loadServer(*p);
+ _database->getServer(*p)->load();
}
catch(const Ice::UserException&)
{
@@ -106,21 +103,23 @@ NodeSessionI::loadServers(const Ice::Current& current) const
Ice::StringSeq
NodeSessionI::getServers(const Ice::Current& current) const
{
- return _database->getAllNodeServers(_name);
+ return _database->getNode(_name)->getServers();
}
void
-NodeSessionI::waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr& cb,
- const std::string& application,
- int revision,
- const Ice::Current&) const
+NodeSessionI::waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
+ const std::string& application,
+ int revision,
+ const Ice::Current&) const
{
- _database->waitForApplicationReplication(cb, application, revision);
+ _database->waitForApplicationUpdate(cb, application, revision);
}
void
NodeSessionI::destroy(const Ice::Current& current)
{
+ const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp
+
{
Lock sync(*this);
if(_destroy)
@@ -130,12 +129,12 @@ NodeSessionI::destroy(const Ice::Current& current)
_destroy = true;
}
- Ice::StringSeq servers = _database->getAllNodeServers(_name);
+ Ice::StringSeq servers = _database->getNode(_name)->getServers();
for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
try
{
- _database->unloadServer(*p);
+ _database->getServer(*p)->unload();
}
catch(const Ice::UserException&)
{
@@ -143,9 +142,29 @@ NodeSessionI::destroy(const Ice::Current& current)
}
}
- _database->removeNode(_name, this, !current.adapter);
+ //
+ // If the registry isn't being shutdown we remove the node
+ // internal proxy from the database.
+ //
+ if(!shutdown)
+ {
+ _database->removeInternalObject(_node->ice_getIdentity());
+ }
+
+ //
+ // Next we notify the observer.
+ //
+ NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_name);
+
+
+ //
+ // Finally, we clear the session, this must be done last. As soon
+ // as the node entry session is set to 0 another session might be
+ // created.
+ //
+ _database->getNode(_name)->setSession(0);
- if(current.adapter)
+ if(!shutdown)
{
try
{
diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h
index 9c89c065db8..eeb66cf5bac 100644
--- a/cpp/src/IceGrid/NodeSessionI.h
+++ b/cpp/src/IceGrid/NodeSessionI.h
@@ -32,8 +32,8 @@ public:
virtual NodeObserverPrx getObserver(const Ice::Current&) const;
virtual void loadServers(const Ice::Current&) const;
virtual Ice::StringSeq getServers(const Ice::Current&) const;
- virtual void waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr&,
- const std::string&, int, const Ice::Current&) const;
+ virtual void waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr&,
+ const std::string&, int, const Ice::Current&) const;
virtual void destroy(const Ice::Current&);
const NodePrx& getNode() const;
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index 1e4bf5bf07d..ad9c6ff96a7 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -457,7 +457,7 @@ RegistryI::setupInternalRegistry(const Ice::ObjectAdapterPtr& registryAdapter)
_wellKnownObjects->add(proxy, InternalRegistry::ice_staticId());
InternalRegistryPrx registry = InternalRegistryPrx::uncheckedCast(proxy);
- _database->setInternalRegistry(registry);
+ _database->getReplicaCache().setInternalRegistry(registry);
return registry;
}
@@ -1225,7 +1225,7 @@ RegistryI::registerNodes(const InternalRegistryPrx& internalRegistry, const Node
assert((*p)->ice_getIdentity().name.find(prefix) != string::npos);
try
{
- _database->setNodeProxy((*p)->ice_getIdentity().name.substr(prefix.size()), *p);
+ _database->getNode((*p)->ice_getIdentity().name.substr(prefix.size()))->setProxy(*p);
}
catch(const NodeNotExistException&)
{
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index c04f680deba..cb4ace333da 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -104,7 +104,7 @@ ReplicaCache::remove(const string& name, bool shutdown)
{
try
{
- _nodes->replicaRemoved(entry->getSession()->getInternalRegistry());
+ _nodes->replicaRemoved(entry->getProxy());
}
catch(const Ice::ConnectionRefusedException&)
{
diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp
index f558e3339d2..43d68dd200f 100644
--- a/cpp/src/IceGrid/ReplicaSessionI.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionI.cpp
@@ -16,6 +16,17 @@
using namespace std;
using namespace IceGrid;
+namespace IceGrid
+{
+
+static bool
+operator==(const ObjectInfo& info, const Ice::Identity& id)
+{
+ return info.proxy->ice_getIdentity() == id;
+}
+
+}
+
ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database,
const WellKnownObjectsManagerPtr& wellKnownObjects,
const string& name,
@@ -35,7 +46,9 @@ ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database,
__setNoDelete(true);
try
{
- _database->addReplica(name, this);
+ _database->getReplicaCache().add(name, this);
+ ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName);
+ RegistryObserverTopicPtr::dynamicCast(obsv)->registryUp(_info);
}
catch(...)
{
@@ -130,15 +143,20 @@ ReplicaSessionI::setAdapterDirectProxy(const string& adapterId,
}
void
-ReplicaSessionI::receivedUpdate(TopicName topic, int serial, const string& failure, const Ice::Current&)
+ReplicaSessionI::receivedUpdate(TopicName topicName, int serial, const string& failure, const Ice::Current&)
{
- _database->replicaReceivedUpdate(_name, topic, serial, failure);
+ ObserverTopicPtr topic = _database->getObserverTopic(topicName);
+ if(topic)
+ {
+ topic->receivedUpdate(_name, serial, failure);
+ }
}
void
ReplicaSessionI::destroy(const Ice::Current& current)
{
- bool shutdown = !current.adapter;
+ const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp
+
{
Lock sync(*this);
if(_destroy)
@@ -157,14 +175,16 @@ ReplicaSessionI::destroy(const Ice::Current& current)
if(!_replicaWellKnownObjects.empty())
{
- _database->removeObjectsInDatabase(_replicaWellKnownObjects);
- if(shutdown)
+ if(shutdown) // Don't remove the replica proxy from the database if the registry is being shutdown.
{
- ObjectInfo info;
- info.type = InternalRegistry::ice_staticId();
- info.proxy = _internalRegistry;
- _database->addObject(info, true);
+ ObjectInfoSeq::iterator p = find(_replicaWellKnownObjects.begin(), _replicaWellKnownObjects.end(),
+ _internalRegistry->ice_getIdentity());
+ if(p != _replicaWellKnownObjects.end())
+ {
+ _replicaWellKnownObjects.erase(p);
+ }
}
+ _database->removeObjectsInDatabase(_replicaWellKnownObjects);
}
if(!shutdown)
@@ -172,7 +192,18 @@ ReplicaSessionI::destroy(const Ice::Current& current)
_wellKnownObjects->updateReplicatedWellKnownObjects(); // No need to update these if we're shutting down.
}
- _database->removeReplica(_name, shutdown);
+ //
+ // Notify the observer that the registry is down.
+ //
+ ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName);
+ RegistryObserverTopicPtr::dynamicCast(obsv)->registryDown(_name);
+
+ //
+ // Remove the replica from the cache. This must be done last. As
+ // soon as the replica is removed another session might be
+ // created.
+ //
+ _database->getReplicaCache().remove(_name, shutdown);
if(current.adapter)
{
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index 6c6d3f91d92..cabd6ef264a 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -149,7 +149,7 @@ public:
string failure;
try
{
- _database->addObject(info, true);
+ _database->addOrUpdateObject(info);
}
catch(const ObjectExistsException& ex)
{
@@ -167,7 +167,14 @@ public:
string failure;
try
{
- _database->addObject(info, true);
+ _database->addOrUpdateObject(info);
+ }
+ catch(const ObjectExistsException& ex)
+ {
+ ostringstream os;
+ os << ex << ":\n";
+ os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity());
+ failure = os.str();
}
catch(const DeploymentException& ex)
{
diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp
index 124359630d2..774b8b074f3 100644
--- a/cpp/src/IceGrid/ServerCache.cpp
+++ b/cpp/src/IceGrid/ServerCache.cpp
@@ -127,7 +127,7 @@ ServerCache::remove(const string& id, bool destroy)
Lock sync(*this);
ServerEntryPtr entry = getImpl(id);
- ServerInfo info = entry->getServerInfo();
+ ServerInfo info = entry->getInfo();
if(destroy)
{
entry->destroy();
@@ -296,7 +296,7 @@ ServerEntry::destroy()
}
ServerInfo
-ServerEntry::getServerInfo(bool resolve) const
+ServerEntry::getInfo(bool resolve) const
{
ServerInfo info;
SessionIPtr session;
@@ -336,6 +336,14 @@ ServerEntry::getId() const
}
ServerPrx
+ServerEntry::getProxy(bool upToDate)
+{
+ int actTimeout, deactTimeout;
+ string node;
+ return getProxy(actTimeout, deactTimeout, node, upToDate);
+}
+
+ServerPrx
ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& node, bool upToDate)
{
{
diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h
index d056e43c852..892e89d140a 100644
--- a/cpp/src/IceGrid/ServerCache.h
+++ b/cpp/src/IceGrid/ServerCache.h
@@ -42,10 +42,11 @@ public:
void update(const ServerInfo&);
void destroy();
- ServerInfo getServerInfo(bool = false) const;
+ ServerInfo getInfo(bool = false) const;
std::string getId() const;
- ServerPrx getProxy(int&, int&, std::string&, bool);
+ ServerPrx getProxy(int&, int&, std::string&, bool = true);
+ ServerPrx getProxy(bool = true);
AdapterPrx getAdapter(const std::string&, bool);
float getLoad(LoadSample) const;
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp
index e64801fc89e..6ff039f625b 100644
--- a/cpp/src/IceGrid/ServerI.cpp
+++ b/cpp/src/IceGrid/ServerI.cpp
@@ -167,11 +167,11 @@ private:
const TraceLevelsPtr _traceLevels;
};
-class WaitForApplicationReplicationCB : public AMI_NodeSession_waitForApplicationReplication
+class WaitForApplicationUpdateCB : public AMI_NodeSession_waitForApplicationUpdate
{
public:
- WaitForApplicationReplicationCB(const ServerIPtr& server) : _server(server)
+ WaitForApplicationUpdateCB(const ServerIPtr& server) : _server(server)
{
}
@@ -1352,8 +1352,8 @@ ServerI::activate()
NodeSessionPrx session = _node->getMasterNodeSession();
if(session)
{
- AMI_NodeSession_waitForApplicationReplicationPtr cb = new WaitForApplicationReplicationCB(this);
- _node->getMasterNodeSession()->waitForApplicationReplication_async(cb, info.uuid, info.revision);
+ AMI_NodeSession_waitForApplicationUpdatePtr cb = new WaitForApplicationUpdateCB(this);
+ _node->getMasterNodeSession()->waitForApplicationUpdate_async(cb, info.uuid, info.revision);
return;
}
}
diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp
index 65ae312a1de..9f099595fa4 100644
--- a/cpp/src/IceGrid/SessionI.cpp
+++ b/cpp/src/IceGrid/SessionI.cpp
@@ -226,7 +226,7 @@ SessionI::allocateObjectById_async(const AMD_Session_allocateObjectByIdPtr& cb,
const Ice::Identity& id,
const Ice::Current&)
{
- _database->allocateObject(id, newAllocateObject(this, cb));
+ _database->getAllocatableObject(id)->allocate(newAllocateObject(this, cb));
}
void
@@ -234,13 +234,13 @@ SessionI::allocateObjectByType_async(const AMD_Session_allocateObjectByTypePtr&
const string& type,
const Ice::Current&)
{
- _database->allocateObjectByType(type, newAllocateObject(this, cb));
+ _database->getAllocatableObjectCache().allocateByType(type, newAllocateObject(this, cb));
}
void
SessionI::releaseObject(const Ice::Identity& id, const Ice::Current&)
{
- _database->releaseObject(id, this);
+ _database->getAllocatableObject(id)->release(this);
}
void
diff --git a/cpp/src/IceGrid/WellKnownObjectsManager.cpp b/cpp/src/IceGrid/WellKnownObjectsManager.cpp
index d09bdde2f65..e1bb5b9607a 100644
--- a/cpp/src/IceGrid/WellKnownObjectsManager.cpp
+++ b/cpp/src/IceGrid/WellKnownObjectsManager.cpp
@@ -93,7 +93,7 @@ WellKnownObjectsManager::updateReplicatedWellKnownObjects()
Lock sync(*this);
- Ice::ObjectPrx replicatedClientProxy = _database->getReplicatedEndpoints("Client", _endpoints["Client"]);
+ Ice::ObjectPrx replicatedClientProxy = _database->getReplicaCache().getEndpoints("Client", _endpoints["Client"]);
id.name = "Query";
info.type = Query::ice_staticId();