summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2005-06-17 15:25:02 +0000
committerBenoit Foucher <benoit@zeroc.com>2005-06-17 15:25:02 +0000
commit2bc0f2b81423bb51c8649fefdeae23c602a56558 (patch)
treeda5dc69eda441fe23345267d1606da5266413f94 /cpp/src
parentbuild fix (diff)
downloadice-2bc0f2b81423bb51c8649fefdeae23c602a56558.tar.bz2
ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.tar.xz
ice-2bc0f2b81423bb51c8649fefdeae23c602a56558.zip
Fixed bugs + added "update" test suite
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/Database.cpp3
-rw-r--r--cpp/src/IceGrid/DescriptorHelper.cpp17
-rw-r--r--cpp/src/IceGrid/IceGridRegistry.cpp6
-rw-r--r--cpp/src/IceGrid/NodeI.cpp6
-rw-r--r--cpp/src/IceGrid/ObserverSessionI.cpp96
-rw-r--r--cpp/src/IceGrid/ObserverSessionI.h3
-rw-r--r--cpp/src/IceGrid/Topics.cpp162
-rw-r--r--cpp/src/IceGrid/Topics.h15
8 files changed, 241 insertions, 67 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index b91f4ea8206..39f4a200bde 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -743,9 +743,6 @@ Database::getServerDescriptor(const std::string& name)
{
ApplicationDescriptorPtr app = getApplicationDescriptor(getServerApplication(name));
- //
- // TODO: Is it really safe to read the application descriptor outside the lock!?
- //
for(ServerInstanceDescriptorSeq::const_iterator p = app->servers.begin(); p != app->servers.end(); ++p)
{
if(p->descriptor->name == name)
diff --git a/cpp/src/IceGrid/DescriptorHelper.cpp b/cpp/src/IceGrid/DescriptorHelper.cpp
index 6366a584fa9..14525b4fcd2 100644
--- a/cpp/src/IceGrid/DescriptorHelper.cpp
+++ b/cpp/src/IceGrid/DescriptorHelper.cpp
@@ -669,9 +669,6 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update)
newApp->variables.erase(*p);
}
- //
- // TODO: Check if the new templates are valid?
- //
newApp->serverTemplates = newUpdate.serverTemplates;
newApp->serverTemplates.insert(oldApp->serverTemplates.begin(), oldApp->serverTemplates.end());
for(p = newUpdate.removeServerTemplates.begin(); p != newUpdate.removeServerTemplates.end(); ++p)
@@ -679,9 +676,6 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update)
newApp->serverTemplates.erase(*p);
}
- //
- // TODO: Check if the new templates are valid?
- //
newApp->serviceTemplates = newUpdate.serviceTemplates;
newApp->serviceTemplates.insert(oldApp->serviceTemplates.begin(), oldApp->serviceTemplates.end());
for(p = newUpdate.removeServiceTemplates.begin(); p != newUpdate.removeServiceTemplates.end(); ++p)
@@ -689,9 +683,6 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update)
newApp->serviceTemplates.erase(*p);
}
- //
- // Update the node descriptors.
- //
newApp->nodes = newUpdate.nodes;
for(NodeDescriptorSeq::const_iterator q = oldApp->nodes.begin(); q != oldApp->nodes.end(); ++q)
{
@@ -721,7 +712,7 @@ ApplicationDescriptorHelper::update(const ApplicationUpdateDescriptor& update)
for_each(newApp->servers.begin(), newApp->servers.end(), AddServerName(updated));
for(ServerInstanceDescriptorSeq::const_iterator q = oldApp->servers.begin(); q != oldApp->servers.end(); ++q)
{
- ServerInstanceDescriptor inst = instantiate(*q); // Re-instantiate old server.
+ ServerInstanceDescriptor inst = instantiate(*q); // Re-instantiate old servers.
if(updated.find(inst.descriptor->name) == updated.end() && remove.find(inst.descriptor->name) == remove.end())
{
if(q->node != inst.node ||
@@ -1123,10 +1114,8 @@ ServerDescriptorHelper::operator==(const ServerDescriptorHelper& helper) const
return false;
}
- if(set<string>(_descriptor->interpreterOptions.begin(),
- _descriptor->interpreterOptions.end()) !=
- set<string>(helper._descriptor->interpreterOptions.begin(),
- helper._descriptor->interpreterOptions.end()))
+ if(set<string>(_descriptor->interpreterOptions.begin(), _descriptor->interpreterOptions.end()) !=
+ set<string>(helper._descriptor->interpreterOptions.begin(), helper._descriptor->interpreterOptions.end()))
{
return false;
}
diff --git a/cpp/src/IceGrid/IceGridRegistry.cpp b/cpp/src/IceGrid/IceGridRegistry.cpp
index c3f4d36f1d0..1e0434daf0e 100644
--- a/cpp/src/IceGrid/IceGridRegistry.cpp
+++ b/cpp/src/IceGrid/IceGridRegistry.cpp
@@ -84,6 +84,12 @@ RegistryService::start(int argc, char* argv[])
return false;
}
+ PropertiesPtr properties = communicator()->getProperties();
+ if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 5) <= 5)
+ {
+ properties->setProperty("Ice.ThreadPool.Server.Size", "5");
+ }
+
_registry = new RegistryI(communicator());
if(!_registry->start(nowarn))
{
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index fefcc49ce9c..b5de9dcbbce 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -464,7 +464,11 @@ NodeI::initObserver(const Ice::StringSeq& servers)
try
{
- _observer->init(_name, serverInfos, adapterInfos);
+ NodeDynamicInfo info;
+ info.name = _name;
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ _observer->initNode(info);
}
catch(const Ice::LocalException&)
{
diff --git a/cpp/src/IceGrid/ObserverSessionI.cpp b/cpp/src/IceGrid/ObserverSessionI.cpp
index ae4a7c8e977..f2c4e8760a2 100644
--- a/cpp/src/IceGrid/ObserverSessionI.cpp
+++ b/cpp/src/IceGrid/ObserverSessionI.cpp
@@ -73,17 +73,49 @@ ObserverSessionI::setObserversByIdentity(const Ice::Identity& registryObserver,
}
void
-ObserverSessionI::startUpdate(int serial, const Ice::Current&)
+ObserverSessionI::startUpdate(int serial, const Ice::Current& current)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
_database->lock(serial, this, _userId);
_updating = true;
}
void
-ObserverSessionI::updateApplication(const ApplicationUpdateDescriptor& update, const Ice::Current&)
+ObserverSessionI::addApplication(const ApplicationDescriptorPtr& app, const Ice::Current& current)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
+ if(!_updating)
+ {
+ throw AccessDenied();
+ }
+ _database->addApplicationDescriptor(this, app);
+}
+
+void
+ObserverSessionI::updateApplication(const ApplicationUpdateDescriptor& update, const Ice::Current& current)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
if(!_updating)
{
throw AccessDenied();
@@ -92,20 +124,76 @@ ObserverSessionI::updateApplication(const ApplicationUpdateDescriptor& update, c
}
void
-ObserverSessionI::finishUpdate(const Ice::Current&)
+ObserverSessionI::syncApplication(const ApplicationDescriptorPtr& app, const Ice::Current& current)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
+ if(!_updating)
+ {
+ throw AccessDenied();
+ }
+ _database->syncApplicationDescriptor(this, app);
+}
+
+void
+ObserverSessionI::removeApplication(const string& name, const Ice::Current& current)
{
Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
if(!_updating)
{
throw AccessDenied();
}
+ _database->removeApplicationDescriptor(this, name);
+}
+void
+ObserverSessionI::finishUpdate(const Ice::Current& current)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
+ if(!_updating)
+ {
+ throw AccessDenied();
+ }
_database->unlock(this);
+ _updating = false;
}
void
-ObserverSessionI::destroy(const Ice::Current&)
+ObserverSessionI::destroy(const Ice::Current& current)
{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+ if(_updating)
+ {
+ _database->unlock(this);
+ _updating = false;
+ }
+
//
// Unsubscribe from the topics.
//
diff --git a/cpp/src/IceGrid/ObserverSessionI.h b/cpp/src/IceGrid/ObserverSessionI.h
index b8ec32c55bc..7eeab821895 100644
--- a/cpp/src/IceGrid/ObserverSessionI.h
+++ b/cpp/src/IceGrid/ObserverSessionI.h
@@ -32,7 +32,10 @@ public:
virtual void setObserversByIdentity(const Ice::Identity&, const Ice::Identity&, const Ice::Current&);
virtual void startUpdate(int, const Ice::Current&);
+ virtual void addApplication(const ApplicationDescriptorPtr&, const Ice::Current&);
+ virtual void syncApplication(const ApplicationDescriptorPtr&, const Ice::Current&);
virtual void updateApplication(const ApplicationUpdateDescriptor&, const Ice::Current&);
+ virtual void removeApplication(const std::string&, const Ice::Current&);
virtual void finishUpdate(const Ice::Current&);
virtual void destroy(const Ice::Current&);
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index b92a14de1b4..41427244449 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -14,24 +14,86 @@
using namespace std;
using namespace IceGrid;
+class RegistryInitCB : public AMI_RegistryObserver_init
+{
+public:
+
+ RegistryInitCB(const RegistryObserverTopicPtr& topic, const RegistryObserverPrx& observer, int serial) :
+ _topic(topic),
+ _observer(observer),
+ _serial(serial)
+ {
+ }
+
+ void
+ ice_response()
+ {
+ _topic->subscribe(_observer, _serial);
+ }
+
+ void
+ ice_exception(const Ice::Exception&)
+ {
+ // Ignore
+ }
+
+private:
+
+ const RegistryObserverTopicPtr _topic;
+ const RegistryObserverPrx _observer;
+ const int _serial;
+};
+
+class NodeInitCB : public AMI_NodeObserver_init
+{
+public:
+
+ NodeInitCB(const NodeObserverTopicPtr& topic, const NodeObserverPrx& observer, int serial) :
+ _topic(topic),
+ _observer(observer),
+ _serial(serial)
+ {
+ }
+
+ void
+ ice_response()
+ {
+ _topic->subscribe(_observer, _serial);
+ }
+
+ void
+ ice_exception(const Ice::Exception&)
+ {
+ // Ignore
+ }
+
+private:
+
+ const NodeObserverTopicPtr _topic;
+ const NodeObserverPrx _observer;
+ const int _serial;
+};
+
+
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) :
- _topic(topic), _publisher(publisher)
+ _topic(topic), _publisher(publisher), _serial(0)
{
}
void
-NodeObserverTopic::init(const string& node,
- const ServerDynamicInfoSeq& servers,
- const AdapterDynamicInfoSeq& adapters,
- const Ice::Current& current)
+NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&)
+{
+ assert(false);
+}
+
+void
+NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current)
{
Lock sync(*this);
- _nodes.insert(node);
- _servers[node] = servers;
- _adapters[node] = adapters;
+ _nodes.insert(make_pair(info.name, info));
- _publisher->init(node, servers, adapters);
+ _publisher->initNode(info);
}
void
@@ -40,7 +102,9 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
Lock sync(*this);
assert(_nodes.find(node) != _nodes.end());
- ServerDynamicInfoSeq& servers = _servers[node];
+ ++_serial;
+
+ ServerDynamicInfoSeq& servers = _nodes[node].servers;
ServerDynamicInfoSeq::iterator p = servers.begin();
while(p != servers.end())
{
@@ -65,7 +129,9 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
Lock sync(*this);
assert(_nodes.find(node) != _nodes.end());
- AdapterDynamicInfoSeq& adapters = _adapters[node];
+ ++_serial;
+
+ AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
AdapterDynamicInfoSeq::iterator p = adapters.begin();
while(p != adapters.end())
{
@@ -85,24 +151,34 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
}
void
-NodeObserverTopic::subscribe(const NodeObserverPrx& observer)
+NodeObserverTopic::subscribe(const NodeObserverPrx& observer, int serial)
{
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
-
- Lock sync(*this);
- _topic->subscribe(qos, observer);
- for(set<string>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ while(true)
{
- try
+ if(serial == -1)
{
- observer->init(*p, _servers[*p], _adapters[*p]);
+ Lock sync(*this);
+ NodeDynamicInfoSeq nodes;
+ nodes.reserve(_nodes.size());
+ for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
+ {
+ nodes.push_back(p->second);
+ }
+ observer->init_async(new NodeInitCB(this, observer, _serial), nodes);
+ return;
}
- catch(const Ice::LocalException& ex)
+
+ Lock sync(*this);
+ if(serial != _serial)
{
- cerr << ex << endl;
- break;
+ serial = -1;
+ continue;
}
+
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ _topic->subscribe(qos, observer);
+ break;
}
}
@@ -117,11 +193,8 @@ NodeObserverTopic::removeNode(const string& name)
{
Lock sync(*this);
_nodes.erase(name);
- _servers.erase(name);
- _adapters.erase(name);
}
-
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic,
const RegistryObserverPrx& publisher,
NodeObserverTopic& nodeObserver) :
@@ -246,21 +319,34 @@ RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&)
}
void
-RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer)
+RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
{
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
+ while(true)
+ {
+ if(serial == -1)
+ {
+ Lock sync(*this);
+ assert(_serial != -1);
+ observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes);
+ return;
+ }
- Lock sync(*this);
- _topic->subscribe(qos, observer);
+ //
+ // If the registry cache changed since we've send the init()
+ // call we need to do it again. Otherwise, we can subscribe to
+ // the IceStorm topic.
+ //
+ Lock sync(*this);
+ if(serial != _serial)
+ {
+ serial = -1;
+ continue;
+ }
- try
- {
- observer->init(_serial, _applications, _nodes);
- }
- catch(const Ice::LocalException& ex)
- {
- cerr << ex << endl;
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ _topic->subscribe(qos, observer);
+ break;
}
}
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index e62cd9d2a1e..f1a57d08292 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -25,12 +25,12 @@ public:
NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&);
- virtual void init(const std::string&, const ServerDynamicInfoSeq&, const AdapterDynamicInfoSeq&,
- const Ice::Current&);
+ virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&);
+ virtual void initNode(const NodeDynamicInfo&, const Ice::Current&);
virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&);
virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&);
- void subscribe(const NodeObserverPrx&);
+ void subscribe(const NodeObserverPrx&, int serial = -1);
void unsubscribe(const NodeObserverPrx&);
void removeNode(const std::string&);
@@ -40,10 +40,10 @@ private:
const IceStorm::TopicPrx _topic;
const NodeObserverPrx _publisher;
- std::set<std::string> _nodes;
- std::map<std::string, ServerDynamicInfoSeq> _servers;
- std::map<std::string, AdapterDynamicInfoSeq> _adapters;
+ int _serial;
+ std::map<std::string, NodeDynamicInfo> _nodes;
};
+typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr;
class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<IceUtil::Mutex>
{
@@ -60,7 +60,7 @@ public:
virtual void nodeUp(const std::string&, const Ice::Current&);
virtual void nodeDown(const std::string&, const Ice::Current&);
- void subscribe(const RegistryObserverPrx&);
+ void subscribe(const RegistryObserverPrx&, int = -1);
void unsubscribe(const RegistryObserverPrx&);
private:
@@ -75,6 +75,7 @@ private:
ApplicationDescriptorSeq _applications;
Ice::StringSeq _nodes;
};
+typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr;
};