diff options
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 219 |
1 files changed, 152 insertions, 67 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 9b02f2f35df..e83f7529aa0 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -11,6 +11,7 @@ #include <IceGrid/ReplicaSessionManager.h> #include <IceGrid/TraceLevels.h> #include <IceGrid/Database.h> +#include <IceGrid/WellKnownObjectsManager.h> using namespace std; using namespace IceGrid; @@ -18,110 +19,138 @@ using namespace IceGrid; namespace IceGrid { -class MasterRegistryObserverI : public RegistryObserver +class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex { public: - MasterRegistryObserverI(const DatabasePtr& database) : _database(database) + MasterDatabaseObserverI(const DatabasePtr& database, ReplicaSessionManager& manager) : + _database(database), + _manager(manager) { } virtual void - init(int serial, - const ApplicationInfoSeq& applications, - const AdapterInfoSeq& adapters, - const ObjectInfoSeq& objects, - const Ice::Current&) + applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current&) { - _database->initReplica(serial, applications, adapters, objects); - } + _database->syncApplications(applications); + _manager.incInitCount(); + } virtual void - applicationAdded(int serial, const ApplicationInfo& application, const Ice::Current&) + applicationAdded(int, const ApplicationInfo& application, const Ice::Current&) { - _database->addApplicationDescriptor(0, application.descriptor, serial); + _database->addApplicationDescriptor(0, application.descriptor); } virtual void - applicationRemoved(int serial, const std::string& name, const Ice::Current&) + applicationRemoved(int, const std::string& name, const Ice::Current&) { - _database->removeApplicationDescriptor(0, name, serial); + _database->removeApplicationDescriptor(0, name); } virtual void - applicationUpdated(int serial, const ApplicationUpdateInfo& update, const Ice::Current&) + applicationUpdated(int, const ApplicationUpdateInfo& update, const Ice::Current&) + { + _database->updateApplicationDescriptor(0, update.descriptor); + } + + virtual void + adapterInit(const AdapterInfoSeq& adapters, const Ice::Current&) { - _database->updateApplicationDescriptor(0, update.descriptor, serial); + _database->syncAdapters(adapters); + _manager.incInitCount(); } virtual void - adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&) + adapterAdded(const AdapterInfo& info, const Ice::Current&) { - _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, serial); + _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy); } virtual void - adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&) + adapterUpdated(const AdapterInfo& info, const Ice::Current&) { - _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, serial); + _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy); } virtual void - adapterRemoved(int serial, const std::string& id, const Ice::Current&) + adapterRemoved(const std::string& id, const Ice::Current&) { - _database->setAdapterDirectProxy(id, "", 0, serial); + _database->setAdapterDirectProxy(id, "", 0); } - + + virtual void + objectInit(const ObjectInfoSeq& objects, const Ice::Current&) + { + _database->syncObjects(objects); + _manager.incInitCount(); + } + virtual void - objectAdded(int serial, const ObjectInfo& info, const Ice::Current&) + objectAdded(const ObjectInfo& info, const Ice::Current&) { - _database->addObject(info, false, serial); + _database->addObject(info, false); } virtual void - objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&) + objectUpdated(const ObjectInfo& info, const Ice::Current&) { - _database->updateObject(info.proxy, serial); + _database->updateObject(info.proxy); } virtual void - objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&) + objectRemoved(const Ice::Identity& id, const Ice::Current&) { - _database->removeObject(id, serial); + _database->removeObject(id); } private: const DatabasePtr _database; + ReplicaSessionManager& _manager; }; + }; -ReplicaSessionManager::ReplicaSessionManager() +ReplicaSessionManager::ReplicaSessionManager() : _initCount(0) { } void -ReplicaSessionManager::create(const string& name, const DatabasePtr& database, const InternalRegistryPrx& replica) +ReplicaSessionManager::create(const string& name, + const RegistryInfo& info, + const DatabasePtr& database, + const WellKnownObjectsManagerPtr& wellKnownObjects, + const InternalRegistryPrx& internalRegistry) { Ice::CommunicatorPtr communicator = database->getCommunicator(); string instanceName = communicator->getDefaultLocator()->ice_getIdentity().category; - Lock sync(*this); - - _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry")); - - Ice::ObjectPrx observer = database->getInternalAdapter()->addWithUUID(new MasterRegistryObserverI(database)); - _observer = RegistryObserverPrx::uncheckedCast(observer); - - _name = name; - _replica = replica; - _database = database; - _traceLevels = _database->getTraceLevels(); + { + Lock sync(*this); + + _master = + InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry-Master")); + + Ice::ObjectPrx obsv = + database->getInternalAdapter()->addWithUUID(new MasterDatabaseObserverI(database, *this)); - _thread = new Thread(*this, _master); - _thread->start(); - notifyAll(); + _observer = DatabaseObserverPrx::uncheckedCast(obsv); + + _name = name; + _info = info; + _internalRegistry = internalRegistry; + _database = database; + _wellKnownObjects = wellKnownObjects; + _traceLevels = _database->getTraceLevels(); + + _thread = new Thread(*this, _master); + _thread->start(); + notifyAll(); + } + + _thread->tryCreateSession(_master); } void @@ -144,16 +173,6 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica) _thread->tryCreateSession(replica); } -void -ReplicaSessionManager::activate() -{ - ReplicaSessionPrx session = _thread->getSession(); - if(session) - { - session->setClientAndServerProxies(_database->getClientProxy(), _database->getServerProxy()); - } -} - NodePrxSeq ReplicaSessionManager::getNodes() const { @@ -163,7 +182,7 @@ ReplicaSessionManager::getNodes() const } catch(const Ice::LocalException&) { - return _replica->getNodes(); + return _internalRegistry->getNodes(); } } @@ -183,8 +202,44 @@ ReplicaSessionManager::destroy() _thread = 0; } +void +ReplicaSessionManager::incInitCount() +{ + Lock sync(*this); + if(++_initCount == 3) + { + notifyAll(); + } +} + +void +ReplicaSessionManager::registerAllWellKnownObjects() +{ + // + // Try to create the session if it doesn't already exists. + // + _thread->tryCreateSession(0); + + // + // If there's an active session, register the well-known objects + // with the session. + // + ReplicaSessionPrx session = _thread->getSession(); + if(session) + { + try + { + _wellKnownObjects->registerAll(session); + return; + } + catch(const Ice::LocalException&) + { + } + } +} + bool -ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) const +ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) { try { @@ -209,7 +264,7 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) const } ReplicaSessionPrx -ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) const +ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) { try { @@ -219,13 +274,43 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti out << "trying to establish session with master replica"; } - ReplicaSessionPrx session = registry->registerReplica(_name, _replica, _observer); + { + Lock sync(*this); + _initCount = 0; + } + + ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry, _observer); int t = session->getTimeout(); if(t > 0) { timeout = IceUtil::Time::seconds(t); - } - session->setClientAndServerProxies(_database->getClientProxy(), _database->getServerProxy()); + } + + // + // Wait for the database to be synchronized with the + // master. This is necessary to ensure that we don't try to + // modify the database before or at the same time as the + // master is trying to send us the copy of the database. + // + { + Lock sync(*this); + while(_initCount != 3) + { + if(t > 0) + { + timedWait(timeout); + } + else + { + wait(); + } + } + } + + // + // Register all the well-known objects with the replica session. + // + _wellKnownObjects->registerAll(session); if(_traceLevels && _traceLevels->replica > 0) { @@ -245,12 +330,11 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti } catch(const Ice::LocalException& ex) { - ObjectInfo info; - info.type = InternalRegistry::ice_staticId(); - info.proxy = _replica; - _database->addObject(info, true); - _database->updateReplicatedWellKnownObjects(); - + // + // Re-register all the well known objects with the local database. + // + _wellKnownObjects->registerAll(); + if(_traceLevels && _traceLevels->replica > 1) { Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); @@ -261,7 +345,7 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti } void -ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) const +ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) { try { @@ -282,3 +366,4 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) const } } } + |