diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-04-05 12:54:15 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-04-05 12:54:15 +0000 |
commit | 872fd9e5dfb743a648c64bf774f6c9a76a45b651 (patch) | |
tree | c071c5dc5bd1177c0913e82cb8a3233f0da50ea6 /cpp/src/IceGrid/Database.cpp | |
parent | adding timeout test (diff) | |
download | ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.bz2 ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.xz ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.zip |
- Added support for observing adapters and objects.
- Lots of cleanup in the IceGrid registry initilization method.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 330 |
1 files changed, 189 insertions, 141 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 8a62f3c0ef0..65f963b25f8 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -182,6 +182,8 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb { int serial; ApplicationDescriptorSeq applications; + AdapterInfoSeq adapters; + ObjectInfoSeq objects; { Lock sync(*this); _registryObserver = registryObserver; @@ -192,12 +194,26 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb { applications.push_back(p->second); } + + for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + adapters.push_back(p->second); + if(adapters.back().id.empty()) + { + adapters.back().id = p->first; + } + } + + for(IdentityObjectInfoDict::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + objects.push_back(p->second); + } } // // Notify the observers. // - _registryObserver->init(serial, applications); + _registryObserver->init(serial, applications, adapters, objects); } void @@ -563,9 +579,9 @@ Database::getAllNodeServers(const string& node) bool Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, _adapterDbName); - if(proxy) + AdapterInfo info; + int serial; + bool updated = false; { Lock sync(*this); if(_adapterCache.has(adapterId)) @@ -573,65 +589,64 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr return false; } - StringAdapterInfoDict::iterator p = adapters.find(adapterId); - if(p != adapters.end()) + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + if(proxy) { - AdapterInfo info = p->second; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - p.set(info); - if(_traceLevels->adapter > 0) + if(p != _adapters.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "updated adapter `" << adapterId << "'"; - if(!replicaGroupId.empty()) - { - out << " with replica group `" << replicaGroupId << "'"; - } + info = p->second; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + p.set(info); + updated = true; } + else + { + info.id = adapterId; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); + } } else { - AdapterInfo info; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); - if(_traceLevels->adapter > 0) + if(p == _adapters.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "added adapter `" << adapterId << "'"; - if(!replicaGroupId.empty()) - { - out << " with replica group `" << replicaGroupId << "'"; - } + return true; } - } - return true; + _adapters.erase(p); + } + + serial = ++_serial; } - else + + + if(_traceLevels->adapter > 0) { - Lock sync(*this); - if(_adapterCache.has(adapterId)) + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'"; + if(!replicaGroupId.empty()) { - return false; - } + out << " with replica group `" << replicaGroupId << "'"; + } + } - StringAdapterInfoDict::iterator p = adapters.find(adapterId); - if(p == adapters.end()) + if(proxy) + { + if(updated) { - return true; + _registryObserver->adapterUpdated(serial, info); } - AdapterInfo info = p->second; - adapters.erase(p); - - if(_traceLevels->adapter > 0) + else { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; + _registryObserver->adapterAdded(serial, info); } - - return true; } + else + { + _registryObserver->adapterRemoved(serial, adapterId); + } + return true; } Ice::ObjectPrx @@ -650,62 +665,73 @@ Database::getAdapterDirectProxy(const string& adapterId) void Database::removeAdapter(const string& adapterId) { - try - { - AdapterEntryPtr adpt = _adapterCache.get(adapterId); - DeploymentException ex; - ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; - ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; - throw ex; - } - catch(const AdapterNotExistException&) + AdapterInfoSeq infos; + int serial; { - } + Lock sync(*this); + if(_adapterCache.has(adapterId)) + { + AdapterEntryPtr adpt = _adapterCache.get(adapterId); + DeploymentException ex; + ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; + ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; + throw ex; + } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + Freeze::TransactionHolder txHolder(_connection); - { - StringAdapterInfoDict adapters(connection, _adapterDbName); - StringAdapterInfoDict::iterator p = adapters.find(adapterId); - if(p != adapters.end()) + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + if(p != _adapters.end()) { - AdapterInfo info = p->second; - adapters.erase(p); - - if(_traceLevels->adapter > 0) + _adapters.erase(p); + } + else + { + p = _adapters.findByReplicaGroupId(adapterId, true); + if(p == _adapters.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; + throw AdapterNotExistException(adapterId); + } + + while(p != _adapters.end()) + { + AdapterInfo info = p->second; + info.replicaGroupId = ""; + infos.push_back(info); + _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); + ++p; } - return; } - } - { - Freeze::TransactionHolder txHolder(connection); - StringAdapterInfoDict adapters(connection, _adapterDbName); + txHolder.commit(); - StringAdapterInfoDict::iterator p = adapters.findByReplicaGroupId(adapterId, true); - if(p == adapters.end()) + if(infos.empty()) { - throw AdapterNotExistException(adapterId); + serial = ++_serial; } - - while(p != adapters.end()) + else { - AdapterInfo info = p->second; - info.replicaGroupId = ""; - adapters.put(StringAdapterInfoDict::value_type(p->first, info)); - ++p; + serial = _serial; + _serial += infos.size(); } - - txHolder.commit(); } if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed replica group `" << adapterId << "'"; + out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; + } + + if(infos.empty()) + { + _registryObserver->adapterRemoved(serial, adapterId); + } + else + { + for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + _registryObserver->adapterUpdated(++serial, *p); + } } } @@ -744,7 +770,7 @@ Database::getAdapters(const string& id, bool allRegistered, int& endpointCount) Ice::Identity identity; identity.category = "IceGridAdapter"; identity.name = id; - Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity)->ice_collocationOptimization(false); + Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity); adpts.push_back(make_pair(id, AdapterPrx::uncheckedCast(adpt))); endpointCount = 1; return adpts; @@ -809,21 +835,28 @@ Database::getAllAdapters(const string& expression) void Database::addObject(const ObjectInfo& info) { - Lock sync(*this); - + int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); - if(_objectCache.has(id)) { - throw ObjectExistsException(id); - } - - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - if(objects.find(id) != objects.end()) - { - throw ObjectExistsException(id); + Lock sync(*this); + if(_objectCache.has(id)) + { + throw ObjectExistsException(id); + } + + if(_objects.find(id) != _objects.end()) + { + throw ObjectExistsException(id); + } + _objects.put(IdentityObjectInfoDict::value_type(id, info)); + + serial = ++_serial; } - objects.put(IdentityObjectInfoDict::value_type(id, info)); + + // + // Notify the observers. + // + _registryObserver->objectAdded(serial, info); if(_traceLevels->object > 0) { @@ -835,33 +868,40 @@ Database::addObject(const ObjectInfo& info) void Database::removeObject(const Ice::Identity& id) { - try - { - ObjectEntryPtr obj = _objectCache.get(id); - DeploymentException ex; - ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'"; - throw ex; - } - catch(const ObjectNotRegisteredException&) + int serial; { - } + Lock sync(*this); + if(_objectCache.has(id)) + { + DeploymentException ex; + ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - if(objects.find(id) == objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - if(objects.erase(id) > 0) - { - if(_traceLevels->object > 0) + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "removed object `" << Ice::identityToString(id) << "'"; + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } + _objects.erase(p); + + serial = ++_serial; + } + + // + // Notify the observers. + // + _registryObserver->objectRemoved(serial, id); + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "removed object `" << Ice::identityToString(id) << "'"; } } @@ -869,31 +909,39 @@ void Database::updateObject(const Ice::ObjectPrx& proxy) { const Ice::Identity id = proxy->ice_getIdentity(); - try - { - ObjectEntryPtr obj = _objectCache.get(id); - DeploymentException ex; - ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'"; - throw ex; - } - catch(const ObjectNotRegisteredException&) + int serial; + ObjectInfo info; { - } + Lock sync(*this); + if(_objectCache.has(id)) + { + DeploymentException ex; + ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - IdentityObjectInfoDict::iterator p = objects.find(id); - if(p == objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + + info = p->second; + info.proxy = proxy; + p.set(info); + + serial = ++_serial; } - ObjectInfo info = p->second; - info.proxy = proxy; - p.set(info); + // + // Notify the observers. + // + _registryObserver->objectUpdated(serial, info); if(_traceLevels->object > 0) { |