summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2009-01-23 17:07:21 -0500
committerBernard Normier <bernard@zeroc.com>2009-01-23 17:07:21 -0500
commit2380e089401d048490da23bc6d71e5687bafbe47 (patch)
tree6d97052d1f93bc2bafcd7fd1a9ebe103544b6cad /cpp/src/IceGrid/NodeI.cpp
parentFixed permissions (diff)
parent3.3.1 third-party updates (diff)
downloadice-2380e089401d048490da23bc6d71e5687bafbe47.tar.bz2
ice-2380e089401d048490da23bc6d71e5687bafbe47.tar.xz
ice-2380e089401d048490da23bc6d71e5687bafbe47.zip
Merge branch 'R3_3_branch' of cvs:/home/git/ice into R3_3_branch
Conflicts: java/resources/IceGridAdmin/icegridadmin_content_dyn.html java/resources/IceGridAdmin/icegridadmin_content_static.html
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp226
1 files changed, 194 insertions, 32 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 4328a4aef85..140e2e73d81 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -1,6 +1,6 @@
// **********************************************************************
//
-// Copyright (c) 2003-2008 ZeroC, Inc. All rights reserved.
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
@@ -188,6 +188,140 @@ private:
string _dest;
};
+class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp
+{
+public:
+
+ NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual bool
+ send()
+ {
+ try
+ {
+ _observer->nodeUp_async(this, _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ NodeDynamicInfo _info;
+};
+
+class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer
+{
+public:
+
+ UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual bool
+ send()
+ {
+ try
+ {
+ _observer->updateServer_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ ServerDynamicInfo _info;
+};
+
+class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter
+{
+public:
+
+ UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual bool
+ send()
+ {
+ try
+ {
+ _observer->updateAdapter_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ AdapterDynamicInfo _info;
+};
+
+}
+
+NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer)
+{
+}
+
+NodeI::Update::~Update()
+{
+}
+
+void
+NodeI::Update::finished(bool success)
+{
+ _node->dequeueUpdate(_observer, this, !success);
}
NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
@@ -697,9 +831,17 @@ NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos,
}
void
-NodeI::destroy()
+NodeI::shutdown()
{
IceUtil::Mutex::Lock sync(_serversLock);
+ for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin();
+ p != _serversByApplication.end(); ++p)
+ {
+ for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ {
+ (*q)->shutdown();
+ }
+ }
_serversByApplication.clear();
}
@@ -847,6 +989,8 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe
assert(_observers.find(session) == _observers.end());
_observers.insert(make_pair(session, observer));
+ _observerUpdates.erase(observer); // Remove any updates from the previous session.
+
ServerDynamicInfoSeq serverInfos;
AdapterDynamicInfoSeq adapterInfos;
for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
@@ -863,19 +1007,11 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe
adapterInfos.push_back(q->second);
}
- try
- {
- NodeDynamicInfo info;
- info.info = _platform.getNodeInfo();
- info.servers = serverInfos;
- info.adapters = adapterInfos;
- observer->nodeUp(info);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_traceLevels->logger);
- out << "unexpected observer exception:\n" << ex;
- }
+ NodeDynamicInfo info;
+ info.info = _platform.getNodeInfo();
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ queueUpdate(observer, new NodeUp(this, observer, info));
}
void
@@ -910,15 +1046,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info)
{
if(sent.find(p->second) == sent.end())
{
- try
- {
- p->second->updateServer(_name, info);
- sent.insert(p->second);
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
+ queueUpdate(p->second, new UpdateServer(this, p->second, info));
}
}
}
@@ -948,16 +1076,50 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
{
if(sent.find(p->second) == sent.end())
{
- try
- {
- p->second->updateAdapter(_name, info);
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
+ queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
+ }
+ }
+}
+
+void
+NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
+{
+ //Lock sync(*this); Called within the synchronization
+ map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ if(p == _observerUpdates.end())
+ {
+ if(update->send())
+ {
+ _observerUpdates[proxy].push_back(update);
}
}
+ else
+ {
+ p->second.push_back(update);
+ }
+}
+
+void
+NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ if(p == _observerUpdates.end() || p->second.front().get() != update.get())
+ {
+ return;
+ }
+
+ p->second.pop_front();
+
+ if(all || (!p->second.empty() && !p->second.front()->send()))
+ {
+ p->second.clear();
+ }
+
+ if(p->second.empty())
+ {
+ _observerUpdates.erase(p);
+ }
}
void