summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-09-26 14:04:54 +0200
committerBenoit Foucher <benoit@zeroc.com>2008-09-26 14:04:54 +0200
commita3c9dfeead519e87ba197e5e66b1013f39fa4366 (patch)
treea1bb69cdea16ca854173a2e3241b242aacf1b14e /cpp/src
parentFixed locator potential hang when resolving round-robin replica group which c... (diff)
downloadice-a3c9dfeead519e87ba197e5e66b1013f39fa4366.tar.bz2
ice-a3c9dfeead519e87ba197e5e66b1013f39fa4366.tar.xz
ice-a3c9dfeead519e87ba197e5e66b1013f39fa4366.zip
IceGrid fixes to ensure the registry/node don't wait too long if a replica becomes unreachable
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/Allocatable.cpp9
-rw-r--r--cpp/src/IceGrid/NodeCache.cpp5
-rw-r--r--cpp/src/IceGrid/NodeCache.h1
-rw-r--r--cpp/src/IceGrid/NodeI.cpp212
-rw-r--r--cpp/src/IceGrid/NodeI.h24
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp2
-rw-r--r--cpp/src/IceGrid/SessionManager.h14
7 files changed, 231 insertions, 36 deletions
diff --git a/cpp/src/IceGrid/Allocatable.cpp b/cpp/src/IceGrid/Allocatable.cpp
index e078564d5a5..666d7c84cb7 100644
--- a/cpp/src/IceGrid/Allocatable.cpp
+++ b/cpp/src/IceGrid/Allocatable.cpp
@@ -38,7 +38,14 @@ AllocationRequest::pending()
if(_timeout > 0)
{
- _session->getTimer()->schedule(this, IceUtil::Time::milliSeconds(_timeout));
+ try
+ {
+ _session->getTimer()->schedule(this, IceUtil::Time::milliSeconds(_timeout));
+ }
+ catch(const IceUtil::Exception&)
+ {
+ // Ignore, timer is destroyed because of shutdown
+ }
}
_state = Pending;
return true;
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp
index 812cb1977df..b9b8b478243 100644
--- a/cpp/src/IceGrid/NodeCache.cpp
+++ b/cpp/src/IceGrid/NodeCache.cpp
@@ -781,7 +781,10 @@ NodeEntry::checkSession() const
while(_registering)
{
- wait();
+ if(!timedWait(IceUtil::Time::seconds(5)))
+ {
+ break; // Consider the node down if it doesn't respond promptly.
+ }
}
if(!_session)
diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h
index f47ede772e3..2fb6fa9ee13 100644
--- a/cpp/src/IceGrid/NodeCache.h
+++ b/cpp/src/IceGrid/NodeCache.h
@@ -45,7 +45,6 @@ public:
void addServer(const ServerEntryPtr&);
void removeServer(const ServerEntryPtr&);
void setSession(const NodeSessionIPtr&);
- void setSavedProxy(const NodePrx&);
NodePrx getProxy() const;
InternalNodeInfoPtr getInfo() const;
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 4328a4aef85..b721c553271 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -188,6 +188,137 @@ private:
string _dest;
};
+class NodeUp : public NodeI::Update, public AMI_NodeObserver_nodeUp
+{
+public:
+
+ NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) :
+ Update(node, observer), _info(info)
+ {
+ }
+
+ virtual void
+ send()
+ {
+ try
+ {
+ _observer->nodeUp_async(this, _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ finished(false);
+ }
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ NodeDynamicInfo _info;
+};
+
+class UpdateServer : public NodeI::Update, public AMI_NodeObserver_updateServer
+{
+public:
+
+ UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual void
+ send()
+ {
+ try
+ {
+ _observer->updateServer_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ finished(false);
+ }
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ ServerDynamicInfo _info;
+};
+
+class UpdateAdapter : public NodeI::Update, public AMI_NodeObserver_updateAdapter
+{
+public:
+
+ UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) :
+ NodeI::Update(node, observer), _info(info)
+ {
+ }
+
+ virtual void
+ send()
+ {
+ try
+ {
+ _observer->updateAdapter_async(this, _node->getName(), _info);
+ }
+ catch(const Ice::LocalException&)
+ {
+ finished(false);
+ }
+ }
+
+ virtual void
+ ice_response()
+ {
+ finished(true);
+ }
+
+ virtual void
+ ice_exception(const Ice::Exception&)
+ {
+ finished(false);
+ }
+
+private:
+
+ AdapterDynamicInfo _info;
+};
+
+}
+
+NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer)
+{
+}
+
+NodeI::Update::~Update()
+{
+}
+
+void
+NodeI::Update::finished(bool success)
+{
+ _node->dequeueUpdate(_observer, this, !success);
}
NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
@@ -847,6 +978,8 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe
assert(_observers.find(session) == _observers.end());
_observers.insert(make_pair(session, observer));
+ _observerUpdates.erase(observer); // Remove any updates from the previous session.
+
ServerDynamicInfoSeq serverInfos;
AdapterDynamicInfoSeq adapterInfos;
for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
@@ -863,19 +996,11 @@ NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observe
adapterInfos.push_back(q->second);
}
- try
- {
- NodeDynamicInfo info;
- info.info = _platform.getNodeInfo();
- info.servers = serverInfos;
- info.adapters = adapterInfos;
- observer->nodeUp(info);
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_traceLevels->logger);
- out << "unexpected observer exception:\n" << ex;
- }
+ NodeDynamicInfo info;
+ info.info = _platform.getNodeInfo();
+ info.servers = serverInfos;
+ info.adapters = adapterInfos;
+ queueUpdate(observer, new NodeUp(this, observer, info));
}
void
@@ -910,15 +1035,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info)
{
if(sent.find(p->second) == sent.end())
{
- try
- {
- p->second->updateServer(_name, info);
- sent.insert(p->second);
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
+ queueUpdate(p->second, new UpdateServer(this, p->second, info));
}
}
}
@@ -948,18 +1065,53 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
{
if(sent.find(p->second) == sent.end())
{
- try
- {
- p->second->updateAdapter(_name, info);
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
+ queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
}
}
}
+void
+NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
+{
+ //Lock sync(*this); Called within the synchronization
+ deque<UpdatePtr>& queue = _observerUpdates[proxy];
+ queue.push_back(update);
+ if(queue.size() == 1)
+ {
+ queue.front()->send();
+ }
+}
+
+void
+NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all)
+{
+ IceUtil::Mutex::Lock sync(_observerMutex);
+ map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ if(p == _observerUpdates.end() || p->second.front().get() != update.get())
+ {
+ return;
+ }
+
+ deque<UpdatePtr>& queue = p->second;
+ if(all)
+ {
+ queue.clear();
+ }
+ else
+ {
+ queue.pop_front();
+ }
+
+ if(!queue.empty())
+ {
+ queue.front()->send();
+ }
+ else
+ {
+ _observerUpdates.erase(p);
+ }
+}
+
void
NodeI::addServer(const ServerIPtr& server, const string& application)
{
diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h
index 36ac0019c0e..6ecb9df70a8 100644
--- a/cpp/src/IceGrid/NodeI.h
+++ b/cpp/src/IceGrid/NodeI.h
@@ -35,9 +35,29 @@ typedef IceUtil::Handle<ServerCommand> ServerCommandPtr;
class NodeSessionManager;
+class NodeI;
+typedef IceUtil::Handle<NodeI> NodeIPtr;
+
class NodeI : public Node, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
+ class Update : virtual public IceUtil::Shared
+ {
+ public:
+
+ Update(const NodeIPtr&, const NodeObserverPrx&);
+ virtual ~Update();
+
+ virtual void send() = 0;
+
+ void finished(bool);
+
+ protected:
+
+ const NodeIPtr _node;
+ const NodeObserverPrx _observer;
+ };
+ typedef IceUtil::Handle<Update> UpdatePtr;
NodeI(const Ice::ObjectAdapterPtr&, NodeSessionManager&, const ActivatorPtr&, const IceUtil::TimerPtr&,
const TraceLevelsPtr&, const NodePrx&, const std::string&, const UserAccountMapperPrx&);
@@ -97,6 +117,8 @@ public:
void removeObserver(const NodeSessionPrx&);
void observerUpdateServer(const ServerDynamicInfo&);
void observerUpdateAdapter(const AdapterDynamicInfo&);
+ void queueUpdate(const NodeObserverPrx&, const UpdatePtr&);
+ void dequeueUpdate(const NodeObserverPrx&, const UpdatePtr&, bool);
void addServer(const ServerIPtr&, const std::string&);
void removeServer(const ServerIPtr&, const std::string&);
@@ -143,6 +165,8 @@ private:
std::map<std::string, ServerDynamicInfo> _serversDynamicInfo;
std::map<std::string, AdapterDynamicInfo> _adaptersDynamicInfo;
+ std::map<NodeObserverPrx, std::deque<UpdatePtr> > _observerUpdates;
+
IceUtil::Mutex _serversLock;
std::map<std::string, std::set<ServerIPtr> > _serversByApplication;
std::set<std::string> _patchInProgress;
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 06ae7828d3f..066b144b0a9 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -581,7 +581,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
//
for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = sessions.begin(); p != sessions.end(); ++p)
{
- (*p)->tryCreateSession(true);
+ (*p)->tryCreateSession(true, IceUtil::Time::seconds(5));
}
}
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index 7760241e865..c598eadc2cd 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -189,7 +189,7 @@ public:
}
virtual void
- tryCreateSession(bool waitForTry = true)
+ tryCreateSession(bool waitForTry = true, const IceUtil::Time& timeout = IceUtil::Time())
{
{
Lock sync(*this);
@@ -215,7 +215,17 @@ public:
// Wait until the action is executed and the state changes.
while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress)
{
- wait();
+ if(timeout == IceUtil::Time())
+ {
+ wait();
+ }
+ else
+ {
+ if(!timedWait(timeout))
+ {
+ break;
+ }
+ }
}
}
}