summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-07-27 13:20:03 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-07-27 13:20:03 +0000
commitdad0bda0d462b4730b168befd187ca43446f12e7 (patch)
tree1a964dc5a3bd2c945e0ee165ddda55c3e4281157 /cpp/src/IceGrid/Database.cpp
parentImproved __checkMode (diff)
downloadice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.bz2
ice-dad0bda0d462b4730b168befd187ca43446f12e7.tar.xz
ice-dad0bda0d462b4730b168befd187ca43446f12e7.zip
More IceGrid replication improvements.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp168
1 files changed, 121 insertions, 47 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 7fee927b7cd..aa683e5344b 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -15,6 +15,7 @@
#include <IceGrid/Util.h>
#include <IceGrid/DescriptorHelper.h>
#include <IceGrid/NodeSessionI.h>
+#include <IceGrid/ReplicaSessionI.h>
#include <IceGrid/Session.h>
#include <IceGrid/Topics.h>
@@ -116,10 +117,8 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
const IceStorm::TopicManagerPrx& topicManager,
- const Ice::ObjectPrx& clientProxy,
- const Ice::ObjectPrx& serverProxy,
const string& instanceName,
- int nodeSessionTimeout,
+ int sessionTimeout,
const TraceLevelsPtr& traceLevels) :
_communicator(registryAdapter->getCommunicator()),
_internalAdapter(registryAdapter),
@@ -127,11 +126,14 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_envName("Registry"),
_instanceName(instanceName),
_traceLevels(traceLevels),
- _replicaCache(_communicator, topicManager, instanceName, clientProxy, serverProxy),
- _nodeCache(_communicator, _replicaCache, nodeSessionTimeout),
+ _sessionTimeout(sessionTimeout),
+ _replicaCache(_communicator, topicManager),
+ _nodeCache(_communicator, _replicaCache),
_objectCache(_communicator),
_allocatableObjectCache(_communicator),
- _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
+ _serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache, sessionTimeout),
+ _clientProxy(_communicator->stringToProxy("dummy")),
+ _serverProxy(_communicator->stringToProxy("dummy")),
_connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)),
_descriptors(_connection, _descriptorDbName),
_objects(_connection, _objectDbName),
@@ -153,6 +155,13 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
}
}
+ _serverCache.setTraceLevels(_traceLevels);
+ _nodeCache.setTraceLevels(_traceLevels);
+ _replicaCache.setTraceLevels(_traceLevels);
+ _adapterCache.setTraceLevels(_traceLevels);
+ _objectCache.setTraceLevels(_traceLevels);
+ _allocatableObjectCache.setTraceLevels(_traceLevels);
+
//
// Register a default servant to manage manually registered object adapters.
//
@@ -177,6 +186,7 @@ void
Database::destroy()
{
_nodeCache.destroy(); // Break cyclic reference count.
+ _replicaCache.destroy();
}
std::string
@@ -207,12 +217,6 @@ Database::clearTopics()
_nodeObserverTopic = 0;
}
-int
-Database::getSessionTimeout() const
-{
- return _nodeCache.getSessionTimeout();
-}
-
void
Database::checkSessionLock(AdminSessionI* session)
{
@@ -261,13 +265,6 @@ Database::initMaster()
{
Lock sync(*this);
- _serverCache.setTraceLevels(_traceLevels);
- _nodeCache.setTraceLevels(_traceLevels);
- _replicaCache.setTraceLevels(_traceLevels);
- _adapterCache.setTraceLevels(_traceLevels);
- _objectCache.setTraceLevels(_traceLevels);
- _allocatableObjectCache.setTraceLevels(_traceLevels);
-
_nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager);
_registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager);
_serial = 0;
@@ -298,19 +295,6 @@ Database::initReplica(int masterSerial,
{
Lock sync(*this);
- if(_serial < 0)
- {
- _serverCache.setTraceLevels(_traceLevels);
- _nodeCache.setTraceLevels(_traceLevels);
- _replicaCache.setTraceLevels(_traceLevels);
- _adapterCache.setTraceLevels(_traceLevels);
- _objectCache.setTraceLevels(_traceLevels);
- _allocatableObjectCache.setTraceLevels(_traceLevels);
- }
- else
- {
-// assert(_serial <= masterSerial); // TODO: Master might have been restarted.
- }
_serial = masterSerial;
ServerEntrySeq entries;
@@ -365,6 +349,47 @@ Database::initReplica(int masterSerial,
}
void
+Database::setClientProxy(const Ice::ObjectPrx& proxy)
+{
+ Lock sync(*this);
+ _clientProxy = proxy;
+}
+
+void
+Database::setServerProxy(const Ice::ObjectPrx& proxy)
+{
+ Lock sync(*this);
+ _serverProxy = proxy;
+}
+
+Ice::ObjectPrx
+Database::getClientProxy() const
+{
+ Lock sync(*this);
+ return _clientProxy;
+}
+
+Ice::ObjectPrx
+Database::getServerProxy() const
+{
+ Lock sync(*this);
+ return _serverProxy;
+}
+
+void
+Database::updateReplicatedWellKnownObjects()
+{
+ Ice::ObjectPrx clientProxy = _replicaCache.getClientProxy(getClientProxy());
+ Ice::Identity id;
+ id.category = _instanceName;
+ id.name = "Query";
+ ObjectInfo info;
+ info.type = Query::ice_staticId();
+ info.proxy = clientProxy->ice_identity(id);
+ addObject(info, true);
+}
+
+void
Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc, int masterSerial)
{
ServerEntrySeq entries;
@@ -640,6 +665,11 @@ void
Database::addNode(const string& name, const NodeSessionIPtr& session)
{
_nodeCache.get(name, true)->setSession(session);
+
+ ObjectInfo info;
+ info.type = Node::ice_staticId();
+ info.proxy = session->getNode();
+ addObject(info, true);
}
NodePrx
@@ -655,16 +685,22 @@ Database::getNodeInfo(const string& name) const
}
void
-Database::removeNode(const string& name)
+Database::removeNode(const string& name, const NodeSessionIPtr& session, bool shutdown)
{
+ if(!shutdown)
+ {
+ removeObject(session->getNode()->ice_getIdentity());
+ }
+
//
// We must notify the observer first (there's an assert in the
// observer to ensure that only nodes which are up are teared
// down).
//
- if(_nodeObserverTopic)
+ NodeObserverTopicPtr topic = getNodeObserverTopic();
+ if(topic)
{
- _nodeObserverTopic->getPublisher()->nodeDown(name);
+ topic->getPublisher()->nodeDown(name);
}
_nodeCache.get(name)->setSession(0);
@@ -673,19 +709,35 @@ Database::removeNode(const string& name)
void
Database::addReplica(const string& name, const ReplicaSessionIPtr& session)
{
- _replicaCache.add(name, session, this);
-}
+ ReplicaEntryPtr entry = _replicaCache.add(name, session);
-InternalRegistryPrxSeq
-Database::getReplicas() const
-{
- return _replicaCache.getAll();
+ ObjectInfo info;
+ info.type = InternalRegistry::ice_staticId();
+ info.proxy = session->getProxy();
+ addObject(info, true);
+
+ RegistryObserverTopicPtr topic = getRegistryObserverTopic();
+ if(topic)
+ {
+ topic->subscribe(session->getObserver());
+ }
}
void
-Database::removeReplica(const string& name)
+Database::removeReplica(const string& name, const ReplicaSessionIPtr& session, bool shutdown)
{
- _replicaCache.remove(name, this);
+ if(!shutdown)
+ {
+ removeObject(session->getProxy()->ice_getIdentity());
+ }
+
+ RegistryObserverTopicPtr topic = getRegistryObserverTopic();
+ if(topic)
+ {
+ topic->unsubscribe(session->getObserver());
+ }
+
+ _replicaCache.remove(name);
}
Ice::StringSeq
@@ -1046,6 +1098,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int
{
int serial;
const Ice::Identity id = info.proxy->ice_getIdentity();
+ bool update = false;
{
Lock sync(*this);
if(_objectCache.has(id))
@@ -1053,9 +1106,16 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int
throw ObjectExistsException(id);
}
- if(!replaceIfExistsInDatabase && _objects.find(id) != _objects.end())
+ if(_objects.find(id) != _objects.end())
{
- throw ObjectExistsException(id);
+ if(!replaceIfExistsInDatabase)
+ {
+ throw ObjectExistsException(id);
+ }
+ else
+ {
+ update = true;
+ }
}
_objects.put(IdentityObjectInfoDict::value_type(id, info));
@@ -1067,13 +1127,27 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int
//
if(_registryObserverTopic)
{
- _registryObserverTopic->getPublisher()->objectAdded(serial, info);
+ if(!update)
+ {
+ _registryObserverTopic->getPublisher()->objectAdded(serial, info);
+ }
+ else
+ {
+ _registryObserverTopic->getPublisher()->objectUpdated(serial, info);
+ }
}
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "added object `" << _communicator->identityToString(id) << "'";
+ if(!update)
+ {
+ out << "added object `" << _communicator->identityToString(id) << "'";
+ }
+ else
+ {
+ out << "updated object `" << _communicator->identityToString(id) << "'";
+ }
}
}