summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeCache.cpp
diff options
context:
space:
mode:
authorJoe George <joe@zeroc.com>2021-01-28 16:26:44 -0500
committerJoe George <joe@zeroc.com>2021-02-01 16:59:30 -0500
commit92a6531e409f2691d82591e185a92299d415fc0f (patch)
tree60c79e2a8f327b8f0b6ebc06b06f48a2e8086f6a /cpp/src/IceGrid/NodeCache.cpp
parentPort Glacier2, IceBox, IceBridge, IceDB, IceXML, icegriddb (diff)
downloadice-92a6531e409f2691d82591e185a92299d415fc0f.tar.bz2
ice-92a6531e409f2691d82591e185a92299d415fc0f.tar.xz
ice-92a6531e409f2691d82591e185a92299d415fc0f.zip
IceGrid and IceStorm
Diffstat (limited to 'cpp/src/IceGrid/NodeCache.cpp')
-rw-r--r--cpp/src/IceGrid/NodeCache.cpp826
1 files changed, 351 insertions, 475 deletions
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp
index b727edb4966..8d397255f72 100644
--- a/cpp/src/IceGrid/NodeCache.cpp
+++ b/cpp/src/IceGrid/NodeCache.cpp
@@ -2,7 +2,6 @@
// Copyright (c) ZeroC, Inc. All rights reserved.
//
-#include <IceUtil/Functional.h>
#include <Ice/Communicator.h>
#include <Ice/Properties.h>
#include <Ice/LoggerUtil.h>
@@ -16,361 +15,122 @@
using namespace std;
using namespace IceGrid;
-namespace IceGrid
+namespace
{
-struct ToInternalServerDescriptor : std::unary_function<CommunicatorDescriptorPtr&, void>
+PropertyDescriptor
+removeProperty(PropertyDescriptorSeq& properties, const string& name)
{
- ToInternalServerDescriptor(const InternalServerDescriptorPtr& descriptor, const InternalNodeInfoPtr& node,
- int iceVersion) :
- _desc(descriptor),
- _node(node),
- _iceVersion(iceVersion)
+ string value;
+ PropertyDescriptorSeq::iterator p = properties.begin();
+ while(p != properties.end())
{
- }
-
- void
- operator()(const CommunicatorDescriptorPtr& desc)
- {
- //
- // Figure out the configuration file name for the communicator
- // (if it's a service, it's "config_<service name>", if it's
- // the server, it's just "config").
- //
- string filename = "config";
- ServiceDescriptorPtr svc = ServiceDescriptorPtr::dynamicCast(desc);
- if(svc)
- {
- filename += "_" + svc->name;
- _desc->services->push_back(svc->name);
- }
-
- PropertyDescriptorSeq& props = _desc->properties[filename];
- PropertyDescriptorSeq communicatorProps = desc->propertySet.properties;
-
- //
- // If this is a service communicator and the IceBox server has Admin
- // enabled or Admin endpoints configured, we ignore the server-lifetime attributes
- // of the service object adapters and assume it's set to false.
- //
- bool ignoreServerLifetime = false;
- if(svc)
- {
- if(_iceVersion == 0 || _iceVersion >= 30300)
- {
- if(getPropertyAsInt(_desc->properties["config"], "Ice.Admin.Enabled") > 0 ||
- getProperty(_desc->properties["config"], "Ice.Admin.Endpoints") != "")
- {
- ignoreServerLifetime = true;
- }
- }
- }
- //
- // Add the adapters and their configuration.
- //
- for(AdapterDescriptorSeq::const_iterator q = desc->adapters.begin(); q != desc->adapters.end(); ++q)
- {
- _desc->adapters.push_back(new InternalAdapterDescriptor(q->id,
- ignoreServerLifetime ? false : q->serverLifetime));
-
- props.push_back(createProperty("# Object adapter " + q->name));
- PropertyDescriptor prop = removeProperty(communicatorProps, q->name + ".Endpoints");
- prop.name = q->name + ".Endpoints";
- props.push_back(prop);
- props.push_back(createProperty(q->name + ".AdapterId", q->id));
- if(!q->replicaGroupId.empty())
- {
- props.push_back(createProperty(q->name + ".ReplicaGroupId", q->replicaGroupId));
- }
-
- //
- // Ignore the register process attribute if the server is using Ice >= 3.3.0
- //
- if(_iceVersion != 0 && _iceVersion < 30300)
- {
- if(q->registerProcess)
- {
- props.push_back(createProperty(q->name + ".RegisterProcess", "1"));
- _desc->processRegistered = true;
- }
- }
- }
-
- _desc->logs.insert(_desc->logs.end(), desc->logs.begin(), desc->logs.end());
-
- const string dbsPath = _node->dataDir + "/servers/" + _desc->id + "/dbs/";
- for(DbEnvDescriptorSeq::const_iterator p = desc->dbEnvs.begin(); p != desc->dbEnvs.end(); ++p)
- {
- props.push_back(createProperty("# Database environment " + p->name));
- if(p->dbHome.empty())
- {
- _desc->dbEnvs.push_back(new InternalDbEnvDescriptor(p->name, p->properties));
- props.push_back(createProperty("Freeze.DbEnv." + p->name + ".DbHome", dbsPath + p->name));
- }
- else
- {
- props.push_back(createProperty("Freeze.DbEnv." + p->name + ".DbHome", p->dbHome));
- }
- }
-
- //
- // Copy the communicator descriptor properties.
- //
- if(!communicatorProps.empty())
- {
- if(svc)
- {
- props.push_back(createProperty("# Service descriptor properties"));
- }
- else
- {
- props.push_back(createProperty("# Server descriptor properties"));
- }
- copy(communicatorProps.begin(), communicatorProps.end(), back_inserter(props));
- }
-
- //
- // For Ice servers > 3.3.0 escape the properties.
- //
- if(_iceVersion == 0 || _iceVersion >= 30300)
- {
- for(PropertyDescriptorSeq::iterator p = props.begin(); p != props.end(); ++p)
- {
- if(p->name.find('#') != 0 || !p->value.empty())
- {
- p->name = escapeProperty(p->name, true);
- p->value = escapeProperty(p->value);
- }
- }
- }
- }
-
- PropertyDescriptor
- removeProperty(PropertyDescriptorSeq& properties, const string& name)
- {
- string value;
- PropertyDescriptorSeq::iterator p = properties.begin();
- while(p != properties.end())
- {
- if(p->name == name)
- {
- value = p->value;
- p = properties.erase(p);
- }
- else
- {
- ++p;
- }
- }
- PropertyDescriptor desc;
- desc.name = name;
- desc.value = value;
- return desc;
- }
-
- InternalServerDescriptorPtr _desc;
- InternalNodeInfoPtr _node;
- int _iceVersion;
-};
-
-class LoadCB : public virtual IceUtil::Shared
-{
-public:
-
- LoadCB(const TraceLevelsPtr& traceLevels, const ServerEntryPtr& server, const string& node, int timeout) :
- _traceLevels(traceLevels), _server(server), _id(server->getId()), _node(node), _timeout(timeout)
- {
- }
-
- void
- response(const ServerPrx& server, const AdapterPrxDict& adapters, int at, int dt)
- {
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "loaded `" << _id << "' on node `" << _node << "'";
- }
-
- //
- // Add the node session timeout on the proxies to ensure the
- // timeout is large enough.
- //
- _server->loadCallback(server, adapters, at + _timeout, dt + _timeout);
- }
-
- void
- exception(const Ice::Exception& lex)
- {
- try
- {
- lex.ice_throw();
- }
- catch(const DeploymentException& ex)
- {
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex.reason;
- }
-
- ostringstream os;
- os << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex.reason;
- _server->exception(DeploymentException(os.str()));
- }
- catch(const Ice::Exception& ex)
- {
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex;
- }
-
- ostringstream os;
- os << ex;
- _server->exception(NodeUnreachableException(_node, os.str()));
- }
- }
-
-private:
-
- const TraceLevelsPtr _traceLevels;
- const ServerEntryPtr _server;
- const string _id;
- const string _node;
- const int _timeout;
-};
-
-class DestroyCB : public virtual IceUtil::Shared
-{
-public:
-
- DestroyCB(const TraceLevelsPtr& traceLevels, const ServerEntryPtr& server, const string& node) :
- _traceLevels(traceLevels), _server(server), _id(server->getId()), _node(node)
- {
- }
-
- void
- response()
- {
- if(_traceLevels && _traceLevels->server > 1)
+ if(p->name == name)
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "unloaded `" << _id << "' on node `" << _node << "'";
- }
- _server->destroyCallback();
- }
-
- void
- exception(const Ice::Exception& dex)
- {
- try
- {
- dex.ice_throw();
- }
- catch(const DeploymentException& ex)
- {
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex.reason;
- }
-
- ostringstream os;
- os << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex.reason;
- _server->exception(DeploymentException(os.str()));
+ value = p->value;
+ p = properties.erase(p);
}
- catch(const Ice::Exception& ex)
+ else
{
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex;
- }
- ostringstream os;
- os << ex;
- _server->exception(NodeUnreachableException(_node, os.str()));
+ ++p;
}
}
-
-private:
-
- const TraceLevelsPtr _traceLevels;
- const ServerEntryPtr _server;
- const string _id;
- const string _node;
-};
+ return { name, value };
+}
}
-NodeCache::NodeCache(const Ice::CommunicatorPtr& communicator, ReplicaCache& replicaCache, const string& replicaName) :
+NodeCache::NodeCache(const shared_ptr<Ice::Communicator>& communicator,
+ ReplicaCache& replicaCache, const string& replicaName) :
_communicator(communicator),
_replicaName(replicaName),
_replicaCache(replicaCache)
{
}
-NodeEntryPtr
+shared_ptr<NodeEntry>
NodeCache::get(const string& name, bool create) const
{
- Lock sync(*this);
- NodeEntryPtr entry = getImpl(name);
- if(!entry && create)
+ lock_guard lock(_mutex);
+
+ auto cacheEntry = getImpl(name);
+ if(!cacheEntry && create)
{
NodeCache& self = const_cast<NodeCache&>(*this);
- entry = new NodeEntry(self, name);
- self.addImpl(name, entry);
+ cacheEntry = make_shared<NodeEntry>(self, name);
+ self.addImpl(name, cacheEntry);
}
- if(!entry)
+ if(!cacheEntry)
{
throw NodeNotExistException(name);
}
+
+ // Get a self removing shared_ptr to the cached NodeEntry which will remove
+ // itself from the this cache upon destruction
+ auto entry = cacheEntry->_selfRemovingPtr.lock();
+
+ if (!entry)
+ {
+ // Create self removing shared_ptr of cacheEntry. The cacheEntry maintains a ref count for the case where
+ // the self removing shared_ptr has no more references but its deleter has yet to run (weak_ptr has expired)
+ // and at the same time another thread calls NodeCache::get which refreshes the self removing ptr before
+ // the cached entry can be removed.
+ entry = shared_ptr<NodeEntry>(const_cast<NodeEntry*>(cacheEntry.get()),
+ [cache = const_cast<NodeCache*>(this), name](NodeEntry* e)
+ {
+ lock_guard cacheLock(cache->_mutex);
+ if(--e->_selfRemovingRefCount == 0)
+ {
+ cache->removeImpl(name);
+ }
+ });
+ cacheEntry->_selfRemovingRefCount++;
+ cacheEntry->_selfRemovingPtr = entry;
+ }
+
return entry;
}
NodeEntry::NodeEntry(NodeCache& cache, const std::string& name) :
_cache(cache),
- _ref(0),
_name(name),
- _registering(false)
-{
-}
-
-NodeEntry::~NodeEntry()
+ _registering(false),
+ _selfRemovingRefCount(0)
{
}
void
NodeEntry::addDescriptor(const string& application, const NodeDescriptor& descriptor)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_descriptors.insert(make_pair(application, descriptor));
}
void
NodeEntry::removeDescriptor(const string& application)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_descriptors.erase(application);
}
void
-NodeEntry::addServer(const ServerEntryPtr& entry)
+NodeEntry::addServer(const shared_ptr<ServerEntry>& entry)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_servers.insert(make_pair(entry->getId(), entry));
}
void
-NodeEntry::removeServer(const ServerEntryPtr& entry)
+NodeEntry::removeServer(const shared_ptr<ServerEntry>& entry)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
_servers.erase(entry->getId());
}
void
-NodeEntry::setSession(const NodeSessionIPtr& session)
+NodeEntry::setSession(const shared_ptr<NodeSessionI>& session)
{
- Lock sync(*this);
+ unique_lock lock(_mutex);
if(session)
{
@@ -380,12 +140,12 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
{
// If the current session has just been destroyed, wait for the setSession(0) call.
assert(session != _session);
- wait();
+ _condVar.wait(lock);
}
else
{
- NodeSessionIPtr s = _session;
- sync.release();
+ auto s = _session;
+ lock.unlock();
try
{
s->getNode()->ice_ping();
@@ -401,7 +161,7 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
{
}
}
- sync.acquire();
+ lock.lock();
}
}
@@ -410,7 +170,7 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
// so we won't need anymore to try to register it with this
// registry.
//
- _proxy = 0;
+ _proxy = nullptr;
}
else
{
@@ -421,12 +181,12 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
}
_session = session;
- notifyAll();
+ _condVar.notify_all();
if(_registering)
{
_registering = false;
- notifyAll();
+ _condVar.notify_all();
}
if(session)
@@ -447,28 +207,28 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
}
}
-NodePrx
+shared_ptr<NodePrx>
NodeEntry::getProxy() const
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
return _session->getNode();
}
-InternalNodeInfoPtr
+shared_ptr<InternalNodeInfo>
NodeEntry::getInfo() const
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
return _session->getInfo();
}
ServerEntrySeq
NodeEntry::getServers() const
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
ServerEntrySeq entries;
- for(map<string, ServerEntryPtr>::const_iterator p = _servers.begin(); p != _servers.end(); ++p)
+ for(map<string, shared_ptr<ServerEntry>>::const_iterator p = _servers.begin(); p != _servers.end(); ++p)
{
entries.push_back(p->second);
}
@@ -478,8 +238,8 @@ NodeEntry::getServers() const
LoadInfo
NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor) const
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
map<string, NodeDescriptor>::const_iterator p = _descriptors.find(application);
if(p == _descriptors.end())
@@ -523,46 +283,45 @@ NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor
return _session->getLoadInfo();
}
-NodeSessionIPtr
+shared_ptr<NodeSessionI>
NodeEntry::getSession() const
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
return _session;
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
NodeEntry::getAdminProxy() const
{
- Ice::ObjectPrx prx = getProxy();
+ auto prx = getProxy();
assert(prx);
- Ice::Identity adminId;
- adminId.name = "NodeAdmin-" + _name ;
- adminId.category = prx->ice_getIdentity().category;
- return prx->ice_identity(adminId);
+ return prx->ice_identity({ "NodeAdmin-" + _name, prx->ice_getIdentity().category });
}
bool
NodeEntry::canRemove()
{
- Lock sync(*this);
- return _servers.empty() && !_session && _descriptors.empty();
+ lock_guard lock(_mutex);
+
+ // The cache mutex must be locked to acesss _selfRemovingRefCount
+ return _servers.empty() && !_session && _descriptors.empty() && _selfRemovingRefCount == 0;
}
void
-NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, const SessionIPtr& session, int timeout,
- bool noRestart)
+NodeEntry::loadServer(const shared_ptr<ServerEntry>& entry, const ServerInfo& server,
+ const shared_ptr<SessionI>& session, chrono::seconds timeout, bool noRestart)
{
try
{
- NodePrx node;
- int sessionTimeout;
- InternalServerDescriptorPtr desc;
+ shared_ptr<NodePrx> node;
+ chrono::seconds sessionTimeout;
+ shared_ptr<InternalServerDescriptor> desc;
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
node = _session->getNode();
- sessionTimeout = _session->getTimeout(Ice::emptyCurrent);
+ sessionTimeout = chrono::seconds(_session->getTimeout(Ice::emptyCurrent));
//
// Check if we should use a specific timeout (the load
@@ -570,9 +329,10 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con
// time to deactivate, up to "deactivation-timeout"
// seconds).
//
- if(timeout > 0)
+ if(timeout > 0s)
{
- node = NodePrx::uncheckedCast(node->ice_invocationTimeout(timeout * 1000));
+ auto timeoutInMilliseconds = secondsToInt(timeout) * 1000;
+ node = Ice::uncheckedCast<NodePrx>(node->ice_invocationTimeout(move(timeoutInMilliseconds)));
}
ServerInfo info = server;
@@ -603,38 +363,79 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con
}
}
+ auto response = [traceLevels = _cache.getTraceLevels(), entry, name = _name, sessionTimeout]
+ (shared_ptr<ServerPrx> serverPrx, AdapterPrxDict adapters, int at, int dt)
+ {
+ if(traceLevels && traceLevels->server > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
+ out << "loaded `" << entry->getId() << "' on node `" << name << "'";
+ }
+
+ //
+ // Add the node session timeout on the proxies to ensure the
+ // timeout is large enough.
+ //
+ entry->loadCallback(move(serverPrx), move(adapters),
+ chrono::seconds(at) + sessionTimeout,
+ chrono::seconds(dt) + sessionTimeout);
+
+ };
+
+ auto exception = [traceLevels = _cache.getTraceLevels(), entry, name = _name](auto exptr)
+ {
+ try
+ {
+ rethrow_exception(exptr);
+ }
+ catch(const DeploymentException& ex)
+ {
+ if(traceLevels && traceLevels->server > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
+ out << "couldn't load `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason;
+ }
+
+ ostringstream os;
+ os << "couldn't load `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason;
+ entry->exception(make_exception_ptr(DeploymentException(os.str())));
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(traceLevels && traceLevels->server > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
+ out << "couldn't load `" << entry->getId() << "' on node `" << name << "':\n" << ex;
+ }
+
+ entry->exception(make_exception_ptr(NodeUnreachableException(name, ex.what())));
+ }
+ };
+
if(noRestart)
{
- node->begin_loadServerWithoutRestart(desc, _cache.getReplicaName(),
- newCallback_Node_loadServerWithoutRestart(
- new LoadCB(_cache.getTraceLevels(), entry, _name, sessionTimeout),
- &LoadCB::response,
- &LoadCB::exception));
+ node->loadServerWithoutRestartAsync(desc, _cache.getReplicaName(), move(response), move(exception));
}
else
{
- node->begin_loadServer(desc, _cache.getReplicaName(),
- newCallback_Node_loadServer(
- new LoadCB(_cache.getTraceLevels(), entry, _name, sessionTimeout),
- &LoadCB::response,
- &LoadCB::exception));
+ node->loadServerAsync(desc, _cache.getReplicaName(), move(response), move(exception));
}
}
- catch(const NodeUnreachableException& ex)
+ catch(const NodeUnreachableException&)
{
- entry->exception(ex);
+ entry->exception(current_exception());
}
}
void
-NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, int timeout, bool noRestart)
+NodeEntry::destroyServer(const shared_ptr<ServerEntry>& entry, const ServerInfo& info, chrono::seconds timeout, bool noRestart)
{
try
{
- NodePrx node;
+ shared_ptr<NodePrx> node;
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
node = _session->getNode();
//
@@ -643,9 +444,10 @@ NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, in
// time to deactivate, up to "deactivation-timeout"
// seconds).
//
- if(timeout > 0)
+ if(timeout > 0s)
{
- node = NodePrx::uncheckedCast(node->ice_invocationTimeout(timeout * 1000));
+ int timeoutInMilliseconds = secondsToInt(timeout) * 1000;
+ node = Ice::uncheckedCast<NodePrx>(node->ice_invocationTimeout(move(timeoutInMilliseconds)));
}
}
@@ -655,35 +457,67 @@ NodeEntry::destroyServer(const ServerEntryPtr& entry, const ServerInfo& info, in
out << "unloading `" << info.descriptor->id << "' on node `" << _name << "'";
}
+ auto response = [traceLevels = _cache.getTraceLevels(), entry, name = _name]
+ {
+ if(traceLevels && traceLevels->server > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
+ out << "unloaded `" << entry->getId() << "' on node `" << name << "'";
+ }
+ entry->destroyCallback();
+ };
+
+ auto exception = [traceLevels = _cache.getTraceLevels(), entry, name = _name](auto exptr)
+ {
+ try
+ {
+ rethrow_exception(exptr);
+ }
+ catch(const DeploymentException& ex)
+ {
+ if(traceLevels && traceLevels->server > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
+ out << "couldn't unload `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason;
+ }
+
+ ostringstream os;
+ os << "couldn't unload `" << entry->getId() << "' on node `" << name << "':\n" << ex.reason;
+ entry->exception(make_exception_ptr(DeploymentException(os.str())));
+ }
+ catch(const Ice::Exception& ex)
+ {
+ if(traceLevels && traceLevels->server > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
+ out << "couldn't unload `" << entry->getId() << "' on node `" << name << "':\n" << ex;
+ }
+ entry->exception(make_exception_ptr(NodeUnreachableException(name, ex.what())));
+ }
+ };
+
if(noRestart)
{
- node->begin_destroyServerWithoutRestart(info.descriptor->id, info.uuid, info.revision,
- _cache.getReplicaName(),
- newCallback_Node_destroyServerWithoutRestart(
- new DestroyCB(_cache.getTraceLevels(), entry, _name),
- &DestroyCB::response,
- &DestroyCB::exception));
+ node->destroyServerWithoutRestartAsync(info.descriptor->id, info.uuid, info.revision,
+ _cache.getReplicaName(), move(response), move(exception));
}
else
{
- node->begin_destroyServer(info.descriptor->id, info.uuid, info.revision, _cache.getReplicaName(),
- newCallback_Node_destroyServer(
- new DestroyCB(_cache.getTraceLevels(), entry, _name),
- &DestroyCB::response,
- &DestroyCB::exception));
+ node->destroyServerAsync(info.descriptor->id, info.uuid, info.revision, _cache.getReplicaName(),
+ move(response), move(exception));
}
}
- catch(const NodeUnreachableException& ex)
+ catch(const NodeUnreachableException&)
{
- entry->exception(ex);
+ entry->exception(current_exception());
}
}
ServerInfo
-NodeEntry::getServerInfo(const ServerInfo& server, const SessionIPtr& session)
+NodeEntry::getServerInfo(const ServerInfo& server, const shared_ptr<SessionI>& session)
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
ServerInfo info = server;
info.descriptor = getServerDescriptor(server, session);
@@ -691,11 +525,11 @@ NodeEntry::getServerInfo(const ServerInfo& server, const SessionIPtr& session)
return info;
}
-InternalServerDescriptorPtr
-NodeEntry::getInternalServerDescriptor(const ServerInfo& server, const SessionIPtr& session)
+shared_ptr<InternalServerDescriptor>
+NodeEntry::getInternalServerDescriptor(const ServerInfo& server, const shared_ptr<SessionI>& session)
{
- Lock sync(*this);
- checkSession();
+ unique_lock lock(_mutex);
+ checkSession(lock);
ServerInfo info = server;
try
@@ -715,54 +549,7 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& server, const SessionIP
}
void
-NodeEntry::__incRef()
-{
- Lock sync(*this);
- assert(_ref >= 0);
- ++_ref;
-}
-
-void
-NodeEntry::__decRef()
-{
- //
- // The node entry implements its own reference counting. If the
- // reference count drops to 1, this means that only the cache
- // holds a reference on the node entry. If that's the case, we
- // check if the node entry can be removed or not and if it can be
- // removed we remove it from the cache map.
- //
-
- bool doRemove = false;
- bool doDelete = false;
- {
- Lock sync(*this); // We use a recursive mutex so it's fine to
- // create Ptr with the mutex locked.
- assert(_ref > 0);
- --_ref;
-
- if(_ref == 1)
- {
- doRemove = canRemove();
- }
- else if(_ref == 0)
- {
- doDelete = true;
- }
- }
-
- if(doRemove)
- {
- _cache.remove(_name);
- }
- else if(doDelete)
- {
- delete this;
- }
-}
-
-void
-NodeEntry::checkSession() const
+NodeEntry::checkSession(unique_lock<mutex>& lock) const
{
if(_session)
{
@@ -796,22 +583,27 @@ NodeEntry::checkSession() const
// hang in the while loop.
//
_registering = true;
- NodeEntry* self = const_cast<NodeEntry*>(this);
- _proxy->begin_registerWithReplica(_cache.getReplicaCache().getInternalRegistry(),
- newCallback_Node_registerWithReplica(self,
- &NodeEntry::finishedRegistration,
- &NodeEntry::finishedRegistration));
- _proxy = 0; // Registration with the proxy is only attempted once.
- }
- while(_registering)
- {
- if(!timedWait(IceUtil::Time::seconds(10)))
- {
- break; // Consider the node down if it doesn't respond promptly.
- }
+ // 'this' is only ever accessed though the self removing pointer, ensuring _selfRemovingPtr is always
+ // valid and its access is thread safe
+ auto self = _selfRemovingPtr.lock();
+ assert(self);
+ _proxy->registerWithReplicaAsync(_cache.getReplicaCache().getInternalRegistry(),
+ [self]
+ {
+ self->finishedRegistration();
+ },
+ [self] (exception_ptr ex)
+ {
+ self->finishedRegistration(ex);
+ });
+ _proxy = nullptr; // Registration with the proxy is only attempted once.
+
}
+ // Consider the node down if it doesn't respond promptly.
+ _condVar.wait_for(lock, 10s, [this] { return !_registering; });
+
if(!_session || _session->isDestroyed())
{
throw NodeUnreachableException(_name, "the node is not active");
@@ -819,9 +611,9 @@ NodeEntry::checkSession() const
}
void
-NodeEntry::setProxy(const NodePrx& node)
+NodeEntry::setProxy(const shared_ptr<NodePrx>& node)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
//
// If the node has already established a session with the
@@ -838,7 +630,7 @@ NodeEntry::setProxy(const NodePrx& node)
void
NodeEntry::finishedRegistration()
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0)
{
Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat);
@@ -855,29 +647,36 @@ NodeEntry::finishedRegistration()
if(_registering)
{
_registering = false;
- notifyAll();
+ _condVar.notify_all();
}
}
void
-NodeEntry::finishedRegistration(const Ice::Exception& ex)
+NodeEntry::finishedRegistration(exception_ptr exptr)
{
- Lock sync(*this);
+ lock_guard lock(_mutex);
if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0)
{
- Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat);
- out << "node `" << _name << "' session creation failed:\n" << ex;
+ try
+ {
+ rethrow_exception(exptr);
+ }
+ catch(const std::exception& ex)
+ {
+ Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat);
+ out << "node `" << _name << "' session creation failed:\n" << ex.what();
+ }
}
if(_registering)
{
_registering = false;
- notifyAll();
+ _condVar.notify_all();
}
}
-ServerDescriptorPtr
-NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& session)
+shared_ptr<ServerDescriptor>
+NodeEntry::getServerDescriptor(const ServerInfo& server, const shared_ptr<SessionI>& session)
{
assert(_session);
@@ -891,7 +690,7 @@ NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& sess
resolve.setReserved("session.id", session->getId());
}
- IceBoxDescriptorPtr iceBox = IceBoxDescriptorPtr::dynamicCast(server.descriptor);
+ auto iceBox = dynamic_pointer_cast<IceBoxDescriptor>(server.descriptor);
if(iceBox)
{
return IceBoxHelper(iceBox).instantiate(resolve, PropertyDescriptorSeq(), PropertySetDescriptorDict());
@@ -903,7 +702,7 @@ NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& sess
}
}
-InternalServerDescriptorPtr
+shared_ptr<InternalServerDescriptor>
NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const
{
//
@@ -911,7 +710,7 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const
//
assert(_session);
- InternalServerDescriptorPtr server = new InternalServerDescriptor();
+ shared_ptr<InternalServerDescriptor> server = make_shared<InternalServerDescriptor>();
server->id = info.descriptor->id;
server->application = info.application;
server->uuid = info.uuid;
@@ -929,7 +728,6 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const
// server->logs: assigned for each communicator (see below)
// server->adapters: assigned for each communicator (see below)
- // server->dbEnvs: assigned for each communicator (see below)
// server->properties: assigned for each communicator (see below)
//
@@ -966,7 +764,8 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const
}
else
{
- props.push_back(createProperty("Ice.Admin.Endpoints", "tcp -h localhost"));
+ props.push_back(createProperty("Ice.Admin.Endpoints", "tcp -h 127.0.0.1"));
+ props.push_back(createProperty("Ice.Admin.ServerName", "127.0.0.1"));
server->processRegistered = true;
}
}
@@ -985,12 +784,12 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const
// Add IceBox properties.
//
string servicesStr;
- IceBoxDescriptorPtr iceBox = IceBoxDescriptorPtr::dynamicCast(info.descriptor);
+ auto iceBox = dynamic_pointer_cast<IceBoxDescriptor>(info.descriptor);
if(iceBox)
{
- for(ServiceInstanceDescriptorSeq::const_iterator p = iceBox->services.begin(); p != iceBox->services.end();++p)
+ for(const auto& serviceInstance : iceBox->services)
{
- ServiceDescriptorPtr s = p->descriptor;
+ const auto& s = serviceInstance.descriptor;
const string path = _session->getInfo()->dataDir + "/servers/" + server->id + "/config/config_" + s->name;
//
@@ -1012,38 +811,115 @@ NodeEntry::getInternalServerDescriptor(const ServerInfo& info) const
}
props.push_back(createProperty("IceBox.LoadOrder", servicesStr));
+ }
- if(iceVersion != 0 && iceVersion < 30300)
+ //
+ // Now, for each communicator of the descriptor, add the necessary
+ // logs, adapters and properties to the internal server
+ // descriptor.
+ //
+ forEachCommunicator(info.descriptor,
+ [server, node = _session->getInfo(), iceVersion](const auto& desc)
{
- if(hasProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.RegisterProcess"))
+ //
+ // Figure out the configuration file name for the communicator
+ // (if it's a service, it's "config_<service name>", if it's
+ // the server, it's just "config").
+ //
+ string filename = "config";
+ auto svc = dynamic_pointer_cast<ServiceDescriptor>(desc);
+ if(svc)
{
- if(getProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.RegisterProcess") != "0")
+ filename += "_" + svc->name;
+ server->services->push_back(svc->name);
+ }
+
+ PropertyDescriptorSeq& serverProps = server->properties[filename];
+ PropertyDescriptorSeq communicatorProps = desc->propertySet.properties;
+
+ //
+ // If this is a service communicator and the IceBox server has Admin
+ // enabled or Admin endpoints configured, we ignore the server-lifetime attributes
+ // of the service object adapters and assume it's set to false.
+ //
+ bool ignoreServerLifetime = false;
+ if(svc)
+ {
+ if(iceVersion == 0 || iceVersion >= 30300)
{
- server->processRegistered = true;
+ if(getPropertyAsInt(server->properties["config"], "Ice.Admin.Enabled") > 0 ||
+ getProperty(server->properties["config"], "Ice.Admin.Endpoints") != "")
+ {
+ ignoreServerLifetime = true;
+ }
}
}
- else
+ //
+ // Add the adapters and their configuration.
+ //
+ for(const auto& adapter : desc->adapters)
{
- props.push_back(createProperty("IceBox.ServiceManager.RegisterProcess", "1"));
- server->processRegistered = true;
+ server->adapters.push_back(make_shared<InternalAdapterDescriptor>(adapter.id,
+ ignoreServerLifetime ? false :
+ adapter.serverLifetime));
+
+ serverProps.push_back(createProperty("# Object adapter " + adapter.name));
+
+ PropertyDescriptor prop = removeProperty(communicatorProps, adapter.name + ".Endpoints");
+ prop.name = adapter.name + ".Endpoints";
+ serverProps.push_back(prop);
+ serverProps.push_back(createProperty(adapter.name + ".AdapterId", adapter.id));
+
+ if(!adapter.replicaGroupId.empty())
+ {
+ serverProps.push_back(createProperty(adapter.name + ".ReplicaGroupId", adapter.replicaGroupId));
+ }
+
+ //
+ // Ignore the register process attribute if the server is using Ice >= 3.3.0
+ //
+ if(iceVersion != 0 && iceVersion < 30300)
+ {
+ if(adapter.registerProcess)
+ {
+ serverProps.push_back(createProperty(adapter.name + ".RegisterProcess", "1"));
+ server->processRegistered = true;
+ }
+ }
}
- if(!hasProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.Endpoints"))
+
+ server->logs.insert(server->logs.end(), desc->logs.begin(), desc->logs.end());
+
+ //
+ // Copy the communicator descriptor properties.
+ //
+ if(!communicatorProps.empty())
{
- props.push_back(createProperty("IceBox.ServiceManager.Endpoints", "tcp -h 127.0.0.1"));
+ if(svc)
+ {
+ serverProps.push_back(createProperty("# Service descriptor properties"));
+ }
+ else
+ {
+ serverProps.push_back(createProperty("# Server descriptor properties"));
+ }
+ copy(communicatorProps.begin(), communicatorProps.end(), back_inserter(serverProps));
}
- }
- if(!hasProperty(info.descriptor->propertySet.properties, "IceBox.InstanceName") &&
- hasProperty(iceBox->propertySet.properties, "IceBox.ServiceManager.Endpoints"))
- {
- props.push_back(createProperty("IceBox.InstanceName", server->id));
- }
- }
- //
- // Now, for each communicator of the descriptor, add the necessary
- // logs, adapters, db envs and properties to the internal server
- // descriptor.
- //
- forEachCommunicator(ToInternalServerDescriptor(server, _session->getInfo(), iceVersion))(info.descriptor);
+ //
+ // For Ice servers > 3.3.0 escape the properties.
+ //
+ if(iceVersion == 0 || iceVersion >= 30300)
+ {
+ for(PropertyDescriptorSeq::iterator p = serverProps.begin(); p != serverProps.end(); ++p)
+ {
+ if(p->name.find('#') != 0 || !p->value.empty())
+ {
+ p->name = escapeProperty(p->name, true);
+ p->value = escapeProperty(p->value);
+ }
+ }
+ }
+ });
return server;
}