summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp219
1 files changed, 152 insertions, 67 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index 9b02f2f35df..e83f7529aa0 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -11,6 +11,7 @@
#include <IceGrid/ReplicaSessionManager.h>
#include <IceGrid/TraceLevels.h>
#include <IceGrid/Database.h>
+#include <IceGrid/WellKnownObjectsManager.h>
using namespace std;
using namespace IceGrid;
@@ -18,110 +19,138 @@ using namespace IceGrid;
namespace IceGrid
{
-class MasterRegistryObserverI : public RegistryObserver
+class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex
{
public:
- MasterRegistryObserverI(const DatabasePtr& database) : _database(database)
+ MasterDatabaseObserverI(const DatabasePtr& database, ReplicaSessionManager& manager) :
+ _database(database),
+ _manager(manager)
{
}
virtual void
- init(int serial,
- const ApplicationInfoSeq& applications,
- const AdapterInfoSeq& adapters,
- const ObjectInfoSeq& objects,
- const Ice::Current&)
+ applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current&)
{
- _database->initReplica(serial, applications, adapters, objects);
- }
+ _database->syncApplications(applications);
+ _manager.incInitCount();
+ }
virtual void
- applicationAdded(int serial, const ApplicationInfo& application, const Ice::Current&)
+ applicationAdded(int, const ApplicationInfo& application, const Ice::Current&)
{
- _database->addApplicationDescriptor(0, application.descriptor, serial);
+ _database->addApplicationDescriptor(0, application.descriptor);
}
virtual void
- applicationRemoved(int serial, const std::string& name, const Ice::Current&)
+ applicationRemoved(int, const std::string& name, const Ice::Current&)
{
- _database->removeApplicationDescriptor(0, name, serial);
+ _database->removeApplicationDescriptor(0, name);
}
virtual void
- applicationUpdated(int serial, const ApplicationUpdateInfo& update, const Ice::Current&)
+ applicationUpdated(int, const ApplicationUpdateInfo& update, const Ice::Current&)
+ {
+ _database->updateApplicationDescriptor(0, update.descriptor);
+ }
+
+ virtual void
+ adapterInit(const AdapterInfoSeq& adapters, const Ice::Current&)
{
- _database->updateApplicationDescriptor(0, update.descriptor, serial);
+ _database->syncAdapters(adapters);
+ _manager.incInitCount();
}
virtual void
- adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&)
+ adapterAdded(const AdapterInfo& info, const Ice::Current&)
{
- _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, serial);
+ _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy);
}
virtual void
- adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&)
+ adapterUpdated(const AdapterInfo& info, const Ice::Current&)
{
- _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, serial);
+ _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy);
}
virtual void
- adapterRemoved(int serial, const std::string& id, const Ice::Current&)
+ adapterRemoved(const std::string& id, const Ice::Current&)
{
- _database->setAdapterDirectProxy(id, "", 0, serial);
+ _database->setAdapterDirectProxy(id, "", 0);
}
-
+
+ virtual void
+ objectInit(const ObjectInfoSeq& objects, const Ice::Current&)
+ {
+ _database->syncObjects(objects);
+ _manager.incInitCount();
+ }
+
virtual void
- objectAdded(int serial, const ObjectInfo& info, const Ice::Current&)
+ objectAdded(const ObjectInfo& info, const Ice::Current&)
{
- _database->addObject(info, false, serial);
+ _database->addObject(info, false);
}
virtual void
- objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&)
+ objectUpdated(const ObjectInfo& info, const Ice::Current&)
{
- _database->updateObject(info.proxy, serial);
+ _database->updateObject(info.proxy);
}
virtual void
- objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&)
+ objectRemoved(const Ice::Identity& id, const Ice::Current&)
{
- _database->removeObject(id, serial);
+ _database->removeObject(id);
}
private:
const DatabasePtr _database;
+ ReplicaSessionManager& _manager;
};
+
};
-ReplicaSessionManager::ReplicaSessionManager()
+ReplicaSessionManager::ReplicaSessionManager() : _initCount(0)
{
}
void
-ReplicaSessionManager::create(const string& name, const DatabasePtr& database, const InternalRegistryPrx& replica)
+ReplicaSessionManager::create(const string& name,
+ const RegistryInfo& info,
+ const DatabasePtr& database,
+ const WellKnownObjectsManagerPtr& wellKnownObjects,
+ const InternalRegistryPrx& internalRegistry)
{
Ice::CommunicatorPtr communicator = database->getCommunicator();
string instanceName = communicator->getDefaultLocator()->ice_getIdentity().category;
- Lock sync(*this);
-
- _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry"));
-
- Ice::ObjectPrx observer = database->getInternalAdapter()->addWithUUID(new MasterRegistryObserverI(database));
- _observer = RegistryObserverPrx::uncheckedCast(observer);
-
- _name = name;
- _replica = replica;
- _database = database;
- _traceLevels = _database->getTraceLevels();
+ {
+ Lock sync(*this);
+
+ _master =
+ InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry-Master"));
+
+ Ice::ObjectPrx obsv =
+ database->getInternalAdapter()->addWithUUID(new MasterDatabaseObserverI(database, *this));
- _thread = new Thread(*this, _master);
- _thread->start();
- notifyAll();
+ _observer = DatabaseObserverPrx::uncheckedCast(obsv);
+
+ _name = name;
+ _info = info;
+ _internalRegistry = internalRegistry;
+ _database = database;
+ _wellKnownObjects = wellKnownObjects;
+ _traceLevels = _database->getTraceLevels();
+
+ _thread = new Thread(*this, _master);
+ _thread->start();
+ notifyAll();
+ }
+
+ _thread->tryCreateSession(_master);
}
void
@@ -144,16 +173,6 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica)
_thread->tryCreateSession(replica);
}
-void
-ReplicaSessionManager::activate()
-{
- ReplicaSessionPrx session = _thread->getSession();
- if(session)
- {
- session->setClientAndServerProxies(_database->getClientProxy(), _database->getServerProxy());
- }
-}
-
NodePrxSeq
ReplicaSessionManager::getNodes() const
{
@@ -163,7 +182,7 @@ ReplicaSessionManager::getNodes() const
}
catch(const Ice::LocalException&)
{
- return _replica->getNodes();
+ return _internalRegistry->getNodes();
}
}
@@ -183,8 +202,44 @@ ReplicaSessionManager::destroy()
_thread = 0;
}
+void
+ReplicaSessionManager::incInitCount()
+{
+ Lock sync(*this);
+ if(++_initCount == 3)
+ {
+ notifyAll();
+ }
+}
+
+void
+ReplicaSessionManager::registerAllWellKnownObjects()
+{
+ //
+ // Try to create the session if it doesn't already exists.
+ //
+ _thread->tryCreateSession(0);
+
+ //
+ // If there's an active session, register the well-known objects
+ // with the session.
+ //
+ ReplicaSessionPrx session = _thread->getSession();
+ if(session)
+ {
+ try
+ {
+ _wellKnownObjects->registerAll(session);
+ return;
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+}
+
bool
-ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) const
+ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session)
{
try
{
@@ -209,7 +264,7 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) const
}
ReplicaSessionPrx
-ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) const
+ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
{
try
{
@@ -219,13 +274,43 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
out << "trying to establish session with master replica";
}
- ReplicaSessionPrx session = registry->registerReplica(_name, _replica, _observer);
+ {
+ Lock sync(*this);
+ _initCount = 0;
+ }
+
+ ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry, _observer);
int t = session->getTimeout();
if(t > 0)
{
timeout = IceUtil::Time::seconds(t);
- }
- session->setClientAndServerProxies(_database->getClientProxy(), _database->getServerProxy());
+ }
+
+ //
+ // Wait for the database to be synchronized with the
+ // master. This is necessary to ensure that we don't try to
+ // modify the database before or at the same time as the
+ // master is trying to send us the copy of the database.
+ //
+ {
+ Lock sync(*this);
+ while(_initCount != 3)
+ {
+ if(t > 0)
+ {
+ timedWait(timeout);
+ }
+ else
+ {
+ wait();
+ }
+ }
+ }
+
+ //
+ // Register all the well-known objects with the replica session.
+ //
+ _wellKnownObjects->registerAll(session);
if(_traceLevels && _traceLevels->replica > 0)
{
@@ -245,12 +330,11 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
}
catch(const Ice::LocalException& ex)
{
- ObjectInfo info;
- info.type = InternalRegistry::ice_staticId();
- info.proxy = _replica;
- _database->addObject(info, true);
- _database->updateReplicatedWellKnownObjects();
-
+ //
+ // Re-register all the well known objects with the local database.
+ //
+ _wellKnownObjects->registerAll();
+
if(_traceLevels && _traceLevels->replica > 1)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
@@ -261,7 +345,7 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
}
void
-ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) const
+ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session)
{
try
{
@@ -282,3 +366,4 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) const
}
}
}
+