diff options
Diffstat (limited to 'cpp/src/IceGrid/NodeI.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 493 |
1 files changed, 168 insertions, 325 deletions
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index d098d71e83f..09c45027550 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -16,121 +16,37 @@ using namespace std; using namespace IceGrid; -namespace +NodeI::Update::Update(UpdateFunction updateFunction, const shared_ptr<NodeI>& node, + const shared_ptr<NodeObserverPrx>& observer) : _func(move(updateFunction)), + _node(node), _observer(observer) { +} -class NodeUp : public NodeI::Update +bool +NodeI::Update::send() { -public: - - NodeUp(const NodeIPtr& node, const NodeObserverPrx& observer, NodeDynamicInfo info) : - NodeI::Update(node, observer), _info(info) + auto self = shared_from_this(); + try { - } + _func([self] { self->_node->dequeueUpdate(self->_observer, self, false); }, + [self](exception_ptr) { self->_node->dequeueUpdate(self->_observer, self, true); }); - virtual bool - send() - { - try - { - _observer->begin_nodeUp(_info, newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed)); - } - catch(const Ice::LocalException&) - { - return false; - } return true; } - -private: - - NodeDynamicInfo _info; -}; - -class UpdateServer : public NodeI::Update -{ -public: - - UpdateServer(const NodeIPtr& node, const NodeObserverPrx& observer, ServerDynamicInfo info) : - NodeI::Update(node, observer), _info(info) - { - } - - virtual bool - send() + catch(const std::exception&) { - try - { - _observer->begin_updateServer(_node->getName(Ice::emptyCurrent), - _info, - newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed)); - } - catch(const Ice::LocalException&) - { - return false; - } - return true; - } - -private: - - ServerDynamicInfo _info; -}; - -class UpdateAdapter : public NodeI::Update -{ -public: - - UpdateAdapter(const NodeIPtr& node, const NodeObserverPrx& observer, AdapterDynamicInfo info) : - NodeI::Update(node, observer), _info(info) - { - } - - virtual bool - send() - { - try - { - _observer->begin_updateAdapter(_node->getName(Ice::emptyCurrent), - _info, - newCallback(static_cast<NodeI::Update*>(this), &NodeI::Update::completed)); - } - catch(const Ice::LocalException&) - { - return false; - } - return true; + return false; } - -private: - - AdapterDynamicInfo _info; -}; - -} - -NodeI::Update::Update(const NodeIPtr& node, const NodeObserverPrx& observer) : _node(node), _observer(observer) -{ } -NodeI::Update::~Update() -{ -} - -void -NodeI::Update::finished(bool success) -{ - _node->dequeueUpdate(_observer, this, !success); -} - -NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, +NodeI::NodeI(const shared_ptr<Ice::ObjectAdapter>& adapter, NodeSessionManager& sessions, - const ActivatorPtr& activator, + const shared_ptr<Activator>& activator, const IceUtil::TimerPtr& timer, - const TraceLevelsPtr& traceLevels, - const NodePrx& proxy, + const shared_ptr<TraceLevels>& traceLevels, + const shared_ptr<NodePrx>& proxy, const string& name, - const UserAccountMapperPrx& mapper, + const shared_ptr<UserAccountMapperPrx>& mapper, const string& instanceName) : _communicator(adapter->getCommunicator()), _adapter(adapter), @@ -146,16 +62,16 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, _instanceName(instanceName), _userAccountMapper(mapper), _platform("IceGrid.Node", _communicator, _traceLevels), - _fileCache(new FileCache(_communicator)), + _fileCache(make_shared<FileCache>(_communicator)), _serial(1), _consistencyCheckDone(false) { - Ice::PropertiesPtr props = _communicator->getProperties(); + auto props = _communicator->getProperties(); const_cast<string&>(_dataDir) = _platform.getDataDir(); const_cast<string&>(_serversDir) = _dataDir + "/servers"; const_cast<string&>(_tmpDir) = _dataDir + "/tmp"; - const_cast<Ice::Int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60); + const_cast<int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60); const_cast<string&>(_outputDir) = props->getProperty("IceGrid.Node.Output"); const_cast<bool&>(_redirectErrToOut) = props->getPropertyAsInt("IceGrid.Node.RedirectErrToOut") > 0; const_cast<bool&>(_allowEndpointsOverride) = props->getPropertyAsInt("IceGrid.Node.AllowEndpointsOverride") > 0; @@ -174,156 +90,75 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, } } - Ice::PropertiesPtr p = Ice::createProperties(); + auto p = Ice::createProperties(); p->parseCommandLineOptions("", overrides); - Ice::PropertyDict propDict = p->getPropertiesForPrefix(""); - for(Ice::PropertyDict::const_iterator q = propDict.begin(); q != propDict.end(); ++q) + auto propDict = p->getPropertiesForPrefix(""); + for(const auto& prop : propDict) { - _propertiesOverride.push_back(createProperty(q->first, q->second)); + _propertiesOverride.push_back({ prop.first, prop.second }); } } } void -NodeI::Update::completed(const Ice::AsyncResultPtr& result) -{ - try - { - result->throwLocalException(); - finished(true); - } - catch(const Ice::LocalException&) - { - finished(false); - } -} - -NodeI::~NodeI() +NodeI::loadServerAsync(shared_ptr<InternalServerDescriptor> descriptor, string replicaName, + function<void(const shared_ptr<ServerPrx>&, const AdapterPrxDict&, int, int)> response, + function<void(exception_ptr)> exception, const Ice::Current& current) { + loadServer(move(descriptor), move(replicaName), false, move(response), move(exception), current); } void -NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB, - const InternalServerDescriptorPtr& descriptor, - const string& replicaName, - const Ice::Current& current) +NodeI::loadServerWithoutRestartAsync(shared_ptr<InternalServerDescriptor> descriptor, + string replicaName, + function<void(const shared_ptr<ServerPrx>&, + const AdapterPrxDict&, int, int)> response, + function<void(exception_ptr)> exception, + const Ice::Current& current) { - loadServer(amdCB, descriptor, replicaName, false, current); -} - -void -NodeI::loadServerWithoutRestart_async(const AMD_Node_loadServerWithoutRestartPtr& amdCB, - const InternalServerDescriptorPtr& descriptor, - const string& replicaName, - const Ice::Current& current) -{ - class LoadServerCB : public AMD_Node_loadServer - { - public: - - LoadServerCB(const AMD_Node_loadServerWithoutRestartPtr& cb) : _cb(cb) - { - } - - virtual void - ice_response(const ServerPrx& server, const AdapterPrxDict& adapters, Ice::Int actTimeout, Ice::Int deacTimeout) - { - _cb->ice_response(server, adapters, actTimeout, deacTimeout); - }; - - virtual void - ice_exception(const ::std::exception& ex) - { - _cb->ice_exception(ex); - } - - virtual void - ice_exception() - { - _cb->ice_exception(); - } - - private: - - const AMD_Node_loadServerWithoutRestartPtr _cb; - }; - loadServer(new LoadServerCB(amdCB), descriptor, replicaName, true, current); + loadServer(move(descriptor), move(replicaName), true, move(response), move(exception), current); } void -NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB, - const string& serverId, - const string& uuid, - int revision, - const string& replicaName, - const Ice::Current& current) +NodeI::destroyServerAsync(string serverId, string uuid, int revision, string replicaName, + function<void()> response, function<void(exception_ptr)> exception, + const Ice::Current& current) { - destroyServer(amdCB, serverId, uuid, revision, replicaName, false, current); + destroyServer(move(serverId), move(uuid), move(revision), move(replicaName), false, + move(response), move(exception), current); } void -NodeI::destroyServerWithoutRestart_async(const AMD_Node_destroyServerWithoutRestartPtr& amdCB, - const string& serverId, - const string& uuid, - int revision, - const string& replicaName, - const Ice::Current& current) +NodeI::destroyServerWithoutRestartAsync(string serverId, string uuid, int revision, string replicaName, + function<void()> response, function<void(exception_ptr)> exception, + const Ice::Current& current) { - class DestroyServerCB : public AMD_Node_destroyServer - { - public: - - DestroyServerCB(const AMD_Node_destroyServerWithoutRestartPtr& cb) : _cb(cb) - { - } - - virtual void - ice_response() - { - _cb->ice_response(); - }; - - virtual void - ice_exception(const ::std::exception& ex) - { - _cb->ice_exception(ex); - } - - virtual void - ice_exception() - { - _cb->ice_exception(); - } - - private: - - const AMD_Node_destroyServerWithoutRestartPtr _cb; - }; - destroyServer(new DestroyServerCB(amdCB), serverId, uuid, revision, replicaName, true, current); + destroyServer(move(serverId), move(uuid), move(revision), move(replicaName), true, + move(response), move(exception), current); } void -NodeI::registerWithReplica(const InternalRegistryPrx& replica, const Ice::Current&) +NodeI::registerWithReplica(shared_ptr<InternalRegistryPrx> replica, const Ice::Current&) { - _sessions.create(replica); + _sessions.create(move(replica)); } void -NodeI::replicaInit(const InternalRegistryPrxSeq& replicas, const Ice::Current&) +NodeI::replicaInit(InternalRegistryPrxSeq replicas, const Ice::Current&) { - _sessions.replicaInit(replicas); + _sessions.replicaInit(move(replicas)); } void -NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&) +NodeI::replicaAdded(shared_ptr<InternalRegistryPrx> replica, const Ice::Current&) { - _sessions.replicaAdded(replica); + _sessions.replicaAdded(move(replica)); } void -NodeI::replicaRemoved(const InternalRegistryPrx& replica, const Ice::Current&) +NodeI::replicaRemoved(shared_ptr<InternalRegistryPrx> replica, const Ice::Current&) { - _sessions.replicaRemoved(replica); + _sessions.replicaRemoved(move(replica)); } std::string @@ -356,14 +191,14 @@ NodeI::shutdown(const Ice::Current&) const _activator->shutdown(); } -Ice::Long -NodeI::getOffsetFromEnd(const string& filename, int count, const Ice::Current&) const +long long +NodeI::getOffsetFromEnd(string filename, int count, const Ice::Current&) const { return _fileCache->getOffsetFromEnd(getFilePath(filename), count); } bool -NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, Ice::StringSeq& lines, +NodeI::read(string filename, long long pos, int size, long long& newPos, Ice::StringSeq& lines, const Ice::Current&) const { return _fileCache->read(getFilePath(filename), pos, size, newPos, lines); @@ -372,31 +207,30 @@ NodeI::read(const string& filename, Ice::Long pos, int size, Ice::Long& newPos, void NodeI::shutdown() { - IceUtil::Mutex::Lock sync(_serversLock); - for(map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.begin(); - p != _serversByApplication.end(); ++p) + lock_guard lock(_serversMutex); + for(const auto& servers : _serversByApplication) { - for(set<ServerIPtr>::const_iterator q = p->second.begin(); q != p->second.end(); ++q) + for(const auto& server : servers.second) { - (*q)->shutdown(); + server->shutdown(); } } _serversByApplication.clear(); } -Ice::CommunicatorPtr +shared_ptr<Ice::Communicator> NodeI::getCommunicator() const { return _communicator; } -Ice::ObjectAdapterPtr +shared_ptr<Ice::ObjectAdapter> NodeI::getAdapter() const { return _adapter; } -ActivatorPtr +shared_ptr<Activator> NodeI::getActivator() const { return _activator; @@ -408,31 +242,31 @@ NodeI::getTimer() const return _timer; } -TraceLevelsPtr +shared_ptr<TraceLevels> NodeI::getTraceLevels() const { return _traceLevels; } -UserAccountMapperPrx +shared_ptr<UserAccountMapperPrx> NodeI::getUserAccountMapper() const { return _userAccountMapper; } PlatformInfo& -NodeI::getPlatformInfo() const +NodeI::getPlatformInfo() { return _platform; } -FileCachePtr +shared_ptr<FileCache> NodeI::getFileCache() const { return _fileCache; } -NodePrx +shared_ptr<NodePrx> NodeI::getProxy() const { return _proxy; @@ -468,14 +302,14 @@ NodeI::allowEndpointsOverride() const return _allowEndpointsOverride; } -NodeSessionPrx -NodeI::registerWithRegistry(const InternalRegistryPrx& registry) +shared_ptr<NodeSessionPrx> +NodeI::registerWithRegistry(const shared_ptr<InternalRegistryPrx>& registry) { return registry->registerNode(_platform.getInternalNodeInfo(), _proxy, _platform.getLoadInfo()); } void -NodeI::checkConsistency(const NodeSessionPrx& session) +NodeI::checkConsistency(const shared_ptr<NodeSessionPrx>& session) { // // Only do the consistency check on the startup. This ensures that servers can't @@ -499,11 +333,11 @@ NodeI::checkConsistency(const NodeSessionPrx& session) // unsigned long serial = 0; Ice::StringSeq servers; - vector<ServerCommandPtr> commands; + vector<shared_ptr<ServerCommand>> commands; while(true) { { - Lock sync(*this); + lock_guard lock(_mutex); if(serial == _serial) { _serial = 1; // We can reset the serial number. @@ -524,54 +358,56 @@ NodeI::checkConsistency(const NodeSessionPrx& session) sort(servers.begin(), servers.end()); } - for_each(commands.begin(), commands.end(), IceUtil::voidMemFun(&ServerCommand::execute)); + for(const auto& command : commands) + { + command->execute(); + } } void -NodeI::addObserver(const NodeSessionPrx& session, const NodeObserverPrx& observer) +NodeI::addObserver(const shared_ptr<NodeSessionPrx>& session, const shared_ptr<NodeObserverPrx>& observer) { - IceUtil::Mutex::Lock sync(_observerMutex); + lock_guard observerLock(_observerMutex); assert(_observers.find(session) == _observers.end()); - _observers.insert(make_pair(session, observer)); + _observers.insert({ session, observer }); _observerUpdates.erase(observer); // Remove any updates from the previous session. ServerDynamicInfoSeq serverInfos; AdapterDynamicInfoSeq adapterInfos; - for(map<string, ServerDynamicInfo>::const_iterator p = _serversDynamicInfo.begin(); - p != _serversDynamicInfo.end(); ++p) + for(const auto& info : _serversDynamicInfo) { - assert(p->second.state != Destroyed && (p->second.state != Inactive || !p->second.enabled)); - serverInfos.push_back(p->second); + assert(info.second.state != ServerState::Destroyed && + (info.second.state != ServerState::Inactive || !info.second.enabled)); + serverInfos.push_back(info.second); } - for(map<string, AdapterDynamicInfo>::const_iterator q = _adaptersDynamicInfo.begin(); - q != _adaptersDynamicInfo.end(); ++q) + for(const auto& info : _adaptersDynamicInfo) { - assert(q->second.proxy); - adapterInfos.push_back(q->second); + assert(info.second.proxy); + adapterInfos.push_back(info.second); } - NodeDynamicInfo info; - info.info = _platform.getNodeInfo(); - info.servers = serverInfos; - info.adapters = adapterInfos; - queueUpdate(observer, new NodeUp(this, observer, info)); + NodeDynamicInfo info = { _platform.getNodeInfo(), move(serverInfos), move(adapterInfos) }; + queueUpdate(observer, [observer, info = move(info)] (auto&& response, auto&& exception) + { + observer->nodeUpAsync(info, move(response), move(exception)); + }); } void -NodeI::removeObserver(const NodeSessionPrx& session) +NodeI::removeObserver(const shared_ptr<NodeSessionPrx>& session) { - IceUtil::Mutex::Lock sync(_observerMutex); + lock_guard observerLock(_observerMutex); _observers.erase(session); } void NodeI::observerUpdateServer(const ServerDynamicInfo& info) { - IceUtil::Mutex::Lock sync(_observerMutex); + lock_guard observerLock(_observerMutex); - if(info.state == Destroyed || (info.state == Inactive && info.enabled)) + if(info.state == ServerState::Destroyed || (info.state == ServerState::Inactive && info.enabled)) { _serversDynamicInfo.erase(info.id); } @@ -586,13 +422,20 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info) // registered twice if a replica is removed and added right away // after). // - set<NodeObserverPrx> sent; - for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p) + set<shared_ptr<NodeObserverPrx>> sent; + for(const auto& observer : _observers) { - if(sent.find(p->second) == sent.end()) + if(sent.find(observer.second) == sent.end()) { - queueUpdate(p->second, new UpdateServer(this, p->second, info)); - sent.insert(p->second); + + queueUpdate(observer.second, + [observer = observer.second, info, name = getName(Ice::emptyCurrent)](auto&& response, + auto&& exception) + { + observer->updateServerAsync(name, info, move(response), move(exception)); + }); + + sent.insert(observer.second); } } } @@ -600,7 +443,7 @@ NodeI::observerUpdateServer(const ServerDynamicInfo& info) void NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) { - IceUtil::Mutex::Lock sync(_observerMutex); + lock_guard observerLock(_observerMutex); if(info.proxy) { @@ -617,22 +460,28 @@ NodeI::observerUpdateAdapter(const AdapterDynamicInfo& info) // registered twice if a replica is removed and added right away // after). // - set<NodeObserverPrx> sent; - for(map<NodeSessionPrx, NodeObserverPrx>::const_iterator p = _observers.begin(); p != _observers.end(); ++p) + set<shared_ptr<NodeObserverPrx>> sent; + for(const auto& observer : _observers) { - if(sent.find(p->second) == sent.end()) + if(sent.find(observer.second) == sent.end()) { - queueUpdate(p->second, new UpdateAdapter(this, p->second, info)); - sent.insert(p->second); + queueUpdate(observer.second, + [observer = observer.second, info, name = getName(Ice::emptyCurrent)] (auto&& response, + auto&& exception) + { + observer->updateAdapterAsync(name, info, move(response), move(exception)); + }); + sent.insert(observer.second); } } } void -NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update) +NodeI::queueUpdate(const shared_ptr<NodeObserverPrx>& proxy, Update::UpdateFunction updateFunction) { - //Lock sync(*this); Called within the synchronization - map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy); + // Must be called with mutex locked + auto update = make_shared<Update>(move(updateFunction), shared_from_this(), proxy); + auto p = _observerUpdates.find(proxy); if(p == _observerUpdates.end()) { if(update->send()) @@ -647,10 +496,10 @@ NodeI::queueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update) } void -NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool all) +NodeI::dequeueUpdate(const shared_ptr<NodeObserverPrx>& proxy, const shared_ptr<Update>& update, bool all) { - IceUtil::Mutex::Lock sync(_observerMutex); - map<NodeObserverPrx, deque<UpdatePtr> >::iterator p = _observerUpdates.find(proxy); + lock_guard observerLock(_observerMutex); + auto p = _observerUpdates.find(proxy); if(p == _observerUpdates.end() || p->second.front().get() != update.get()) { return; @@ -670,23 +519,22 @@ NodeI::dequeueUpdate(const NodeObserverPrx& proxy, const UpdatePtr& update, bool } void -NodeI::addServer(const ServerIPtr& server, const string& application) +NodeI::addServer(const shared_ptr<ServerI>& server, const string& application) { - IceUtil::Mutex::Lock sync(_serversLock); - map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); + lock_guard serversLock(_serversMutex); + auto p = _serversByApplication.find(application); if(p == _serversByApplication.end()) { - map<string, set<ServerIPtr> >::value_type v(application, set<ServerIPtr>()); - p = _serversByApplication.insert(p, v); + p = _serversByApplication.insert(p, { application, {} }); } p->second.insert(server); } void -NodeI::removeServer(const ServerIPtr& server, const std::string& application) +NodeI::removeServer(const shared_ptr<ServerI>& server, const std::string& application) { - IceUtil::Mutex::Lock sync(_serversLock); - map<string, set<ServerIPtr> >::iterator p = _serversByApplication.find(application); + lock_guard serversLock(_serversMutex); + auto p = _serversByApplication.find(application); if(p != _serversByApplication.end()) { p->second.erase(server); @@ -700,10 +548,7 @@ NodeI::removeServer(const ServerIPtr& server, const std::string& application) Ice::Identity NodeI::createServerIdentity(const string& name) const { - Ice::Identity id; - id.category = _instanceName + "-Server"; - id.name = name; - return id; + return { name, _instanceName + "-Server" }; } string @@ -712,10 +557,10 @@ NodeI::getServerAdminCategory() const return _instanceName + "-NodeServerAdminRouter"; } -vector<ServerCommandPtr> +vector<shared_ptr<ServerCommand>> NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) { - vector<ServerCommandPtr> commands; + vector<shared_ptr<ServerCommand>> commands; // // Check if the servers directory doesn't contain more servers @@ -741,10 +586,10 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) // try { - vector<string>::iterator p = remove.begin(); + auto p = remove.begin(); while(p != remove.end()) { - ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(*p))); + auto server = dynamic_pointer_cast<ServerI>(_adapter->find(createServerIdentity(*p))); if(server) { // @@ -752,7 +597,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) // try { - ServerCommandPtr command = server->destroy(0, "", 0, "Master", false); + auto command = server->destroy("", 0, "Master", false, nullptr); if(command) { commands.push_back(command); @@ -812,7 +657,7 @@ NodeI::checkConsistencyNoSync(const Ice::StringSeq& servers) return commands; } -NodeSessionPrx +shared_ptr<NodeSessionPrx> NodeI::getMasterNodeSession() const { return _sessions.getMasterNodeSession(); @@ -900,12 +745,12 @@ NodeI::canRemoveServerDirectory(const string& name) return true; } -set<ServerIPtr> +set<shared_ptr<ServerI>> NodeI::getApplicationServers(const string& application) const { - IceUtil::Mutex::Lock sync(_serversLock); - set<ServerIPtr> servers; - map<string, set<ServerIPtr> >::const_iterator p = _serversByApplication.find(application); + lock_guard lock(_serversMutex); + set<shared_ptr<ServerI>> servers; + auto p = _serversByApplication.find(application); if(p != _serversByApplication.end()) { servers = p->second; @@ -941,18 +786,17 @@ NodeI::getFilePath(const string& filename) const } void -NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB, - const InternalServerDescriptorPtr& descriptor, - const string& replicaName, - bool noRestart, +NodeI::loadServer(shared_ptr<InternalServerDescriptor> descriptor, string replicaName, bool noRestart, + function<void(const shared_ptr<ServerPrx> &, const AdapterPrxDict &, int, int)>&& response, + function<void(exception_ptr)>&& exception, const Ice::Current& current) { - ServerCommandPtr command; + shared_ptr<ServerCommand> command; { - Lock sync(*this); + lock_guard lock(_mutex); ++_serial; - Ice::Identity id = createServerIdentity(descriptor->id); + auto id = createServerIdentity(descriptor->id); // // Check if we already have a servant for this server. If that's @@ -962,14 +806,14 @@ NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB, while(true) { bool added = false; - ServerIPtr server; + shared_ptr<ServerI> server; try { - server = ServerIPtr::dynamicCast(_adapter->find(id)); + server = dynamic_pointer_cast<ServerI>(_adapter->find(id)); if(!server) { - ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id)); - server = new ServerI(this, proxy, _serversDir, descriptor->id, _waitTime); + auto proxy = Ice::uncheckedCast<ServerPrx>(_adapter->createProxy(id)); + server = make_shared<ServerI>(shared_from_this(), proxy, _serversDir, descriptor->id, _waitTime); _adapter->add(server, id); added = true; } @@ -989,7 +833,8 @@ NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB, try { - command = server->load(amdCB, descriptor, replicaName, noRestart); + // Don't std::move response/exception as we may need to loop and call again load. + command = server->load(descriptor, replicaName, noRestart, response, exception); } catch(const Ice::ObjectNotExistException&) { @@ -1021,30 +866,27 @@ NodeI::loadServer(const AMD_Node_loadServerPtr& amdCB, } void -NodeI::destroyServer(const AMD_Node_destroyServerPtr& amdCB, - const string& serverId, - const string& uuid, - int revision, - const string& replicaName, - bool noRestart, +NodeI::destroyServer(string serverId, string uuid, int revision, string replicaName, bool noRestart, + function<void()> response, + function<void(exception_ptr)>, const Ice::Current& current) { - ServerCommandPtr command; + shared_ptr<ServerCommand> command; { - Lock sync(*this); + lock_guard lock(_mutex); ++_serial; - ServerIPtr server; + shared_ptr<ServerI> server; try { - server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId))); + server = dynamic_pointer_cast<ServerI>(_adapter->find(createServerIdentity(serverId))); } catch(const Ice::ObjectAdapterDeactivatedException&) { // // We throw an object not exist exception to avoid // dispatch warnings. The registry will consider the node - // has being unreachable upon receival of this exception + // has being unreachable upon receipt of this exception // (like any other Ice::LocalException). We could also // have disabled dispatch warnings but they can still // useful to catch other issues. @@ -1054,7 +896,7 @@ NodeI::destroyServer(const AMD_Node_destroyServerPtr& amdCB, if(!server) { - server = new ServerI(this, 0, _serversDir, serverId, _waitTime); + server = make_shared<ServerI>(shared_from_this(), nullptr, _serversDir, serverId, _waitTime); } // @@ -1062,11 +904,12 @@ NodeI::destroyServer(const AMD_Node_destroyServerPtr& amdCB, // try { - command = server->destroy(amdCB, uuid, revision, replicaName, noRestart); + // Don't std::move response as we may need to call it if there is an exception + command = server->destroy(uuid, revision, replicaName, noRestart, response); } catch(const Ice::ObjectNotExistException&) { - amdCB->ice_response(); + response(); return; } } |