summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp233
1 files changed, 111 insertions, 122 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index da6f5cb4983..73547a5a304 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -14,35 +14,35 @@ using namespace IceGrid;
namespace IceGrid
{
-class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex
+class MasterDatabaseObserverI : public DatabaseObserver
{
public:
- MasterDatabaseObserverI(const ReplicaSessionManager::ThreadPtr& thread,
- const DatabasePtr& database,
- const ReplicaSessionPrx& session) :
+ MasterDatabaseObserverI(const shared_ptr<ReplicaSessionManager::Thread>& thread,
+ const shared_ptr<Database>& database,
+ const shared_ptr<ReplicaSessionPrx>& session) :
_thread(thread),
_database(database),
_session(session)
{
}
- virtual void
- applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current)
+ void
+ applicationInit(int, ApplicationInfoSeq applications, const Ice::Current& current) override
{
int serial = 0;
- _database->syncApplications(applications, getSerials(current.ctx, serial));
- receivedUpdate(ApplicationObserverTopicName, serial);
+ _database->syncApplications(move(applications), getSerials(current.ctx, serial));
+ receivedUpdate(TopicName::ApplicationObserver, serial);
}
- virtual void
- applicationAdded(int, const ApplicationInfo& application, const Ice::Current& current)
+ void
+ applicationAdded(int, ApplicationInfo application, const Ice::Current& current) override
{
int serial = 0;
string failure;
try
{
- _database->addApplication(application, 0, getSerials(current.ctx, serial));
+ _database->addApplication(move(application), nullptr, getSerials(current.ctx, serial));
}
catch(const DeploymentException& ex)
{
@@ -50,17 +50,17 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- receivedUpdate(ApplicationObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::ApplicationObserver, serial, failure);
}
- virtual void
- applicationRemoved(int, const std::string& name, const Ice::Current& current)
+ void
+ applicationRemoved(int, std::string name, const Ice::Current& current) override
{
int serial = 0;
string failure;
try
{
- _database->removeApplication(name, 0, getSerials(current.ctx, serial));
+ _database->removeApplication(name, nullptr, getSerials(current.ctx, serial));
}
catch(const ApplicationNotExistException& ex)
{
@@ -68,17 +68,17 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- receivedUpdate(ApplicationObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::ApplicationObserver, serial, failure);
}
- virtual void
- applicationUpdated(int, const ApplicationUpdateInfo& update, const Ice::Current& current)
+ void
+ applicationUpdated(int, ApplicationUpdateInfo update, const Ice::Current& current) override
{
int serial = 0;
string failure;
try
{
- _database->updateApplication(update, false, 0, getSerials(current.ctx, serial));
+ _database->updateApplication(move(update), false, nullptr, getSerials(current.ctx, serial));
}
catch(const DeploymentException& ex)
{
@@ -92,19 +92,19 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- receivedUpdate(ApplicationObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::ApplicationObserver, serial, failure);
}
- virtual void
- adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current)
+ void
+ adapterInit(AdapterInfoSeq adapters, const Ice::Current& current) override
{
int serial = 0;
_database->syncAdapters(adapters, getSerials(current.ctx, serial));
- receivedUpdate(AdapterObserverTopicName, serial);
+ receivedUpdate(TopicName::AdapterObserver, serial);
}
- virtual void
- adapterAdded(const AdapterInfo& info, const Ice::Current& current)
+ void
+ adapterAdded(AdapterInfo info, const Ice::Current& current) override
{
int serial = 0;
string failure;
@@ -116,11 +116,11 @@ public:
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- receivedUpdate(AdapterObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::AdapterObserver, serial, failure);
}
- virtual void
- adapterUpdated(const AdapterInfo& info, const Ice::Current& current)
+ void
+ adapterUpdated(AdapterInfo info, const Ice::Current& current) override
{
int serial = 0;
string failure;
@@ -132,35 +132,35 @@ public:
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- receivedUpdate(AdapterObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::AdapterObserver, serial, failure);
}
- virtual void
- adapterRemoved(const std::string& id, const Ice::Current& current)
+ void
+ adapterRemoved(std::string id, const Ice::Current& current) override
{
int serial = 0;
string failure;
try
{
- _database->setAdapterDirectProxy(id, "", 0, getSerials(current.ctx, serial));
+ _database->setAdapterDirectProxy(id, "", nullptr, getSerials(current.ctx, serial));
}
catch(const AdapterExistsException&)
{
failure = "adapter `" + id + "' already exists and belongs to an application";
}
- receivedUpdate(AdapterObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::AdapterObserver, serial, failure);
}
- virtual void
- objectInit(const ObjectInfoSeq& objects, const Ice::Current& current)
+ void
+ objectInit(ObjectInfoSeq objects, const Ice::Current& current) override
{
int serial = 0;
- _database->syncObjects(objects, getSerials(current.ctx, serial));
- receivedUpdate(ObjectObserverTopicName, serial);
+ _database->syncObjects(move(objects), getSerials(current.ctx, serial));
+ receivedUpdate(TopicName::ObjectObserver, serial);
}
- virtual void
- objectAdded(const ObjectInfo& info, const Ice::Current& current)
+ void
+ objectAdded(ObjectInfo info, const Ice::Current& current) override
{
int serial = 0;
string failure;
@@ -175,11 +175,11 @@ public:
os << "id: " << _database->getCommunicator()->identityToString(info.proxy->ice_getIdentity());
failure = os.str();
}
- receivedUpdate(ObjectObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::ObjectObserver, serial, failure);
}
- virtual void
- objectUpdated(const ObjectInfo& info, const Ice::Current& current)
+ void
+ objectUpdated(ObjectInfo info, const Ice::Current& current) override
{
int serial = 0;
string failure;
@@ -200,11 +200,11 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- receivedUpdate(ObjectObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::ObjectObserver, serial, failure);
}
- virtual void
- objectRemoved(const Ice::Identity& id, const Ice::Current& current)
+ void
+ objectRemoved(Ice::Identity id, const Ice::Current& current) override
{
int serial = 0;
string failure;
@@ -221,15 +221,15 @@ public:
catch(const ObjectNotRegisteredException&)
{
}
- receivedUpdate(ObjectObserverTopicName, serial, failure);
+ receivedUpdate(TopicName::ObjectObserver, serial, failure);
}
private:
- Ice::Long
+ long long
getSerials(const Ice::Context& context, int& serial)
{
- Ice::Context::const_iterator p = context.find("serial");
+ auto p = context.find("serial");
if(p != context.end())
{
istringstream is(p->second);
@@ -243,7 +243,7 @@ private:
p = context.find("dbSerial");
if(p != context.end())
{
- Ice::Long dbSerial;
+ long long dbSerial;
istringstream is(p->second);
is >> dbSerial;
return dbSerial;
@@ -270,27 +270,22 @@ private:
}
}
- const ReplicaSessionManager::ThreadPtr _thread;
- const DatabasePtr _database;
- const ReplicaSessionPrx _session;
+ const shared_ptr<ReplicaSessionManager::Thread> _thread;
+ const shared_ptr<Database> _database;
+ const shared_ptr<ReplicaSessionPrx> _session;
};
};
-ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) :
- SessionManager(communicator, instanceName)
-{
-}
-
void
ReplicaSessionManager::create(const string& name,
- const InternalReplicaInfoPtr& info,
- const DatabasePtr& database,
- const WellKnownObjectsManagerPtr& wellKnownObjects,
- const InternalRegistryPrx& internalRegistry)
+ const shared_ptr<InternalReplicaInfo>& info,
+ const shared_ptr<Database>& database,
+ const shared_ptr<WellKnownObjectsManager>& wellKnownObjects,
+ const shared_ptr<InternalRegistryPrx>& internalRegistry)
{
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_name = name;
_info = info;
@@ -299,9 +294,8 @@ ReplicaSessionManager::create(const string& name,
_wellKnownObjects = wellKnownObjects;
_traceLevels = _database->getTraceLevels();
- _thread = new Thread(*this, _master, _traceLevels->logger);
- _thread->start();
- notifyAll();
+ _thread = make_shared<Thread>(*this, _master, _traceLevels->logger);
+ _condVar.notify_all();
}
_thread->tryCreateSession();
@@ -309,14 +303,13 @@ ReplicaSessionManager::create(const string& name,
}
void
-ReplicaSessionManager::create(const InternalRegistryPrx& replica)
+ReplicaSessionManager::create(const std::shared_ptr<InternalRegistryPrx>& replica)
{
{
- Lock sync(*this);
- while(!_master) // Wait to be initialized.
- {
- wait();
- }
+ unique_lock lock(_mutex);
+
+ // Wait to be initialized.
+ _condVar.wait(lock, [this] { return _master; });
}
if(replica->ice_getIdentity() != _master->ice_getIdentity())
@@ -354,27 +347,27 @@ ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const
void
ReplicaSessionManager::destroy()
{
- ThreadPtr thread;
+ shared_ptr<Thread> thread;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(!_communicator)
{
return;
}
thread = _thread;
- _thread = 0;
+ _thread = nullptr;
- _communicator = 0;
+ _communicator = nullptr;
}
if(thread)
{
thread->terminate();
- thread->getThreadControl().join();
+ thread->join();
}
- _database = 0;
- _wellKnownObjects = 0;
+ _database = nullptr;
+ _wellKnownObjects = nullptr;
}
void
@@ -392,7 +385,7 @@ ReplicaSessionManager::registerAllWellKnownObjects()
// If there's an active session, register the well-known objects
// with the session.
//
- ReplicaSessionPrx session = _thread->getSession();
+ auto session = _thread->getSession();
if(session)
{
try
@@ -406,33 +399,31 @@ ReplicaSessionManager::registerAllWellKnownObjects()
}
}
-IceGrid::InternalRegistryPrx
+shared_ptr<InternalRegistryPrx>
ReplicaSessionManager::findInternalRegistryForReplica(const Ice::Identity& id)
{
- vector<Ice::AsyncResultPtr> results;
- vector<QueryPrx> queryObjects = findAllQueryObjects(true);
- for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ vector<future<shared_ptr<Ice::ObjectPrx>>> results;
+ for(const auto& obj : findAllQueryObjects(true))
{
- results.push_back((*q)->begin_findObjectById(id));
+ results.push_back(obj->findObjectByIdAsync(id));
}
- for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p)
+ for(auto& result : results)
{
- QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
try
{
- return InternalRegistryPrx::checkedCast(query->end_findObjectById(*p));
+ return Ice::checkedCast<InternalRegistryPrx>(result.get());
}
catch(const Ice::Exception&)
{
}
}
- return 0;
+ return nullptr;
}
bool
-ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session)
+ReplicaSessionManager::keepAlive(const shared_ptr<ReplicaSessionPrx>& session)
{
try
{
@@ -456,11 +447,11 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session)
}
}
-ReplicaSessionPrx
-ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Time& timeout)
+shared_ptr<ReplicaSessionPrx>
+ReplicaSessionManager::createSession(shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout)
{
- ReplicaSessionPrx session;
- IceInternal::UniquePtr<Ice::Exception> exception;
+ shared_ptr<ReplicaSessionPrx> session;
+ string exceptionMsg = "";
try
{
if(_traceLevels && _traceLevels->replica > 1)
@@ -469,7 +460,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
out << "trying to establish session with master replica";
}
- set<InternalRegistryPrx> used;
+ set<shared_ptr<InternalRegistryPrx>> used;
if(!registry->ice_getEndpoints().empty())
{
try
@@ -478,34 +469,32 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
}
catch(const Ice::LocalException& ex)
{
- exception.reset(ex.ice_clone());
+ exceptionMsg = ex.what();
used.insert(registry);
- registry = InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()));
+ registry = Ice::uncheckedCast<InternalRegistryPrx>(registry->ice_endpoints({}));
}
}
if(!session)
{
- vector<Ice::AsyncResultPtr> results;
- vector<QueryPrx> queryObjects = findAllQueryObjects(false);
- for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ vector<future<shared_ptr<Ice::ObjectPrx>>> results;
+ for(const auto& obj : findAllQueryObjects(false))
{
- results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity()));
+ results.push_back(obj->findObjectByIdAsync(registry->ice_getIdentity()));
}
- for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p)
+ for(auto& result : results)
{
- QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
if(isDestroyed())
{
break;
}
- InternalRegistryPrx newRegistry;
+ shared_ptr<InternalRegistryPrx> newRegistry;
try
{
- Ice::ObjectPrx obj = query->end_findObjectById(*p);
- newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ auto obj = result.get();
+ newRegistry = Ice::uncheckedCast<InternalRegistryPrx>(obj);
if(newRegistry && used.find(newRegistry) == used.end())
{
session = createSessionImpl(newRegistry, timeout);
@@ -515,7 +504,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
}
catch(const Ice::LocalException& ex)
{
- exception.reset(ex.ice_clone());
+ exceptionMsg = ex.what();
if(newRegistry)
{
used.insert(newRegistry);
@@ -530,7 +519,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
{
_traceLevels->logger->error("a replica with the same name is already registered and active");
}
- exception.reset(ex.ice_clone());
+ exceptionMsg = ex.what();
}
catch(const DeploymentException& ex)
{
@@ -538,7 +527,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
{
_traceLevels->logger->error("database synchronization with master failed:\n" + ex.reason);
}
- exception.reset(ex.ice_clone());
+ exceptionMsg = ex.what();
}
catch(const PermissionDeniedException& ex)
{
@@ -546,11 +535,11 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
{
_traceLevels->logger->error("connection to master was denied:\n" + ex.reason);
}
- exception.reset(ex.ice_clone());
+ exceptionMsg = ex.what();
}
catch(const Ice::Exception& ex)
{
- exception.reset(ex.ice_clone());
+ exceptionMsg = ex.what();
}
if(session)
@@ -578,30 +567,30 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
else
{
out << "failed to establish session with master replica:\n";
- if(exception.get())
+ if(exceptionMsg.empty())
{
- out << *exception.get();
+ out << "failed to get replica proxy";
}
else
{
- out << "failed to get replica proxy";
+ out << exceptionMsg;
}
}
}
return session;
}
-ReplicaSessionPrx
-ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
+shared_ptr<ReplicaSessionPrx>
+ReplicaSessionManager::createSessionImpl(const shared_ptr<InternalRegistryPrx>& registry, chrono::seconds& timeout)
{
- ReplicaSessionPrx session;
+ shared_ptr<ReplicaSessionPrx> session;
try
{
session = registry->registerReplica(_info, _internalRegistry);
- int t = session->getTimeout();
+ auto t = session->getTimeout();
if(t > 0)
{
- timeout = IceUtil::Time::seconds(t / 2);
+ timeout = chrono::seconds(t / 2);
}
//
@@ -609,8 +598,8 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic
// to the session so that it can subscribe it. This call only
// returns once the observer is subscribed and initialized.
//
- DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session);
- _observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant));
+ auto servant = make_shared<MasterDatabaseObserverI>(_thread, _database, session);
+ _observer = Ice::uncheckedCast<DatabaseObserverPrx>(_database->getInternalAdapter()->addWithUUID(servant));
StringLongDict serials = _database->getSerials();
IceUtil::Optional<StringLongDict> serialsOpt;
if(!serials.empty())
@@ -628,7 +617,7 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic
}
void
-ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session)
+ReplicaSessionManager::destroySession(const shared_ptr<ReplicaSessionPrx>& session)
{
if(session)
{
@@ -661,6 +650,6 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session)
catch(const Ice::LocalException&)
{
}
- _observer = 0;
+ _observer = nullptr;
}
}