summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp212
1 files changed, 182 insertions, 30 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 4328a4aef85..b721c553271 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -188,6 +188,137 @@ private:
string _dest;
};
+class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp
+{
+public:
+
+ NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) :
+ Update(node, observer), _info(info)
+ {
+ }
+
+ virtual void
+ send()
+ {
+ try
+ {
+ _observer->nodeUp_async(this, _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ finished(false);
+ }
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ NodeDynamicInfo _info;
+};
+
+class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer
+{
+public:
+
+ UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual void
+ send()
+ {
+ try
+ {
+ _observer->updateServer_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ finished(false);
+ }
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ ServerDynamicInfo _info;
+};
+
+class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter
+{
+public:
+
+ UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual void
+ send()
+ {
+ try
+ {
+ _observer->updateAdapter_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ finished(false);
+ }
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ AdapterDynamicInfo _info;
+};
+
+}
+
+NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer)
+{
+}
+
+NodeI::Update::~Update()
+{
+}
+
+void
+NodeI::Update::finished(bool success)
+{
+ _node->dequeueUpdate(_observer, this, !success);
}
NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
@@ -847,6 +978,8 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe
assert(_observers.find(session) == _observers.end());
_observers.insert(make_pair(session, observer));
+ _observerUpdates.erase(observer); // Remove any updates from the previous session.
+
ServerDynamicInfoSeq serverInfos;
AdapterDynamicInfoSeq adapterInfos;
for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
@@ -863,19 +996,11 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe
adapterInfos.push_back(q->second);
}
- try
- {
- NodeDynamicInfo info;
- info.info = _platform.getNodeInfo();
- info.servers = serverInfos;
- info.adapters = adapterInfos;
- observer->nodeUp(info);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_traceLevels->logger);
- out << "unexpected observer exception:\n" << ex;
- }
+ NodeDynamicInfo info;
+ info.info = _platform.getNodeInfo();
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ queueUpdate(observer, new NodeUp(this, observer, info));
}
void
@@ -910,15 +1035,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info)
{
if(sent.find(p->second) == sent.end())
{
- try
- {
- p->second->updateServer(_name, info);
- sent.insert(p->second);
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
+ queueUpdate(p->second, new UpdateServer(this, p->second, info));
}
}
}
@@ -948,18 +1065,53 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
{
if(sent.find(p->second) == sent.end())
{
- try
- {
- p->second->updateAdapter(_name, info);
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
+ queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
}
}
}
+void
+NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
+{
+ //Lock sync(*this); Called within the synchronization
+ deque<UpdatePtr>& queue = _observerUpdates[proxy];
+ queue.push_back(update);
+ if(queue.size() == 1)
+ {
+ queue.front()->send();
+ }
+}
+
+void
+NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ if(p == _observerUpdates.end() || p->second.front().get() != update.get())
+ {
+ return;
+ }
+
+ deque<UpdatePtr>& queue = p->second;
+ if(all)
+ {
+ queue.clear();
+ }
+ else
+ {
+ queue.pop_front();
+ }
+
+ if(!queue.empty())
+ {
+ queue.front()->send();
+ }
+ else
+ {
+ _observerUpdates.erase(p);
+ }
+}
+
void
NodeI::addServer(const ServerIPtr& server, const string& application)
{