diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-06-02 16:11:51 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-06-02 16:11:51 +0000 |
commit | 8d90c43bd49c728f3c553a4e4d2ad7d25367a872 (patch) | |
tree | 078cd0193fcdfa12936eae5a6372ae750ecb79f7 /cpp/src/IceGrid/Database.cpp | |
parent | warm up JIT compiler before measuring latency (diff) | |
download | ice-8d90c43bd49c728f3c553a4e4d2ad7d25367a872.tar.bz2 ice-8d90c43bd49c728f3c553a4e4d2ad7d25367a872.tar.xz ice-8d90c43bd49c728f3c553a4e4d2ad7d25367a872.zip |
Bug fixes
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 328 |
1 files changed, 196 insertions, 132 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 7ca95468137..7a3f32bd760 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -185,31 +185,29 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb ApplicationDescriptorSeq applications; AdapterInfoSeq adapters; ObjectInfoSeq objects; - { - Lock sync(*this); - _registryObserver = registryObserver; - _nodeObserver = nodeObserver; - serial = _serial; - - for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) - { - applications.push_back(p->second); - } - for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) - { - adapters.push_back(q->second); - if(adapters.back().id.empty()) - { - adapters.back().id = q->first; - } - } - - for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) + _registryObserver = registryObserver; + _nodeObserver = nodeObserver; + serial = _serial; + + for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) + { + applications.push_back(p->second); + } + + for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) + { + adapters.push_back(q->second); + if(adapters.back().id.empty()) { - objects.push_back(r->second); + adapters.back().id = q->first; } } + + for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) + { + objects.push_back(r->second); + } // // Notify the observers. @@ -264,43 +262,64 @@ void Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc) { ServerEntrySeq entries; - int serial; - DistributionDescriptor appDistrib; - map<string, DistributionDescriptorDict> nodeDistrib; - { Lock sync(*this); - checkSessionLock(session); - // - // We first ensure that the application doesn't already exist - // and that the application components don't already exist. - // + while(_updating.find(desc.name) != _updating.end()) + { + wait(); + } + if(_descriptors.find(desc.name) != _descriptors.end()) { throw DeploymentException("application `" + desc.name + "' already exists"); - } + } ApplicationHelper helper(_communicator, desc); - - // - // Ensure that the application servers, adapters and objects - // aren't already registered. - // checkForAddition(helper); - - // - // Register the application servers, adapters, objects. - // load(helper, entries); - - // - // Save the application descriptor. - // - _descriptors.put(StringApplicationDescriptorDict::value_type(desc.name, desc)); - - serial = ++_serial; + _updating.insert(desc.name); + } + + // + // Synchronize the servers on the nodes. If a server couldn't be + // deployed we unload the application and throw. + // + try + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + catch(const DeploymentException& ex) + { + { + Lock sync(*this); + entries.clear(); + unload(ApplicationHelper(_communicator, desc), entries); + _updating.erase(desc.name); + notifyAll(); + } + try + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + catch(const DeploymentException& ex) + { + // TODO: warning? + } + throw ex; + } + + // + // Save the application descriptor. + // + int serial; + { + Lock sync(*this); + _descriptors.put(StringApplicationDescriptorDict::value_type(desc.name, desc)); + serial = ++_serial; + _updating.erase(desc.name); + notifyAll(); } // @@ -313,22 +332,23 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "added application `" << desc.name << "'"; } - - // - // Synchronize the servers on the nodes. - // - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } void Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update) { ServerEntrySeq entries; - int serial; + ApplicationDescriptor oldDesc; + ApplicationDescriptor newDesc; { Lock sync(*this); checkSessionLock(session); + while(_updating.find(update.name) != _updating.end()) + { + wait(); + } + StringApplicationDescriptorDict::const_iterator p = _descriptors.find(update.name); if(p == _descriptors.end()) { @@ -337,40 +357,34 @@ Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationU ApplicationHelper previous(_communicator, p->second); ApplicationHelper helper(_communicator, previous.update(update)); - + checkForUpdate(previous, helper); - reload(previous, helper, entries); - - _descriptors.put(StringApplicationDescriptorDict::value_type(update.name, helper.getDefinition())); - - serial = ++_serial; - } - // - // Notify the observers. - // - _registryObserver->applicationUpdated(serial, update); + oldDesc = previous.getDefinition(); + newDesc = helper.getDefinition(); - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "updated application `" << update.name << "'"; + _updating.insert(update.name); } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + finishUpdate(entries, update, oldDesc, newDesc); } void Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& newDesc) { ServerEntrySeq entries; - int serial; ApplicationUpdateDescriptor update; + ApplicationDescriptor oldDesc; { Lock sync(*this); checkSessionLock(session); + while(_updating.find(update.name) != _updating.end()) + { + wait(); + } + StringApplicationDescriptorDict::const_iterator p = _descriptors.find(newDesc.name); if(p == _descriptors.end()) { @@ -381,27 +395,56 @@ Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDes ApplicationHelper helper(_communicator, newDesc); update = helper.diff(previous); - checkForUpdate(previous, helper); - - reload(previous, helper, entries); - - _descriptors.put(StringApplicationDescriptorDict::value_type(newDesc.name, newDesc)); + checkForUpdate(previous, helper); + reload(previous, helper, entries); - serial = ++_serial; + oldDesc = previous.getDefinition(); + + _updating.insert(update.name); } - // - // Notify the observers. - // - _registryObserver->applicationUpdated(serial, update); + finishUpdate(entries, update, oldDesc, newDesc); +} - if(_traceLevels->application > 0) +void +Database::instantiateServer(AdminSessionI* session, + const string& application, + const string& node, + const ServerInstanceDescriptor& instance) +{ + ServerEntrySeq entries; + ApplicationUpdateDescriptor update; + ApplicationDescriptor oldDesc; + ApplicationDescriptor newDesc; { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "synced application `" << newDesc.name << "'"; + Lock sync(*this); + checkSessionLock(session); + + while(_updating.find(update.name) != _updating.end()) + { + wait(); + } + + StringApplicationDescriptorDict::const_iterator p = _descriptors.find(application); + if(p == _descriptors.end()) + { + throw ApplicationNotExistException(application); + } + + ApplicationHelper previous(_communicator, p->second); + ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance)); + update = helper.diff(previous); + + checkForUpdate(previous, helper); + reload(previous, helper, entries); + + oldDesc = previous.getDefinition(); + newDesc = helper.getDefinition(); + + _updating.insert(update.name); } - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + finishUpdate(entries, update, oldDesc, newDesc); } void @@ -413,6 +456,11 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& Lock sync(*this); checkSessionLock(session); + while(_updating.find(name) != _updating.end()) + { + wait(); + } + StringApplicationDescriptorDict::iterator p = _descriptors.find(name); if(p == _descriptors.end()) { @@ -452,52 +500,6 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); } -void -Database::instantiateServer(AdminSessionI* session, - const string& application, - const string& node, - const ServerInstanceDescriptor& instance) -{ - ServerEntrySeq entries; - int serial; - ApplicationUpdateDescriptor update; - { - Lock sync(*this); - checkSessionLock(session); - - StringApplicationDescriptorDict::const_iterator p = _descriptors.find(application); - if(p == _descriptors.end()) - { - throw ApplicationNotExistException(application); - } - - ApplicationHelper previous(_communicator, p->second); - ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance)); - update = helper.diff(previous); - - checkForUpdate(previous, helper); - - reload(previous, helper, entries); - - _descriptors.put(StringApplicationDescriptorDict::value_type(application, helper.getDefinition())); - - serial = ++_serial; - } - - // - // Notify the observers. - // - _registryObserver->applicationUpdated(serial, update); - - if(_traceLevels->application > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "updated application `" << update.name << "'"; - } - - for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); -} - ApplicationDescriptor Database::getApplicationDescriptor(const std::string& name) { @@ -633,7 +635,6 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr serial = ++_serial; } - if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); @@ -935,7 +936,7 @@ Database::removeObject(const Ice::Identity& id) { int serial; { - Lock sync(*this); + Lock sync(*this); if(_objectCache.has(id)) { DeploymentException ex; @@ -1405,3 +1406,66 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp entries.push_back(_serverCache.add(*q)); } } + +void +Database::finishUpdate(ServerEntrySeq& entries, + const ApplicationUpdateDescriptor& update, + const ApplicationDescriptor& oldDesc, + const ApplicationDescriptor& newDesc) +{ + + // + // Synchronize the servers on the nodes. If a server couldn't be + // deployed we unload the application and throw. + // + try + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + catch(const DeploymentException& ex) + { + { + Lock sync(*this); + entries.clear(); + ApplicationHelper previous(_communicator, newDesc); + ApplicationHelper helper(_communicator, oldDesc); + reload(previous, helper, entries); + _updating.erase(newDesc.name); + notifyAll(); + } + try + { + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync)); + } + catch(const DeploymentException& ex) + { + // TODO: warning? + } + throw ex; + } + + // + // Save the application descriptor. + // + int serial; + { + Lock sync(*this); + _descriptors.put(StringApplicationDescriptorDict::value_type(update.name, newDesc)); + serial = ++_serial; + _updating.erase(update.name); + notifyAll(); + } + + // + // Notify the observers. + // + _registryObserver->applicationUpdated(serial, update); + + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "updated application `" << update.name << "'"; + } + +} + |