diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 190 |
1 files changed, 121 insertions, 69 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 463296597ad..1fbf58643ec 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -540,13 +540,6 @@ NodeI::getTraceLevels() const return _traceLevels; } -NodeObserverPrx -NodeI::getObserver() const -{ - IceUtil::Mutex::Lock sync(_observerMutex); - return _observer; -} - UserAccountMapperPrx NodeI::getUserAccountMapper() const { @@ -560,13 +553,6 @@ NodeI::registerWithRegistry(const InternalRegistryPrx& registry) } void -NodeI::setObserver(const NodeObserverPrx& observer) -{ - IceUtil::Mutex::Lock sync(_observerMutex); - _observer = observer; -} - -void NodeI::checkConsistency(const NodeSessionPrx& session) { // @@ -589,7 +575,6 @@ NodeI::checkConsistency(const NodeSessionPrx& session) { _serial = 1; // We can reset the serial number. checkConsistencyNoSync(servers); - initObserver(servers); break; } serial = _serial; @@ -608,29 +593,135 @@ NodeI::checkConsistency(const NodeSessionPrx& session) } void -NodeI::addServer(const string& application, const ServerIPtr& server) +NodeI::addObserver(const string& replicaName, const NodeObserverPrx& observer) { - IceUtil::Mutex::Lock sync(_serversByApplicationLock); - map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); - if(p == _serversByApplication.end()) + IceUtil::Mutex::Lock sync(_observerMutex); + _observers.insert(make_pair(replicaName, observer)); + + ServerDynamicInfoSeq serverInfos; + AdapterDynamicInfoSeq adapterInfos; + for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin(); + p != _serversDynamicInfo.end(); ++p) + { + assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled)); + serverInfos.push_back(p->second); + } + + for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin(); + q != _adaptersDynamicInfo.end(); ++q) { - map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>()); - p = _serversByApplication.insert(p, v); + assert(q->second.proxy); + adapterInfos.push_back(q->second); + } + + try + { + NodeDynamicInfo info; + info.info = _platform.getNodeInfo(); + info.servers = serverInfos; + info.adapters = adapterInfos; + observer->nodeUp(info); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_traceLevels->logger); + out << "unexpected observer exception:\n" << ex; } - p->second.insert(server); } void -NodeI::removeServer(const std::string& application, const ServerIPtr& server) +NodeI::removeObserver(const string& replicaName) { - IceUtil::Mutex::Lock sync(_serversByApplicationLock); - map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); - if(p != _serversByApplication.end()) + IceUtil::Mutex::Lock sync(_observerMutex); + _observers.erase(replicaName); +} + +void +NodeI::observerUpdateServer(const ServerDynamicInfo& info) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + + if(info.state == Destroyed || info.state == Inactive && info.enabled) { - p->second.erase(server); - if(p->second.empty()) + _serversDynamicInfo.erase(info.id); + } + else + { + _serversDynamicInfo[info.id] = info; + } + + for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p) + { + try + { + p->second->updateServer(_name, info); + } + catch(const Ice::LocalException&) { - _serversByApplication.erase(p); + // IGNORE + } + } +} + +void +NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + + if(info.proxy) + { + _adaptersDynamicInfo[info.id] = info; + } + else + { + _adaptersDynamicInfo.erase(info.id); + } + + for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p) + { + try + { + p->second->updateAdapter(_name, info); + } + catch(const Ice::LocalException&) + { + // IGNORE + } + } +} + +void +NodeI::addServer(const ServerIPtr& server, const string& application, bool dependsOnApplicationDistrib) +{ + IceUtil::Mutex::Lock sync(_serversLock); + + if(dependsOnApplicationDistrib) + { + map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); + if(p == _serversByApplication.end()) + { + map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>()); + p = _serversByApplication.insert(p, v); + } + p->second.insert(server); + } +} + +void +NodeI::removeServer(const ServerIPtr& server, const std::string& application, bool dependsOnApplicationDistrib) +{ + IceUtil::Mutex::Lock sync(_serversLock); + + if(dependsOnApplicationDistrib) + { + map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); + if(p != _serversByApplication.end()) + { + p->second.erase(server); + if(p->second.empty()) + { + _serversByApplication.erase(p); + } } } } @@ -820,45 +911,6 @@ NodeI::canRemoveServerDirectory(const string& name) } void -NodeI::initObserver(const Ice::StringSeq& servers) -{ - ServerDynamicInfoSeq serverInfos; - AdapterDynamicInfoSeq adapterInfos; - - for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) - { - Ice::Identity id = createServerIdentity(*p); - ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id)); - if(server) - { - try - { - server->getDynamicInfo(serverInfos, adapterInfos); - } - catch(const Ice::ObjectNotExistException&) - { - } - } - } - - try - { - NodeDynamicInfo info; - info.info = _platform.getNodeInfo(); - info.servers = serverInfos; - info.adapters = adapterInfos; - - assert(_observer); // No synchronization needed here, this is called by a single thread. - _observer->nodeUp(info); - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_traceLevels->logger); - out << "unexpected observer exception:\n" << ex; - } -} - -void NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<string>& directories) { PatcherFeedbackPtr feedback = new LogPatcherFeedback(_traceLevels, dest); @@ -898,7 +950,7 @@ NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<str set<ServerIPtr> NodeI::getApplicationServers(const string& application) const { - IceUtil::Mutex::Lock sync(_serversByApplicationLock); + IceUtil::Mutex::Lock sync(_serversLock); set<ServerIPtr> servers; map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application); if(p != _serversByApplication.end()) |