summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/Database.cpp16
-rw-r--r--cpp/src/IceGrid/NodeCache.cpp34
-rw-r--r--cpp/src/IceGrid/NodeCache.h5
-rw-r--r--cpp/src/IceGrid/NodeSessionI.cpp36
-rw-r--r--cpp/src/IceGrid/ServerCache.cpp125
-rw-r--r--cpp/src/IceGrid/ServerCache.h11
-rw-r--r--cpp/src/IceGrid/ServerI.cpp25
7 files changed, 144 insertions, 108 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index b7f26779a04..09e77531310 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -301,7 +301,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
{
try
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
}
catch(const DeploymentException& ex)
{
@@ -516,14 +516,8 @@ Database::removeApplication(const string& name, AdminSessionI* session)
if(_master)
{
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load));
- }
- catch(const DeploymentException&)
- {
- // Ignore, this is traced by the node cache.
- }
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitNoThrow));
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
@@ -1468,7 +1462,7 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
//
try
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
}
catch(const DeploymentException& ex)
{
@@ -1485,7 +1479,7 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
try
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::load));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
}
catch(const DeploymentException& ex)
{
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp
index dfd740c97e5..08fb3348763 100644
--- a/cpp/src/IceGrid/NodeCache.cpp
+++ b/cpp/src/IceGrid/NodeCache.cpp
@@ -350,16 +350,16 @@ NodeEntry::getInfo() const
return _session->getInfo();
}
-Ice::StringSeq
+ServerEntrySeq
NodeEntry::getServers() const
{
Lock sync(*this);
- Ice::StringSeq names;
+ ServerEntrySeq entries;
for(map<string, ServerEntryPtr>::const_iterator p = _servers.begin(); p != _servers.end(); ++p)
{
- names.push_back(p->second->getId());
+ entries.push_back(p->second);
}
- return names;
+ return entries;
}
LoadInfo
@@ -439,7 +439,7 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con
//
if(timeout > 0 && timeout != sessionTimeout)
{
- node = NodePrx::uncheckedCast(node->ice_timeout(sessionTimeout));
+ node = NodePrx::uncheckedCast(node->ice_timeout(timeout + sessionTimeout));
}
try
@@ -486,17 +486,37 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con
}
void
-NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info)
+NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, int timeout)
{
try
{
+ NodePrx node;
+ {
+ Lock sync(*this);
+ checkSession();
+ node = _session->getNode();
+ int sessionTimeout = _session->getTimeout();
+
+ //
+ // Check if we should use a specific timeout (the load
+ // call can deactivate the server and it can take some
+ // time to deactivate, up to "deactivation-timeout"
+ // seconds).
+ //
+ if(timeout > 0 && timeout != sessionTimeout)
+ {
+ node = NodePrx::uncheckedCast(node->ice_timeout(timeout + sessionTimeout));
+ }
+ }
+
if(_cache.getTraceLevels() && _cache.getTraceLevels()->server > 2)
{
Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->serverCat);
out << "unloading `" << info.descriptor->id << "' on node `" << _name << "'";
}
+
AMI_Node_destroyServerPtr amiCB = new DestroyCB(_cache.getTraceLevels(), entry, _name);
- getProxy()->destroyServer_async(amiCB, info.descriptor->id, info.uuid, info.revision);
+ node->destroyServer_async(amiCB, info.descriptor->id, info.uuid, info.revision);
}
catch(const NodeUnreachableException& ex)
{
diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h
index c61c1363ea3..8493921cb7e 100644
--- a/cpp/src/IceGrid/NodeCache.h
+++ b/cpp/src/IceGrid/NodeCache.h
@@ -28,6 +28,7 @@ typedef IceUtil::Handle<NodeSessionI> NodeSessionIPtr;
class ServerEntry;
typedef IceUtil::Handle<ServerEntry> ServerEntryPtr;
+typedef std::vector<ServerEntryPtr> ServerEntrySeq;
class ReplicaCache;
@@ -48,13 +49,13 @@ public:
NodePrx getProxy() const;
NodeInfo getInfo() const;
- Ice::StringSeq getServers() const;
+ ServerEntrySeq getServers() const;
LoadInfo getLoadInfoAndLoadFactor(const std::string&, float&) const;
bool canRemove();
void loadServer(const ServerEntryPtr&, const ServerInfo&, const SessionIPtr&, int);
- void destroyServer(const ServerEntryPtr&, const ServerInfo&);
+ void destroyServer(const ServerEntryPtr&, const ServerInfo&, int);
ServerInfo getServerInfo(const ServerInfo&, const SessionIPtr&);
void __incRef();
diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp
index ee87d6ac16c..a86c04600df 100644
--- a/cpp/src/IceGrid/NodeSessionI.cpp
+++ b/cpp/src/IceGrid/NodeSessionI.cpp
@@ -112,24 +112,20 @@ NodeSessionI::loadServers_async(const AMD_NodeSession_loadServersPtr& amdCB, con
//
// Get the server proxies to load them on the node.
//
- Ice::StringSeq servers = _database->getNode(_info.name)->getServers();
- for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
- {
- try
- {
- _database->getServer(*p)->load();
- }
- catch(const Ice::UserException&)
- {
- // Ignore.
- }
- }
+ ServerEntrySeq servers = _database->getNode(_info.name)->getServers();
+ for_each(servers.begin(), servers.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
Ice::StringSeq
NodeSessionI::getServers(const Ice::Current& current) const
{
- return _database->getNode(_info.name)->getServers();
+ ServerEntrySeq servers = _database->getNode(_info.name)->getServers();
+ Ice::StringSeq names;
+ for(ServerEntrySeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
+ {
+ names.push_back((*p)->getId());
+ }
+ return names;
}
void
@@ -208,18 +204,8 @@ NodeSessionI::destroyImpl(bool shutdown)
_destroy = true;
}
- Ice::StringSeq servers = _database->getNode(_info.name)->getServers();
- for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
- {
- try
- {
- _database->getServer(*p)->unload();
- }
- catch(const Ice::UserException&)
- {
- // Ignore.
- }
- }
+ ServerEntrySeq servers = _database->getNode(_info.name)->getServers();
+ for_each(servers.begin(), servers.end(), IceUtil::voidMemFun(&ServerEntry::unsync));
//
// If the registry isn't being shutdown we remove the node
diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp
index ba6a18f73a2..f8ddf356417 100644
--- a/cpp/src/IceGrid/ServerCache.cpp
+++ b/cpp/src/IceGrid/ServerCache.cpp
@@ -211,20 +211,42 @@ ServerEntry::ServerEntry(ServerCache& cache, const string& id) :
}
void
-ServerEntry::load()
+ServerEntry::sync()
{
+ syncImpl();
+}
+
+void
+ServerEntry::syncAndWait()
+{
+ syncImpl();
try
{
- syncImpl(true);
+ waitImpl();
}
catch(const NodeUnreachableException&)
{
- // Ignore
+ //
+ // The node being unreachable isn't considered as a failure to
+ // synchronize the server.
+ //
+ }
+}
+
+void
+ServerEntry::waitNoThrow()
+{
+ try
+ {
+ waitImpl();
+ }
+ catch(const Ice::Exception&)
+ {
}
}
void
-ServerEntry::unload()
+ServerEntry::unsync()
{
Lock sync(*this);
if(_loaded.get())
@@ -356,7 +378,12 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string&
while(true)
{
- syncImpl(true);
+ //
+ // Note that we don't call syncAndWait() because we want
+ // NodeUnreachableException exceptions to go through.
+ //
+ syncImpl();
+ waitImpl();
{
Lock sync(*this);
@@ -402,7 +429,12 @@ ServerEntry::getAdapter(const string& id, bool upToDate)
while(true)
{
- syncImpl(true);
+ //
+ // Note that we don't call syncAndWait() because we want
+ // NodeUnreachableException exceptions to go through.
+ //
+ syncImpl();
+ waitImpl();
{
Lock sync(*this);
@@ -470,7 +502,7 @@ ServerEntry::getLoad(LoadSample sample) const
}
void
-ServerEntry::syncImpl(bool waitForUpdate)
+ServerEntry::syncImpl()
{
ServerInfo load;
SessionIPtr session;
@@ -481,17 +513,7 @@ ServerEntry::syncImpl(bool waitForUpdate)
Lock sync(*this);
if(_synchronizing)
{
- if(waitForUpdate)
- {
- while(_synchronizing)
- {
- wait();
- }
- }
- else
- {
- return;
- }
+ return;
}
if(!_load.get() && !_destroy.get())
@@ -505,6 +527,7 @@ ServerEntry::syncImpl(bool waitForUpdate)
if(_destroy.get())
{
destroy = *_destroy;
+ timeout = _deactivationTimeout;
}
else if(_load.get())
{
@@ -524,7 +547,7 @@ ServerEntry::syncImpl(bool waitForUpdate)
{
try
{
- _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy);
+ _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout);
}
catch(NodeNotExistException&)
{
@@ -541,39 +564,47 @@ ServerEntry::syncImpl(bool waitForUpdate)
{
exception(NodeUnreachableException(load.node, "node is not active"));
}
+ }
+}
+
+void
+ServerEntry::waitImpl()
+{
+ Lock sync(*this);
+ while(_synchronizing)
+ {
+ wait();
}
-
- if(waitForUpdate)
+
+ if(_exception.get())
{
- Lock sync(*this);
- while(_synchronizing)
+ try
{
- wait();
+ _exception->ice_throw();
}
- if(_exception.get())
+ catch(const DeploymentException&)
{
- try
- {
- _exception->ice_throw();
- }
- catch(const DeploymentException&)
- {
- throw;
- }
- catch(const NodeUnreachableException&)
- {
- throw;
- }
- catch(const Ice::Exception& ex)
+ throw;
+ }
+ catch(const NodeUnreachableException&)
+ {
+ throw;
+ }
+ catch(const Ice::Exception& ex) // This shouln't happen.
+ {
+ ostringstream os;
+ os << "unexpected exception while synchronizing server `" + _id + "':\n" << ex;
+ TraceLevelsPtr traceLevels = _cache.getTraceLevels();
+ if(traceLevels)
{
- ostringstream os;
- os << "unexpected exception while synchronizing server `" + _id + "':\n" << ex;
- throw DeploymentException(os.str());
+ Ice::Error err(traceLevels->logger);
+ err << os.str();
}
+ throw DeploymentException(os.str());
}
}
}
-
+
void
ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, int at, int dt)
{
@@ -626,7 +657,7 @@ ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, i
{
try
{
- _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy);
+ _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout);
}
catch(NodeNotExistException&)
{
@@ -842,7 +873,8 @@ ServerEntry::allocatedNoSync(const SessionIPtr& session)
}
}
- syncImpl(true); // We sync here to ensure the "session" server will be activated.
+ sync();
+ waitNoThrow();
}
void
@@ -933,6 +965,7 @@ ServerEntry::releasedNoSync(const SessionIPtr& session)
return;
}
}
-
- syncImpl(true); // We sync here to ensure the "session" server will be shutdown.
+
+ sync();
+ waitNoThrow();
}
diff --git a/cpp/src/IceGrid/ServerCache.h b/cpp/src/IceGrid/ServerCache.h
index 892e89d140a..1e934430c30 100644
--- a/cpp/src/IceGrid/ServerCache.h
+++ b/cpp/src/IceGrid/ServerCache.h
@@ -36,8 +36,10 @@ public:
ServerEntry(ServerCache&, const std::string&);
- void load();
- void unload();
+ void sync();
+ void syncAndWait();
+ void waitNoThrow();
+ void unsync();
void update(const ServerInfo&);
void destroy();
@@ -64,8 +66,9 @@ public:
private:
- void syncImpl(bool);
-
+ void syncImpl();
+ void waitImpl();
+
ServerCache& _cache;
const std::string _id;
std::auto_ptr<ServerInfo> _loaded;
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp
index a3e5871237e..daeef08ca53 100644
--- a/cpp/src/IceGrid/ServerI.cpp
+++ b/cpp/src/IceGrid/ServerI.cpp
@@ -363,13 +363,7 @@ DestroyCommand::DestroyCommand(const ServerIPtr& server, bool kill) :
bool
DestroyCommand::canExecute(ServerI::InternalServerState state)
{
- return
- state == ServerI::Inactive ||
- state == ServerI::WaitForActivation ||
- state == ServerI::Active ||
- state == ServerI::ActivationTimeout ||
- state == ServerI::Deactivating ||
- state == ServerI::DeactivatingWaitForProcess;
+ return state == ServerI::Inactive;
}
ServerI::InternalServerState
@@ -1051,9 +1045,14 @@ ServerI::destroy(const AMD_Node_destroyServerPtr& amdCB, const string& uuid, int
}
}
+ if(!StopCommand::isStopped(_state) && !_stop)
+ {
+ _stop = new StopCommand(this, _node->getWaitQueue(), _deactivationTimeout);
+ }
if(!_destroy)
{
- _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching);
+ //_destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching);
+ _destroy = new DestroyCommand(this, false);
}
if(amdCB)
{
@@ -2097,13 +2096,13 @@ ServerCommandPtr
ServerI::nextCommand()
{
ServerCommandPtr command;
- if(_destroy && _destroy->canExecute(_state))
+ if(_stop && _stop->canExecute(_state))
{
- command = _destroy;
+ command = _stop;
}
- else if(_stop && _stop->canExecute(_state))
+ else if(_destroy && _destroy->canExecute(_state))
{
- command = _stop;
+ command = _destroy;
}
else if(_load && _load->canExecute(_state))
{
@@ -2159,7 +2158,7 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason)
assert(_state == Deactivating);
break;
case Destroying:
- assert(_destroy && _destroy->canExecute(_state));
+ assert(_state == Inactive && _destroy && _destroy->canExecute(_state));
break;
case Destroyed:
assert(_destroy);