diff options
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 117 |
1 files changed, 73 insertions, 44 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 6feb9f41403..705dd2e3eb9 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -23,9 +23,12 @@ class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex { public: - MasterDatabaseObserverI(const DatabasePtr& database, ReplicaSessionManager& manager) : + MasterDatabaseObserverI(const ReplicaSessionManager::ThreadPtr& thread, + const DatabasePtr& database, + const ReplicaSessionPrx& session) : + _thread(thread), _database(database), - _manager(manager) + _session(session) { } @@ -33,6 +36,7 @@ public: applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current) { _database->syncApplications(applications); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx)); } virtual void @@ -49,7 +53,7 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -66,7 +70,7 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -89,13 +93,14 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current) { _database->syncAdapters(adapters); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx)); } virtual void @@ -106,7 +111,7 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -117,7 +122,7 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -128,13 +133,14 @@ public: { failure = "adapter `" + id + "' already exists and belongs to an application"; } - _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void objectInit(const ObjectInfoSeq& objects, const Ice::Current& current) { _database->syncObjects(objects); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx)); } virtual void @@ -158,7 +164,7 @@ public: os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity()); failure = os.str(); } - _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -181,7 +187,7 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -206,7 +212,7 @@ public: catch(const ObjectNotRegisteredException&) { } - _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } private: @@ -225,8 +231,25 @@ private: return -1; } + void + receivedUpdate(TopicName name, int serial, const string& failure = string()) + { + try + { + _session->receivedUpdate(name, serial, failure); + } + catch(const Ice::LocalException&) + { + } + if(!failure.empty()) + { + _thread->destroyActiveSession(); + } + } + + const ReplicaSessionManager::ThreadPtr _thread; const DatabasePtr _database; - ReplicaSessionManager& _manager; + const ReplicaSessionPrx _session; }; @@ -243,19 +266,13 @@ ReplicaSessionManager::create(const string& name, const WellKnownObjectsManagerPtr& wellKnownObjects, const InternalRegistryPrx& internalRegistry) { - Ice::CommunicatorPtr communicator = database->getCommunicator(); - string instanceName = communicator->getDefaultLocator()->ice_getIdentity().category; + Ice::CommunicatorPtr comm = database->getCommunicator(); + string instName = comm->getDefaultLocator()->ice_getIdentity().category; { Lock sync(*this); - _master = - InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry-Master")); - - Ice::ObjectPrx obsv = - database->getInternalAdapter()->addWithUUID(new MasterDatabaseObserverI(database, *this)); - - _observer = DatabaseObserverPrx::uncheckedCast(obsv); + _master = InternalRegistryPrx::uncheckedCast(comm->stringToProxy(instName + "/InternalRegistry-Master")); _name = name; _info = info; @@ -269,7 +286,7 @@ ReplicaSessionManager::create(const string& name, notifyAll(); } - _thread->tryCreateSession(_master); + _thread->tryCreateSession(0); } void @@ -318,27 +335,6 @@ ReplicaSessionManager::destroy() _thread->terminate(); _thread->getThreadControl().join(); - _thread = 0; -} - -void -ReplicaSessionManager::receivedUpdate(TopicName name, int serial, const string& failure) -{ - ReplicaSessionPrx session = _thread->getSession(); - if(session) - { - try - { - session->receivedUpdate(name, serial, failure); - } - catch(const Ice::LocalException&) - { - } - } - if(!failure.empty()) - { - _thread->destroyActiveSession(); - } } void @@ -403,12 +399,21 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti out << "trying to establish session with master replica"; } - ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry, _observer); + ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry); int t = session->getTimeout(); if(t > 0) { timeout = IceUtil::Time::seconds(t / 2); } + + // + // Create a new database observer servant and give its proxy + // to the session so that it can subscribe it. This call only + // returns once the observer is subscribed and initialized. + // + DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session); + _observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant)); + session->setDatabaseObserver(_observer); // // Register all the well-known objects with the replica session. @@ -433,6 +438,18 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti } catch(const Ice::LocalException& ex) { + if(_observer) + { + try + { + _database->getInternalAdapter()->remove(_observer->ice_getIdentity()); + } + catch(const Ice::LocalException&) + { + } + _observer = 0; + } + // // Re-register all the well known objects with the local database. // @@ -468,5 +485,17 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) out << "couldn't destroy master replica session:\n" << ex; } } + + if(_observer) + { + try + { + _database->getInternalAdapter()->remove(_observer->ice_getIdentity()); + } + catch(const Ice::LocalException&) + { + } + _observer = 0; + } } |