diff options
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 155 |
1 files changed, 107 insertions, 48 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index a706df8a4f4..88992d1f81f 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -35,17 +35,19 @@ public: virtual void applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current) { - _database->syncApplications(applications); - receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx)); + int serial; + _database->syncApplications(applications, getSerials(current.ctx, serial)); + receivedUpdate(ApplicationObserverTopicName, serial); } virtual void applicationAdded(int, const ApplicationInfo& application, const Ice::Current& current) { + int serial; string failure; try { - _database->addApplication(application); + _database->addApplication(application, 0, getSerials(current.ctx, serial)); } catch(const DeploymentException& ex) { @@ -53,16 +55,17 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, serial, failure); } virtual void applicationRemoved(int, const std::string& name, const Ice::Current& current) { + int serial; string failure; try { - _database->removeApplication(name); + _database->removeApplication(name, 0, getSerials(current.ctx, serial)); } catch(const ApplicationNotExistException& ex) { @@ -70,16 +73,17 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, serial, failure); } virtual void applicationUpdated(int, const ApplicationUpdateInfo& update, const Ice::Current& current) { + int serial; string failure; try { - _database->updateApplication(update, false); + _database->updateApplication(update, false, 0, getSerials(current.ctx, serial)); } catch(const DeploymentException& ex) { @@ -93,75 +97,81 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ApplicationObserverTopicName, serial, failure); } virtual void adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current) { - _database->syncAdapters(adapters); - receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx)); + int serial; + _database->syncAdapters(adapters, getSerials(current.ctx, serial)); + receivedUpdate(AdapterObserverTopicName, serial); } virtual void adapterAdded(const AdapterInfo& info, const Ice::Current& current) { + int serial; string failure; try { - _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy); + _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, getSerials(current.ctx, serial)); } catch(const AdapterExistsException&) { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, serial, failure); } virtual void adapterUpdated(const AdapterInfo& info, const Ice::Current& current) { + int serial; string failure; try { - _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy); + _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, getSerials(current.ctx, serial)); } catch(const AdapterExistsException&) { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, serial, failure); } virtual void adapterRemoved(const std::string& id, const Ice::Current& current) { + int serial; string failure; try { - _database->setAdapterDirectProxy(id, "", 0); + _database->setAdapterDirectProxy(id, "", 0, getSerials(current.ctx, serial)); } catch(const AdapterExistsException&) { failure = "adapter `" + id + "' already exists and belongs to an application"; } - receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(AdapterObserverTopicName, serial, failure); } virtual void objectInit(const ObjectInfoSeq& objects, const Ice::Current& current) { - _database->syncObjects(objects); - receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx)); + int serial; + _database->syncObjects(objects, getSerials(current.ctx, serial)); + receivedUpdate(ObjectObserverTopicName, serial); } virtual void objectAdded(const ObjectInfo& info, const Ice::Current& current) { + int serial; string failure; try { - _database->addOrUpdateObject(info); + _database->addOrUpdateObject(info, getSerials(current.ctx, serial)); } catch(const ObjectExistsException& ex) { @@ -170,16 +180,17 @@ public: os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity()); failure = os.str(); } - receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, serial, failure); } virtual void objectUpdated(const ObjectInfo& info, const Ice::Current& current) { + int serial; string failure; try { - _database->addOrUpdateObject(info); + _database->addOrUpdateObject(info, getSerials(current.ctx, serial)); } catch(const ObjectExistsException& ex) { @@ -194,16 +205,17 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, serial, failure); } virtual void objectRemoved(const Ice::Identity& id, const Ice::Current& current) { + int serial; string failure; try { - _database->removeObject(id); + _database->removeObject(id, getSerials(current.ctx, serial)); } catch(const DeploymentException& ex) { @@ -214,23 +226,37 @@ public: catch(const ObjectNotRegisteredException&) { } - receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); + receivedUpdate(ObjectObserverTopicName, serial, failure); } private: - int - getSerial(const Ice::Context& context) + Ice::Long + getSerials(const Ice::Context& context, int& serial) { Ice::Context::const_iterator p = context.find("serial"); if(p != context.end()) { - int serial; istringstream is(p->second); is >> serial; - return serial; } - return -1; + else + { + serial = -1; + } + + p = context.find("dbSerial"); + if(p != context.end()) + { + Ice::Long dbSerial; + istringstream is(p->second); + is >> dbSerial; + return dbSerial; + } + else + { + return -1; + } } void @@ -256,7 +282,7 @@ private: }; -ReplicaSessionManager::ReplicaSessionManager() +ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator) : SessionManager(communicator) { } @@ -267,15 +293,9 @@ ReplicaSessionManager::create(const string& name, const WellKnownObjectsManagerPtr& wellKnownObjects, const InternalRegistryPrx& internalRegistry) { - Ice::CommunicatorPtr comm = database->getCommunicator(); { Lock sync(*this); - Ice::ObjectPrx prx = comm->getDefaultLocator(); - Ice::Identity id = prx->ice_getIdentity(); - id.name = "InternalRegistry-Master"; - - _master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq())); _name = name; _info = info; _internalRegistry = internalRegistry; @@ -283,11 +303,6 @@ ReplicaSessionManager::create(const string& name, _wellKnownObjects = wellKnownObjects; _traceLevels = _database->getTraceLevels(); - // - // Initialize query objects from the default locator endpoints. - // - initQueryObjects(comm->getDefaultLocator()); - _thread = new Thread(*this, _master, _traceLevels->logger); _thread->start(); notifyAll(); @@ -344,16 +359,21 @@ ReplicaSessionManager::destroy() ThreadPtr thread; { Lock sync(*this); - if(!_thread) + if(!_communicator) { return; } thread = _thread; _thread = 0; + + _communicator = 0; } - thread->terminate(); - thread->getThreadControl().join(); + if(thread) + { + thread->terminate(); + thread->getThreadControl().join(); + } _database = 0; _wellKnownObjects = 0; @@ -388,6 +408,31 @@ ReplicaSessionManager::registerAllWellKnownObjects() } } +IceGrid::InternalRegistryPrx +ReplicaSessionManager::findInternalRegistryForReplica(const Ice::Identity& id) +{ + vector<Ice::AsyncResultPtr> results; + vector<QueryPrx> queryObjects = findAllQueryObjects(); + for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + { + results.push_back((*q)->begin_findObjectById(id)); + } + + for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + { + QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); + try + { + return InternalRegistryPrx::checkedCast(query->end_findObjectById(*p)); + } + catch(const Ice::Exception&) + { + } + } + + return 0; +} + bool ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) { @@ -433,7 +478,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim { session = createSessionImpl(registry, timeout); } - catch(const Ice::Exception& ex) + catch(const Ice::LocalException& ex) { exception.reset(ex.ice_clone()); used.insert(registry); @@ -470,7 +515,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim break; } } - catch(const Ice::Exception& ex) + catch(const Ice::LocalException& ex) { exception.reset(ex.ice_clone()); if(newRegistry) @@ -489,11 +534,19 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim } exception.reset(ex.ice_clone()); } + catch(const DeploymentException& ex) + { + if(_traceLevels) + { + _traceLevels->logger->error("database synchronization with master failed:\n" + ex.reason); + } + exception.reset(ex.ice_clone()); + } catch(const PermissionDeniedException& ex) { if(_traceLevels) { - _traceLevels->logger->error("connection to the the registry `" + _name + "' was denied:\n" + ex.reason); + _traceLevels->logger->error("connection to master was denied:\n" + ex.reason); } exception.reset(ex.ice_clone()); } @@ -560,7 +613,13 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic // DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session); _observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant)); - session->setDatabaseObserver(_observer); + StringLongDict serials = _database->getSerials(); + IceUtil::Optional<StringLongDict> serialsOpt; + if(!serials.empty()) + { + serialsOpt = serials; // Don't provide serials parameter if serials aren't supported. + } + session->setDatabaseObserver(_observer, serialsOpt); return session; } catch(const Ice::Exception&) |