summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp136
1 files changed, 38 insertions, 98 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 512fe1c7640..134303b7ffc 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -17,6 +17,7 @@
#include <IceGrid/Util.h>
#include <IceGrid/WaitQueue.h>
#include <IceGrid/TraceLevels.h>
+#include <IceGrid/NodeSessionManager.h>
using namespace std;
using namespace IcePatch2;
@@ -173,6 +174,7 @@ private:
};
NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
+ NodeSessionManager& sessions,
const ActivatorPtr& activator,
const WaitQueuePtr& waitQueue,
const TraceLevelsPtr& traceLevels,
@@ -180,6 +182,7 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
const string& name,
const UserAccountMapperPrx& mapper) :
_adapter(adapter),
+ _sessions(sessions),
_activator(activator),
_waitQueue(waitQueue),
_traceLevels(traceLevels),
@@ -456,6 +459,18 @@ NodeI::patch(const string& application,
}
}
+void
+NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&)
+{
+ _sessions.replicaAdded(replica);
+}
+
+void
+NodeI::replicaRemoved(const InternalRegistryPrx& replica, const Ice::Current&)
+{
+ _sessions.replicaRemoved(replica);
+}
+
std::string
NodeI::getName(const Ice::Current&) const
{
@@ -478,12 +493,16 @@ void
NodeI::shutdown(const Ice::Current&) const
{
_activator->shutdown();
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor);
- while(_session)
- {
- _sessionMonitor.wait();
- }
+ //
+ // TODO: XXX: Wait for the session to be down with the registry
+ // who invoked this call. Perhaps it's better to have the registry
+ // wait actually...
+ //
+// IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor);
+// while(_session)
+// {
+// _sessionMonitor.wait();
+// }
}
Ice::CommunicatorPtr
@@ -519,7 +538,7 @@ NodeI::getTraceLevels() const
NodeObserverPrx
NodeI::getObserver() const
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor);
+ IceUtil::Mutex::Lock sync(_observerMutex);
return _observer;
}
@@ -530,98 +549,21 @@ NodeI::getUserAccountMapper() const
}
NodeSessionPrx
-NodeI::getSession() const
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor);
- return _session;
-}
-
-void
-NodeI::setSession(const NodeSessionPrx& session, const NodeObserverPrx& observer)
+NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor);
- _session = session;
- _observer = observer;
- _sessionMonitor.notifyAll();
-}
-
-int
-NodeI::keepAlive()
-{
- NodeSessionPrx session = getSession();
- if(session)
- {
- try
- {
- session->keepAlive(_platform.getLoadInfo());
- }
- catch(const Ice::LocalException&)
- {
- setSession(0, 0);
- }
- }
- else
+ NodeSessionPrx session = registry->registerNode(_name, _proxy, _platform.getNodeInfo());
+ NodeObserverPrx observer = session->getObserver();
+ if(observer)
{
- try
- {
- Ice::ObjectPrx obj = getCommunicator()->stringToProxy(_instanceName + "/InternalRegistry");
- InternalRegistryPrx registry = InternalRegistryPrx::uncheckedCast(obj);
- NodeSessionPrx session = registry->registerNode(_name, _proxy, _platform.getNodeInfo());
- NodeObserverPrx observer;
- int timeout = session->getTimeoutAndObserver(observer);
- setSession(session, observer);
- checkConsistency();
- return timeout / 2;
- }
- catch(const NodeActiveException&)
- {
- _traceLevels->logger->error("a node with the same name is already registered and active");
- }
- catch(const Ice::LocalException& ex)
- {
- ostringstream os;
- os << "couldn't contact the IceGrid registry:\n" << ex;
- _traceLevels->logger->warning(os.str());
- }
- }
- return 0;
-}
-
-void
-NodeI::waitForSession()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor);
- while(!_session)
- {
- _sessionMonitor.timedWait(IceUtil::Time::seconds(_waitTime));
- }
-}
-
-void
-NodeI::stop()
-{
- Lock sync(_sessionMonitor);
- if(_session)
- {
- try
- {
- _session->destroy();
- }
- catch(const Ice::LocalException& ex)
- {
- ostringstream os;
- os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex;
- _traceLevels->logger->warning(os.str());
- }
-
- _session = 0;
- _observer = 0;
- _sessionMonitor.notifyAll();
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ _observer = observer;
}
+ checkConsistency(session);
+ return session;
}
void
-NodeI::checkConsistency()
+NodeI::checkConsistency(const NodeSessionPrx& session)
{
//
// We use a serial number to keep track of the concurrent changes
@@ -635,8 +577,7 @@ NodeI::checkConsistency()
//
unsigned long serial = 0;
Ice::StringSeq servers;
- NodeSessionPrx session;
- do
+ while(true)
{
{
Lock sync(*this);
@@ -649,11 +590,10 @@ NodeI::checkConsistency()
}
serial = _serial;
}
- session = getSession();
- servers = session ? session->getServers() : Ice::StringSeq();
+ assert(session);
+ servers = session->getServers();
sort(servers.begin(), servers.end());
}
- while(session);
}
void