summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp328
1 files changed, 227 insertions, 101 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 1ee45b553e8..bb7e5a1e86f 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -16,6 +16,7 @@
#include <IceGrid/DescriptorHelper.h>
#include <IceGrid/NodeSessionI.h>
#include <IceGrid/Session.h>
+#include <IceGrid/Topics.h>
#include <algorithm>
#include <functional>
@@ -108,59 +109,45 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
}
-Database::Database(const Ice::ObjectAdapterPtr& adapter,
- const string& envName,
+Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
+ const IceStorm::TopicManagerPrx& topicManager,
+ const Ice::ObjectPrx& clientProxy,
+ const Ice::ObjectPrx& serverProxy,
const string& instanceName,
int nodeSessionTimeout,
const TraceLevelsPtr& traceLevels) :
- _communicator(adapter->getCommunicator()),
- _internalAdapter(adapter),
- _envName(envName),
+ _communicator(registryAdapter->getCommunicator()),
+ _internalAdapter(registryAdapter),
+ _topicManager(topicManager),
+ _envName("Registry"),
_instanceName(instanceName),
- _traceLevels(traceLevels),
- _nodeCache(_communicator, nodeSessionTimeout),
+ _traceLevels(traceLevels),
+ _replicaCache(_communicator, topicManager, instanceName, clientProxy, serverProxy),
+ _nodeCache(_communicator, _replicaCache, nodeSessionTimeout),
_objectCache(_communicator),
_allocatableObjectCache(_communicator),
_serverCache(_communicator, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
- _connection(Freeze::createConnection(adapter->getCommunicator(), envName)),
+ _connection(Freeze::createConnection(registryAdapter->getCommunicator(), _envName)),
_descriptors(_connection, _descriptorDbName),
_objects(_connection, _objectDbName),
_adapters(_connection, _adapterDbName),
_lock(0),
- _serial(0)
+ _serial(-1)
{
//
- // Cache the servers & adapters.
+ // Register a default servant to manage manually registered object adapters.
//
- ServerEntrySeq entries;
- for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
+ __setNoDelete(true);
+ try
{
- try
- {
- load(ApplicationHelper(_communicator, p->second), entries);
- }
- catch(const DeploymentException& ex)
- {
- Ice::Warning warn(_traceLevels->logger);
- warn << "invalid application `" << p->first << "':\n" << ex.reason;
- }
+ _internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter");
}
-
- _serverCache.setTraceLevels(_traceLevels);
- _nodeCache.setTraceLevels(_traceLevels);
- _adapterCache.setTraceLevels(_traceLevels);
- _objectCache.setTraceLevels(_traceLevels);
- _allocatableObjectCache.setTraceLevels(_traceLevels);
-
- //
- // Register a default servant to manage manually registered object adapters.
- //
- // NOTE: This must be done only once we're sure this constructor
- // won't throw. The servant locator is holding a handle on this
- // object and if an exception was thrown a bogus database object
- // won't be referenced from the servant locator.
- //
- _internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter");
+ catch(...)
+ {
+ __setNoDelete(false);
+ throw;
+ }
+ __setNoDelete(false);
}
Database::~Database()
@@ -179,41 +166,32 @@ Database::getInstanceName() const
return _instanceName;
}
-void
-Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeObserverPrx& nodeObserver)
+RegistryObserverTopicPtr
+Database::getRegistryObserverTopic() const
{
- int serial;
- ApplicationDescriptorSeq applications;
- AdapterInfoSeq adapters;
- ObjectInfoSeq objects;
+ Lock sync(*this);
+ return _registryObserverTopic;
+}
- _registryObserver = registryObserver;
- _nodeObserver = nodeObserver;
- serial = _serial;
-
- for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
- {
- applications.push_back(p->second);
- }
-
- for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q)
- {
- adapters.push_back(q->second);
- if(adapters.back().id.empty())
- {
- adapters.back().id = q->first;
- }
- }
-
- for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r)
- {
- objects.push_back(r->second);
- }
+NodeObserverTopicPtr
+Database::getNodeObserverTopic() const
+{
+ Lock sync(*this);
+ return _nodeObserverTopic;
+}
- //
- // Notify the observers.
- //
- _registryObserver->init(serial, applications, adapters, objects);
+void
+Database::clearTopics()
+{
+ Lock sync(*this);
+ _registryObserverTopic = 0;
+ _nodeObserverTopic = 0;
+}
+
+int
+Database::getSessionTimeout() const
+{
+ return _nodeCache.getSessionTimeout();
}
void
@@ -260,7 +238,103 @@ Database::unlock(AdminSessionI* session)
}
void
-Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc)
+Database::init(int serial)
+{
+ ApplicationDescriptorSeq applications;
+ AdapterInfoSeq adapters;
+ ObjectInfoSeq objects;
+
+ //
+ // Cache the servers & adapters.
+ //
+ ServerEntrySeq entries;
+ for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
+ {
+ applications.push_back(p->second);
+ try
+ {
+ load(ApplicationHelper(_communicator, p->second), entries);
+ }
+ catch(const DeploymentException& ex)
+ {
+ Ice::Warning warn(_traceLevels->logger);
+ warn << "invalid application `" << p->first << "':\n" << ex.reason;
+ }
+ }
+
+ for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q)
+ {
+ adapters.push_back(q->second);
+ if(adapters.back().id.empty())
+ {
+ adapters.back().id = q->first;
+ }
+ }
+
+ for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r)
+ {
+ objects.push_back(r->second);
+ }
+
+ _serverCache.setTraceLevels(_traceLevels);
+ _nodeCache.setTraceLevels(_traceLevels);
+ _replicaCache.setTraceLevels(_traceLevels);
+ _adapterCache.setTraceLevels(_traceLevels);
+ _objectCache.setTraceLevels(_traceLevels);
+ _allocatableObjectCache.setTraceLevels(_traceLevels);
+
+ _serial = serial;
+
+ if(_registryObserverTopic)
+ {
+ //
+ // Initialize the topic cache.
+ //
+ _registryObserverTopic->getPublisher()->init(_serial, applications, adapters, objects);
+ }
+}
+
+void
+Database::initMaster()
+{
+ Lock sync(*this);
+ _nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager);
+ _registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager);
+ init(0);
+}
+
+void
+Database::initReplica(int masterSerial,
+ const ApplicationDescriptorSeq& applications,
+ const AdapterInfoSeq& adapters,
+ const ObjectInfoSeq& objects)
+{
+ Lock sync(*this);
+
+ _descriptors.clear();
+ for(ApplicationDescriptorSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
+ {
+ _descriptors.put(StringApplicationDescriptorDict::value_type(p->name, *p));
+ }
+
+ _objects.clear();
+ for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ {
+ _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
+ }
+
+ _adapters.clear();
+ for(AdapterInfoSeq::const_iterator r = adapters.end(); r != adapters.end(); ++r)
+ {
+ _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
+ }
+
+ init(masterSerial);
+ notifyAll();
+}
+
+void
+Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc, int masterSerial)
{
ServerEntrySeq entries;
{
@@ -326,7 +400,10 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc
//
// Notify the observers.
//
- _registryObserver->applicationAdded(serial, desc);
+ if(_registryObserverTopic)
+ {
+ _registryObserverTopic->getPublisher()->applicationAdded(serial, desc);
+ }
if(_traceLevels->application > 0)
{
@@ -336,7 +413,8 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc
}
void
-Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update)
+Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update,
+ int masterSerial)
{
ServerEntrySeq entries;
ApplicationDescriptor oldDesc;
@@ -449,7 +527,7 @@ Database::instantiateServer(AdminSessionI* session,
}
void
-Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name)
+Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name, int masterSerial)
{
ServerEntrySeq entries;
int serial;
@@ -490,7 +568,10 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string&
//
// Notify the observers
//
- _registryObserver->applicationRemoved(serial, name);
+ if(_registryObserverTopic)
+ {
+ _registryObserverTopic->getPublisher()->applicationRemoved(serial, name);
+ }
if(_traceLevels->application > 0)
{
@@ -527,6 +608,17 @@ Database::getAllApplications(const string& expression)
void
Database::addNode(const string& name, const NodeSessionIPtr& session)
{
+ {
+ //
+ // Wait for the database to be initialized before to add a
+ // node.
+ //
+ Lock sync(*this);
+ while(_serial < 0)
+ {
+ wait();
+ }
+ }
_nodeCache.get(name, true)->setSession(session);
}
@@ -550,10 +642,26 @@ Database::removeNode(const string& name)
// observer to ensure that only nodes which are up are teared
// down).
//
- _nodeObserver->nodeDown(name);
+ if(_nodeObserverTopic)
+ {
+ _nodeObserverTopic->getPublisher()->nodeDown(name);
+ }
+
_nodeCache.get(name)->setSession(0);
}
+void
+Database::addReplica(const string& name, const ReplicaSessionIPtr& session)
+{
+ _replicaCache.add(name, session, this);
+}
+
+void
+Database::removeReplica(const string& name)
+{
+ _replicaCache.remove(name, this);
+}
+
Ice::StringSeq
Database::getAllNodes(const string& expression)
{
@@ -593,7 +701,8 @@ Database::getAllNodeServers(const string& node)
}
bool
-Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
+Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
+ int masterSerial)
{
AdapterInfo info;
int serial;
@@ -646,21 +755,24 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
}
}
- if(proxy)
+ if(_registryObserverTopic)
{
- if(updated)
+ if(proxy)
{
- _registryObserver->adapterUpdated(serial, info);
+ if(updated)
+ {
+ _registryObserverTopic->getPublisher()->adapterUpdated(serial, info);
+ }
+ else
+ {
+ _registryObserverTopic->getPublisher()->adapterAdded(serial, info);
+ }
}
else
{
- _registryObserver->adapterAdded(serial, info);
+ _registryObserverTopic->getPublisher()->adapterRemoved(serial, adapterId);
}
}
- else
- {
- _registryObserver->adapterRemoved(serial, adapterId);
- }
return true;
}
@@ -736,16 +848,19 @@ Database::removeAdapter(const string& adapterId)
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
}
-
- if(infos.empty())
- {
- _registryObserver->adapterRemoved(serial, adapterId);
- }
- else
+
+ if(_registryObserverTopic)
{
- for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ if(infos.empty())
{
- _registryObserver->adapterUpdated(++serial, *p);
+ _registryObserverTopic->getPublisher()->adapterRemoved(serial, adapterId);
+ }
+ else
+ {
+ for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ _registryObserverTopic->getPublisher()->adapterUpdated(++serial, *p);
+ }
}
}
}
@@ -901,7 +1016,7 @@ Database::getAllAdapters(const string& expression)
}
void
-Database::addObject(const ObjectInfo& info)
+Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase, int masterSerial)
{
int serial;
const Ice::Identity id = info.proxy->ice_getIdentity();
@@ -912,7 +1027,7 @@ Database::addObject(const ObjectInfo& info)
throw ObjectExistsException(id);
}
- if(_objects.find(id) != _objects.end())
+ if(!replaceIfExistsInDatabase && _objects.find(id) != _objects.end())
{
throw ObjectExistsException(id);
}
@@ -924,7 +1039,10 @@ Database::addObject(const ObjectInfo& info)
//
// Notify the observers.
//
- _registryObserver->objectAdded(serial, info);
+ if(_registryObserverTopic)
+ {
+ _registryObserverTopic->getPublisher()->objectAdded(serial, info);
+ }
if(_traceLevels->object > 0)
{
@@ -934,7 +1052,7 @@ Database::addObject(const ObjectInfo& info)
}
void
-Database::removeObject(const Ice::Identity& id)
+Database::removeObject(const Ice::Identity& id, int masterSerial)
{
int serial;
{
@@ -964,7 +1082,10 @@ Database::removeObject(const Ice::Identity& id)
//
// Notify the observers.
//
- _registryObserver->objectRemoved(serial, id);
+ if(_registryObserverTopic)
+ {
+ _registryObserverTopic->getPublisher()->objectRemoved(serial, id);
+ }
if(_traceLevels->object > 0)
{
@@ -974,7 +1095,7 @@ Database::removeObject(const Ice::Identity& id)
}
void
-Database::updateObject(const Ice::ObjectPrx& proxy)
+Database::updateObject(const Ice::ObjectPrx& proxy, int masterSerial)
{
const Ice::Identity id = proxy->ice_getIdentity();
int serial;
@@ -1009,7 +1130,10 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
//
// Notify the observers.
//
- _registryObserver->objectUpdated(serial, info);
+ if(_registryObserverTopic)
+ {
+ _registryObserverTopic->getPublisher()->objectUpdated(serial, info);
+ }
if(_traceLevels->object > 0)
{
@@ -1464,13 +1588,15 @@ Database::finishUpdate(ServerEntrySeq& entries,
//
// Notify the observers.
//
- _registryObserver->applicationUpdated(serial, update);
+ if(_registryObserverTopic)
+ {
+ _registryObserverTopic->getPublisher()->applicationUpdated(serial, update);
+ }
if(_traceLevels->application > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
out << "updated application `" << update.name << "'";
}
-
}