summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaSessionManager.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-09-03 15:42:19 +0200
committerBenoit Foucher <benoit@zeroc.com>2013-09-03 15:42:19 +0200
commit91f6ebb998532b36fc70187b641a5b7404060422 (patch)
treeac88e961c68e4b09eb819f4b57b9ecac56854567 /cpp/src/IceGrid/ReplicaSessionManager.cpp
parentICE-5378 - Remove slice35d.dll from Windows installer (diff)
downloadice-91f6ebb998532b36fc70187b641a5b7404060422.tar.bz2
ice-91f6ebb998532b36fc70187b641a5b7404060422.tar.xz
ice-91f6ebb998532b36fc70187b641a5b7404060422.zip
Fixed ICE-5358 - allow IceGrid replica to initialize its database from another replica
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp155
1 files changed, 107 insertions, 48 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index a706df8a4f4..88992d1f81f 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -35,17 +35,19 @@ public:
virtual void
applicationInit(int, const ApplicationInfoSeq& applications, const Ice::Current& current)
{
- _database->syncApplications(applications);
- receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx));
+ int serial;
+ _database->syncApplications(applications, getSerials(current.ctx, serial));
+ receivedUpdate(ApplicationObserverTopicName, serial);
}
virtual void
applicationAdded(int, const ApplicationInfo& application, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->addApplication(application);
+ _database->addApplication(application, 0, getSerials(current.ctx, serial));
}
catch(const DeploymentException& ex)
{
@@ -53,16 +55,17 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ApplicationObserverTopicName, serial, failure);
}
virtual void
applicationRemoved(int, const std::string& name, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->removeApplication(name);
+ _database->removeApplication(name, 0, getSerials(current.ctx, serial));
}
catch(const ApplicationNotExistException& ex)
{
@@ -70,16 +73,17 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ApplicationObserverTopicName, serial, failure);
}
virtual void
applicationUpdated(int, const ApplicationUpdateInfo& update, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->updateApplication(update, false);
+ _database->updateApplication(update, false, 0, getSerials(current.ctx, serial));
}
catch(const DeploymentException& ex)
{
@@ -93,75 +97,81 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ApplicationObserverTopicName, serial, failure);
}
virtual void
adapterInit(const AdapterInfoSeq& adapters, const Ice::Current& current)
{
- _database->syncAdapters(adapters);
- receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx));
+ int serial;
+ _database->syncAdapters(adapters, getSerials(current.ctx, serial));
+ receivedUpdate(AdapterObserverTopicName, serial);
}
virtual void
adapterAdded(const AdapterInfo& info, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy);
+ _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, getSerials(current.ctx, serial));
}
catch(const AdapterExistsException&)
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(AdapterObserverTopicName, serial, failure);
}
virtual void
adapterUpdated(const AdapterInfo& info, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy);
+ _database->setAdapterDirectProxy(info.id, info.replicaGroupId, info.proxy, getSerials(current.ctx, serial));
}
catch(const AdapterExistsException&)
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(AdapterObserverTopicName, serial, failure);
}
virtual void
adapterRemoved(const std::string& id, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->setAdapterDirectProxy(id, "", 0);
+ _database->setAdapterDirectProxy(id, "", 0, getSerials(current.ctx, serial));
}
catch(const AdapterExistsException&)
{
failure = "adapter `" + id + "' already exists and belongs to an application";
}
- receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(AdapterObserverTopicName, serial, failure);
}
virtual void
objectInit(const ObjectInfoSeq& objects, const Ice::Current& current)
{
- _database->syncObjects(objects);
- receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx));
+ int serial;
+ _database->syncObjects(objects, getSerials(current.ctx, serial));
+ receivedUpdate(ObjectObserverTopicName, serial);
}
virtual void
objectAdded(const ObjectInfo& info, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->addOrUpdateObject(info);
+ _database->addOrUpdateObject(info, getSerials(current.ctx, serial));
}
catch(const ObjectExistsException& ex)
{
@@ -170,16 +180,17 @@ public:
os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity());
failure = os.str();
}
- receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ObjectObserverTopicName, serial, failure);
}
virtual void
objectUpdated(const ObjectInfo& info, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->addOrUpdateObject(info);
+ _database->addOrUpdateObject(info, getSerials(current.ctx, serial));
}
catch(const ObjectExistsException& ex)
{
@@ -194,16 +205,17 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ObjectObserverTopicName, serial, failure);
}
virtual void
objectRemoved(const Ice::Identity& id, const Ice::Current& current)
{
+ int serial;
string failure;
try
{
- _database->removeObject(id);
+ _database->removeObject(id, getSerials(current.ctx, serial));
}
catch(const DeploymentException& ex)
{
@@ -214,23 +226,37 @@ public:
catch(const ObjectNotRegisteredException&)
{
}
- receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
+ receivedUpdate(ObjectObserverTopicName, serial, failure);
}
private:
- int
- getSerial(const Ice::Context& context)
+ Ice::Long
+ getSerials(const Ice::Context& context, int& serial)
{
Ice::Context::const_iterator p = context.find("serial");
if(p != context.end())
{
- int serial;
istringstream is(p->second);
is >> serial;
- return serial;
}
- return -1;
+ else
+ {
+ serial = -1;
+ }
+
+ p = context.find("dbSerial");
+ if(p != context.end())
+ {
+ Ice::Long dbSerial;
+ istringstream is(p->second);
+ is >> dbSerial;
+ return dbSerial;
+ }
+ else
+ {
+ return -1;
+ }
}
void
@@ -256,7 +282,7 @@ private:
};
-ReplicaSessionManager::ReplicaSessionManager()
+ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator) : SessionManager(communicator)
{
}
@@ -267,15 +293,9 @@ ReplicaSessionManager::create(const string& name,
const WellKnownObjectsManagerPtr& wellKnownObjects,
const InternalRegistryPrx& internalRegistry)
{
- Ice::CommunicatorPtr comm = database->getCommunicator();
{
Lock sync(*this);
- Ice::ObjectPrx prx = comm->getDefaultLocator();
- Ice::Identity id = prx->ice_getIdentity();
- id.name = "InternalRegistry-Master";
-
- _master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()));
_name = name;
_info = info;
_internalRegistry = internalRegistry;
@@ -283,11 +303,6 @@ ReplicaSessionManager::create(const string& name,
_wellKnownObjects = wellKnownObjects;
_traceLevels = _database->getTraceLevels();
- //
- // Initialize query objects from the default locator endpoints.
- //
- initQueryObjects(comm->getDefaultLocator());
-
_thread = new Thread(*this, _master, _traceLevels->logger);
_thread->start();
notifyAll();
@@ -344,16 +359,21 @@ ReplicaSessionManager::destroy()
ThreadPtr thread;
{
Lock sync(*this);
- if(!_thread)
+ if(!_communicator)
{
return;
}
thread = _thread;
_thread = 0;
+
+ _communicator = 0;
}
- thread->terminate();
- thread->getThreadControl().join();
+ if(thread)
+ {
+ thread->terminate();
+ thread->getThreadControl().join();
+ }
_database = 0;
_wellKnownObjects = 0;
@@ -388,6 +408,31 @@ ReplicaSessionManager::registerAllWellKnownObjects()
}
}
+IceGrid::InternalRegistryPrx
+ReplicaSessionManager::findInternalRegistryForReplica(const Ice::Identity& id)
+{
+ vector<Ice::AsyncResultPtr> results;
+ vector<QueryPrx> queryObjects = findAllQueryObjects();
+ for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ {
+ results.push_back((*q)->begin_findObjectById(id));
+ }
+
+ for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p)
+ {
+ QueryPrx query = QueryPrx::uncheckedCast((*p)->getProxy());
+ try
+ {
+ return InternalRegistryPrx::checkedCast(query->end_findObjectById(*p));
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ }
+
+ return 0;
+}
+
bool
ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session)
{
@@ -433,7 +478,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
{
session = createSessionImpl(registry, timeout);
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::LocalException& ex)
{
exception.reset(ex.ice_clone());
used.insert(registry);
@@ -470,7 +515,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
break;
}
}
- catch(const Ice::Exception& ex)
+ catch(const Ice::LocalException& ex)
{
exception.reset(ex.ice_clone());
if(newRegistry)
@@ -489,11 +534,19 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
}
exception.reset(ex.ice_clone());
}
+ catch(const DeploymentException& ex)
+ {
+ if(_traceLevels)
+ {
+ _traceLevels->logger->error("database synchronization with master failed:\n" + ex.reason);
+ }
+ exception.reset(ex.ice_clone());
+ }
catch(const PermissionDeniedException& ex)
{
if(_traceLevels)
{
- _traceLevels->logger->error("connection to the the registry `" + _name + "' was denied:\n" + ex.reason);
+ _traceLevels->logger->error("connection to master was denied:\n" + ex.reason);
}
exception.reset(ex.ice_clone());
}
@@ -560,7 +613,13 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic
//
DatabaseObserverPtr servant = new MasterDatabaseObserverI(_thread, _database, session);
_observer = DatabaseObserverPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(servant));
- session->setDatabaseObserver(_observer);
+ StringLongDict serials = _database->getSerials();
+ IceUtil::Optional<StringLongDict> serialsOpt;
+ if(!serials.empty())
+ {
+ serialsOpt = serials; // Don't provide serials parameter if serials aren't supported.
+ }
+ session->setDatabaseObserver(_observer, serialsOpt);
return session;
}
catch(const Ice::Exception&)