diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 136 |
1 files changed, 38 insertions, 98 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 512fe1c7640..134303b7ffc 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -17,6 +17,7 @@ #include <IceGrid/Util.h> #include <IceGrid/WaitQueue.h> #include <IceGrid/TraceLevels.h> +#include <IceGrid/NodeSessionManager.h> using namespace std; using namespace IcePatch2; @@ -173,6 +174,7 @@ private: }; NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, + NodeSessionManager& sessions, const ActivatorPtr& activator, const WaitQueuePtr& waitQueue, const TraceLevelsPtr& traceLevels, @@ -180,6 +182,7 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, const string& name, const UserAccountMapperPrx& mapper) : _adapter(adapter), + _sessions(sessions), _activator(activator), _waitQueue(waitQueue), _traceLevels(traceLevels), @@ -456,6 +459,18 @@ NodeI::patch(const string& application, } } +void +NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&) +{ + _sessions.replicaAdded(replica); +} + +void +NodeI::replicaRemoved(const InternalRegistryPrx& replica, const Ice::Current&) +{ + _sessions.replicaRemoved(replica); +} + std::string NodeI::getName(const Ice::Current&) const { @@ -478,12 +493,16 @@ void NodeI::shutdown(const Ice::Current&) const { _activator->shutdown(); - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); - while(_session) - { - _sessionMonitor.wait(); - } + // + // TODO: XXX: Wait for the session to be down with the registry + // who invoked this call. Perhaps it's better to have the registry + // wait actually... + // +// IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); +// while(_session) +// { +// _sessionMonitor.wait(); +// } } Ice::CommunicatorPtr @@ -519,7 +538,7 @@ NodeI::getTraceLevels() const NodeObserverPrx NodeI::getObserver() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); + IceUtil::Mutex::Lock sync(_observerMutex); return _observer; } @@ -530,98 +549,21 @@ NodeI::getUserAccountMapper() const } NodeSessionPrx -NodeI::getSession() const -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); - return _session; -} - -void -NodeI::setSession(const NodeSessionPrx& session, const NodeObserverPrx& observer) +NodeI::registerWithRegistry(const InternalRegistryPrx& registry) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); - _session = session; - _observer = observer; - _sessionMonitor.notifyAll(); -} - -int -NodeI::keepAlive() -{ - NodeSessionPrx session = getSession(); - if(session) - { - try - { - session->keepAlive(_platform.getLoadInfo()); - } - catch(const Ice::LocalException&) - { - setSession(0, 0); - } - } - else + NodeSessionPrx session = registry->registerNode(_name, _proxy, _platform.getNodeInfo()); + NodeObserverPrx observer = session->getObserver(); + if(observer) { - try - { - Ice::ObjectPrx obj = getCommunicator()->stringToProxy(_instanceName + "/InternalRegistry"); - InternalRegistryPrx registry = InternalRegistryPrx::uncheckedCast(obj); - NodeSessionPrx session = registry->registerNode(_name, _proxy, _platform.getNodeInfo()); - NodeObserverPrx observer; - int timeout = session->getTimeoutAndObserver(observer); - setSession(session, observer); - checkConsistency(); - return timeout / 2; - } - catch(const NodeActiveException&) - { - _traceLevels->logger->error("a node with the same name is already registered and active"); - } - catch(const Ice::LocalException& ex) - { - ostringstream os; - os << "couldn't contact the IceGrid registry:\n" << ex; - _traceLevels->logger->warning(os.str()); - } - } - return 0; -} - -void -NodeI::waitForSession() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); - while(!_session) - { - _sessionMonitor.timedWait(IceUtil::Time::seconds(_waitTime)); - } -} - -void -NodeI::stop() -{ - Lock sync(_sessionMonitor); - if(_session) - { - try - { - _session->destroy(); - } - catch(const Ice::LocalException& ex) - { - ostringstream os; - os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex; - _traceLevels->logger->warning(os.str()); - } - - _session = 0; - _observer = 0; - _sessionMonitor.notifyAll(); + IceUtil::Mutex::Lock sync(_observerMutex); + _observer = observer; } + checkConsistency(session); + return session; } void -NodeI::checkConsistency() +NodeI::checkConsistency(const NodeSessionPrx& session) { // // We use a serial number to keep track of the concurrent changes @@ -635,8 +577,7 @@ NodeI::checkConsistency() // unsigned long serial = 0; Ice::StringSeq servers; - NodeSessionPrx session; - do + while(true) { { Lock sync(*this); @@ -649,11 +590,10 @@ NodeI::checkConsistency() } serial = _serial; } - session = getSession(); - servers = session ? session->getServers() : Ice::StringSeq(); + assert(session); + servers = session->getServers(); sort(servers.begin(), servers.end()); } - while(session); } void |