diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-10-20 14:11:37 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-10-20 14:11:37 +0000 |
commit | 12d2c541faa3f6a4db3ba58e7f9c8f9c03f947e8 (patch) | |
tree | 342dbb435f37e4f64fd1c515fc14fa17cf0e4639 /cpp | |
parent | Fix (diff) | |
download | ice-12d2c541faa3f6a4db3ba58e7f9c8f9c03f947e8.tar.bz2 ice-12d2c541faa3f6a4db3ba58e7f9c8f9c03f947e8.tar.xz ice-12d2c541faa3f6a4db3ba58e7f9c8f9c03f947e8.zip |
Observer fixes
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/slice/IceGrid/Admin.ice | 12 | ||||
-rw-r--r-- | cpp/slice/IceGrid/Exception.ice | 20 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 190 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.h | 18 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerAdapterI.cpp | 21 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 42 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 22 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 1 |
10 files changed, 197 insertions, 139 deletions
diff --git a/cpp/slice/IceGrid/Admin.ice b/cpp/slice/IceGrid/Admin.ice index cf43960cad2..dd23d91bc4d 100644 --- a/cpp/slice/IceGrid/Admin.ice +++ b/cpp/slice/IceGrid/Admin.ice @@ -1111,9 +1111,13 @@ interface AdminSession extends Glacier2::Session * * @param objObs The object observer. * + * @throws ObserverAlreadyRegisteredException Raised if an + * observer is already registered with this registry. + * **/ idempotent void setObservers(RegistryObserver* registryObs, NodeObserver* nodeObs, ApplicationObserver* appObs, - AdapterObserver* adptObs, ObjectObserver* objObs); + AdapterObserver* adptObs, ObjectObserver* objObs) + throws ObserverAlreadyRegisteredException; /** * @@ -1133,9 +1137,13 @@ interface AdminSession extends Glacier2::Session * * @param objObs The object observer. * + * @throws ObserverAlreadyRegisteredException Raised if an + * observer is already registered with this registry. + * **/ idempotent void setObserversByIdentity(Ice::Identity registryObs, Ice::Identity nodeObs, Ice::Identity appObs, - Ice::Identity adptObs, Ice::Identity objObs); + Ice::Identity adptObs, Ice::Identity objObs) + throws ObserverAlreadyRegisteredException; /** * diff --git a/cpp/slice/IceGrid/Exception.ice b/cpp/slice/IceGrid/Exception.ice index 138c2685756..4302940f8f7 100644 --- a/cpp/slice/IceGrid/Exception.ice +++ b/cpp/slice/IceGrid/Exception.ice @@ -231,6 +231,26 @@ exception PermissionDeniedException string reason; }; +/** + * + * This exception is raised if an observer is already registered with + * the registry. + * + * @see AdminSession::setObservers + * @see AdminSession::setObserversByIdentity + * + **/ +exception ObserverAlreadyRegisteredException +{ + /** + * + * The identity of the observer. + * + **/ + Ice::Identity id; +}; + + }; #endif diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index dd7c1f59237..cfe57adc0db 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -278,7 +278,7 @@ AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info, void AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer) { - if(_observers[name] != observer) + if(_observers[name] && _observers[name] != observer) { _database->getObserverTopic(name)->unsubscribe(_observers[name]); _observers[name] = 0; diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 463296597ad..1fbf58643ec 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -540,13 +540,6 @@ NodeI::getTraceLevels() const return _traceLevels; } -NodeObserverPrx -NodeI::getObserver() const -{ - IceUtil::Mutex::Lock sync(_observerMutex); - return _observer; -} - UserAccountMapperPrx NodeI::getUserAccountMapper() const { @@ -560,13 +553,6 @@ NodeI::registerWithRegistry(const InternalRegistryPrx& registry) } void -NodeI::setObserver(const NodeObserverPrx& observer) -{ - IceUtil::Mutex::Lock sync(_observerMutex); - _observer = observer; -} - -void NodeI::checkConsistency(const NodeSessionPrx& session) { // @@ -589,7 +575,6 @@ NodeI::checkConsistency(const NodeSessionPrx& session) { _serial = 1; // We can reset the serial number. checkConsistencyNoSync(servers); - initObserver(servers); break; } serial = _serial; @@ -608,29 +593,135 @@ NodeI::checkConsistency(const NodeSessionPrx& session) } void -NodeI::addServer(const string& application, const ServerIPtr& server) +NodeI::addObserver(const string& replicaName, const NodeObserverPrx& observer) { - IceUtil::Mutex::Lock sync(_serversByApplicationLock); - map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); - if(p == _serversByApplication.end()) + IceUtil::Mutex::Lock sync(_observerMutex); + _observers.insert(make_pair(replicaName, observer)); + + ServerDynamicInfoSeq serverInfos; + AdapterDynamicInfoSeq adapterInfos; + for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin(); + p != _serversDynamicInfo.end(); ++p) + { + assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled)); + serverInfos.push_back(p->second); + } + + for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin(); + q != _adaptersDynamicInfo.end(); ++q) { - map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>()); - p = _serversByApplication.insert(p, v); + assert(q->second.proxy); + adapterInfos.push_back(q->second); + } + + try + { + NodeDynamicInfo info; + info.info = _platform.getNodeInfo(); + info.servers = serverInfos; + info.adapters = adapterInfos; + observer->nodeUp(info); + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_traceLevels->logger); + out << "unexpected observer exception:\n" << ex; } - p->second.insert(server); } void -NodeI::removeServer(const std::string& application, const ServerIPtr& server) +NodeI::removeObserver(const string& replicaName) { - IceUtil::Mutex::Lock sync(_serversByApplicationLock); - map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); - if(p != _serversByApplication.end()) + IceUtil::Mutex::Lock sync(_observerMutex); + _observers.erase(replicaName); +} + +void +NodeI::observerUpdateServer(const ServerDynamicInfo& info) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + + if(info.state == Destroyed || info.state == Inactive && info.enabled) { - p->second.erase(server); - if(p->second.empty()) + _serversDynamicInfo.erase(info.id); + } + else + { + _serversDynamicInfo[info.id] = info; + } + + for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p) + { + try + { + p->second->updateServer(_name, info); + } + catch(const Ice::LocalException&) { - _serversByApplication.erase(p); + // IGNORE + } + } +} + +void +NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + + if(info.proxy) + { + _adaptersDynamicInfo[info.id] = info; + } + else + { + _adaptersDynamicInfo.erase(info.id); + } + + for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p) + { + try + { + p->second->updateAdapter(_name, info); + } + catch(const Ice::LocalException&) + { + // IGNORE + } + } +} + +void +NodeI::addServer(const ServerIPtr& server, const string& application, bool dependsOnApplicationDistrib) +{ + IceUtil::Mutex::Lock sync(_serversLock); + + if(dependsOnApplicationDistrib) + { + map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); + if(p == _serversByApplication.end()) + { + map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>()); + p = _serversByApplication.insert(p, v); + } + p->second.insert(server); + } +} + +void +NodeI::removeServer(const ServerIPtr& server, const std::string& application, bool dependsOnApplicationDistrib) +{ + IceUtil::Mutex::Lock sync(_serversLock); + + if(dependsOnApplicationDistrib) + { + map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); + if(p != _serversByApplication.end()) + { + p->second.erase(server); + if(p->second.empty()) + { + _serversByApplication.erase(p); + } } } } @@ -820,45 +911,6 @@ NodeI::canRemoveServerDirectory(const string& name) } void -NodeI::initObserver(const Ice::StringSeq& servers) -{ - ServerDynamicInfoSeq serverInfos; - AdapterDynamicInfoSeq adapterInfos; - - for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) - { - Ice::Identity id = createServerIdentity(*p); - ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id)); - if(server) - { - try - { - server->getDynamicInfo(serverInfos, adapterInfos); - } - catch(const Ice::ObjectNotExistException&) - { - } - } - } - - try - { - NodeDynamicInfo info; - info.info = _platform.getNodeInfo(); - info.servers = serverInfos; - info.adapters = adapterInfos; - - assert(_observer); // No synchronization needed here, this is called by a single thread. - _observer->nodeUp(info); - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_traceLevels->logger); - out << "unexpected observer exception:\n" << ex; - } -} - -void NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<string>& directories) { PatcherFeedbackPtr feedback = new LogPatcherFeedback(_traceLevels, dest); @@ -898,7 +950,7 @@ NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<str set<ServerIPtr> NodeI::getApplicationServers(const string& application) const { - IceUtil::Mutex::Lock sync(_serversByApplicationLock); + IceUtil::Mutex::Lock sync(_serversLock); set<ServerIPtr> servers; map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application); if(p != _serversByApplication.end()) diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index 322cb69b76d..46df88855ad 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -59,23 +59,25 @@ public: Ice::ObjectAdapterPtr getAdapter() const; ActivatorPtr getActivator() const; TraceLevelsPtr getTraceLevels() const; - NodeObserverPrx getObserver() const; UserAccountMapperPrx getUserAccountMapper() const; PlatformInfo& getPlatformInfo() const { return _platform; } NodeSessionPrx registerWithRegistry(const InternalRegistryPrx&); - void setObserver(const NodeObserverPrx&); void checkConsistency(const NodeSessionPrx&); NodeSessionPrx getMasterNodeSession() const; - void addServer(const std::string&, const ServerIPtr&); - void removeServer(const std::string&, const ServerIPtr&); + void addObserver(const std::string&, const NodeObserverPrx&); + void removeObserver(const std::string&); + void observerUpdateServer(const ServerDynamicInfo&); + void observerUpdateAdapter(const AdapterDynamicInfo&); + + void addServer(const ServerIPtr&, const std::string&, bool); + void removeServer(const ServerIPtr&, const std::string&, bool); private: void checkConsistencyNoSync(const Ice::StringSeq&); bool canRemoveServerDirectory(const std::string&); - void initObserver(const Ice::StringSeq&); void patch(const IcePatch2::FileServerPrx&, const std::string&, const std::vector<std::string>&); std::set<ServerIPtr> getApplicationServers(const std::string&) const; @@ -97,10 +99,12 @@ private: std::string _tmpDir; unsigned long _serial; IceUtil::Mutex _observerMutex; - NodeObserverPrx _observer; + std::map<std::string, NodeObserverPrx> _observers; + std::map<std::string, ServerDynamicInfo> _serversDynamicInfo; + std::map<std::string, AdapterDynamicInfo> _adaptersDynamicInfo; mutable PlatformInfo _platform; - IceUtil::Mutex _serversByApplicationLock; + IceUtil::Mutex _serversLock; std::map<std::string, std::set<ServerIPtr> > _serversByApplication; std::set<std::string> _patchInProgress; }; diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 5823e73a379..7bae899a9bf 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -140,12 +140,15 @@ NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registr { timeout = IceUtil::Time::seconds(t / 2); } + _node->addObserver(_name, session->getObserver()); return session; } void NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session) { + _node->removeObserver(_name); + try { session->destroy(); @@ -473,13 +476,8 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) if(session) { session->loadServers(); - _node->setObserver(session->getObserver()); _node->checkConsistency(session); } - else - { - _node->setObserver(0); - } } catch(const Ice::LocalException&) { diff --git a/cpp/src/IceGrid/ServerAdapterI.cpp b/cpp/src/IceGrid/ServerAdapterI.cpp index d00f721cdd9..eef9228ec94 100644 --- a/cpp/src/IceGrid/ServerAdapterI.cpp +++ b/cpp/src/IceGrid/ServerAdapterI.cpp @@ -139,23 +139,10 @@ ServerAdapterI::setDirectProxy(const Ice::ObjectPrx& prx, const Ice::Current&) if(updated) { - NodeObserverPrx observer = _node->getObserver(); - if(observer) - { - AdapterDynamicInfo info; - info.id = _id; - info.proxy = _proxy; - try - { - observer->updateAdapter(_node->getName(), info); - } - catch(const Ice::LocalException&) - { - // - // Expected if the master IceGrid registry is down. - // - } - } + AdapterDynamicInfo info; + info.id = _id; + info.proxy = _proxy; + _node->observerUpdateAdapter(info); } if(_proxy) diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index c64bdf1e3ba..131a6c8bf09 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -720,6 +720,8 @@ ServerI::setEnabled(bool enabled, const ::Ice::Current&) { return; // Nothing to change! } + + _node->observerUpdateServer(getDynamicInfo()); } if(activate) @@ -738,19 +740,6 @@ ServerI::setEnabled(bool enabled, const ::Ice::Current&) { } } - - NodeObserverPrx observer = _node->getObserver(); - if(observer) - { - try - { - observer->updateServer(_node->getName(), getDynamicInfo()); - } - catch(const Ice::LocalException&) - { - // Expected if the IceGrid registry is down. - } - } } bool @@ -1509,10 +1498,7 @@ ServerI::destroy() adpts = _adapters; } - if(_info.descriptor->applicationDistrib) - { - _node->removeServer(_info.application, this); - } + _node->removeServer(this, _info.application, _info.descriptor->applicationDistrib); try { @@ -1688,14 +1674,11 @@ ServerI::update() throw DeploymentException(msg); } - if(oldInfo.descriptor && oldInfo.descriptor->applicationDistrib) - { - _node->removeServer(oldInfo.application, this); - } - if(_info.descriptor->applicationDistrib) + if(oldInfo.descriptor) { - _node->addServer(_info.application, this); + _node->removeServer(this, oldInfo.application, oldInfo.descriptor->applicationDistrib); } + _node->addServer(this, _info.application, _info.descriptor->applicationDistrib); AdapterPrxDict adapters; for(ServerAdapterDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) @@ -2329,18 +2312,7 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason) if(toServerState(previous) != toServerState(_state) && !(previous == Inactive && _state == Deactivating)) { - NodeObserverPrx observer = _node->getObserver(); - if(observer) - { - try - { - observer->updateServer(_node->getName(), getDynamicInfo()); - } - catch(const Ice::LocalException&) - { - // Expected if the IceGrid registry is down. - } - } + _node->observerUpdateServer(getDynamicInfo()); } if(_node->getTraceLevels()->server > 1) diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 20159fe4ec6..582a5f5da5b 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -50,11 +50,24 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { return; } - + + // + // We need to ensure that this observer isn't already registered + // as IceStorm might otherwise replace a previous subscriber + // without any notification. + // + assert(obsv); + if(_subscribers.find(obsv->ice_getIdentity()) != _subscribers.end()) + { + throw ObserverAlreadyRegisteredException(obsv->ice_getIdentity()); + } + IceStorm::QoS qos; qos["reliability"] = "twoway ordered"; initObserver(_topic->subscribe(qos, obsv)); + _subscribers.insert(obsv->ice_getIdentity()); + if(!name.empty()) { assert(_syncSubscribers.find(name) == _syncSubscribers.end()); @@ -72,6 +85,9 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) _topic->unsubscribe(observer); } + assert(observer); + _subscribers.erase(observer->ice_getIdentity()); + if(!name.empty()) { assert(_syncSubscribers.find(name) != _syncSubscribers.end()); @@ -358,7 +374,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser { if(p->id == server.id) { - if(server.state == Destroyed || server.state == Inactive) + if(server.state == Destroyed || (server.state == Inactive && server.enabled)) { servers.erase(p); } @@ -370,7 +386,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser } ++p; } - if(server.state != Destroyed && server.state != Inactive && p == servers.end()) + if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end()) { servers.push_back(server); } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index 13a75970b56..dc480fdc5d1 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -49,6 +49,7 @@ protected: Ice::ObjectPrx _basePublisher; int _serial; + std::set<Ice::Identity> _subscribers; std::set<std::string> _syncSubscribers; std::map<int, std::set<std::string> > _waitForUpdates; std::map<int, std::map<std::string, std::string> > _updateFailures; |