summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/AdminSessionI.cpp2
-rw-r--r--cpp/src/IceGrid/NodeI.cpp190
-rw-r--r--cpp/src/IceGrid/NodeI.h18
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp8
-rw-r--r--cpp/src/IceGrid/ServerAdapterI.cpp21
-rw-r--r--cpp/src/IceGrid/ServerI.cpp42
-rw-r--r--cpp/src/IceGrid/Topics.cpp22
-rw-r--r--cpp/src/IceGrid/Topics.h1
8 files changed, 167 insertions, 137 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp
index dd7c1f59237..cfe57adc0db 100644
--- a/cpp/src/IceGrid/AdminSessionI.cpp
+++ b/cpp/src/IceGrid/AdminSessionI.cpp
@@ -278,7 +278,7 @@ AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info,
void
AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer)
{
- if(_observers[name] != observer)
+ if(_observers[name] && _observers[name] != observer)
{
_database->getObserverTopic(name)->unsubscribe(_observers[name]);
_observers[name] = 0;
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 463296597ad..1fbf58643ec 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -540,13 +540,6 @@ NodeI::getTraceLevels() const
return _traceLevels;
}
-NodeObserverPrx
-NodeI::getObserver() const
-{
- IceUtil::Mutex::Lock sync(_observerMutex);
- return _observer;
-}
-
UserAccountMapperPrx
NodeI::getUserAccountMapper() const
{
@@ -560,13 +553,6 @@ NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
}
void
-NodeI::setObserver(const NodeObserverPrx& observer)
-{
- IceUtil::Mutex::Lock sync(_observerMutex);
- _observer = observer;
-}
-
-void
NodeI::checkConsistency(const NodeSessionPrx& session)
{
//
@@ -589,7 +575,6 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
{
_serial = 1; // We can reset the serial number.
checkConsistencyNoSync(servers);
- initObserver(servers);
break;
}
serial = _serial;
@@ -608,29 +593,135 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
}
void
-NodeI::addServer(const string& application, const ServerIPtr& server)
+NodeI::addObserver(const string& replicaName, const NodeObserverPrx& observer)
{
- IceUtil::Mutex::Lock sync(_serversByApplicationLock);
- map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
- if(p == _serversByApplication.end())
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ _observers.insert(make_pair(replicaName, observer));
+
+ ServerDynamicInfoSeq serverInfos;
+ AdapterDynamicInfoSeq adapterInfos;
+ for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
+ p != _serversDynamicInfo.end(); ++p)
+ {
+ assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled));
+ serverInfos.push_back(p->second);
+ }
+
+ for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin();
+ q != _adaptersDynamicInfo.end(); ++q)
{
- map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
- p = _serversByApplication.insert(p, v);
+ assert(q->second.proxy);
+ adapterInfos.push_back(q->second);
+ }
+
+ try
+ {
+ NodeDynamicInfo info;
+ info.info = _platform.getNodeInfo();
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ observer->nodeUp(info);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_traceLevels->logger);
+ out << "unexpected observer exception:\n" << ex;
}
- p->second.insert(server);
}
void
-NodeI::removeServer(const std::string& application, const ServerIPtr& server)
+NodeI::removeObserver(const string& replicaName)
{
- IceUtil::Mutex::Lock sync(_serversByApplicationLock);
- map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
- if(p != _serversByApplication.end())
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ _observers.erase(replicaName);
+}
+
+void
+NodeI::observerUpdateServer(const ServerDynamicInfo& info)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+
+ if(info.state == Destroyed || info.state == Inactive && info.enabled)
{
- p->second.erase(server);
- if(p->second.empty())
+ _serversDynamicInfo.erase(info.id);
+ }
+ else
+ {
+ _serversDynamicInfo[info.id] = info;
+ }
+
+ for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ try
+ {
+ p->second->updateServer(_name, info);
+ }
+ catch(const Ice::LocalException&)
{
- _serversByApplication.erase(p);
+ // IGNORE
+ }
+ }
+}
+
+void
+NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+
+ if(info.proxy)
+ {
+ _adaptersDynamicInfo[info.id] = info;
+ }
+ else
+ {
+ _adaptersDynamicInfo.erase(info.id);
+ }
+
+ for(map<string, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ {
+ try
+ {
+ p->second->updateAdapter(_name, info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // IGNORE
+ }
+ }
+}
+
+void
+NodeI::addServer(const ServerIPtr& server, const string& application, bool dependsOnApplicationDistrib)
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+
+ if(dependsOnApplicationDistrib)
+ {
+ map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ if(p == _serversByApplication.end())
+ {
+ map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
+ p = _serversByApplication.insert(p, v);
+ }
+ p->second.insert(server);
+ }
+}
+
+void
+NodeI::removeServer(const ServerIPtr& server, const std::string& application, bool dependsOnApplicationDistrib)
+{
+ IceUtil::Mutex::Lock sync(_serversLock);
+
+ if(dependsOnApplicationDistrib)
+ {
+ map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ if(p != _serversByApplication.end())
+ {
+ p->second.erase(server);
+ if(p->second.empty())
+ {
+ _serversByApplication.erase(p);
+ }
}
}
}
@@ -820,45 +911,6 @@ NodeI::canRemoveServerDirectory(const string& name)
}
void
-NodeI::initObserver(const Ice::StringSeq& servers)
-{
- ServerDynamicInfoSeq serverInfos;
- AdapterDynamicInfoSeq adapterInfos;
-
- for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
- {
- Ice::Identity id = createServerIdentity(*p);
- ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id));
- if(server)
- {
- try
- {
- server->getDynamicInfo(serverInfos, adapterInfos);
- }
- catch(const Ice::ObjectNotExistException&)
- {
- }
- }
- }
-
- try
- {
- NodeDynamicInfo info;
- info.info = _platform.getNodeInfo();
- info.servers = serverInfos;
- info.adapters = adapterInfos;
-
- assert(_observer); // No synchronization needed here, this is called by a single thread.
- _observer->nodeUp(info);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_traceLevels->logger);
- out << "unexpected observer exception:\n" << ex;
- }
-}
-
-void
NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<string>& directories)
{
PatcherFeedbackPtr feedback = new LogPatcherFeedback(_traceLevels, dest);
@@ -898,7 +950,7 @@ NodeI::patch(const FileServerPrx& icepatch, const string& dest, const vector<str
set<ServerIPtr>
NodeI::getApplicationServers(const string& application) const
{
- IceUtil::Mutex::Lock sync(_serversByApplicationLock);
+ IceUtil::Mutex::Lock sync(_serversLock);
set<ServerIPtr> servers;
map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application);
if(p != _serversByApplication.end())
diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h
index 322cb69b76d..46df88855ad 100644
--- a/cpp/src/IceGrid/NodeI.h
+++ b/cpp/src/IceGrid/NodeI.h
@@ -59,23 +59,25 @@ public:
Ice::ObjectAdapterPtr getAdapter() const;
ActivatorPtr getActivator() const;
TraceLevelsPtr getTraceLevels() const;
- NodeObserverPrx getObserver() const;
UserAccountMapperPrx getUserAccountMapper() const;
PlatformInfo& getPlatformInfo() const { return _platform; }
NodeSessionPrx registerWithRegistry(const InternalRegistryPrx&);
- void setObserver(const NodeObserverPrx&);
void checkConsistency(const NodeSessionPrx&);
NodeSessionPrx getMasterNodeSession() const;
- void addServer(const std::string&, const ServerIPtr&);
- void removeServer(const std::string&, const ServerIPtr&);
+ void addObserver(const std::string&, const NodeObserverPrx&);
+ void removeObserver(const std::string&);
+ void observerUpdateServer(const ServerDynamicInfo&);
+ void observerUpdateAdapter(const AdapterDynamicInfo&);
+
+ void addServer(const ServerIPtr&, const std::string&, bool);
+ void removeServer(const ServerIPtr&, const std::string&, bool);
private:
void checkConsistencyNoSync(const Ice::StringSeq&);
bool canRemoveServerDirectory(const std::string&);
- void initObserver(const Ice::StringSeq&);
void patch(const IcePatch2::FileServerPrx&, const std::string&, const std::vector<std::string>&);
std::set<ServerIPtr> getApplicationServers(const std::string&) const;
@@ -97,10 +99,12 @@ private:
std::string _tmpDir;
unsigned long _serial;
IceUtil::Mutex _observerMutex;
- NodeObserverPrx _observer;
+ std::map<std::string, NodeObserverPrx> _observers;
+ std::map<std::string, ServerDynamicInfo> _serversDynamicInfo;
+ std::map<std::string, AdapterDynamicInfo> _adaptersDynamicInfo;
mutable PlatformInfo _platform;
- IceUtil::Mutex _serversByApplicationLock;
+ IceUtil::Mutex _serversLock;
std::map<std::string, std::set<ServerIPtr> > _serversByApplication;
std::set<std::string> _patchInProgress;
};
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 5823e73a379..7bae899a9bf 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -140,12 +140,15 @@ NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registr
{
timeout = IceUtil::Time::seconds(t / 2);
}
+ _node->addObserver(_name, session->getObserver());
return session;
}
void
NodeSessionKeepAliveThread::destroySession(const NodeSessionPrx& session)
{
+ _node->removeObserver(_name);
+
try
{
session->destroy();
@@ -473,13 +476,8 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
if(session)
{
session->loadServers();
- _node->setObserver(session->getObserver());
_node->checkConsistency(session);
}
- else
- {
- _node->setObserver(0);
- }
}
catch(const Ice::LocalException&)
{
diff --git a/cpp/src/IceGrid/ServerAdapterI.cpp b/cpp/src/IceGrid/ServerAdapterI.cpp
index d00f721cdd9..eef9228ec94 100644
--- a/cpp/src/IceGrid/ServerAdapterI.cpp
+++ b/cpp/src/IceGrid/ServerAdapterI.cpp
@@ -139,23 +139,10 @@ ServerAdapterI::setDirectProxy(const Ice::ObjectPrx& prx, const Ice::Current&)
if(updated)
{
- NodeObserverPrx observer = _node->getObserver();
- if(observer)
- {
- AdapterDynamicInfo info;
- info.id = _id;
- info.proxy = _proxy;
- try
- {
- observer->updateAdapter(_node->getName(), info);
- }
- catch(const Ice::LocalException&)
- {
- //
- // Expected if the master IceGrid registry is down.
- //
- }
- }
+ AdapterDynamicInfo info;
+ info.id = _id;
+ info.proxy = _proxy;
+ _node->observerUpdateAdapter(info);
}
if(_proxy)
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp
index c64bdf1e3ba..131a6c8bf09 100644
--- a/cpp/src/IceGrid/ServerI.cpp
+++ b/cpp/src/IceGrid/ServerI.cpp
@@ -720,6 +720,8 @@ ServerI::setEnabled(bool enabled, const ::Ice::Current&)
{
return; // Nothing to change!
}
+
+ _node->observerUpdateServer(getDynamicInfo());
}
if(activate)
@@ -738,19 +740,6 @@ ServerI::setEnabled(bool enabled, const ::Ice::Current&)
{
}
}
-
- NodeObserverPrx observer = _node->getObserver();
- if(observer)
- {
- try
- {
- observer->updateServer(_node->getName(), getDynamicInfo());
- }
- catch(const Ice::LocalException&)
- {
- // Expected if the IceGrid registry is down.
- }
- }
}
bool
@@ -1509,10 +1498,7 @@ ServerI::destroy()
adpts = _adapters;
}
- if(_info.descriptor->applicationDistrib)
- {
- _node->removeServer(_info.application, this);
- }
+ _node->removeServer(this, _info.application, _info.descriptor->applicationDistrib);
try
{
@@ -1688,14 +1674,11 @@ ServerI::update()
throw DeploymentException(msg);
}
- if(oldInfo.descriptor && oldInfo.descriptor->applicationDistrib)
- {
- _node->removeServer(oldInfo.application, this);
- }
- if(_info.descriptor->applicationDistrib)
+ if(oldInfo.descriptor)
{
- _node->addServer(_info.application, this);
+ _node->removeServer(this, oldInfo.application, oldInfo.descriptor->applicationDistrib);
}
+ _node->addServer(this, _info.application, _info.descriptor->applicationDistrib);
AdapterPrxDict adapters;
for(ServerAdapterDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
@@ -2329,18 +2312,7 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason)
if(toServerState(previous) != toServerState(_state) &&
!(previous == Inactive && _state == Deactivating))
{
- NodeObserverPrx observer = _node->getObserver();
- if(observer)
- {
- try
- {
- observer->updateServer(_node->getName(), getDynamicInfo());
- }
- catch(const Ice::LocalException&)
- {
- // Expected if the IceGrid registry is down.
- }
- }
+ _node->observerUpdateServer(getDynamicInfo());
}
if(_node->getTraceLevels()->server > 1)
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 20159fe4ec6..582a5f5da5b 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -50,11 +50,24 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
return;
}
-
+
+ //
+ // We need to ensure that this observer isn't already registered
+ // as IceStorm might otherwise replace a previous subscriber
+ // without any notification.
+ //
+ assert(obsv);
+ if(_subscribers.find(obsv->ice_getIdentity()) != _subscribers.end())
+ {
+ throw ObserverAlreadyRegisteredException(obsv->ice_getIdentity());
+ }
+
IceStorm::QoS qos;
qos["reliability"] = "twoway ordered";
initObserver(_topic->subscribe(qos, obsv));
+ _subscribers.insert(obsv->ice_getIdentity());
+
if(!name.empty())
{
assert(_syncSubscribers.find(name) == _syncSubscribers.end());
@@ -72,6 +85,9 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
_topic->unsubscribe(observer);
}
+ assert(observer);
+ _subscribers.erase(observer->ice_getIdentity());
+
if(!name.empty())
{
assert(_syncSubscribers.find(name) != _syncSubscribers.end());
@@ -358,7 +374,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
{
if(p->id == server.id)
{
- if(server.state == Destroyed || server.state == Inactive)
+ if(server.state == Destroyed || (server.state == Inactive && server.enabled))
{
servers.erase(p);
}
@@ -370,7 +386,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
}
++p;
}
- if(server.state != Destroyed && server.state != Inactive && p == servers.end())
+ if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end())
{
servers.push_back(server);
}
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index 13a75970b56..dc480fdc5d1 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -49,6 +49,7 @@ protected:
Ice::ObjectPrx _basePublisher;
int _serial;
+ std::set<Ice::Identity> _subscribers;
std::set<std::string> _syncSubscribers;
std::map<int, std::set<std::string> > _waitForUpdates;
std::map<int, std::map<std::string, std::string> > _updateFailures;