summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2017-02-10 18:42:18 +0100
committerBenoit Foucher <benoit@zeroc.com>2017-02-10 18:42:18 +0100
commit3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11 (patch)
tree8a2475dda8b95d5b837bdf5415a1a968c57642c1
parentFix (ICE-7331) - IceGridGUI preference preservation (diff)
downloadice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.bz2
ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.tar.xz
ice-3b2960e7be53c931ffcf84a0a2fe3ca1e2b75e11.zip
Fixed ICE-7328 - IceGrid no longer returns proxies from disabled servers for ByType operations
-rw-r--r--CHANGELOG-3.7.md6
-rw-r--r--cpp/src/IceGrid/Allocatable.h1
-rw-r--r--cpp/src/IceGrid/AllocatableObjectCache.cpp27
-rw-r--r--cpp/src/IceGrid/AllocatableObjectCache.h9
-rw-r--r--cpp/src/IceGrid/Database.cpp17
-rw-r--r--cpp/src/IceGrid/NodeI.cpp2
-rw-r--r--cpp/src/IceGrid/ObjectCache.cpp27
-rw-r--r--cpp/src/IceGrid/ObjectCache.h17
-rw-r--r--cpp/src/IceGrid/QueryI.cpp2
-rw-r--r--cpp/src/IceGrid/ServerCache.cpp15
-rw-r--r--cpp/src/IceGrid/ServerCache.h11
-rw-r--r--cpp/src/IceGrid/Topics.cpp162
-rw-r--r--cpp/src/IceGrid/Topics.h5
-rw-r--r--cpp/test/IceGrid/activation/AllTests.cpp14
-rw-r--r--cpp/test/IceGrid/activation/application.xml14
-rw-r--r--cpp/test/IceGrid/allocation/AllTests.cpp23
-rw-r--r--cpp/test/IceGrid/allocation/application.xml4
17 files changed, 250 insertions, 106 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md
index efe1c7612b3..309367ac13d 100644
--- a/CHANGELOG-3.7.md
+++ b/CHANGELOG-3.7.md
@@ -17,6 +17,12 @@ These are the changes since Ice 3.6.3.
## General Changes
+- The `findObjectByType`, `findAllObjectsByType`, `findObjectByTypeOnLeastLoadedNode`
+ operations from the `IceGrid::Query` interface and the `allocateObjectByType`
+ operation from the `IceGrid::Session` interfaces now only returns proxies for
+ Ice objects from enabled servers. If a server is disabled, its well-known or
+ allocatable Ice objects won't be returned anymore to clients.
+
- A Slice enumeration (enum) creates now a new namespace scope for its
enumerators. In previous releases, the enumerators were in the same
namespace scope as the enumeration. For example:
diff --git a/cpp/src/IceGrid/Allocatable.h b/cpp/src/IceGrid/Allocatable.h
index 0f10b94f4ae..2bf31a4447c 100644
--- a/cpp/src/IceGrid/Allocatable.h
+++ b/cpp/src/IceGrid/Allocatable.h
@@ -85,6 +85,7 @@ public:
bool isAllocatable() const { return _allocatable; }
SessionIPtr getSession() const;
+ virtual bool isEnabled() const = 0;
virtual void allocated(const SessionIPtr&) = 0;
virtual void released(const SessionIPtr&) = 0;
virtual bool canTryAllocate() { return false; }
diff --git a/cpp/src/IceGrid/AllocatableObjectCache.cpp b/cpp/src/IceGrid/AllocatableObjectCache.cpp
index 3a9799e1eac..438add8b263 100644
--- a/cpp/src/IceGrid/AllocatableObjectCache.cpp
+++ b/cpp/src/IceGrid/AllocatableObjectCache.cpp
@@ -12,6 +12,7 @@
#include <Ice/LoggerUtil.h>
#include <Ice/LocalException.h>
#include <IceGrid/AllocatableObjectCache.h>
+#include <IceGrid/ServerCache.h>
#include <IceGrid/SessionI.h>
using namespace std;
@@ -126,7 +127,7 @@ AllocatableObjectCache::AllocatableObjectCache(const Ice::CommunicatorPtr& commu
}
void
-AllocatableObjectCache::add(const ObjectInfo& info, const AllocatablePtr& parent)
+AllocatableObjectCache::add(const ObjectInfo& info, const ServerEntryPtr& parent)
{
const Ice::Identity& id = info.proxy->ice_getIdentity();
@@ -217,13 +218,18 @@ AllocatableObjectCache::allocateByType(const string& type, const ObjectAllocatio
vector<AllocatableObjectEntryPtr> objects = p->second.getObjects();
RandomNumberGenerator rng;
random_shuffle(objects.begin(), objects.end(), rng); // TODO: OPTIMIZE
+ int allocatable = 0;
try
{
for(vector<AllocatableObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
- if((*q)->tryAllocate(request))
+ if((*q)->isEnabled())
{
- return;
+ ++allocatable;
+ if((*q)->tryAllocate(request))
+ {
+ return;
+ }
}
}
}
@@ -231,7 +237,10 @@ AllocatableObjectCache::allocateByType(const string& type, const ObjectAllocatio
{
return; // The request has been answered already, no need to throw here.
}
-
+ if(allocatable == 0)
+ {
+ throw AllocationException("no allocatable objects with type `" + type + "' enabled");
+ }
p->second.addAllocationRequest(request);
}
@@ -252,12 +261,14 @@ AllocatableObjectCache::canTryAllocate(const AllocatableObjectEntryPtr& entry)
AllocatableObjectEntry::AllocatableObjectEntry(AllocatableObjectCache& cache,
const ObjectInfo& info,
- const AllocatablePtr& parent) :
+ const ServerEntryPtr& parent) :
Allocatable(true, parent),
_cache(cache),
_info(info),
+ _server(parent),
_destroyed(false)
{
+ assert(_server);
}
Ice::ObjectPrx
@@ -278,6 +289,12 @@ AllocatableObjectEntry::canRemove()
return true;
}
+bool
+AllocatableObjectEntry::isEnabled() const
+{
+ return _server->isEnabled();
+}
+
void
AllocatableObjectEntry::allocated(const SessionIPtr& session)
{
diff --git a/cpp/src/IceGrid/AllocatableObjectCache.h b/cpp/src/IceGrid/AllocatableObjectCache.h
index 382810c3653..9f2db6f3121 100644
--- a/cpp/src/IceGrid/AllocatableObjectCache.h
+++ b/cpp/src/IceGrid/AllocatableObjectCache.h
@@ -19,18 +19,22 @@
namespace IceGrid
{
+class ServerEntry;
+typedef IceUtil::Handle<ServerEntry> ServerEntryPtr;
+
class AllocatableObjectCache;
class AllocatableObjectEntry : public Allocatable
{
public:
- AllocatableObjectEntry(AllocatableObjectCache&, const ObjectInfo&, const AllocatablePtr&);
+ AllocatableObjectEntry(AllocatableObjectCache&, const ObjectInfo&, const ServerEntryPtr&);
Ice::ObjectPrx getProxy() const;
std::string getType() const;
bool canRemove();
+ virtual bool isEnabled() const;
virtual void allocated(const SessionIPtr&);
virtual void released(const SessionIPtr&);
virtual bool canTryAllocate();
@@ -42,6 +46,7 @@ private:
AllocatableObjectCache& _cache;
const ObjectInfo _info;
+ ServerEntryPtr _server;
bool _destroyed;
};
typedef IceUtil::Handle<AllocatableObjectEntry> AllocatableObjectEntryPtr;
@@ -77,7 +82,7 @@ public:
AllocatableObjectCache(const Ice::CommunicatorPtr&);
- void add(const ObjectInfo&, const AllocatablePtr&);
+ void add(const ObjectInfo&, const ServerEntryPtr&);
AllocatableObjectEntryPtr get(const Ice::Identity&) const;
void remove(const Ice::Identity&);
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index ce458e52936..e20d176cbe8 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -265,6 +265,8 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
_registryObserverTopic = new RegistryObserverTopic(_topicManager);
+ _serverCache.setNodeObserverTopic(_nodeObserverTopic);
+
// Set all serials to 1 if they have not yet been set.
Ice::Long serial;
if(!_serials.get(txn, applicationsDbName, serial))
@@ -1777,7 +1779,16 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample
Ice::ObjectProxySeq
Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
{
- Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type);
+ Ice::ObjectProxySeq proxies;
+
+ vector<ObjectEntryPtr> objects = _objectCache.getObjectsByType(type);
+ for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ {
+ if(_nodeObserverTopic->isServerEnabled((*q)->getServer())) // Only return proxies from enabled servers.
+ {
+ proxies.push_back((*q)->getProxy());
+ }
+ }
IceDB::ReadOnlyTxn txn(_env);
vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type);
@@ -2162,7 +2173,7 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri
_adapterCache.addReplicaGroup(*r, application);
for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
{
- _objectCache.add(toObjectInfo(_communicator, *o, r->id), application);
+ _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
}
}
@@ -2305,7 +2316,7 @@ Database::reload(const ApplicationHelper& oldApp,
for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
{
- _objectCache.add(toObjectInfo(_communicator, *o, r->id), application);
+ _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
}
}
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index e83d6c3cdc3..294e20f393d 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -971,6 +971,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info)
if(sent.find(p->second) == sent.end())
{
queueUpdate(p->second, new UpdateServer(this, p->second, info));
+ sent.insert(p->second);
}
}
}
@@ -1001,6 +1002,7 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
if(sent.find(p->second) == sent.end())
{
queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
+ sent.insert(p->second);
}
}
}
diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp
index cc76cc2d9b7..8cd6aaedeb2 100644
--- a/cpp/src/IceGrid/ObjectCache.cpp
+++ b/cpp/src/IceGrid/ObjectCache.cpp
@@ -74,7 +74,7 @@ ObjectCache::ObjectCache(const Ice::CommunicatorPtr& communicator) : _communicat
}
void
-ObjectCache::add(const ObjectInfo& info, const string& application)
+ObjectCache::add(const ObjectInfo& info, const string& application, const string& server)
{
const Ice::Identity& id = info.proxy->ice_getIdentity();
@@ -86,7 +86,7 @@ ObjectCache::add(const ObjectInfo& info, const string& application)
return;
}
- ObjectEntryPtr entry = new ObjectEntry(info, application);
+ ObjectEntryPtr entry = new ObjectEntry(info, application, server);
addImpl(id, entry);
map<string, TypeEntry>::iterator p = _types.find(entry->getType());
@@ -142,22 +142,16 @@ ObjectCache::remove(const Ice::Identity& id)
}
}
-Ice::ObjectProxySeq
+vector<ObjectEntryPtr>
ObjectCache::getObjectsByType(const string& type)
{
Lock sync(*this);
- Ice::ObjectProxySeq proxies;
map<string, TypeEntry>::const_iterator p = _types.find(type);
if(p == _types.end())
{
- return proxies;
+ return vector<ObjectEntryPtr>();
}
- const vector<ObjectEntryPtr>& objects = p->second.getObjects();
- for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q)
- {
- proxies.push_back((*q)->getProxy());
- }
- return proxies;
+ return p->second.getObjects();
}
ObjectInfoSeq
@@ -194,9 +188,10 @@ ObjectCache::getAllByType(const string& type)
return infos;
}
-ObjectEntry::ObjectEntry(const ObjectInfo& info, const string& application) :
+ObjectEntry::ObjectEntry(const ObjectInfo& info, const string& application, const string& server) :
_info(info),
- _application(application)
+ _application(application),
+ _server(server)
{
}
@@ -218,6 +213,12 @@ ObjectEntry::getApplication() const
return _application;
}
+string
+ObjectEntry::getServer() const
+{
+ return _server;
+}
+
const ObjectInfo&
ObjectEntry::getObjectInfo() const
{
diff --git a/cpp/src/IceGrid/ObjectCache.h b/cpp/src/IceGrid/ObjectCache.h
index fba99917e1b..fbeb5aeb9f4 100644
--- a/cpp/src/IceGrid/ObjectCache.h
+++ b/cpp/src/IceGrid/ObjectCache.h
@@ -23,11 +23,12 @@ class ObjectCache;
class ObjectEntry : public IceUtil::Shared
{
public:
-
- ObjectEntry(const ObjectInfo&, const std::string&);
+
+ ObjectEntry(const ObjectInfo&, const std::string&, const std::string&);
Ice::ObjectPrx getProxy() const;
std::string getType() const;
std::string getApplication() const;
+ std::string getServer() const;
const ObjectInfo& getObjectInfo() const;
bool canRemove();
@@ -36,6 +37,7 @@ private:
const ObjectInfo _info;
const std::string _application;
+ const std::string _server;
};
typedef IceUtil::Handle<ObjectEntry> ObjectEntryPtr;
@@ -45,18 +47,19 @@ public:
ObjectCache(const Ice::CommunicatorPtr&);
- void add(const ObjectInfo&, const std::string&);
+ void add(const ObjectInfo&, const std::string&, const std::string&);
ObjectEntryPtr get(const Ice::Identity&) const;
void remove(const Ice::Identity&);
- Ice::ObjectProxySeq getObjectsByType(const std::string&);
+ std::vector<ObjectEntryPtr> getObjectsByType(const std::string&);
+
ObjectInfoSeq getAll(const std::string&);
ObjectInfoSeq getAllByType(const std::string&);
const Ice::CommunicatorPtr& getCommunicator() const { return _communicator; }
private:
-
+
class TypeEntry
{
public:
@@ -65,11 +68,11 @@ private:
void add(const ObjectEntryPtr&);
bool remove(const ObjectEntryPtr&);
-
+
const std::vector<ObjectEntryPtr>& getObjects() const { return _objects; }
private:
-
+
std::vector<ObjectEntryPtr> _objects;
};
diff --git a/cpp/src/IceGrid/QueryI.cpp b/cpp/src/IceGrid/QueryI.cpp
index 366f5eb607d..713b197c36c 100644
--- a/cpp/src/IceGrid/QueryI.cpp
+++ b/cpp/src/IceGrid/QueryI.cpp
@@ -86,7 +86,7 @@ QueryI::findAllReplicas(const Ice::ObjectPrx& proxy, const Ice::Current& current
try
{
AdapterInfoSeq infos = _database->getFilteredAdapterInfo(prx->ice_getAdapterId(), current.con, current.ctx);
- if(infos.empty() || infos[0].replicaGroupId != prx->ice_getAdapterId())
+ if(infos.empty() || infos[0].replicaGroupId != prx->ice_getAdapterId())
{
// The adapter id doesn't refer to a replica group or the replica group is empty.
return Ice::ObjectProxySeq();
diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp
index 03b4332f43b..bafafa42553 100644
--- a/cpp/src/IceGrid/ServerCache.cpp
+++ b/cpp/src/IceGrid/ServerCache.cpp
@@ -18,6 +18,7 @@
#include <IceGrid/AllocatableObjectCache.h>
#include <IceGrid/SessionI.h>
#include <IceGrid/DescriptorHelper.h>
+#include <IceGrid/Topics.h>
using namespace std;
using namespace IceGrid;
@@ -263,6 +264,12 @@ ServerCache::clear(const string& id)
}
void
+ServerCache::setNodeObserverTopic(const NodeObserverTopicPtr& nodeObserverTopic)
+{
+ _nodeObserverTopic = nodeObserverTopic;
+}
+
+void
ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc,
const CommunicatorDescriptorPtr& newDesc,
const ServerEntryPtr& server,
@@ -291,7 +298,7 @@ ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc,
for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
{
- _objectCache.add(toObjectInfo(_communicator, *r, q->id), application);
+ _objectCache.add(toObjectInfo(_communicator, *r, q->id), application, server->getId());
}
for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r)
{
@@ -1091,6 +1098,12 @@ ServerEntry::checkUpdate(const ServerInfo& info, bool noRestart)
return new CheckUpdateResult(_id, oldInfo.node, noRestart, desc, server->begin_checkUpdate(desc, noRestart));
}
+bool
+ServerEntry::isEnabled() const
+{
+ return _cache.getNodeObserverTopic()->isServerEnabled(_id);
+}
+
void
ServerEntry::allocated(const SessionIPtr& session)
{
diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h
index d604439caa8..8379fe4f8c3 100644
--- a/cpp/src/IceGrid/ServerCache.h
+++ b/cpp/src/IceGrid/ServerCache.h
@@ -34,6 +34,9 @@ typedef IceUtil::Handle<NodeEntry> NodeEntryPtr;
class CheckServerResult;
typedef IceUtil::Handle<CheckServerResult> CheckServerResultPtr;
+class NodeObserverTopic;
+typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr;
+
class CheckUpdateResult : public IceUtil::Shared
{
public:
@@ -97,6 +100,7 @@ public:
void destroyCallback();
void exception(const Ice::Exception&);
+ virtual bool isEnabled() const;
virtual void allocated(const SessionIPtr&);
virtual void allocatedNoSync(const SessionIPtr&);
virtual void released(const SessionIPtr&);
@@ -139,7 +143,8 @@ public:
using CacheByString<ServerEntry>::remove;
#endif
- ServerCache(const Ice::CommunicatorPtr&, const std::string&, NodeCache&, AdapterCache&, ObjectCache&, AllocatableObjectCache&);
+ ServerCache(const Ice::CommunicatorPtr&, const std::string&, NodeCache&, AdapterCache&, ObjectCache&,
+ AllocatableObjectCache&);
ServerEntryPtr add(const ServerInfo&);
ServerEntryPtr get(const std::string&) const;
@@ -155,6 +160,9 @@ public:
Ice::CommunicatorPtr getCommunicator() const { return _communicator; }
const std::string& getInstanceName() const { return _instanceName; }
+ const NodeObserverTopicPtr& getNodeObserverTopic() const { return _nodeObserverTopic; }
+ void setNodeObserverTopic(const NodeObserverTopicPtr&);
+
private:
void addCommunicator(const CommunicatorDescriptorPtr&, const CommunicatorDescriptorPtr&, const ServerEntryPtr&,
@@ -170,6 +178,7 @@ private:
AdapterCache& _adapterCache;
ObjectCache& _objectCache;
AllocatableObjectCache& _allocatableObjectCache;
+ NodeObserverTopicPtr _nodeObserverTopic;
};
};
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 578f688d0b4..140c2673140 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -139,7 +139,7 @@ ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
++p;
}
}
-
+
if(notifyMonitor)
{
notifyAll();
@@ -163,7 +163,7 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail
if(p != _waitForUpdates.end())
{
p->second.erase(name);
-
+
if(!failure.empty())
{
map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
@@ -237,7 +237,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
if(q != _updateFailures.end())
{
map<string, string> failures = q->second;
- _updateFailures.erase(q);
+ _updateFailures.erase(q);
ostringstream os;
for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
{
@@ -288,7 +288,7 @@ ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
return context;
}
-RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
+RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
ObserverTopic(topicManager, "RegistryObserver")
{
_publishers = getPublishers<RegistryObserverPrx>();
@@ -314,11 +314,11 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `registryUp' update:\n" << ex;
+ out << "unexpected exception while publishing `registryUp' update:\n" << ex;
}
}
-void
+void
RegistryObserverTopic::registryDown(const string& name)
{
Lock sync(*this);
@@ -344,7 +344,7 @@ RegistryObserverTopic::registryDown(const string& name)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `registryDown' update:\n" << ex;
+ out << "unexpected exception while publishing `registryDown' update:\n" << ex;
}
}
@@ -361,8 +361,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
observer->registryInit(registries, getContext(_serial));
}
-NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
- const Ice::ObjectAdapterPtr& adapter) :
+NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
+ const Ice::ObjectAdapterPtr& adapter) :
ObserverTopic(topicManager, "NodeObserver")
{
_publishers = getPublishers<NodeObserverPrx>();
@@ -391,6 +391,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
}
updateSerial();
_nodes.insert(make_pair(info.info.name, info));
+ for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p)
+ {
+ _serverStatus[p->id] = p->enabled;
+ }
try
{
for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
@@ -401,17 +405,17 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
+ out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
}
}
-void
+void
NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&)
{
assert(false);
}
-void
+void
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
{
Lock sync(*this);
@@ -427,7 +431,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
//
return;
}
-
+
updateSerial();
ServerDynamicInfoSeq& servers = _nodes[node].servers;
@@ -453,6 +457,15 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
servers.push_back(server);
}
+ if(server.state != Destroyed)
+ {
+ _serverStatus[server.id] = server.enabled;
+ }
+ else
+ {
+ _serverStatus.erase(server.id);
+ }
+
try
{
for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
@@ -463,11 +476,11 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `updateServer' update:\n" << ex;
+ out << "unexpected exception while publishing `updateServer' update:\n" << ex;
}
}
-void
+void
NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
{
Lock sync(*this);
@@ -508,7 +521,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
{
adapters.push_back(adapter);
}
-
+
try
{
for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
@@ -519,11 +532,11 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
+ out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
}
}
-void
+void
NodeObserverTopic::nodeDown(const string& name)
{
Lock sync(*this);
@@ -534,22 +547,30 @@ NodeObserverTopic::nodeDown(const string& name)
updateSerial();
- if(_nodes.find(name) != _nodes.end())
+ if(_nodes.find(name) == _nodes.end())
{
- _nodes.erase(name);
- try
- {
- for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
- {
- (*p)->nodeDown(name);
- }
- }
- catch(const Ice::LocalException& ex)
+ return;
+ }
+
+ ServerDynamicInfoSeq& servers = _nodes[name].servers;
+ for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ {
+ _serverStatus.erase(p->id);
+ }
+
+ _nodes.erase(name);
+ try
+ {
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
{
- Ice::Warning out(_logger);
- out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
+ (*p)->nodeDown(name);
}
}
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
+ }
}
void
@@ -565,6 +586,25 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
observer->nodeInit(nodes, getContext(_serial));
}
+bool
+NodeObserverTopic::isServerEnabled(const string& server) const
+{
+ Lock sync(*this);
+ if(_topics.empty())
+ {
+ return false;
+ }
+ map<string, bool>::const_iterator p = _serverStatus.find(server);
+ if(p != _serverStatus.end())
+ {
+ return p->second;
+ }
+ else
+ {
+ return true; // Assume the server is enabled if we don't know its status.
+ }
+}
+
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
const map<string, ApplicationInfo>& applications, Ice::Long serial) :
ObserverTopic(topicManager, "ApplicationObserver", serial),
@@ -597,13 +637,13 @@ ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationI
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info)
{
Lock sync(*this);
@@ -624,13 +664,13 @@ ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const Application
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name)
{
Lock sync(*this);
@@ -650,13 +690,13 @@ ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& n
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
@@ -712,13 +752,13 @@ ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const Applicati
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-void
+void
ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
@@ -738,7 +778,7 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
_publishers = getPublishers<AdapterObserverPrx>();
}
-int
+int
AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts)
{
Lock sync(*this);
@@ -762,13 +802,13 @@ AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpt
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
{
Lock sync(*this);
@@ -788,13 +828,13 @@ AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info)
{
Lock sync(*this);
@@ -814,7 +854,7 @@ AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
@@ -840,13 +880,13 @@ AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
+ out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-void
+void
AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
@@ -854,7 +894,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
adapters.push_back(p->second);
- }
+ }
observer->adapterInit(adapters, getContext(_serial, _dbSerial));
}
@@ -866,7 +906,7 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
_publishers = getPublishers<ObjectObserverPrx>();
}
-int
+int
ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects)
{
Lock sync(*this);
@@ -890,13 +930,13 @@ ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectInit' update:\n" << ex;
+ out << "unexpected exception while publishing `objectInit' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
{
Lock sync(*this);
@@ -916,13 +956,13 @@ ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
}
-int
+int
ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
{
Lock sync(*this);
@@ -942,7 +982,7 @@ ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
@@ -968,7 +1008,7 @@ ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
+ out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
return _serial;
@@ -1000,7 +1040,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
}
else
@@ -1016,7 +1056,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
+ out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
}
}
@@ -1024,7 +1064,7 @@ ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
//
// We don't wait for the update to be received by the replicas
// here. This operation is called by ReplicaSessionI.
- //
+ //
addExpectedUpdate(_serial);
//waitForSyncedSubscribersNoSync(_serial);
return _serial;
@@ -1053,21 +1093,21 @@ ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_logger);
- out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
+ out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
}
//
// We don't need to wait for the update to be received by the
- // replicas here. This operation is only called internaly by
+ // replicas here. This operation is only called internaly by
// IceGrid.
- //
+ //
addExpectedUpdate(_serial);
//waitForSyncedSubscribersNoSync(_serial);
return _serial;
}
-void
+void
ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
{
ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index ceaf6f4ca38..44affbacec3 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -88,7 +88,7 @@ typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr;
class NodeObserverTopic : public ObserverTopic, public NodeObserver
{
public:
-
+
NodeObserverTopic(const IceStorm::TopicManagerPrx&, const Ice::ObjectAdapterPtr&);
virtual void nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&);
@@ -102,11 +102,14 @@ public:
void nodeDown(const std::string&);
virtual void initObserver(const Ice::ObjectPrx&);
+ bool isServerEnabled(const std::string&) const;
+
private:
const NodeObserverPrx _externalPublisher;
std::vector<NodeObserverPrx> _publishers;
std::map<std::string, NodeDynamicInfo> _nodes;
+ std::map<std::string, bool> _serverStatus;
};
typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr;
diff --git a/cpp/test/IceGrid/activation/AllTests.cpp b/cpp/test/IceGrid/activation/AllTests.cpp
index 3ae92ba6e95..7fba9e9975e 100644
--- a/cpp/test/IceGrid/activation/AllTests.cpp
+++ b/cpp/test/IceGrid/activation/AllTests.cpp
@@ -92,6 +92,10 @@ allTests(const Ice::CommunicatorPtr& communicator)
IceGrid::RegistryPrx registry = IceGrid::RegistryPrx::checkedCast(
communicator->stringToProxy(communicator->getDefaultLocator()->ice_getIdentity().category + "/Registry"));
test(registry);
+
+ IceGrid::QueryPrx query = IceGrid::QueryPrx::checkedCast(
+ communicator->stringToProxy(communicator->getDefaultLocator()->ice_getIdentity().category + "/Query"));
+
IceGrid::AdminSessionPrx session = registry->createAdminSession("foo", "bar");
session->ice_getConnection()->setACM(registry->getACMTimeout(), IceUtil::None, Ice::ICE_ENUM(ACMHeartbeat, HeartbeatAlways));
@@ -253,6 +257,8 @@ allTests(const Ice::CommunicatorPtr& communicator)
cout << "testing server disable... " << flush;
try
{
+ int count = query->findAllObjectsByType("Test").size();
+
test(admin->getServerState("server") == IceGrid::Inactive);
admin->enableServer("server", false);
try
@@ -263,6 +269,9 @@ allTests(const Ice::CommunicatorPtr& communicator)
catch(const Ice::NoEndpointException&)
{
}
+
+ test(query->findAllObjectsByType("Test").size() == count - 1);
+
try
{
admin->startServer("server");
@@ -292,6 +301,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
}
test(admin->getServerState("server-manual") == IceGrid::Inactive);
+ test(query->findAllObjectsByType("Test").size() == count - 2);
test(admin->getServerState("server-always") == IceGrid::Active);
admin->enableServer("server-always", false);
@@ -314,7 +324,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
}
test(admin->getServerState("server-always") == IceGrid::Inactive);
-
+ test(query->findAllObjectsByType("Test").size() == count - 3);
test(admin->getServerState("server") == IceGrid::Inactive);
admin->enableServer("server", true);
@@ -335,6 +345,8 @@ allTests(const Ice::CommunicatorPtr& communicator)
test(admin->getServerPid("server") == pid);
admin->stopServer("server");
test(admin->getServerState("server") == IceGrid::Inactive);
+
+ test(query->findAllObjectsByType("Test").size() == count - 2);
}
catch(const Ice::LocalException& ex)
{
diff --git a/cpp/test/IceGrid/activation/application.xml b/cpp/test/IceGrid/activation/application.xml
index 2b5ea2c2a72..a73b1cde237 100644
--- a/cpp/test/IceGrid/activation/application.xml
+++ b/cpp/test/IceGrid/activation/application.xml
@@ -14,9 +14,9 @@
activation-timeout="${activation-timeout}"
deactivation-timeout="${deactivation-timeout}">
<adapter name="TestAdapter" endpoints="default">
- <object identity="${server}" type="Test"/>
- <allocatable identity="${server}" type="Test"/>
- </adapter>
+ <object identity="${server}" type="Test"/>
+ <allocatable identity="${server}" type="Test"/>
+ </adapter>
<property name="ActivationDelay" value="${activation-delay}"/>
<property name="DeactivationDelay" value="${deactivation-delay}"/>
<property name="FailOnStartup" value="${fail-on-startup}"/>
@@ -53,13 +53,13 @@
<server id="invalid-exe" exe="server2" activation="on-demand">
<adapter name="TestAdapter" endpoints="default">
- <object identity="${server}" type="Test"/>
- </adapter>
+ <object identity="${server}" type="Test"/>
+ </adapter>
</server>
<server id="invalid-pwd" exe="./server" pwd="./bogus">
<adapter name="TestAdapter" endpoints="default">
- <object identity="${server}" type="Test"/>
- </adapter>
+ <object identity="${server}" type="Test"/>
+ </adapter>
</server>
<server id="invalid-exe-no-oa" exe="./server2">
<property name="Ice.Admin.Endpoints" value=""/>
diff --git a/cpp/test/IceGrid/allocation/AllTests.cpp b/cpp/test/IceGrid/allocation/AllTests.cpp
index 82b7131aa6e..e0c0871c4d5 100644
--- a/cpp/test/IceGrid/allocation/AllTests.cpp
+++ b/cpp/test/IceGrid/allocation/AllTests.cpp
@@ -480,7 +480,6 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->setAllocationTimeout(0);
session2->setAllocationTimeout(0);
-
try
{
obj = session1->allocateObjectByType("::Unknown");
@@ -595,6 +594,17 @@ allTests(const Ice::CommunicatorPtr& communicator)
session1->releaseObject(obj->ice_getIdentity());
+ admin->enableServer("ObjectAllocation", false);
+ try
+ {
+ session1->allocateObjectByType("::Test");
+ test(false);
+ }
+ catch(const AllocationException&)
+ {
+ }
+ admin->enableServer("ObjectAllocation", true);
+
cout << "ok" << endl;
cout << "testing object allocation timeout... " << flush;
@@ -817,6 +827,17 @@ allTests(const Ice::CommunicatorPtr& communicator)
session2->releaseObject(obj1->ice_getIdentity());
session2->releaseObject(obj2->ice_getIdentity());
+ admin->enableServer("ServerAllocation", false);
+ try
+ {
+ session1->allocateObjectByType("::TestServer1");
+ test(false);
+ }
+ catch(const AllocationException&)
+ {
+ }
+ admin->enableServer("ServerAllocation", true);
+
cout << "ok" << endl;
cout << "testing concurrent allocations... " << flush;
diff --git a/cpp/test/IceGrid/allocation/application.xml b/cpp/test/IceGrid/allocation/application.xml
index 27c8ad021f7..2855186151b 100644
--- a/cpp/test/IceGrid/allocation/application.xml
+++ b/cpp/test/IceGrid/allocation/application.xml
@@ -27,8 +27,8 @@
<server id="ObjectAllocation" exe="${server.dir}/server" activation="on-demand" pwd=".">
<adapter name="Server" endpoints="default">
- <allocatable identity="allocatable" type="::Test"/>
- <allocatable identity="allocatablebis" type="::TestBis"/>
+ <allocatable identity="allocatable" type="::Test"/>
+ <allocatable identity="allocatablebis" type="::TestBis"/>
<object identity="nonallocatable" type="::Test"/>
<object identity="nonallocatable2" type="::NotAllocatable"/>