summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp190
1 files changed, 121 insertions, 69 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 463296597ad..1fbf58643ec 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -540,13 +540,6 @@ NodeI::getTraceLevels() const
return _traceLevels;
}
-NodeObserverPrx
-NodeI::getObserver() const
-{
- IceUtil::Mutex::Lock sync(_observerMutex);
- return _observer;
-}
-
UserAccountMapperPrx
NodeI::getUserAccountMapper() const
{
@@ -560,13 +553,6 @@ NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
}
void
-NodeI::setObserver(const NodeObserverPrx& observer)
-{
- IceUtil::Mutex::Lock sync(_observerMutex);
- _observer = observer;
-}
-
-void
NodeI::checkConsistency(const NodeSessionPrx& session)
{
//
@@ -589,7 +575,6 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
{
_serial = 1; // We can reset the serial number.
checkConsistencyNoSync(servers);
- initObserver(servers);
break;
}
serial = _serial;
@@ -608,29 +593,135 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
}
void
-NodeI::addServer(const string& application, const ServerIPtr& server)
+NodeI::addObserver(const string& replicaName, const NodeObserverPrx& observer)
{
- IceUtil::Mutex::Lock sync(_serversByApplicationLock);
- map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
- if(p == _serversByApplication.end())
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ _observers.insert(make_pair(replicaName, observer));
+
+ ServerDynamicInfoSeq serverInfos;
+ AdapterDynamicInfoSeq adapterInfos;
+ for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
+ p != _serversDynamicInfo.end(); ++p)
+ {
+ assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled));
+ serverInfos.push_back(p->second);
+ }
+
+ for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin();
+ q != _adaptersDynamicInfo.end(); ++q)
{
- map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
- p = _serversByApplication.insert(p, v);
+ assert(q->second.proxy);
+ adapterInfos.push_back(q->second);
+ }
+
+ try
+ {
+ NodeDynamicInfo info;
+ info.info = _platform.getNodeInfo();
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ observer->nodeUp(info);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_traceLevels->logger);
+ out << "unexpected observer exception:\n" << ex;
}
- p->second.insert(server);
}
void
-NodeI::removeServer(const std::string& application, const ServerIPtr& server)
+NodeI::removeObserver(const string& replicaName)
{
- IceUtil::Mutex::Lock sync(_serversByApplicationLock);
- map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
- if(p != _serversByApplication.end())
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ _observers.erase(replicaName);
+}
+
+void
+NodeI::observerUpdateServer(const ServerDynamicInfo& info)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+
+ if(info.state == Destroyed || info.state == Inactive && info.enabled)
{
- p->second.erase(server);
- if(p->second.empty())
+ _serversDynamicInfo.erase(info.id);
+ }
+ else
+ {
+ _serversDynamicInfo[info.id] = info;
+ }
+
+ for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ try
+ {
+ p->second->updateServer(_name, info);
+ }
+ catch(const Ice::LocalException&)
{
- _serversByApplication.erase(p);
+ // IGNORE
+ }
+ }
+}
+
+void
+NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+
+ if(info.proxy)
+ {
+ _adaptersDynamicInfo[info.id] = info;
+ }
+ else
+ {
+ _adaptersDynamicInfo.erase(info.id);
+ }
+
+ for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ try
+ {
+ p->second->updateAdapter(_name, info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // IGNORE
+ }
+ }
+}
+
+void
+NodeI::addServer(const ServerIPtr& server, const string& application, bool dependsOnApplicationDistrib)
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+
+ if(dependsOnApplicationDistrib)
+ {
+ map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ if(p == _serversByApplication.end())
+ {
+ map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
+ p = _serversByApplication.insert(p, v);
+ }
+ p->second.insert(server);
+ }
+}
+
+void
+NodeI::removeServer(const ServerIPtr& server, const std::string& application, bool dependsOnApplicationDistrib)
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+
+ if(dependsOnApplicationDistrib)
+ {
+ map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ if(p != _serversByApplication.end())
+ {
+ p->second.erase(server);
+ if(p->second.empty())
+ {
+ _serversByApplication.erase(p);
+ }
}
}
}
@@ -820,45 +911,6 @@ NodeI::canRemoveServerDirectory(const string& name)
}
void
-NodeI::initObserver(const Ice::StringSeq& servers)
-{
- ServerDynamicInfoSeq serverInfos;
- AdapterDynamicInfoSeq adapterInfos;
-
- for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
- {
- Ice::Identity id = createServerIdentity(*p);
- ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id));
- if(server)
- {
- try
- {
- server->getDynamicInfo(serverInfos, adapterInfos);
- }
- catch(const Ice::ObjectNotExistException&)
- {
- }
- }
- }
-
- try
- {
- NodeDynamicInfo info;
- info.info = _platform.getNodeInfo();
- info.servers = serverInfos;
- info.adapters = adapterInfos;
-
- assert(_observer); // No synchronization needed here, this is called by a single thread.
- _observer->nodeUp(info);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_traceLevels->logger);
- out << "unexpected observer exception:\n" << ex;
- }
-}
-
-void
NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<string>& directories)
{
PatcherFeedbackPtr feedback = new LogPatcherFeedback(_traceLevels, dest);
@@ -898,7 +950,7 @@ NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<str
set<ServerIPtr>
NodeI::getApplicationServers(const string& application) const
{
- IceUtil::Mutex::Lock sync(_serversByApplicationLock);
+ IceUtil::Mutex::Lock sync(_serversLock);
set<ServerIPtr> servers;
map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application);
if(p != _serversByApplication.end())