summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r--cpp/src/IceGrid/NodeI.cpp493
1 files changed, 168 insertions, 325 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index d098d71e83f..09c45027550 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -16,121 +16,37 @@
using namespace std;
using namespace IceGrid;
-namespace
+NodeI::Update::Update(UpdateFunction updateFunction, const shared_ptr<NodeI>& node,
+ const shared_ptr<NodeObserverPrx>& observer) : _func(move(updateFunction)),
+ _node(node), _observer(observer)
{
+}
-class NodeUp : public NodeI::Update
+bool
+NodeI::Update::send()
{
-public:
-
- NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) :
- NodeI::Update(node, observer), _info(info)
+ auto self = shared_from_this();
+ try
{
- }
+ _func([self] { self->_node->dequeueUpdate(self->_observer, self, false); },
+ [self](exception_ptr) { self->_node->dequeueUpdate(self->_observer, self, true); });
- virtual bool
- send()
- {
- try
- {
- _observer->begin_nodeUp(_info, newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed));
- }
- catch(const Ice::LocalException&)
- {
- return false;
- }
return true;
}
-
-private:
-
- NodeDynamicInfo _info;
-};
-
-class UpdateServer : public NodeI::Update
-{
-public:
-
- UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) :
- NodeI::Update(node, observer), _info(info)
- {
- }
-
- virtual bool
- send()
+ catch(const std::exception&)
{
- try
- {
- _observer->begin_updateServer(_node->getName(Ice::emptyCurrent),
- _info,
- newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed));
- }
- catch(const Ice::LocalException&)
- {
- return false;
- }
- return true;
- }
-
-private:
-
- ServerDynamicInfo _info;
-};
-
-class UpdateAdapter : public NodeI::Update
-{
-public:
-
- UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) :
- NodeI::Update(node, observer), _info(info)
- {
- }
-
- virtual bool
- send()
- {
- try
- {
- _observer->begin_updateAdapter(_node->getName(Ice::emptyCurrent),
- _info,
- newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed));
- }
- catch(const Ice::LocalException&)
- {
- return false;
- }
- return true;
+ return false;
}
-
-private:
-
- AdapterDynamicInfo _info;
-};
-
-}
-
-NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer)
-{
}
-NodeI::Update::~Update()
-{
-}
-
-void
-NodeI::Update::finished(bool success)
-{
- _node->dequeueUpdate(_observer, this, !success);
-}
-
-NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
+NodeI::NodeI(const shared_ptr<Ice::ObjectAdapter>& adapter,
NodeSessionManager& sessions,
- const ActivatorPtr& activator,
+ const shared_ptr<Activator>& activator,
const IceUtil::TimerPtr& timer,
- const TraceLevelsPtr& traceLevels,
- const NodePrx& proxy,
+ const shared_ptr<TraceLevels>& traceLevels,
+ const shared_ptr<NodePrx>& proxy,
const string& name,
- const UserAccountMapperPrx& mapper,
+ const shared_ptr<UserAccountMapperPrx>& mapper,
const string& instanceName) :
_communicator(adapter->getCommunicator()),
_adapter(adapter),
@@ -146,16 +62,16 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
_instanceName(instanceName),
_userAccountMapper(mapper),
_platform("IceGrid.Node", _communicator, _traceLevels),
- _fileCache(new FileCache(_communicator)),
+ _fileCache(make_shared<FileCache>(_communicator)),
_serial(1),
_consistencyCheckDone(false)
{
- Ice::PropertiesPtr props = _communicator->getProperties();
+ auto props = _communicator->getProperties();
const_cast<string&>(_dataDir) = _platform.getDataDir();
const_cast<string&>(_serversDir) = _dataDir + "/servers";
const_cast<string&>(_tmpDir) = _dataDir + "/tmp";
- const_cast<Ice::Int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60);
+ const_cast<int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60);
const_cast<string&>(_outputDir) = props->getProperty("IceGrid.Node.Output");
const_cast<bool&>(_redirectErrToOut) = props->getPropertyAsInt("IceGrid.Node.RedirectErrToOut") > 0;
const_cast<bool&>(_allowEndpointsOverride) = props->getPropertyAsInt("IceGrid.Node.AllowEndpointsOverride") > 0;
@@ -174,156 +90,75 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
}
}
- Ice::PropertiesPtr p = Ice::createProperties();
+ auto p = Ice::createProperties();
p->parseCommandLineOptions("", overrides);
- Ice::PropertyDict propDict = p->getPropertiesForPrefix("");
- for(Ice::PropertyDict::const_iterator q = propDict.begin(); q != propDict.end(); ++q)
+ auto propDict = p->getPropertiesForPrefix("");
+ for(const auto& prop : propDict)
{
- _propertiesOverride.push_back(createProperty(q->first, q->second));
+ _propertiesOverride.push_back({ prop.first, prop.second });
}
}
}
void
-NodeI::Update::completed(const Ice::AsyncResultPtr& result)
-{
- try
- {
- result->throwLocalException();
- finished(true);
- }
- catch(const Ice::LocalException&)
- {
- finished(false);
- }
-}
-
-NodeI::~NodeI()
+NodeI::loadServerAsync(shared_ptr<InternalServerDescriptor> descriptor, string replicaName,
+ function<void(const shared_ptr<ServerPrx>&, const AdapterPrxDict&, int, int)> response,
+ function<void(exception_ptr)> exception, const Ice::Current& current)
{
+ loadServer(move(descriptor), move(replicaName), false, move(response), move(exception), current);
}
void
-NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
- const InternalServerDescriptorPtr& descriptor,
- const string& replicaName,
- const Ice::Current& current)
+NodeI::loadServerWithoutRestartAsync(shared_ptr<InternalServerDescriptor> descriptor,
+ string replicaName,
+ function<void(const shared_ptr<ServerPrx>&,
+ const AdapterPrxDict&, int, int)> response,
+ function<void(exception_ptr)> exception,
+ const Ice::Current& current)
{
- loadServer(amdCB, descriptor, replicaName, false, current);
-}
-
-void
-NodeI::loadServerWithoutRestart_async(const AMD_Node_loadServerWithoutRestartPtr& amdCB,
- const InternalServerDescriptorPtr& descriptor,
- const string& replicaName,
- const Ice::Current& current)
-{
- class LoadServerCB : public AMD_Node_loadServer
- {
- public:
-
- LoadServerCB(const AMD_Node_loadServerWithoutRestartPtr& cb) : _cb(cb)
- {
- }
-
- virtual void
- ice_response(const ServerPrx& server, const AdapterPrxDict& adapters, Ice::Int actTimeout, Ice::Int deacTimeout)
- {
- _cb->ice_response(server, adapters, actTimeout, deacTimeout);
- };
-
- virtual void
- ice_exception(const ::std::exception& ex)
- {
- _cb->ice_exception(ex);
- }
-
- virtual void
- ice_exception()
- {
- _cb->ice_exception();
- }
-
- private:
-
- const AMD_Node_loadServerWithoutRestartPtr _cb;
- };
- loadServer(new LoadServerCB(amdCB), descriptor, replicaName, true, current);
+ loadServer(move(descriptor), move(replicaName), true, move(response), move(exception), current);
}
void
-NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB,
- const string& serverId,
- const string& uuid,
- int revision,
- const string& replicaName,
- const Ice::Current& current)
+NodeI::destroyServerAsync(string serverId, string uuid, int revision, string replicaName,
+ function<void()> response, function<void(exception_ptr)> exception,
+ const Ice::Current& current)
{
- destroyServer(amdCB, serverId, uuid, revision, replicaName, false, current);
+ destroyServer(move(serverId), move(uuid), move(revision), move(replicaName), false,
+ move(response), move(exception), current);
}
void
-NodeI::destroyServerWithoutRestart_async(const AMD_Node_destroyServerWithoutRestartPtr& amdCB,
- const string& serverId,
- const string& uuid,
- int revision,
- const string& replicaName,
- const Ice::Current& current)
+NodeI::destroyServerWithoutRestartAsync(string serverId, string uuid, int revision, string replicaName,
+ function<void()> response, function<void(exception_ptr)> exception,
+ const Ice::Current& current)
{
- class DestroyServerCB : public AMD_Node_destroyServer
- {
- public:
-
- DestroyServerCB(const AMD_Node_destroyServerWithoutRestartPtr& cb) : _cb(cb)
- {
- }
-
- virtual void
- ice_response()
- {
- _cb->ice_response();
- };
-
- virtual void
- ice_exception(const ::std::exception& ex)
- {
- _cb->ice_exception(ex);
- }
-
- virtual void
- ice_exception()
- {
- _cb->ice_exception();
- }
-
- private:
-
- const AMD_Node_destroyServerWithoutRestartPtr _cb;
- };
- destroyServer(new DestroyServerCB(amdCB), serverId, uuid, revision, replicaName, true, current);
+ destroyServer(move(serverId), move(uuid), move(revision), move(replicaName), true,
+ move(response), move(exception), current);
}
void
-NodeI::registerWithReplica(const InternalRegistryPrx& replica, const Ice::Current&)
+NodeI::registerWithReplica(shared_ptr<InternalRegistryPrx> replica, const Ice::Current&)
{
- _sessions.create(replica);
+ _sessions.create(move(replica));
}
void
-NodeI::replicaInit(const InternalRegistryPrxSeq& replicas, const Ice::Current&)
+NodeI::replicaInit(InternalRegistryPrxSeq replicas, const Ice::Current&)
{
- _sessions.replicaInit(replicas);
+ _sessions.replicaInit(move(replicas));
}
void
-NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&)
+NodeI::replicaAdded(shared_ptr<InternalRegistryPrx> replica, const Ice::Current&)
{
- _sessions.replicaAdded(replica);
+ _sessions.replicaAdded(move(replica));
}
void
-NodeI::replicaRemoved(const InternalRegistryPrx& replica, const Ice::Current&)
+NodeI::replicaRemoved(shared_ptr<InternalRegistryPrx> replica, const Ice::Current&)
{
- _sessions.replicaRemoved(replica);
+ _sessions.replicaRemoved(move(replica));
}
std::string
@@ -356,14 +191,14 @@ NodeI::shutdown(const Ice::Current&) const
_activator->shutdown();
}
-Ice::Long
-NodeI::getOffsetFromEnd(const string& filename, int count, const Ice::Current&) const
+long long
+NodeI::getOffsetFromEnd(string filename, int count, const Ice::Current&) const
{
return _fileCache->getOffsetFromEnd(getFilePath(filename), count);
}
bool
-NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, Ice::StringSeq& lines,
+NodeI::read(string filename, long long pos, int size, long long& newPos, Ice::StringSeq& lines,
const Ice::Current&) const
{
return _fileCache->read(getFilePath(filename), pos, size, newPos, lines);
@@ -372,31 +207,30 @@ NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos,
void
NodeI::shutdown()
{
- IceUtil::Mutex::Lock sync(_serversLock);
- for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin();
- p != _serversByApplication.end(); ++p)
+ lock_guard lock(_serversMutex);
+ for(const auto& servers : _serversByApplication)
{
- for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q)
+ for(const auto& server : servers.second)
{
- (*q)->shutdown();
+ server->shutdown();
}
}
_serversByApplication.clear();
}
-Ice::CommunicatorPtr
+shared_ptr<Ice::Communicator>
NodeI::getCommunicator() const
{
return _communicator;
}
-Ice::ObjectAdapterPtr
+shared_ptr<Ice::ObjectAdapter>
NodeI::getAdapter() const
{
return _adapter;
}
-ActivatorPtr
+shared_ptr<Activator>
NodeI::getActivator() const
{
return _activator;
@@ -408,31 +242,31 @@ NodeI::getTimer() const
return _timer;
}
-TraceLevelsPtr
+shared_ptr<TraceLevels>
NodeI::getTraceLevels() const
{
return _traceLevels;
}
-UserAccountMapperPrx
+shared_ptr<UserAccountMapperPrx>
NodeI::getUserAccountMapper() const
{
return _userAccountMapper;
}
PlatformInfo&
-NodeI::getPlatformInfo() const
+NodeI::getPlatformInfo()
{
return _platform;
}
-FileCachePtr
+shared_ptr<FileCache>
NodeI::getFileCache() const
{
return _fileCache;
}
-NodePrx
+shared_ptr<NodePrx>
NodeI::getProxy() const
{
return _proxy;
@@ -468,14 +302,14 @@ NodeI::allowEndpointsOverride() const
return _allowEndpointsOverride;
}
-NodeSessionPrx
-NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
+shared_ptr<NodeSessionPrx>
+NodeI::registerWithRegistry(const shared_ptr<InternalRegistryPrx>& registry)
{
return registry->registerNode(_platform.getInternalNodeInfo(), _proxy, _platform.getLoadInfo());
}
void
-NodeI::checkConsistency(const NodeSessionPrx& session)
+NodeI::checkConsistency(const shared_ptr<NodeSessionPrx>& session)
{
//
// Only do the consistency check on the startup. This ensures that servers can't
@@ -499,11 +333,11 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
//
unsigned long serial = 0;
Ice::StringSeq servers;
- vector<ServerCommandPtr> commands;
+ vector<shared_ptr<ServerCommand>> commands;
while(true)
{
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(serial == _serial)
{
_serial = 1; // We can reset the serial number.
@@ -524,54 +358,56 @@ NodeI::checkConsistency(const NodeSessionPrx& session)
sort(servers.begin(), servers.end());
}
- for_each(commands.begin(), commands.end(), IceUtil::voidMemFun(&ServerCommand::execute));
+ for(const auto& command : commands)
+ {
+ command->execute();
+ }
}
void
-NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observer)
+NodeI::addObserver(const shared_ptr<NodeSessionPrx>& session, const shared_ptr<NodeObserverPrx>& observer)
{
- IceUtil::Mutex::Lock sync(_observerMutex);
+ lock_guard observerLock(_observerMutex);
assert(_observers.find(session) == _observers.end());
- _observers.insert(make_pair(session, observer));
+ _observers.insert({ session, observer });
_observerUpdates.erase(observer); // Remove any updates from the previous session.
ServerDynamicInfoSeq serverInfos;
AdapterDynamicInfoSeq adapterInfos;
- for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin();
- p != _serversDynamicInfo.end(); ++p)
+ for(const auto& info : _serversDynamicInfo)
{
- assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled));
- serverInfos.push_back(p->second);
+ assert(info.second.state != ServerState::Destroyed &&
+ (info.second.state != ServerState::Inactive || !info.second.enabled));
+ serverInfos.push_back(info.second);
}
- for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin();
- q != _adaptersDynamicInfo.end(); ++q)
+ for(const auto& info : _adaptersDynamicInfo)
{
- assert(q->second.proxy);
- adapterInfos.push_back(q->second);
+ assert(info.second.proxy);
+ adapterInfos.push_back(info.second);
}
- NodeDynamicInfo info;
- info.info = _platform.getNodeInfo();
- info.servers = serverInfos;
- info.adapters = adapterInfos;
- queueUpdate(observer, new NodeUp(this, observer, info));
+ NodeDynamicInfo info = { _platform.getNodeInfo(), move(serverInfos), move(adapterInfos) };
+ queueUpdate(observer, [observer, info = move(info)] (auto&& response, auto&& exception)
+ {
+ observer->nodeUpAsync(info, move(response), move(exception));
+ });
}
void
-NodeI::removeObserver(const NodeSessionPrx& session)
+NodeI::removeObserver(const shared_ptr<NodeSessionPrx>& session)
{
- IceUtil::Mutex::Lock sync(_observerMutex);
+ lock_guard observerLock(_observerMutex);
_observers.erase(session);
}
void
NodeI::observerUpdateServer(const ServerDynamicInfo& info)
{
- IceUtil::Mutex::Lock sync(_observerMutex);
+ lock_guard observerLock(_observerMutex);
- if(info.state == Destroyed || (info.state == Inactive && info.enabled))
+ if(info.state == ServerState::Destroyed || (info.state == ServerState::Inactive && info.enabled))
{
_serversDynamicInfo.erase(info.id);
}
@@ -586,13 +422,20 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info)
// registered twice if a replica is removed and added right away
// after).
//
- set<NodeObserverPrx> sent;
- for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ set<shared_ptr<NodeObserverPrx>> sent;
+ for(const auto& observer : _observers)
{
- if(sent.find(p->second) == sent.end())
+ if(sent.find(observer.second) == sent.end())
{
- queueUpdate(p->second, new UpdateServer(this, p->second, info));
- sent.insert(p->second);
+
+ queueUpdate(observer.second,
+ [observer = observer.second, info, name = getName(Ice::emptyCurrent)](auto&& response,
+ auto&& exception)
+ {
+ observer->updateServerAsync(name, info, move(response), move(exception));
+ });
+
+ sent.insert(observer.second);
}
}
}
@@ -600,7 +443,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info)
void
NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
{
- IceUtil::Mutex::Lock sync(_observerMutex);
+ lock_guard observerLock(_observerMutex);
if(info.proxy)
{
@@ -617,22 +460,28 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info)
// registered twice if a replica is removed and added right away
// after).
//
- set<NodeObserverPrx> sent;
- for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p)
+ set<shared_ptr<NodeObserverPrx>> sent;
+ for(const auto& observer : _observers)
{
- if(sent.find(p->second) == sent.end())
+ if(sent.find(observer.second) == sent.end())
{
- queueUpdate(p->second, new UpdateAdapter(this, p->second, info));
- sent.insert(p->second);
+ queueUpdate(observer.second,
+ [observer = observer.second, info, name = getName(Ice::emptyCurrent)] (auto&& response,
+ auto&& exception)
+ {
+ observer->updateAdapterAsync(name, info, move(response), move(exception));
+ });
+ sent.insert(observer.second);
}
}
}
void
-NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
+NodeI::queueUpdate(const shared_ptr<NodeObserverPrx>& proxy, Update::UpdateFunction updateFunction)
{
- //Lock sync(*this); Called within the synchronization
- map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ // Must be called with mutex locked
+ auto update = make_shared<Update>(move(updateFunction), shared_from_this(), proxy);
+ auto p = _observerUpdates.find(proxy);
if(p == _observerUpdates.end())
{
if(update->send())
@@ -647,10 +496,10 @@ NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update)
}
void
-NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all)
+NodeI::dequeueUpdate(const shared_ptr<NodeObserverPrx>& proxy, const shared_ptr<Update>& update, bool all)
{
- IceUtil::Mutex::Lock sync(_observerMutex);
- map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy);
+ lock_guard observerLock(_observerMutex);
+ auto p = _observerUpdates.find(proxy);
if(p == _observerUpdates.end() || p->second.front().get() != update.get())
{
return;
@@ -670,23 +519,22 @@ NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool
}
void
-NodeI::addServer(const ServerIPtr& server, const string& application)
+NodeI::addServer(const shared_ptr<ServerI>& server, const string& application)
{
- IceUtil::Mutex::Lock sync(_serversLock);
- map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ lock_guard serversLock(_serversMutex);
+ auto p = _serversByApplication.find(application);
if(p == _serversByApplication.end())
{
- map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>());
- p = _serversByApplication.insert(p, v);
+ p = _serversByApplication.insert(p, { application, {} });
}
p->second.insert(server);
}
void
-NodeI::removeServer(const ServerIPtr& server, const std::string& application)
+NodeI::removeServer(const shared_ptr<ServerI>& server, const std::string& application)
{
- IceUtil::Mutex::Lock sync(_serversLock);
- map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application);
+ lock_guard serversLock(_serversMutex);
+ auto p = _serversByApplication.find(application);
if(p != _serversByApplication.end())
{
p->second.erase(server);
@@ -700,10 +548,7 @@ NodeI::removeServer(const ServerIPtr& server, const std::string& application)
Ice::Identity
NodeI::createServerIdentity(const string& name) const
{
- Ice::Identity id;
- id.category = _instanceName + "-Server";
- id.name = name;
- return id;
+ return { name, _instanceName + "-Server" };
}
string
@@ -712,10 +557,10 @@ NodeI::getServerAdminCategory() const
return _instanceName + "-NodeServerAdminRouter";
}
-vector<ServerCommandPtr>
+vector<shared_ptr<ServerCommand>>
NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
{
- vector<ServerCommandPtr> commands;
+ vector<shared_ptr<ServerCommand>> commands;
//
// Check if the servers directory doesn't contain more servers
@@ -741,10 +586,10 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
//
try
{
- vector<string>::iterator p = remove.begin();
+ auto p = remove.begin();
while(p != remove.end())
{
- ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(*p)));
+ auto server = dynamic_pointer_cast<ServerI>(_adapter->find(createServerIdentity(*p)));
if(server)
{
//
@@ -752,7 +597,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
//
try
{
- ServerCommandPtr command = server->destroy(0, "", 0, "Master", false);
+ auto command = server->destroy("", 0, "Master", false, nullptr);
if(command)
{
commands.push_back(command);
@@ -812,7 +657,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers)
return commands;
}
-NodeSessionPrx
+shared_ptr<NodeSessionPrx>
NodeI::getMasterNodeSession() const
{
return _sessions.getMasterNodeSession();
@@ -900,12 +745,12 @@ NodeI::canRemoveServerDirectory(const string& name)
return true;
}
-set<ServerIPtr>
+set<shared_ptr<ServerI>>
NodeI::getApplicationServers(const string& application) const
{
- IceUtil::Mutex::Lock sync(_serversLock);
- set<ServerIPtr> servers;
- map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application);
+ lock_guard lock(_serversMutex);
+ set<shared_ptr<ServerI>> servers;
+ auto p = _serversByApplication.find(application);
if(p != _serversByApplication.end())
{
servers = p->second;
@@ -941,18 +786,17 @@ NodeI::getFilePath(const string& filename) const
}
void
-NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB,
- const InternalServerDescriptorPtr& descriptor,
- const string& replicaName,
- bool noRestart,
+NodeI::loadServer(shared_ptr<InternalServerDescriptor> descriptor, string replicaName, bool noRestart,
+ function<void(const shared_ptr<ServerPrx> &, const AdapterPrxDict &, int, int)>&& response,
+ function<void(exception_ptr)>&& exception,
const Ice::Current& current)
{
- ServerCommandPtr command;
+ shared_ptr<ServerCommand> command;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
++_serial;
- Ice::Identity id = createServerIdentity(descriptor->id);
+ auto id = createServerIdentity(descriptor->id);
//
// Check if we already have a servant for this server. If that's
@@ -962,14 +806,14 @@ NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB,
while(true)
{
bool added = false;
- ServerIPtr server;
+ shared_ptr<ServerI> server;
try
{
- server = ServerIPtr::dynamicCast(_adapter->find(id));
+ server = dynamic_pointer_cast<ServerI>(_adapter->find(id));
if(!server)
{
- ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id));
- server = new ServerI(this, proxy, _serversDir, descriptor->id, _waitTime);
+ auto proxy = Ice::uncheckedCast<ServerPrx>(_adapter->createProxy(id));
+ server = make_shared<ServerI>(shared_from_this(), proxy, _serversDir, descriptor->id, _waitTime);
_adapter->add(server, id);
added = true;
}
@@ -989,7 +833,8 @@ NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB,
try
{
- command = server->load(amdCB, descriptor, replicaName, noRestart);
+ // Don't std::move response/exception as we may need to loop and call again load.
+ command = server->load(descriptor, replicaName, noRestart, response, exception);
}
catch(const Ice::ObjectNotExistException&)
{
@@ -1021,30 +866,27 @@ NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB,
}
void
-NodeI::destroyServer(const AMD_Node_destroyServerPtr& amdCB,
- const string& serverId,
- const string& uuid,
- int revision,
- const string& replicaName,
- bool noRestart,
+NodeI::destroyServer(string serverId, string uuid, int revision, string replicaName, bool noRestart,
+ function<void()> response,
+ function<void(exception_ptr)>,
const Ice::Current& current)
{
- ServerCommandPtr command;
+ shared_ptr<ServerCommand> command;
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
++_serial;
- ServerIPtr server;
+ shared_ptr<ServerI> server;
try
{
- server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId)));
+ server = dynamic_pointer_cast<ServerI>(_adapter->find(createServerIdentity(serverId)));
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
//
// We throw an object not exist exception to avoid
// dispatch warnings. The registry will consider the node
- // has being unreachable upon receival of this exception
+ // has being unreachable upon receipt of this exception
// (like any other Ice::LocalException). We could also
// have disabled dispatch warnings but they can still
// useful to catch other issues.
@@ -1054,7 +896,7 @@ NodeI::destroyServer(const AMD_Node_destroyServerPtr& amdCB,
if(!server)
{
- server = new ServerI(this, 0, _serversDir, serverId, _waitTime);
+ server = make_shared<ServerI>(shared_from_this(), nullptr, _serversDir, serverId, _waitTime);
}
//
@@ -1062,11 +904,12 @@ NodeI::destroyServer(const AMD_Node_destroyServerPtr& amdCB,
//
try
{
- command = server->destroy(amdCB, uuid, revision, replicaName, noRestart);
+ // Don't std::move response as we may need to call it if there is an exception
+ command = server->destroy(uuid, revision, replicaName, noRestart, response);
}
catch(const Ice::ObjectNotExistException&)
{
- amdCB->ice_response();
+ response();
return;
}
}