summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp117
1 files changed, 73 insertions, 44 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index 6feb9f41403..705dd2e3eb9 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -23,9 +23,12 @@ class MasterDatabaseObserverI : public DatabaseObserver, public IceUtil::Mutex
{
public:
- MasterDatabaseObserverI(const DatabasePtr& database, ReplicaSessionManager& manager) :
+ MasterDatabaseObserverI(const ReplicaSessionManager::ThreadPtr& thread,
+ const DatabasePtr& database,
+ const ReplicaSessionPrx& session) :
+ _thread(thread),
_database(database),
- _manager(manager)
+ _session(session)
{
}
@@ -33,6 +36,7 @@ public:
applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current)
{
_database->syncApplications(applications);
+ receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx));
}
virtual void
@@ -49,7 +53,7 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -66,7 +70,7 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -89,13 +93,14 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current)
{
_database->syncAdapters(adapters);
+ receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx));
}
virtual void
@@ -106,7 +111,7 @@ public:
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -117,7 +122,7 @@ public:
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -128,13 +133,14 @@ public:
{
failure = "adapter `" + id + "' already exists and belongs to an application";
}
- _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
objectInit(const ObjectInfoSeq& objects, const Ice::Current& current)
{
_database->syncObjects(objects);
+ receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx));
}
virtual void
@@ -158,7 +164,7 @@ public:
os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity());
failure = os.str();
}
- _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -181,7 +187,7 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -206,7 +212,7 @@ public:
catch(const ObjectNotRegisteredException&)
{
}
- _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
}
private:
@@ -225,8 +231,25 @@ private:
return -1;
}
+ void
+ receivedUpdate(TopicName name, int serial, const string& failure = string())
+ {
+ try
+ {
+ _session->receivedUpdate(name, serial, failure);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ if(!failure.empty())
+ {
+ _thread->destroyActiveSession();
+ }
+ }
+
+ const ReplicaSessionManager::ThreadPtr _thread;
const DatabasePtr _database;
- ReplicaSessionManager& _manager;
+ const ReplicaSessionPrx _session;
};
@@ -243,19 +266,13 @@ ReplicaSessionManager::create(const string& name,
const WellKnownObjectsManagerPtr& wellKnownObjects,
const InternalRegistryPrx& internalRegistry)
{
- Ice::CommunicatorPtr communicator = database->getCommunicator();
- string instanceName = communicator->getDefaultLocator()->ice_getIdentity().category;
+ Ice::CommunicatorPtr comm = database->getCommunicator();
+ string instName = comm->getDefaultLocator()->ice_getIdentity().category;
{
Lock sync(*this);
- _master =
- InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/InternalRegistry-Master"));
-
- Ice::ObjectPrx obsv =
- database->getInternalAdapter()->addWithUUID(new MasterDatabaseObserverI(database, *this));
-
- _observer = DatabaseObserverPrx::uncheckedCast(obsv);
+ _master = InternalRegistryPrx::uncheckedCast(comm->stringToProxy(instName + "/InternalRegistry-Master"));
_name = name;
_info = info;
@@ -269,7 +286,7 @@ ReplicaSessionManager::create(const string& name,
notifyAll();
}
- _thread->tryCreateSession(_master);
+ _thread->tryCreateSession(0);
}
void
@@ -318,27 +335,6 @@ ReplicaSessionManager::destroy()
_thread->terminate();
_thread->getThreadControl().join();
- _thread = 0;
-}
-
-void
-ReplicaSessionManager::receivedUpdate(TopicName name, int serial, const string& failure)
-{
- ReplicaSessionPrx session = _thread->getSession();
- if(session)
- {
- try
- {
- session->receivedUpdate(name, serial, failure);
- }
- catch(const Ice::LocalException&)
- {
- }
- }
- if(!failure.empty())
- {
- _thread->destroyActiveSession();
- }
}
void
@@ -403,12 +399,21 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
out << "trying to establish session with master replica";
}
- ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry, _observer);
+ ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry);
int t = session->getTimeout();
if(t > 0)
{
timeout = IceUtil::Time::seconds(t / 2);
}
+
+ //
+ // Create a new database observer servant and give its proxy
+ // to the session so that it can subscribe it. This call only
+ // returns once the observer is subscribed and initialized.
+ //
+ DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session);
+ _observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant));
+ session->setDatabaseObserver(_observer);
//
// Register all the well-known objects with the replica session.
@@ -433,6 +438,18 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
}
catch(const Ice::LocalException& ex)
{
+ if(_observer)
+ {
+ try
+ {
+ _database->getInternalAdapter()->remove(_observer->ice_getIdentity());
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ _observer = 0;
+ }
+
//
// Re-register all the well known objects with the local database.
//
@@ -468,5 +485,17 @@ ReplicaSessionManager::destroySession(const ReplicaSessionPrx& session)
out << "couldn't destroy master replica session:\n" << ex;
}
}
+
+ if(_observer)
+ {
+ try
+ {
+ _database->getInternalAdapter()->remove(_observer->ice_getIdentity());
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ _observer = 0;
+ }
}