diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/IceGrid/Allocatable.cpp | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.cpp | 5 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.h | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 212 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.h | 24 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 14 | ||||
-rwxr-xr-x | cpp/test/IceGrid/replication/run.py | 2 |
8 files changed, 232 insertions, 37 deletions
diff --git a/cpp/src/IceGrid/Allocatable.cpp b/cpp/src/IceGrid/Allocatable.cpp index e078564d5a5..666d7c84cb7 100644 --- a/cpp/src/IceGrid/Allocatable.cpp +++ b/cpp/src/IceGrid/Allocatable.cpp @@ -38,7 +38,14 @@ AllocationRequest::pending() if(_timeout > 0) { - _session->getTimer()->schedule(this, IceUtil::Time::milliSeconds(_timeout)); + try + { + _session->getTimer()->schedule(this, IceUtil::Time::milliSeconds(_timeout)); + } + catch(const IceUtil::Exception&) + { + // Ignore, timer is destroyed because of shutdown + } } _state = Pending; return true; diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index 812cb1977df..b9b8b478243 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -781,7 +781,10 @@ NodeEntry::checkSession() const while(_registering) { - wait(); + if(!timedWait(IceUtil::Time::seconds(5))) + { + break; // Consider the node down if it doesn't respond promptly. + } } if(!_session) diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h index f47ede772e3..2fb6fa9ee13 100644 --- a/cpp/src/IceGrid/NodeCache.h +++ b/cpp/src/IceGrid/NodeCache.h @@ -45,7 +45,6 @@ public: void addServer(const ServerEntryPtr&); void removeServer(const ServerEntryPtr&); void setSession(const NodeSessionIPtr&); - void setSavedProxy(const NodePrx&); NodePrx getProxy() const; InternalNodeInfoPtr getInfo() const; diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 4328a4aef85..b721c553271 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -188,6 +188,137 @@ private: string _dest; }; +class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp +{ +public: + + NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) : + Update(node, observer), _info(info) + { + } + + virtual void + send() + { + try + { + _observer->nodeUp_async(this, _info); + } + catch(const Ice::LocalException&) + { + finished(false); + } + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + NodeDynamicInfo _info; +}; + +class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer +{ +public: + + UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual void + send() + { + try + { + _observer->updateServer_async(this, _node->getName(), _info); + } + catch(const Ice::LocalException&) + { + finished(false); + } + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + ServerDynamicInfo _info; +}; + +class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter +{ +public: + + UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) : + NodeI::Update(node, observer), _info(info) + { + } + + virtual void + send() + { + try + { + _observer->updateAdapter_async(this, _node->getName(), _info); + } + catch(const Ice::LocalException&) + { + finished(false); + } + } + + virtual void + ice_response() + { + finished(true); + } + + virtual void + ice_exception(const Ice::Exception&) + { + finished(false); + } + +private: + + AdapterDynamicInfo _info; +}; + +} + +NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer) +{ +} + +NodeI::Update::~Update() +{ +} + +void +NodeI::Update::finished(bool success) +{ + _node->dequeueUpdate(_observer, this, !success); } NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, @@ -847,6 +978,8 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe assert(_observers.find(session) == _observers.end()); _observers.insert(make_pair(session, observer)); + _observerUpdates.erase(observer); // Remove any updates from the previous session. + ServerDynamicInfoSeq serverInfos; AdapterDynamicInfoSeq adapterInfos; for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin(); @@ -863,19 +996,11 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe adapterInfos.push_back(q->second); } - try - { - NodeDynamicInfo info; - info.info = _platform.getNodeInfo(); - info.servers = serverInfos; - info.adapters = adapterInfos; - observer->nodeUp(info); - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_traceLevels->logger); - out << "unexpected observer exception:\n" << ex; - } + NodeDynamicInfo info; + info.info = _platform.getNodeInfo(); + info.servers = serverInfos; + info.adapters = adapterInfos; + queueUpdate(observer, new NodeUp(this, observer, info)); } void @@ -910,15 +1035,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info) { if(sent.find(p->second) == sent.end()) { - try - { - p->second->updateServer(_name, info); - sent.insert(p->second); - } - catch(const Ice::LocalException&) - { - // IGNORE - } + queueUpdate(p->second, new UpdateServer(this, p->second, info)); } } } @@ -948,18 +1065,53 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) { if(sent.find(p->second) == sent.end()) { - try - { - p->second->updateAdapter(_name, info); - } - catch(const Ice::LocalException&) - { - // IGNORE - } + queueUpdate(p->second, new UpdateAdapter(this, p->second, info)); } } } +void +NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update) +{ + //Lock sync(*this); Called within the synchronization + deque<UpdatePtr>& queue = _observerUpdates[proxy]; + queue.push_back(update); + if(queue.size() == 1) + { + queue.front()->send(); + } +} + +void +NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy); + if(p == _observerUpdates.end() || p->second.front().get() != update.get()) + { + return; + } + + deque<UpdatePtr>& queue = p->second; + if(all) + { + queue.clear(); + } + else + { + queue.pop_front(); + } + + if(!queue.empty()) + { + queue.front()->send(); + } + else + { + _observerUpdates.erase(p); + } +} + void NodeI::addServer(const ServerIPtr& server, const string& application) { diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index 36ac0019c0e..6ecb9df70a8 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -35,9 +35,29 @@ typedef IceUtil::Handle<ServerCommand> ServerCommandPtr; class NodeSessionManager; +class NodeI; +typedef IceUtil::Handle<NodeI> NodeIPtr; + class NodeI : public Node, public IceUtil::Monitor<IceUtil::Mutex> { public: + class Update : virtual public IceUtil::Shared + { + public: + + Update(const NodeIPtr&, const NodeObserverPrx&); + virtual ~Update(); + + virtual void send() = 0; + + void finished(bool); + + protected: + + const NodeIPtr _node; + const NodeObserverPrx _observer; + }; + typedef IceUtil::Handle<Update> UpdatePtr; NodeI(const Ice::ObjectAdapterPtr&, NodeSessionManager&, const ActivatorPtr&, const IceUtil::TimerPtr&, const TraceLevelsPtr&, const NodePrx&, const std::string&, const UserAccountMapperPrx&); @@ -97,6 +117,8 @@ public: void removeObserver(const NodeSessionPrx&); void observerUpdateServer(const ServerDynamicInfo&); void observerUpdateAdapter(const AdapterDynamicInfo&); + void queueUpdate(const NodeObserverPrx&, const UpdatePtr&); + void dequeueUpdate(const NodeObserverPrx&, const UpdatePtr&, bool); void addServer(const ServerIPtr&, const std::string&); void removeServer(const ServerIPtr&, const std::string&); @@ -143,6 +165,8 @@ private: std::map<std::string, ServerDynamicInfo> _serversDynamicInfo; std::map<std::string, AdapterDynamicInfo> _adaptersDynamicInfo; + std::map<NodeObserverPrx, std::deque<UpdatePtr> > _observerUpdates; + IceUtil::Mutex _serversLock; std::map<std::string, std::set<ServerIPtr> > _serversByApplication; std::set<std::string> _patchInProgress; diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 06ae7828d3f..066b144b0a9 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -581,7 +581,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) // for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p) { - (*p)->tryCreateSession(true); + (*p)->tryCreateSession(true, IceUtil::Time::seconds(5)); } } diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index 7760241e865..c598eadc2cd 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -189,7 +189,7 @@ public: } virtual void - tryCreateSession(bool waitForTry = true) + tryCreateSession(bool waitForTry = true, const IceUtil::Time& timeout = IceUtil::Time()) { { Lock sync(*this); @@ -215,7 +215,17 @@ public: // Wait until the action is executed and the state changes. while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress) { - wait(); + if(timeout == IceUtil::Time()) + { + wait(); + } + else + { + if(!timedWait(timeout)) + { + break; + } + } } } } diff --git a/cpp/test/IceGrid/replication/run.py b/cpp/test/IceGrid/replication/run.py index 1311d8675ff..5a13ccef323 100755 --- a/cpp/test/IceGrid/replication/run.py +++ b/cpp/test/IceGrid/replication/run.py @@ -23,4 +23,4 @@ from scripts import * TestUtil.addLdPath(os.getcwd()) IceGridAdmin.iceGridTest("application.xml", '--IceDir="%s" --TestDir="%s"' % (TestUtil.toplevel, os.getcwd()), - ' \'properties-override=\'%s\'' % TestUtil.getCommandLine("", TestUtil.DriverConfig("server")).replace("--", "")) + '\\"properties-override=%s\\"' % TestUtil.getCommandLine("", TestUtil.DriverConfig("server")).replace("--", "")) |