summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/Database.cpp34
-rw-r--r--cpp/src/IceGrid/Database.h3
-rw-r--r--cpp/src/IceGrid/IceGridObserver.cpp32
-rw-r--r--cpp/src/IceGrid/NodeI.cpp2
-rw-r--r--cpp/src/IceGrid/NodeSessionI.cpp23
-rw-r--r--cpp/src/IceGrid/NodeSessionI.h3
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp4
-rw-r--r--cpp/src/IceGrid/Topics.cpp58
-rw-r--r--cpp/src/IceGrid/Topics.h9
9 files changed, 57 insertions, 111 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 39f4a200bde..75b9a7eab14 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -221,31 +221,26 @@ Database::~Database()
}
void
-Database::setRegistryObserver(const RegistryObserverPrx& observer)
+Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeObserverPrx& nodeObserver)
{
int serial;
ApplicationDescriptorSeq applications;
- Ice::StringSeq nodes;
{
Lock sync(*this);
- _registryObserver = observer;
+ _registryObserver = registryObserver;
+ _nodeObserver = nodeObserver;
serial = _serial;
for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
{
applications.push_back(p->second);
}
-
- for(map<string, NodeSessionIPtr>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
- {
- nodes.push_back(p->first);
- }
}
//
// Notify the observers.
//
- _registryObserver->init(serial, applications, nodes);
+ _registryObserver->init(serial, applications);
}
void
@@ -715,15 +710,26 @@ Database::getNode(const string& name) const
void
Database::removeNode(const string& name)
{
- Lock sync(*this);
- if(_nodes.erase(name) > 0)
{
- if(_traceLevels->node > 0)
+ Lock sync(*this);
+ if(_nodes.erase(name) > 0)
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "removed node `" << name << "'";
+ if(_traceLevels->node > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
+ out << "removed node `" << name << "'";
+ }
}
}
+
+ try
+ {
+ _nodeObserver->nodeDown(name);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // TODO: Log a warning?
+ }
}
Ice::StringSeq
diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h
index c7c5cde3959..3e42d34b0e7 100644
--- a/cpp/src/IceGrid/Database.h
+++ b/cpp/src/IceGrid/Database.h
@@ -73,7 +73,7 @@ public:
Database(const Ice::ObjectAdapterPtr&, const std::string&, int, const TraceLevelsPtr&);
virtual ~Database();
- void setRegistryObserver(const RegistryObserverPrx&);
+ void setObservers(const RegistryObserverPrx&, const NodeObserverPrx&);
void lock(int serial, ObserverSessionI*, const std::string&);
void unlock(ObserverSessionI*);
@@ -144,6 +144,7 @@ private:
const int _nodeSessionTimeout;
const TraceLevelsPtr _traceLevels;
RegistryObserverPrx _registryObserver;
+ NodeObserverPrx _nodeObserver;
std::map<std::string, ServerEntryPtr> _servers;
std::map<std::string, ServerEntryPtr> _serversByAdapterId;
diff --git a/cpp/src/IceGrid/IceGridObserver.cpp b/cpp/src/IceGrid/IceGridObserver.cpp
index d0679587a2e..2cbb659c054 100644
--- a/cpp/src/IceGrid/IceGridObserver.cpp
+++ b/cpp/src/IceGrid/IceGridObserver.cpp
@@ -76,14 +76,8 @@ class RegistryObserverI : public RegistryObserver
public:
virtual void
- init(int, const ApplicationDescriptorSeq&, const Ice::StringSeq& nodes, const Ice::Current&)
+ init(int, const ApplicationDescriptorSeq&, const Ice::Current&)
{
- cout << "active nodes: ";
- for(Ice::StringSeq::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
- {
- cout << *p << " ";
- }
- cout << endl;
}
virtual void
@@ -109,18 +103,6 @@ public:
{
cout << "application `" << app->name << "' synced (serial = " << serial << ")" << endl;
}
-
- virtual void
- nodeUp(const string& name, const Ice::Current& current)
- {
- cout << "node `" << name << "' is up" << endl;
- }
-
- virtual void
- nodeDown(const string& name, const Ice::Current& current)
- {
- cout << "node `" << name << "' is down" << endl;
- }
};
class NodeObserverI : public NodeObserver
@@ -132,12 +114,12 @@ public:
{
for(NodeDynamicInfoSeq::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
{
- initNode(*p, c);
+ nodeUp(*p, c);
}
}
- virtual void
- initNode(const NodeDynamicInfo& node, const Ice::Current&)
+ virtual void
+ nodeUp(const NodeDynamicInfo& node, const Ice::Current& current)
{
cout << "node `" << node.name << "' servers: ";
for(ServerDynamicInfoSeq::const_iterator p = node.servers.begin(); p != node.servers.end(); ++p)
@@ -153,6 +135,12 @@ public:
}
virtual void
+ nodeDown(const string& name, const Ice::Current& current)
+ {
+ cout << "node `" << name << "' is down" << endl;
+ }
+
+ virtual void
updateServer(const string& node, const ServerDynamicInfo& info, const Ice::Current&)
{
cout << "node `" << node << "' server `" << info.name << "' updated: pid = " << info.pid << " state = `";
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index b5de9dcbbce..575c974202a 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -468,7 +468,7 @@ NodeI::initObserver(const Ice::StringSeq& servers)
info.name = _name;
info.servers = serverInfos;
info.adapters = adapterInfos;
- _observer->initNode(info);
+ _observer->nodeUp(info);
}
catch(const Ice::LocalException&)
{
diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp
index 838158e8719..98eaa052aee 100644
--- a/cpp/src/IceGrid/NodeSessionI.cpp
+++ b/cpp/src/IceGrid/NodeSessionI.cpp
@@ -14,10 +14,8 @@
using namespace std;
using namespace IceGrid;
-NodeSessionI::NodeSessionI(const DatabasePtr& database, const RegistryObserverPrx& observer, const string& name,
- const NodePrx& node) :
+NodeSessionI::NodeSessionI(const DatabasePtr& database, const string& name, const NodePrx& node) :
_database(database),
- _observer(observer),
_name(name),
_node(node),
_startTime(IceUtil::Time::now()),
@@ -28,16 +26,6 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database, const RegistryObserverPr
try
{
_database->addNode(name, this);
-
- try
- {
- _observer->nodeUp(_name);
- }
- catch(const Ice::LocalException& ex)
- {
- // TODO: Log a warning?
- cerr << ex << endl;
- }
}
catch(...)
{
@@ -80,15 +68,6 @@ NodeSessionI::destroy(const Ice::Current& current)
try
{
- _observer->nodeDown(_name);
- }
- catch(const Ice::LocalException&)
- {
- // TODO: Log a warning?
- }
-
- try
- {
current.adapter->remove(current.id);
}
catch(const Ice::ObjectAdapterDeactivatedException&)
diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h
index b2d8e1c7662..84c06954ddf 100644
--- a/cpp/src/IceGrid/NodeSessionI.h
+++ b/cpp/src/IceGrid/NodeSessionI.h
@@ -23,7 +23,7 @@ class NodeSessionI : public NodeSession, public SessionI, public IceUtil::Mutex
{
public:
- NodeSessionI(const DatabasePtr&, const RegistryObserverPrx&, const std::string&, const NodePrx&);
+ NodeSessionI(const DatabasePtr&, const std::string&, const NodePrx&);
virtual void keepAlive(const Ice::Current&);
virtual Ice::StringSeq getServers(const Ice::Current&);
@@ -35,7 +35,6 @@ public:
private:
const DatabasePtr _database;
- const RegistryObserverPrx _observer;
const std::string _name;
const NodePrx _node;
const IceUtil::Time _startTime;
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index e4f4f0d1891..eab152a6957 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -355,7 +355,7 @@ RegistryI::start(bool nowarn)
obj = registryAdapter->addWithUUID(regTopic)->ice_collocationOptimization(false);
obj = obj->ice_locator(_communicator->getDefaultLocator());
_registryObserver = RegistryObserverPrx::uncheckedCast(obj);
- _database->setRegistryObserver(_registryObserver);
+ _database->setObservers(_registryObserver, _nodeObserver);
//
// Create the session manager.
@@ -403,7 +403,7 @@ NodeSessionPrx
RegistryI::registerNode(const std::string& name, const NodePrx& node, const Ice::Current& c)
{
NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout));
- NodeSessionIPtr session = new NodeSessionI(_database, _registryObserver, name, n);
+ NodeSessionIPtr session = new NodeSessionI(_database, name, n);
NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session));
_reaper->add(proxy, session);
return proxy;
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 2ee3b28361b..1d4372cb370 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -87,16 +87,28 @@ NodeObserverTopic::init(const NodeDynamicInfoSeq&, const Ice::Current&)
}
void
-NodeObserverTopic::initNode(const NodeDynamicInfo& info, const Ice::Current& current)
+NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& current)
{
Lock sync(*this);
_nodes.insert(make_pair(info.name, info));
- _publisher->initNode(info);
+ _publisher->nodeUp(info);
}
void
+NodeObserverTopic::nodeDown(const string& name, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ assert(_nodes.find(name) != _nodes.end());
+ _nodes.erase(name);
+
+ _publisher->nodeDown(name);
+}
+
+
+void
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
{
Lock sync(*this);
@@ -202,13 +214,6 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer)
_topic->unsubscribe(observer);
}
-void
-NodeObserverTopic::removeNode(const string& name)
-{
- Lock sync(*this);
- _nodes.erase(name);
-}
-
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic,
const RegistryObserverPrx& publisher,
NodeObserverTopic& nodeObserver) :
@@ -217,18 +222,14 @@ RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic,
}
void
-RegistryObserverTopic::init(int serial,
- const ApplicationDescriptorSeq& apps,
- const Ice::StringSeq& nodes,
- const Ice::Current&)
+RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&)
{
Lock sync(*this);
_serial = serial;
_applications = apps;
- _nodes = nodes;
- _publisher->init(serial, apps, nodes);
+ _publisher->init(serial, apps);
}
void
@@ -308,31 +309,6 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes
}
void
-RegistryObserverTopic::nodeUp(const string& name, const Ice::Current&)
-{
- Lock sync(*this);
-
- assert(find(_nodes.begin(), _nodes.end(), name) == _nodes.end());
- _nodes.push_back(name);
-
- _publisher->nodeUp(name);
-}
-
-void
-RegistryObserverTopic::nodeDown(const string& name, const Ice::Current&)
-{
- Lock sync(*this);
-
- Ice::StringSeq::iterator p = find(_nodes.begin(), _nodes.end(), name);
- assert(p != _nodes.end());
- _nodes.erase(p);
-
- _nodeObserver.removeNode(name);
-
- _publisher->nodeDown(name);
-}
-
-void
RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
{
while(true)
@@ -341,7 +317,7 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial
{
Lock sync(*this);
assert(_serial != -1);
- observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications, _nodes);
+ observer->init_async(new RegistryInitCB(this, observer, _serial), _serial, _applications);
return;
}
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index f1a57d08292..fb18519e579 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -26,7 +26,8 @@ public:
NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&);
virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&);
- virtual void initNode(const NodeDynamicInfo&, const Ice::Current&);
+ virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&);
+ virtual void nodeDown(const std::string&, const Ice::Current&);
virtual void updateServer(const std::string&, const ServerDynamicInfo&, const Ice::Current&);
virtual void updateAdapter(const std::string&, const AdapterDynamicInfo&, const Ice::Current&);
@@ -50,16 +51,13 @@ class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<I
public:
RegistryObserverTopic(const IceStorm::TopicPrx&, const RegistryObserverPrx&, NodeObserverTopic&);
- virtual void init(int, const ApplicationDescriptorSeq&, const Ice::StringSeq&, const Ice::Current&);
+ virtual void init(int, const ApplicationDescriptorSeq&, const Ice::Current&);
virtual void applicationAdded(int, const ApplicationDescriptorPtr&, const Ice::Current&);
virtual void applicationRemoved(int, const std::string&, const Ice::Current&);
virtual void applicationSynced(int, const ApplicationDescriptorPtr&, const Ice::Current&);
virtual void applicationUpdated(int, const ApplicationUpdateDescriptor&, const Ice::Current&);
- virtual void nodeUp(const std::string&, const Ice::Current&);
- virtual void nodeDown(const std::string&, const Ice::Current&);
-
void subscribe(const RegistryObserverPrx&, int = -1);
void unsubscribe(const RegistryObserverPrx&);
@@ -73,7 +71,6 @@ private:
int _serial;
ApplicationDescriptorSeq _applications;
- Ice::StringSeq _nodes;
};
typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr;