diff options
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 233 |
1 files changed, 111 insertions, 122 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index da6f5cb4983..73547a5a304 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -14,35 +14,35 @@ using namespace IceGrid; namespace IceGrid { -class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex +class MasterDatabaseObserverI : public DatabaseObserver { public: - MasterDatabaseObserverI(const ReplicaSessionManager::ThreadPtr& thread, - const DatabasePtr& database, - const ReplicaSessionPrx& session) : + MasterDatabaseObserverI(const shared_ptr<ReplicaSessionManager::Thread>& thread, + const shared_ptr<Database>& database, + const shared_ptr<ReplicaSessionPrx>& session) : _thread(thread), _database(database), _session(session) { } - virtual void - applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current) + void + applicationInit(int, ApplicationInfoSeq applications, const Ice::Current& current) override { int serial = 0; - _database->syncApplications(applications, getSerials(current.ctx, serial)); - receivedUpdate(ApplicationObserverTopicName, serial); + _database->syncApplications(move(applications), getSerials(current.ctx, serial)); + receivedUpdate(TopicName::ApplicationObserver, serial); } - virtual void - applicationAdded(int, const ApplicationInfo& application, const Ice::Current& current) + void + applicationAdded(int, ApplicationInfo application, const Ice::Current& current) override { int serial = 0; string failure; try { - _database->addApplication(application, 0, getSerials(current.ctx, serial)); + _database->addApplication(move(application), nullptr, getSerials(current.ctx, serial)); } catch(const DeploymentException& ex) { @@ -50,17 +50,17 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - receivedUpdate(ApplicationObserverTopicName, serial, failure); + receivedUpdate(TopicName::ApplicationObserver, serial, failure); } - virtual void - applicationRemoved(int, const std::string& name, const Ice::Current& current) + void + applicationRemoved(int, std::string name, const Ice::Current& current) override { int serial = 0; string failure; try { - _database->removeApplication(name, 0, getSerials(current.ctx, serial)); + _database->removeApplication(name, nullptr, getSerials(current.ctx, serial)); } catch(const ApplicationNotExistException& ex) { @@ -68,17 +68,17 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - receivedUpdate(ApplicationObserverTopicName, serial, failure); + receivedUpdate(TopicName::ApplicationObserver, serial, failure); } - virtual void - applicationUpdated(int, const ApplicationUpdateInfo& update, const Ice::Current& current) + void + applicationUpdated(int, ApplicationUpdateInfo update, const Ice::Current& current) override { int serial = 0; string failure; try { - _database->updateApplication(update, false, 0, getSerials(current.ctx, serial)); + _database->updateApplication(move(update), false, nullptr, getSerials(current.ctx, serial)); } catch(const DeploymentException& ex) { @@ -92,19 +92,19 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - receivedUpdate(ApplicationObserverTopicName, serial, failure); + receivedUpdate(TopicName::ApplicationObserver, serial, failure); } - virtual void - adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current) + void + adapterInit(AdapterInfoSeq adapters, const Ice::Current& current) override { int serial = 0; _database->syncAdapters(adapters, getSerials(current.ctx, serial)); - receivedUpdate(AdapterObserverTopicName, serial); + receivedUpdate(TopicName::AdapterObserver, serial); } - virtual void - adapterAdded(const AdapterInfo& info, const Ice::Current& current) + void + adapterAdded(AdapterInfo info, const Ice::Current& current) override { int serial = 0; string failure; @@ -116,11 +116,11 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - receivedUpdate(AdapterObserverTopicName, serial, failure); + receivedUpdate(TopicName::AdapterObserver, serial, failure); } - virtual void - adapterUpdated(const AdapterInfo& info, const Ice::Current& current) + void + adapterUpdated(AdapterInfo info, const Ice::Current& current) override { int serial = 0; string failure; @@ -132,35 +132,35 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - receivedUpdate(AdapterObserverTopicName, serial, failure); + receivedUpdate(TopicName::AdapterObserver, serial, failure); } - virtual void - adapterRemoved(const std::string& id, const Ice::Current& current) + void + adapterRemoved(std::string id, const Ice::Current& current) override { int serial = 0; string failure; try { - _database->setAdapterDirectProxy(id, "", 0, getSerials(current.ctx, serial)); + _database->setAdapterDirectProxy(id, "", nullptr, getSerials(current.ctx, serial)); } catch(const AdapterExistsException&) { failure = "adapter `" + id + "' already exists and belongs to an application"; } - receivedUpdate(AdapterObserverTopicName, serial, failure); + receivedUpdate(TopicName::AdapterObserver, serial, failure); } - virtual void - objectInit(const ObjectInfoSeq& objects, const Ice::Current& current) + void + objectInit(ObjectInfoSeq objects, const Ice::Current& current) override { int serial = 0; - _database->syncObjects(objects, getSerials(current.ctx, serial)); - receivedUpdate(ObjectObserverTopicName, serial); + _database->syncObjects(move(objects), getSerials(current.ctx, serial)); + receivedUpdate(TopicName::ObjectObserver, serial); } - virtual void - objectAdded(const ObjectInfo& info, const Ice::Current& current) + void + objectAdded(ObjectInfo info, const Ice::Current& current) override { int serial = 0; string failure; @@ -175,11 +175,11 @@ public: os << "id: " << _database->getCommunicator()->identityToString(info.proxy->ice_getIdentity()); failure = os.str(); } - receivedUpdate(ObjectObserverTopicName, serial, failure); + receivedUpdate(TopicName::ObjectObserver, serial, failure); } - virtual void - objectUpdated(const ObjectInfo& info, const Ice::Current& current) + void + objectUpdated(ObjectInfo info, const Ice::Current& current) override { int serial = 0; string failure; @@ -200,11 +200,11 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - receivedUpdate(ObjectObserverTopicName, serial, failure); + receivedUpdate(TopicName::ObjectObserver, serial, failure); } - virtual void - objectRemoved(const Ice::Identity& id, const Ice::Current& current) + void + objectRemoved(Ice::Identity id, const Ice::Current& current) override { int serial = 0; string failure; @@ -221,15 +221,15 @@ public: catch(const ObjectNotRegisteredException&) { } - receivedUpdate(ObjectObserverTopicName, serial, failure); + receivedUpdate(TopicName::ObjectObserver, serial, failure); } private: - Ice::Long + long long getSerials(const Ice::Context& context, int& serial) { - Ice::Context::const_iterator p = context.find("serial"); + auto p = context.find("serial"); if(p != context.end()) { istringstream is(p->second); @@ -243,7 +243,7 @@ private: p = context.find("dbSerial"); if(p != context.end()) { - Ice::Long dbSerial; + long long dbSerial; istringstream is(p->second); is >> dbSerial; return dbSerial; @@ -270,27 +270,22 @@ private: } } - const ReplicaSessionManager::ThreadPtr _thread; - const DatabasePtr _database; - const ReplicaSessionPrx _session; + const shared_ptr<ReplicaSessionManager::Thread> _thread; + const shared_ptr<Database> _database; + const shared_ptr<ReplicaSessionPrx> _session; }; }; -ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) : - SessionManager(communicator, instanceName) -{ -} - void ReplicaSessionManager::create(const string& name, - const InternalReplicaInfoPtr& info, - const DatabasePtr& database, - const WellKnownObjectsManagerPtr& wellKnownObjects, - const InternalRegistryPrx& internalRegistry) + const shared_ptr<InternalReplicaInfo>& info, + const shared_ptr<Database>& database, + const shared_ptr<WellKnownObjectsManager>& wellKnownObjects, + const shared_ptr<InternalRegistryPrx>& internalRegistry) { { - Lock sync(*this); + lock_guard lock(_mutex); _name = name; _info = info; @@ -299,9 +294,8 @@ ReplicaSessionManager::create(const string& name, _wellKnownObjects = wellKnownObjects; _traceLevels = _database->getTraceLevels(); - _thread = new Thread(*this, _master, _traceLevels->logger); - _thread->start(); - notifyAll(); + _thread = make_shared<Thread>(*this, _master, _traceLevels->logger); + _condVar.notify_all(); } _thread->tryCreateSession(); @@ -309,14 +303,13 @@ ReplicaSessionManager::create(const string& name, } void -ReplicaSessionManager::create(const InternalRegistryPrx& replica) +ReplicaSessionManager::create(const std::shared_ptr<InternalRegistryPrx>& replica) { { - Lock sync(*this); - while(!_master) // Wait to be initialized. - { - wait(); - } + unique_lock lock(_mutex); + + // Wait to be initialized. + _condVar.wait(lock, [this] { return _master; }); } if(replica->ice_getIdentity() != _master->ice_getIdentity()) @@ -354,27 +347,27 @@ ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const void ReplicaSessionManager::destroy() { - ThreadPtr thread; + shared_ptr<Thread> thread; { - Lock sync(*this); + lock_guard lock(_mutex); if(!_communicator) { return; } thread = _thread; - _thread = 0; + _thread = nullptr; - _communicator = 0; + _communicator = nullptr; } if(thread) { thread->terminate(); - thread->getThreadControl().join(); + thread->join(); } - _database = 0; - _wellKnownObjects = 0; + _database = nullptr; + _wellKnownObjects = nullptr; } void @@ -392,7 +385,7 @@ ReplicaSessionManager::registerAllWellKnownObjects() // If there's an active session, register the well-known objects // with the session. // - ReplicaSessionPrx session = _thread->getSession(); + auto session = _thread->getSession(); if(session) { try @@ -406,33 +399,31 @@ ReplicaSessionManager::registerAllWellKnownObjects() } } -IceGrid::InternalRegistryPrx +shared_ptr<InternalRegistryPrx> ReplicaSessionManager::findInternalRegistryForReplica(const Ice::Identity& id) { - vector<Ice::AsyncResultPtr> results; - vector<QueryPrx> queryObjects = findAllQueryObjects(true); - for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + vector<future<shared_ptr<Ice::ObjectPrx>>> results; + for(const auto& obj : findAllQueryObjects(true)) { - results.push_back((*q)->begin_findObjectById(id)); + results.push_back(obj->findObjectByIdAsync(id)); } - for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + for(auto& result : results) { - QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); try { - return InternalRegistryPrx::checkedCast(query->end_findObjectById(*p)); + return Ice::checkedCast<InternalRegistryPrx>(result.get()); } catch(const Ice::Exception&) { } } - return 0; + return nullptr; } bool -ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) +ReplicaSessionManager::keepAlive(const shared_ptr<ReplicaSessionPrx>& session) { try { @@ -456,11 +447,11 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) } } -ReplicaSessionPrx -ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Time& timeout) +shared_ptr<ReplicaSessionPrx> +ReplicaSessionManager::createSession(shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout) { - ReplicaSessionPrx session; - IceInternal::UniquePtr<Ice::Exception> exception; + shared_ptr<ReplicaSessionPrx> session; + string exceptionMsg = ""; try { if(_traceLevels && _traceLevels->replica > 1) @@ -469,7 +460,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim out << "trying to establish session with master replica"; } - set<InternalRegistryPrx> used; + set<shared_ptr<InternalRegistryPrx>> used; if(!registry->ice_getEndpoints().empty()) { try @@ -478,34 +469,32 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim } catch(const Ice::LocalException& ex) { - exception.reset(ex.ice_clone()); + exceptionMsg = ex.what(); used.insert(registry); - registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())); + registry = Ice::uncheckedCast<InternalRegistryPrx>(registry->ice_endpoints({})); } } if(!session) { - vector<Ice::AsyncResultPtr> results; - vector<QueryPrx> queryObjects = findAllQueryObjects(false); - for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + vector<future<shared_ptr<Ice::ObjectPrx>>> results; + for(const auto& obj : findAllQueryObjects(false)) { - results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity())); + results.push_back(obj->findObjectByIdAsync(registry->ice_getIdentity())); } - for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + for(auto& result : results) { - QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy()); if(isDestroyed()) { break; } - InternalRegistryPrx newRegistry; + shared_ptr<InternalRegistryPrx> newRegistry; try { - Ice::ObjectPrx obj = query->end_findObjectById(*p); - newRegistry = InternalRegistryPrx::uncheckedCast(obj); + auto obj = result.get(); + newRegistry = Ice::uncheckedCast<InternalRegistryPrx>(obj); if(newRegistry && used.find(newRegistry) == used.end()) { session = createSessionImpl(newRegistry, timeout); @@ -515,7 +504,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim } catch(const Ice::LocalException& ex) { - exception.reset(ex.ice_clone()); + exceptionMsg = ex.what(); if(newRegistry) { used.insert(newRegistry); @@ -530,7 +519,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim { _traceLevels->logger->error("a replica with the same name is already registered and active"); } - exception.reset(ex.ice_clone()); + exceptionMsg = ex.what(); } catch(const DeploymentException& ex) { @@ -538,7 +527,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim { _traceLevels->logger->error("database synchronization with master failed:\n" + ex.reason); } - exception.reset(ex.ice_clone()); + exceptionMsg = ex.what(); } catch(const PermissionDeniedException& ex) { @@ -546,11 +535,11 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim { _traceLevels->logger->error("connection to master was denied:\n" + ex.reason); } - exception.reset(ex.ice_clone()); + exceptionMsg = ex.what(); } catch(const Ice::Exception& ex) { - exception.reset(ex.ice_clone()); + exceptionMsg = ex.what(); } if(session) @@ -578,30 +567,30 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim else { out << "failed to establish session with master replica:\n"; - if(exception.get()) + if(exceptionMsg.empty()) { - out << *exception.get(); + out << "failed to get replica proxy"; } else { - out << "failed to get replica proxy"; + out << exceptionMsg; } } } return session; } -ReplicaSessionPrx -ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout) +shared_ptr<ReplicaSessionPrx> +ReplicaSessionManager::createSessionImpl(const shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout) { - ReplicaSessionPrx session; + shared_ptr<ReplicaSessionPrx> session; try { session = registry->registerReplica(_info, _internalRegistry); - int t = session->getTimeout(); + auto t = session->getTimeout(); if(t > 0) { - timeout = IceUtil::Time::seconds(t / 2); + timeout = chrono::seconds(t / 2); } // @@ -609,8 +598,8 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic // to the session so that it can subscribe it. This call only // returns once the observer is subscribed and initialized. // - DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session); - _observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant)); + auto servant = make_shared<MasterDatabaseObserverI>(_thread, _database, session); + _observer = Ice::uncheckedCast<DatabaseObserverPrx>(_database->getInternalAdapter()->addWithUUID(servant)); StringLongDict serials = _database->getSerials(); IceUtil::Optional<StringLongDict> serialsOpt; if(!serials.empty()) @@ -628,7 +617,7 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic } void -ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) +ReplicaSessionManager::destroySession(const shared_ptr<ReplicaSessionPrx>& session) { if(session) { @@ -661,6 +650,6 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session) catch(const Ice::LocalException&) { } - _observer = 0; + _observer = nullptr; } } |