summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/AdminSessionI.cpp253
-rw-r--r--cpp/src/IceGrid/AdminSessionI.h10
-rw-r--r--cpp/src/IceGrid/IceGridNode.cpp2
-rw-r--r--cpp/src/IceGrid/Internal.ice58
-rw-r--r--cpp/src/IceGrid/InternalRegistryI.cpp94
-rw-r--r--cpp/src/IceGrid/InternalRegistryI.h5
-rw-r--r--cpp/src/IceGrid/NodeCache.cpp41
-rw-r--r--cpp/src/IceGrid/NodeI.cpp16
-rw-r--r--cpp/src/IceGrid/NodeI.h3
-rw-r--r--cpp/src/IceGrid/NodeSessionI.cpp142
-rw-r--r--cpp/src/IceGrid/NodeSessionI.h15
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp346
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.h27
-rw-r--r--cpp/src/IceGrid/PlatformInfo.cpp36
-rw-r--r--cpp/src/IceGrid/ReapThread.h57
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp32
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp70
-rw-r--r--cpp/src/IceGrid/ReplicaCache.h6
-rw-r--r--cpp/src/IceGrid/ReplicaSessionI.cpp122
-rw-r--r--cpp/src/IceGrid/ReplicaSessionI.h14
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp5
-rw-r--r--cpp/src/IceGrid/SessionI.cpp180
-rw-r--r--cpp/src/IceGrid/SessionI.h34
-rw-r--r--cpp/src/IceGrid/SessionManager.h17
24 files changed, 887 insertions, 698 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp
index e3b4b7a87de..ce9317907f4 100644
--- a/cpp/src/IceGrid/AdminSessionI.cpp
+++ b/cpp/src/IceGrid/AdminSessionI.cpp
@@ -59,10 +59,25 @@ AdminSessionI::~AdminSessionI()
{
}
-void
-AdminSessionI::setAdmin(const AdminPrx& admin)
+Ice::ObjectPrx
+AdminSessionI::registerWithServantLocator(const SessionServantLocatorIPtr& servantLoc,
+ const Ice::ConnectionPtr& con,
+ const RegistryIPtr& registry)
+{
+ Ice::ObjectPrx proxy = BaseSessionI::registerWithServantLocator(servantLoc, con);
+ _admin = AdminPrx::uncheckedCast(servantLoc->add(new AdminI(_database, registry, this), con));
+ return proxy;
+}
+
+Ice::ObjectPrx
+AdminSessionI::registerWithObjectAdapter(const Ice::ObjectAdapterPtr& adapter, const RegistryIPtr& registry)
{
- const_cast<AdminPrx&>(_admin) = admin;
+ Ice::ObjectPrx proxy = BaseSessionI::registerWithObjectAdapter(adapter);
+ Ice::Identity identity;
+ identity.category = _database->getInstanceName();
+ identity.name = IceUtil::generateUUID();
+ _admin = AdminPrx::uncheckedCast(adapter->add(new AdminI(_database, registry, this), identity));
+ return proxy;
}
AdminPrx
@@ -247,58 +262,140 @@ AdminSessionI::openRegistryStdErr(const std::string& name, const Ice::Current& c
}
void
-AdminSessionI::destroy(const Ice::Current& current)
+AdminSessionI::destroy(const Ice::Current&)
{
- BaseSessionI::destroy(current);
-
- try
+ destroyImpl(false);
+}
+
+void
+AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer)
+{
+ if(_observers[name] && _observers[name] != observer)
{
- _database->unlock(this);
+ _database->getObserverTopic(name)->unsubscribe(_observers[name]);
+ _observers[name] = 0;
}
- catch(AccessDeniedException&)
+
+ if(observer)
{
+ _observers[name] = observer;
+ _database->getObserverTopic(name)->subscribe(_observers[name]);
}
+}
- //
- // Unregister the admin servant from the session servant locator
- // or object adapter.
- //
+Ice::ObjectPrx
+AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection)
+{
+ return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id);
+}
+
+FileIteratorPrx
+AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, const Ice::Current& current)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ Ice::ObjectNotExistException ex(__FILE__, __LINE__);
+ ex.id = current.id;
+ throw ex;
+ }
+
+ Ice::ObjectPrx obj;
+ Ice::ObjectPtr servant = new FileIteratorI(this, reader, filename);
+ if(_servantLocator)
+ {
+ obj = _servantLocator->add(servant, current.con);
+ }
+ else
+ {
+ assert(_adapter);
+ obj = _adapter->addWithUUID(servant);
+ }
+ _iterators.insert(obj->ice_getIdentity());
+ return FileIteratorPrx::uncheckedCast(obj);
+}
+
+void
+AdminSessionI::removeFileIterator(const Ice::Identity& id, const Ice::Current& current)
+{
+ Lock sync(*this);
if(_servantLocator)
{
- _servantLocator->remove(_admin->ice_getIdentity());
+ _servantLocator->remove(id);
+ }
+ else
+ {
+ try
+ {
+ assert(_adapter);
+ _adapter->remove(id);
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ }
+ }
+ _iterators.erase(id);
+}
+
+void
+AdminSessionI::destroyImpl(bool shutdown)
+{
+ BaseSessionI::destroyImpl(shutdown);
+
+ try
+ {
+ _database->unlock(this);
}
- else if(current.adapter)
+ catch(AccessDeniedException&)
{
- current.adapter->remove(_admin->ice_getIdentity());
}
//
- // Unregister the iterators from the session servant locator or
- // object adapter.
+ // Unregister the admin servant from the session servant locator
+ // or object adapter.
//
- for(set<Ice::Identity>::const_iterator p = _iterators.begin(); p != _iterators.end(); ++p)
+ if(!shutdown)
{
if(_servantLocator)
{
- _servantLocator->remove(*p);
+ _servantLocator->remove(_admin->ice_getIdentity());
}
- else if(current.adapter)
+ else if(_adapter)
{
try
{
- current.adapter->remove(*p);
+ _adapter->remove(_admin->ice_getIdentity());
}
- catch(const Ice::LocalException&)
+ catch(const Ice::ObjectAdapterDeactivatedException&)
{
}
}
- }
- //
- // Unsubscribe from the topics.
- //
- if(current.adapter) // Not shutting down
- {
+ //
+ // Unregister the iterators from the session servant locator or
+ // object adapter.
+ //
+ for(set<Ice::Identity>::const_iterator p = _iterators.begin(); p != _iterators.end(); ++p)
+ {
+ if(_servantLocator)
+ {
+ _servantLocator->remove(*p);
+ }
+ else if(_adapter)
+ {
+ try
+ {
+ _adapter->remove(*p);
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ }
+ }
+ }
+
+ //
+ // Unsubscribe from the topics.
+ //
setupObserverSubscription(RegistryObserverTopicName, 0);
setupObserverSubscription(NodeObserverTopicName, 0);
setupObserverSubscription(ApplicationObserverTopicName, 0);
@@ -324,26 +421,17 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie
{
assert(_adapter);
- Ice::IdentitySeq ids; // Identities of the object the session is allowed to access.
-
- Ice::Identity id;
- id.category = _database->getInstanceName();
-
- // The per-session admin object.
- id.name = IceUtil::generateUUID();
AdminSessionIPtr session = createSessionServant(sessionId);
- AdminPrx admin = AdminPrx::uncheckedCast(_adapter->add(new AdminI(_database, _registry, session), id));
- session->setAdmin(admin);
- ids.push_back(id);
+ Ice::ObjectPrx proxy = session->registerWithObjectAdapter(_adapter, _registry);
- // The session admin object.
- id.name = IceUtil::generateUUID();
- Glacier2::SessionPrx s = Glacier2::SessionPrx::uncheckedCast(_adapter->add(session, id));
- ids.push_back(id);
+ Ice::Identity queryId;
+ queryId.category = _database->getInstanceName();
+ queryId.name = "Query";
- // The IceGrid::Query object
- id.name = "Query";
- ids.push_back(id);
+ Ice::IdentitySeq ids; // Identities of the object the session is allowed to access.
+ ids.push_back(queryId); // The IceGrid::Query object
+ ids.push_back(proxy->ice_getIdentity()); // The session object.
+ ids.push_back(session->getAdmin()->ice_getIdentity()); // The per-session admin object.
int timeout = 0;
if(ctl)
@@ -354,7 +442,7 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie
}
catch(const Ice::LocalException&)
{
- s->destroy();
+ session->destroy(Ice::Current());
return 0;
}
timeout = ctl->getSessionTimeout();
@@ -362,10 +450,10 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie
if(timeout > 0)
{
- _reaper->add(new SessionReapable(_adapter, session, s->ice_getIdentity()), timeout);
+ _reaper->add(new SessionReapable<AdminSessionI>(_database->getTraceLevels()->logger, session), timeout);
}
- return s;
+ return Glacier2::SessionPrx::uncheckedCast(proxy);
}
AdminSessionIPtr
@@ -419,70 +507,3 @@ AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info,
return _factory->createGlacier2Session(userDN, ctl);
}
-void
-AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer)
-{
- if(_observers[name] && _observers[name] != observer)
- {
- _database->getObserverTopic(name)->unsubscribe(_observers[name]);
- _observers[name] = 0;
- }
-
- if(observer)
- {
- _observers[name] = observer;
- _database->getObserverTopic(name)->subscribe(_observers[name]);
- }
-}
-
-Ice::ObjectPrx
-AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection)
-{
- return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id);
-}
-
-FileIteratorPrx
-AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, const Ice::Current& current)
-{
- Lock sync(*this);
- if(_destroyed)
- {
- Ice::ObjectNotExistException ex(__FILE__, __LINE__);
- ex.id = current.id;
- throw ex;
- }
-
- Ice::ObjectPrx obj;
- Ice::ObjectPtr servant = new FileIteratorI(this, reader, filename);
- if(_servantLocator)
- {
- obj = _servantLocator->add(servant, current.con);
- }
- else
- {
- obj = current.adapter->addWithUUID(servant);
- }
- _iterators.insert(obj->ice_getIdentity());
- return FileIteratorPrx::uncheckedCast(obj);
-}
-
-void
-AdminSessionI::removeFileIterator(const Ice::Identity& id, const Ice::Current& current)
-{
- Lock sync(*this);
- if(_servantLocator)
- {
- _servantLocator->remove(id);
- }
- else
- {
- try
- {
- current.adapter->remove(id);
- }
- catch(const Ice::LocalException&)
- {
- }
- }
- _iterators.erase(id);
-}
diff --git a/cpp/src/IceGrid/AdminSessionI.h b/cpp/src/IceGrid/AdminSessionI.h
index df7802cf960..dd6f388a041 100644
--- a/cpp/src/IceGrid/AdminSessionI.h
+++ b/cpp/src/IceGrid/AdminSessionI.h
@@ -31,11 +31,13 @@ public:
AdminSessionI(const std::string&, const DatabasePtr&, int, const std::string&);
virtual ~AdminSessionI();
- void setAdmin(const AdminPrx&);
+ virtual Ice::ObjectPrx registerWithServantLocator(const SessionServantLocatorIPtr&, const Ice::ConnectionPtr&,
+ const RegistryIPtr&);
+ virtual Ice::ObjectPrx registerWithObjectAdapter(const Ice::ObjectAdapterPtr&, const RegistryIPtr&);
virtual void keepAlive(const Ice::Current& current) { BaseSessionI::keepAlive(current); }
- virtual AdminPrx getAdmin(const Ice::Current&) const;
+ virtual AdminPrx getAdmin(const Ice::Current& = Ice::Current()) const;
virtual void setObservers(const RegistryObserverPrx&, const NodeObserverPrx&, const ApplicationObserverPrx&,
const AdapterObserverPrx&, const ObjectObserverPrx&, const Ice::Current&);
@@ -67,9 +69,11 @@ private:
Ice::ObjectPrx toProxy(const Ice::Identity&, const Ice::ConnectionPtr&);
FileIteratorPrx addFileIterator(const FileReaderPrx&, const std::string&, const Ice::Current&);
+ virtual void destroyImpl(bool);
+
const int _timeout;
- const AdminPrx _admin;
const std::string _replicaName;
+ AdminPrx _admin;
std::map<TopicName, Ice::ObjectPrx> _observers;
std::set<Ice::Identity> _iterators;
};
diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp
index 337db3a32ec..c23e68edc10 100644
--- a/cpp/src/IceGrid/IceGridNode.cpp
+++ b/cpp/src/IceGrid/IceGridNode.cpp
@@ -486,7 +486,7 @@ NodeService::start(int argc, char* argv[])
// Notify the node session manager that the node can start
// accepting incoming connections.
//
- _sessions.activated();
+ _sessions.activate();
string bundleName = properties->getProperty("IceGrid.Node.PrintServersReady");
if(!bundleName.empty() || !desc.empty())
diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice
index 9eac496d733..3eb99cd89e0 100644
--- a/cpp/src/IceGrid/Internal.ice
+++ b/cpp/src/IceGrid/Internal.ice
@@ -192,7 +192,28 @@ interface Server extends FileReader
interface InternalRegistry;
sequence<InternalRegistry*> InternalRegistryPrxSeq;
-interface Node extends FileReader
+interface ReplicaObserver
+{
+ void replicaInit(InternalRegistryPrxSeq replicas);
+
+ /**
+ *
+ * Notification that a replica has been added. The node should
+ * establish a session with this new replica.
+ *
+ **/
+ void replicaAdded(InternalRegistry* replica);
+
+ /**
+ *
+ * Notification that a replica has been removed. The node should
+ * destroy the session to this replica.
+ *
+ **/
+ void replicaRemoved(InternalRegistry* replica);
+};
+
+interface Node extends FileReader, ReplicaObserver
{
/**
*
@@ -238,22 +259,6 @@ interface Node extends FileReader
/**
*
- * Notification that a replica has been added. The node should
- * establish a session with this new replica.
- *
- **/
- void replicaAdded(InternalRegistry* replica);
-
- /**
- *
- * Notification that a replica has been removed. The node should
- * destroy the session to this replica.
- *
- **/
- void replicaRemoved(InternalRegistry* replica);
-
- /**
- *
* Get the node name.
*
**/
@@ -304,6 +309,13 @@ interface NodeSession
/**
*
+ * Set the replica observer.
+ *
+ **/
+ void setReplicaObserver(ReplicaObserver* observer);
+
+ /**
+ *
* Return the node session timeout.
*
**/
@@ -447,22 +459,20 @@ interface InternalRegistry extends FileReader
* is already registered, [registerNode] will overide the previous
* node only if it's not active.
*
- * @param name The name of the node to register.
- *
- * @param nd The proxy of the node.
- *
* @param info Some information on the node.
+ *
+ * @param prx The proxy of the node.
*
- * @return The name of the servers currently deployed on the node.
+ * @return The node session proxy.
*
* @throws NodeActiveException Raised if the node is already
* registered and currently active.
*
**/
- NodeSession* registerNode(string name, Node* nd, NodeInfo info)
+ NodeSession* registerNode(NodeInfo info, Node* prx)
throws NodeActiveException;
- ReplicaSession* registerReplica(string name, RegistryInfo info, InternalRegistry* prx)
+ ReplicaSession* registerReplica(RegistryInfo info, InternalRegistry* prx)
throws ReplicaActiveException;
void registerWithReplica(InternalRegistry* prx);
diff --git a/cpp/src/IceGrid/InternalRegistryI.cpp b/cpp/src/IceGrid/InternalRegistryI.cpp
index 4a6d55347fd..3172c52f648 100644
--- a/cpp/src/IceGrid/InternalRegistryI.cpp
+++ b/cpp/src/IceGrid/InternalRegistryI.cpp
@@ -23,73 +23,6 @@
using namespace std;
using namespace IceGrid;
-namespace IceGrid
-{
-
-template<class T>
-class SessionReapable : public Reapable
-{
- typedef IceUtil::Handle<T> TPtr;
-
-public:
-
- SessionReapable(const Ice::ObjectAdapterPtr& adapter, const TPtr& session, const Ice::ObjectPrx& proxy) :
- _adapter(adapter),
- _session(session),
- _proxy(proxy)
- {
- }
-
- virtual ~SessionReapable()
- {
- }
-
- virtual IceUtil::Time
- timestamp() const
- {
- return _session->timestamp();
- }
-
- virtual void
- destroy(bool destroy)
- {
- try
- {
- //
- // Invoke on the servant directly instead of the
- // proxy. Invoking on the proxy might not always work if the
- // communicator is being shutdown/destroyed. We have to create
- // a fake "current" because the session destroy methods needs
- // the adapter and object identity to unregister the servant
- // from the adapter.
- //
- Ice::Current current;
- if(!destroy)
- {
- current.adapter = _adapter;
- current.id = _proxy->ice_getIdentity();
- }
- _session->destroy(current);
- }
- catch(const Ice::ObjectNotExistException&)
- {
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_proxy->ice_getCommunicator()->getLogger());
- out << "unexpected exception while reaping session:\n" << ex;
- }
- }
-
-private:
-
- const Ice::ObjectAdapterPtr _adapter;
- const TPtr _session;
- const Ice::ObjectPrx _proxy;
-};
-
-}
-
InternalRegistryI::InternalRegistryI(const RegistryIPtr& registry,
const DatabasePtr& database,
const ReapThreadPtr& reaper,
@@ -112,17 +45,14 @@ InternalRegistryI::~InternalRegistryI()
}
NodeSessionPrx
-InternalRegistryI::registerNode(const std::string& name,
- const NodePrx& node,
- const NodeInfo& info,
- const Ice::Current& current)
+InternalRegistryI::registerNode(const NodeInfo& info, const NodePrx& node, const Ice::Current& current)
{
+ const Ice::LoggerPtr logger = _database->getTraceLevels()->logger;
try
{
- NodeSessionIPtr session = new NodeSessionI(_database, name, node, info, _nodeSessionTimeout);
- NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(current.adapter->addWithUUID(session));
- _reaper->add(new SessionReapable<NodeSessionI>(current.adapter, session, proxy), _nodeSessionTimeout);
- return proxy;
+ NodeSessionIPtr session = new NodeSessionI(_database, node, info, _nodeSessionTimeout);
+ _reaper->add(new SessionReapable<NodeSessionI>(logger, session), _nodeSessionTimeout);
+ return session->getProxy();
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
@@ -131,18 +61,16 @@ InternalRegistryI::registerNode(const std::string& name,
}
ReplicaSessionPrx
-InternalRegistryI::registerReplica(const std::string& name,
- const RegistryInfo& info,
- const InternalRegistryPrx& registry,
+InternalRegistryI::registerReplica(const RegistryInfo& info,
+ const InternalRegistryPrx& prx,
const Ice::Current& current)
{
+ const Ice::LoggerPtr logger = _database->getTraceLevels()->logger;
try
{
- ReplicaSessionIPtr session = new ReplicaSessionI(_database, _wellKnownObjects, name, info, registry,
- _replicaSessionTimeout);
- ReplicaSessionPrx proxy = ReplicaSessionPrx::uncheckedCast(current.adapter->addWithUUID(session));
- _reaper->add(new SessionReapable<ReplicaSessionI>(current.adapter, session, proxy), _replicaSessionTimeout);
- return proxy;
+ ReplicaSessionIPtr s = new ReplicaSessionI(_database, _wellKnownObjects, info, prx, _replicaSessionTimeout);
+ _reaper->add(new SessionReapable<ReplicaSessionI>(logger, s), _replicaSessionTimeout);
+ return s->getProxy();
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
diff --git a/cpp/src/IceGrid/InternalRegistryI.h b/cpp/src/IceGrid/InternalRegistryI.h
index a9585b3668d..27c470ab4e1 100644
--- a/cpp/src/IceGrid/InternalRegistryI.h
+++ b/cpp/src/IceGrid/InternalRegistryI.h
@@ -41,9 +41,8 @@ public:
const WellKnownObjectsManagerPtr&, ReplicaSessionManager&);
virtual ~InternalRegistryI();
- virtual NodeSessionPrx registerNode(const std::string&, const NodePrx&, const NodeInfo&, const Ice::Current&);
- virtual ReplicaSessionPrx registerReplica(const std::string&, const RegistryInfo&, const InternalRegistryPrx&,
- const Ice::Current&);
+ virtual NodeSessionPrx registerNode(const NodeInfo&, const NodePrx&, const Ice::Current&);
+ virtual ReplicaSessionPrx registerReplica(const RegistryInfo&, const InternalRegistryPrx&, const Ice::Current&);
virtual void registerWithReplica(const InternalRegistryPrx&, const Ice::Current&);
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp
index d0a8e000671..dfd740c97e5 100644
--- a/cpp/src/IceGrid/NodeCache.cpp
+++ b/cpp/src/IceGrid/NodeCache.cpp
@@ -258,20 +258,37 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
{
Lock sync(*this);
- if(session && _session)
+ if(session)
{
- if(_session->isDestroyed())
+ while(_session)
{
- // If the current session has just been destroyed, wait for the setSession(0) call.
- assert(session != _session);
- while(_session)
+ if(_session->isDestroyed())
{
+ // If the current session has just been destroyed, wait for the setSession(0) call.
+ assert(session != _session);
wait();
}
- }
- else
- {
- throw NodeActiveException();
+ else
+ {
+ NodeSessionIPtr session = _session;
+ sync.release();
+ try
+ {
+ session->getNode()->ice_ping();
+ throw NodeActiveException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ try
+ {
+ session->destroy();
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ }
+ }
+ sync.acquire();
+ }
}
}
else if(!session && !_session)
@@ -281,17 +298,11 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
if(!session && _session)
{
- _cache.getReplicaCache().nodeRemoved(_session->getNode());
}
_session = session;
notifyAll();
- if(_session)
- {
- _cache.getReplicaCache().nodeAdded(session->getNode());
- }
-
//
// Clear the saved proxy, the node has established a session
// so we won't need anymore to try to register it with this
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 81a29709767..e1ddd0cc112 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -493,9 +493,15 @@ NodeI::registerWithReplica(const InternalRegistryPrx& replica, const Ice::Curren
}
void
+NodeI::replicaInit(const InternalRegistryPrxSeq& replicas, const Ice::Current&)
+{
+ _sessions.replicaInit(replicas);
+}
+
+void
NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&)
{
- _sessions.replicaAdded(replica, false);
+ _sessions.replicaAdded(replica);
}
void
@@ -611,6 +617,12 @@ NodeI::getFileCache() const
return _fileCache;
}
+NodePrx
+NodeI::getProxy() const
+{
+ return _proxy;
+}
+
string
NodeI::getOutputDir() const
{
@@ -626,7 +638,7 @@ NodeI::getRedirectErrToOut() const
NodeSessionPrx
NodeI::registerWithRegistry(const InternalRegistryPrx& registry)
{
- return registry->registerNode(_name, _proxy, _platform.getNodeInfo());
+ return registry->registerNode(_platform.getNodeInfo(), _proxy);
}
void
diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h
index d1d1db2b3f6..72b08ab6c24 100644
--- a/cpp/src/IceGrid/NodeI.h
+++ b/cpp/src/IceGrid/NodeI.h
@@ -47,6 +47,8 @@ public:
const Ice::Current&);
virtual void registerWithReplica(const InternalRegistryPrx&, const Ice::Current&);
+
+ virtual void replicaInit(const InternalRegistryPrxSeq&, const Ice::Current&);
virtual void replicaAdded(const InternalRegistryPrx&, const Ice::Current&);
virtual void replicaRemoved(const InternalRegistryPrx&, const Ice::Current&);
@@ -67,6 +69,7 @@ public:
UserAccountMapperPrx getUserAccountMapper() const;
PlatformInfo& getPlatformInfo() const;
FileCachePtr getFileCache() const;
+ NodePrx getProxy() const;
std::string getOutputDir() const;
bool getRedirectErrToOut() const;
diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp
index fa3f29903d4..c0e916684b9 100644
--- a/cpp/src/IceGrid/NodeSessionI.cpp
+++ b/cpp/src/IceGrid/NodeSessionI.cpp
@@ -17,13 +17,11 @@ using namespace std;
using namespace IceGrid;
NodeSessionI::NodeSessionI(const DatabasePtr& database,
- const string& name,
const NodePrx& node,
const NodeInfo& info,
int timeout) :
_database(database),
_traceLevels(database->getTraceLevels()),
- _name(name),
_node(NodePrx::uncheckedCast(node->ice_timeout(timeout * 1000))),
_info(info),
_timeout(timeout),
@@ -33,12 +31,14 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database,
__setNoDelete(true);
try
{
- _database->getNode(name, true)->setSession(this);
+ _database->getNode(info.name, true)->setSession(this);
ObjectInfo info;
info.type = Node::ice_staticId();
info.proxy = _node;
_database->addInternalObject(info, true); // Add or update previous node proxy.
+
+ _proxy = NodeSessionPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(this));
}
catch(...)
{
@@ -63,11 +63,29 @@ NodeSessionI::keepAlive(const LoadInfo& load, const Ice::Current& current)
if(_traceLevels->node > 2)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "node `" << _name << "' keep alive ";
+ out << "node `" << _info.name << "' keep alive ";
out << "(load = " << _load.avg1 << ", " << _load.avg5 << ", " << _load.avg15 << ")";
}
}
+void
+NodeSessionI::setReplicaObserver(const ReplicaObserverPrx& observer, const Ice::Current&)
+{
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return;
+ }
+ else if(_replicaObserver) // This might happen on activation of the node.
+ {
+ assert(_replicaObserver == observer);
+ return;
+ }
+
+ _replicaObserver = observer;
+ _database->getReplicaCache().subscribe(observer);
+}
+
int
NodeSessionI::getTimeout(const Ice::Current& current) const
{
@@ -86,7 +104,7 @@ NodeSessionI::loadServers(const Ice::Current& current) const
//
// Get the server proxies to load them on the node.
//
- Ice::StringSeq servers = _database->getNode(_name)->getServers();
+ Ice::StringSeq servers = _database->getNode(_info.name)->getServers();
for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
try
@@ -103,7 +121,7 @@ NodeSessionI::loadServers(const Ice::Current& current) const
Ice::StringSeq
NodeSessionI::getServers(const Ice::Current& current) const
{
- return _database->getNode(_name)->getServers();
+ return _database->getNode(_info.name)->getServers();
}
void
@@ -116,10 +134,63 @@ NodeSessionI::waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplic
}
void
-NodeSessionI::destroy(const Ice::Current& current)
+NodeSessionI::destroy(const Ice::Current&)
{
- const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp
+ destroyImpl(false);
+}
+IceUtil::Time
+NodeSessionI::timestamp() const
+{
+ Lock sync(*this);
+ if(_destroy)
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ return _timestamp;
+}
+
+void
+NodeSessionI::shutdown()
+{
+ destroyImpl(true);
+}
+
+const NodePrx&
+NodeSessionI::getNode() const
+{
+ return _node;
+}
+
+const NodeInfo&
+NodeSessionI::getInfo() const
+{
+ return _info;
+}
+
+const LoadInfo&
+NodeSessionI::getLoadInfo() const
+{
+ Lock sync(*this);
+ return _load;
+}
+
+NodeSessionPrx
+NodeSessionI::getProxy() const
+{
+ return _proxy;
+}
+
+bool
+NodeSessionI::isDestroyed() const
+{
+ Lock sync(*this);
+ return _destroy;
+}
+
+void
+NodeSessionI::destroyImpl(bool shutdown)
+{
{
Lock sync(*this);
if(_destroy)
@@ -129,7 +200,7 @@ NodeSessionI::destroy(const Ice::Current& current)
_destroy = true;
}
- Ice::StringSeq servers = _database->getNode(_name)->getServers();
+ Ice::StringSeq servers = _database->getNode(_info.name)->getServers();
for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
try
@@ -154,61 +225,32 @@ NodeSessionI::destroy(const Ice::Current& current)
//
// Next we notify the observer.
//
- NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_name);
-
+ NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_info.name);
+
+ //
+ // Unsubscribe the node replica observer.
+ //
+ if(_replicaObserver)
+ {
+ _database->getReplicaCache().unsubscribe(_replicaObserver);
+ _replicaObserver = 0;
+ }
//
// Finally, we clear the session, this must be done last. As soon
// as the node entry session is set to 0 another session might be
// created.
//
- _database->getNode(_name)->setSession(0);
+ _database->getNode(_info.name)->setSession(0);
if(!shutdown)
{
try
{
- current.adapter->remove(current.id);
+ _database->getInternalAdapter()->remove(_proxy->ice_getIdentity());
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
}
}
}
-
-const NodePrx&
-NodeSessionI::getNode() const
-{
- return _node;
-}
-
-const NodeInfo&
-NodeSessionI::getInfo() const
-{
- return _info;
-}
-
-const LoadInfo&
-NodeSessionI::getLoadInfo() const
-{
- Lock sync(*this);
- return _load;
-}
-
-IceUtil::Time
-NodeSessionI::timestamp() const
-{
- Lock sync(*this);
- if(_destroy)
- {
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
- return _timestamp;
-}
-
-bool
-NodeSessionI::isDestroyed() const
-{
- Lock sync(*this);
- return _destroy;
-}
diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h
index eeb66cf5bac..d5aaa028965 100644
--- a/cpp/src/IceGrid/NodeSessionI.h
+++ b/cpp/src/IceGrid/NodeSessionI.h
@@ -25,24 +25,31 @@ class NodeSessionI : public NodeSession, public IceUtil::Mutex
{
public:
- NodeSessionI(const DatabasePtr&, const std::string&, const NodePrx&, const NodeInfo&, int);
+ NodeSessionI(const DatabasePtr&, const NodePrx&, const NodeInfo&, int);
virtual void keepAlive(const LoadInfo&, const Ice::Current&);
+ virtual void setReplicaObserver(const ReplicaObserverPrx&, const Ice::Current&);
virtual int getTimeout(const Ice::Current& = Ice::Current()) const;
virtual NodeObserverPrx getObserver(const Ice::Current&) const;
virtual void loadServers(const Ice::Current&) const;
virtual Ice::StringSeq getServers(const Ice::Current&) const;
virtual void waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr&,
const std::string&, int, const Ice::Current&) const;
- virtual void destroy(const Ice::Current&);
+ virtual void destroy(const Ice::Current& = Ice::Current());
+ virtual IceUtil::Time timestamp() const;
+ virtual void shutdown();
+
const NodePrx& getNode() const;
const NodeInfo& getInfo() const;
const LoadInfo& getLoadInfo() const;
- virtual IceUtil::Time timestamp() const;
+ NodeSessionPrx getProxy() const;
+
bool isDestroyed() const;
private:
+
+ void destroyImpl(bool);
const DatabasePtr _database;
const TraceLevelsPtr _traceLevels;
@@ -50,6 +57,8 @@ private:
const NodePrx _node;
const NodeInfo _info;
const int _timeout;
+ NodeSessionPrx _proxy;
+ ReplicaObserverPrx _replicaObserver;
IceUtil::Time _timestamp;
LoadInfo _load;
bool _destroy;
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 6ee09c7fa3c..967a15dae5c 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -237,10 +237,10 @@ NodeSessionManager::create(const NodeIPtr& node)
_thread->start();
//
- // We can't wait for the session to be created here as the node
- // adapter isn't activated yet and the registry would hang trying
- // to load the servers on the node (when createSession invokes
- // loadServers() on the session).
+ // Try to create the session. It's important that we wait for the
+ // creation of the session as this will also try to create sessions
+ // with replicas (see createdSession below) and this must be done
+ // before the node is activated.
//
_thread->tryCreateSession(true);
}
@@ -258,17 +258,23 @@ NodeSessionManager::create(const InternalRegistryPrx& replica)
}
else
{
- replicaAdded(replica, true);
+ createReplicaSession(replica, true);
}
}
void
-NodeSessionManager::activated()
+NodeSessionManager::activate()
{
{
Lock sync(*this);
_activated = true;
}
+
+ //
+ // Get the master session, if it's not created, try to create it
+ // again and make sure that the servers are synchronized and the
+ // replica observer is set on the session.
+ //
NodeSessionPrx session = _thread->getSession();
if(!session)
{
@@ -277,7 +283,14 @@ NodeSessionManager::activated()
}
if(session)
{
- syncServers(session);
+ try
+ {
+ session->setReplicaObserver(_node->getProxy());
+ syncServers(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
}
@@ -321,121 +334,119 @@ NodeSessionManager::destroy()
}
void
-NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica, bool waitTryCreateSession)
+NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
{
- Lock sync(*this);
- if(_destroyed)
{
- return;
- }
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
- ++_serial;
- NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
- NodeSessionKeepAliveThreadPtr thread;
- if(p != _sessions.end())
- {
- thread = p->second;
- thread->setRegistry(replica);
+ //
+ // Initialize the set of replicas known by the master.
+ //
+ _replicas.clear();
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ _replicas.insert((*p)->ice_getIdentity());
+ }
}
- else
+
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
- _sessions.insert(make_pair(replica->ice_getIdentity(), thread));
- thread->start();
+ createReplicaSession(*p, false);
}
- thread->tryCreateSession(waitTryCreateSession);
}
void
-NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
+NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
{
- NodeSessionKeepAliveThreadPtr thread;
{
Lock sync(*this);
if(_destroyed)
{
return;
}
+ _replicas.insert(replica->ice_getIdentity());
+ }
+
+ createReplicaSession(replica, false);
+}
- ++_serial;
- NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity());
- if(p != _sessions.end())
+void
+NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica)
+{
+ {
+ Lock sync(*this);
+ if(_destroyed)
{
- thread = p->second;
- _sessions.erase(p);
+ return;
}
+ _replicas.erase(replica->ice_getIdentity());
}
- if(thread)
- {
- _node->removeObserver(thread->getSession()); // Needs to be done here because we don't destroy the session.
- thread->terminate(false); // Don't destroy the session, the replica is being shutdown!
- thread->getThreadControl().join();
- }
+
+ //
+ // We don't remove the session here. It will eventually be reaped
+ // by reapReplicas() if the session is dead.
+ //
}
void
-NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
-{
- NodeSessionMap sessions;
- _sessions.swap(sessions);
+NodeSessionManager::createReplicaSession(const InternalRegistryPrx& replica, bool waitTryCreateSession)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+ NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
NodeSessionKeepAliveThreadPtr thread;
- vector<NodeSessionKeepAliveThreadPtr> newSessions;
- for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ if(p != _sessions.end())
{
- if((*p)->ice_getIdentity() == _master->ice_getIdentity())
- {
- continue;
- }
- NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity());
- if(q != sessions.end())
- {
- thread = q->second;
- sessions.erase((*p)->ice_getIdentity());
- }
- else
- {
- thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects);
- thread->start();
- thread->tryCreateSession(false);
- newSessions.push_back(thread);
- }
- _sessions.insert(make_pair((*p)->ice_getIdentity(), thread));
+ thread = p->second;
+ thread->setRegistry(replica);
}
+ else
+ {
+ thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
+ _sessions.insert(make_pair(replica->ice_getIdentity(), thread));
+ thread->start();
+ }
+ thread->tryCreateSession(waitTryCreateSession);
+}
- NodeSessionMap::iterator q = sessions.begin();
- while(q != sessions.end())
+void
+NodeSessionManager::reapReplicas()
+{
+ vector<NodeSessionKeepAliveThreadPtr> reap;
{
- if(q->second->getSession()) // Don't destroy sessions which are still alive!
+ Lock sync(*this);
+ if(_destroyed)
{
- _sessions.insert(make_pair(q->first, q->second));
- sessions.erase(q++);
+ return;
}
- else
+
+ NodeSessionMap::iterator q = _sessions.begin();
+ while(q != _sessions.end())
{
- ++q;
+ if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected())
+ {
+ _node->removeObserver(q->second->getSession());
+ reap.push_back(q->second);
+ _sessions.erase(q++);
+ }
+ else
+ {
+ ++q;
+ }
}
}
- for(q = sessions.begin(); q != sessions.end(); ++q)
- {
- q->second->terminate();
- }
- for(q = sessions.begin(); q != sessions.end(); ++q)
- {
- q->second->getThreadControl().join();
- }
- //
- // If the node is being started, we wait for the new sessions to
- // be created. This ensures that once the node is activated all
- // the known replicas are connected.
- //
- if(!_activated)
+ for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p)
{
- for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator t = newSessions.begin(); t != newSessions.end(); ++t)
- {
- (*t)->tryCreateSession(true);
- }
+ (*p)->getThreadControl().join();
}
}
@@ -454,99 +465,130 @@ NodeSessionManager::syncServers(const NodeSessionPrx& session)
// established their session with the node.
//
assert(session);
- try
- {
- session->loadServers();
- _node->checkConsistency(session);
- }
- catch(const Ice::LocalException&)
- {
- }
+ session->loadServers();
+ _node->checkConsistency(session);
}
void
NodeSessionManager::createdSession(const NodeSessionPrx& session)
{
+ bool activated;
+ {
+ Lock sync(*this);
+ activated = _activated;
+ }
+
//
- // Get the list of replicas (either with the master session or the
- // IceGrid::Query interface) and make sure we have sessions opened
- // to these replicas.
+ // Synchronize the servers if the session is active and if the
+ // node adapter has been activated (otherwise, the servers will be
+ // synced after the node adapter activation, see activate()).
//
- try
+ // We also set the replica observer to receive notifications of
+ // replica addition/removal.
+ //
+ if(session && activated)
+ {
+ try
+ {
+ session->setReplicaObserver(_node->getProxy());
+ syncServers(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ return;
+ }
+
+ //
+ // If there's no master session or if the node adapter isn't
+ // activated yet, we retrieve a list of the replicas either from
+ // the master or from the known replicas (the ones configured with
+ // Ice.Default.Locator) and we try to establish connections to
+ // each of the replicas.
+ //
+
+ InternalRegistryPrxSeq replicas;
+ if(session)
{
- unsigned long serial = 0;
- InternalRegistryPrxSeq replicas;
- while(true)
+ assert(!activated); // The node adapter isn't activated yet so
+ // we're not subscribed yet to the replica
+ // observer topic.
+ try
{
+ replicas = _thread->getRegistry()->getReplicas();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ else
+ {
+ map<Ice::Identity, Ice::ObjectPrx> proxies;
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ try
{
- Lock sync(*this);
- if(serial == _serial)
+ Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
{
- NodeSessionManager& self = const_cast<NodeSessionManager&>(*this);
- self._serial = 1;
- self.syncReplicas(replicas);
- break;
+ //
+ // NOTE: We might override a good proxy here! We could improve this to make
+ // sure that we don't override the proxy for replica N if that proxy was
+ // obtained from replica N.
+ //
+ proxies[(*q)->ice_getIdentity()] = *q;
}
- serial = _serial;
}
-
- if(session)
+ catch(const Ice::LocalException&)
{
- replicas = _thread->getRegistry()->getReplicas();
+ // IGNORE
}
- else
+ }
+
+ for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
+ {
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
+ }
+ }
+
+ {
+ Lock sync(*this);
+
+ //
+ // If the node adapter was activated since we last check, we don't need
+ // to initialize the replicas here, it will be done by replicaInit().
+ //
+ if(!session || !_activated)
+ {
+ _replicas.clear();
+ for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
- replicas.clear();
- map<Ice::Identity, Ice::ObjectPrx> proxies;
- for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
- {
- try
- {
- Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q)
- {
- //
- // NOTE: We might override a good proxy
- // here! We could improve this to make
- // sure that we don't override the proxy
- // for replica N if that proxy was
- // obtained from replica N.
- //
- proxies[(*q)->ice_getIdentity()] = *q;
- }
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
- }
- for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q)
+ if((*p)->ice_getIdentity() != _master->ice_getIdentity())
{
- replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second));
+ _replicas.insert((*p)->ice_getIdentity());
}
}
}
}
- catch(const Ice::LocalException&)
- {
- // IGNORE
- }
-
+
//
- // Synchronize the servers if the session is active and if the
- // node adapter has been activated (otherwise, the servers will be
- // synced after the node adapter activation, see activated()).
+ // Create the replica sessions and wait for the creation. It's
+ // important to wait to ensure that the replica sessions are
+ // created before the node adapter is activated.
//
- if(session)
+ InternalRegistryPrxSeq::const_iterator t;
+ for(t = replicas.begin(); t != replicas.end(); ++t)
{
- bool activated;
+ if((*t)->ice_getIdentity() != _master->ice_getIdentity())
{
- Lock sync(*this);
- activated = _activated;
+ createReplicaSession(*t, false);
}
- if(activated)
+ }
+ for(t = replicas.begin(); t != replicas.end(); ++t)
+ {
+ if((*t)->ice_getIdentity() != _master->ice_getIdentity())
{
- syncServers(session);
+ createReplicaSession(*t, true);
}
}
}
diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h
index 5d28f060aca..ea5895e25d9 100644
--- a/cpp/src/IceGrid/NodeSessionManager.h
+++ b/cpp/src/IceGrid/NodeSessionManager.h
@@ -54,19 +54,23 @@ public:
void create(const NodeIPtr&);
void create(const InternalRegistryPrx&);
- void activated();
+ void activate();
bool waitForCreate();
void terminate();
void destroy();
- void replicaAdded(const InternalRegistryPrx&, bool);
+ void replicaInit(const InternalRegistryPrxSeq&);
+ void replicaAdded(const InternalRegistryPrx&);
void replicaRemoved(const InternalRegistryPrx&);
NodeSessionPrx getMasterNodeSession() const { return _thread->getSession(); }
private:
- void syncReplicas(const InternalRegistryPrxSeq&);
+ void createReplicaSession(const InternalRegistryPrx&, bool);
+
+ void reapReplicas();
+
void syncServers(const NodeSessionPrx&);
class Thread : public NodeSessionKeepAliveThread
@@ -84,9 +88,25 @@ private:
{
NodeSessionPrx session = NodeSessionKeepAliveThread::createSession(master, timeout);
_manager.createdSession(session);
+ _manager.reapReplicas();
return session;
}
+ virtual void
+ destroySession(const NodeSessionPrx& session)
+ {
+ NodeSessionKeepAliveThread::destroySession(session);
+ _manager.reapReplicas();
+ }
+
+ virtual bool
+ keepAlive(const NodeSessionPrx& session)
+ {
+ bool alive = NodeSessionKeepAliveThread::keepAlive(session);
+ _manager.reapReplicas();
+ return alive;
+ }
+
private:
NodeSessionManager& _manager;
@@ -106,6 +126,7 @@ private:
typedef std::map<Ice::Identity, NodeSessionKeepAliveThreadPtr> NodeSessionMap;
NodeSessionMap _sessions;
+ std::set<Ice::Identity> _replicas;
};
}
diff --git a/cpp/src/IceGrid/PlatformInfo.cpp b/cpp/src/IceGrid/PlatformInfo.cpp
index da02ec5f2d4..c5fc269efc6 100644
--- a/cpp/src/IceGrid/PlatformInfo.cpp
+++ b/cpp/src/IceGrid/PlatformInfo.cpp
@@ -42,28 +42,28 @@ using namespace IceGrid;
namespace IceGrid
{
-string
-getLocalizedPerfName(const map<string, string>& perfNames, const string& name)
-{
- unsigned long idx;
- map<string, string>::const_iterator p = perfNames.find(name);
- if(p == perfNames.end())
+ string
+ getLocalizedPerfName(const map<string, string>& perfNames, const string& name)
{
- return "";
- }
- istringstream is(p->second);
- is >> idx;
+ unsigned long idx;
+ map<string, string>::const_iterator p = perfNames.find(name);
+ if(p == perfNames.end())
+ {
+ return "";
+ }
+ istringstream is(p->second);
+ is >> idx;
- vector<char> localized;
- unsigned long size = 256;
- localized.resize(size);
- while(PdhLookupPerfNameByIndex(0, idx, &localized[0], &size) == PDH_MORE_DATA)
- {
- size += 256;
+ vector<char> localized;
+ unsigned long size = 256;
localized.resize(size);
+ while(PdhLookupPerfNameByIndex(0, idx, &localized[0], &size) == PDH_MORE_DATA)
+ {
+ size += 256;
+ localized.resize(size);
+ }
+ return string(&localized[0]);
}
- return string(&localized[0]);
-}
};
diff --git a/cpp/src/IceGrid/ReapThread.h b/cpp/src/IceGrid/ReapThread.h
index e39812e0128..164ffe40fc8 100644
--- a/cpp/src/IceGrid/ReapThread.h
+++ b/cpp/src/IceGrid/ReapThread.h
@@ -13,6 +13,11 @@
#include <IceUtil/Thread.h>
#include <IceUtil/Mutex.h>
#include <IceUtil/Monitor.h>
+
+#include <Ice/Logger.h>
+#include <Ice/LocalException.h>
+#include <Ice/LoggerUtil.h>
+
#include <list>
namespace IceGrid
@@ -30,6 +35,58 @@ public:
};
typedef IceUtil::Handle<Reapable> ReapablePtr;
+template<class T>
+class SessionReapable : public Reapable
+{
+ typedef IceUtil::Handle<T> TPtr;
+
+public:
+
+ SessionReapable(const Ice::LoggerPtr& logger, const TPtr& session) :
+ _logger(logger), _session(session)
+ {
+ }
+
+ virtual ~SessionReapable()
+ {
+ }
+
+ virtual IceUtil::Time
+ timestamp() const
+ {
+ return _session->timestamp();
+ }
+
+ virtual void
+ destroy(bool shutdown)
+ {
+ try
+ {
+ if(shutdown)
+ {
+ _session->shutdown();
+ }
+ else
+ {
+ _session->destroy(Ice::Current());
+ }
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ Ice::Warning out(_logger);
+ out << "unexpected exception while reaping session:\n" << ex;
+ }
+ }
+
+private:
+
+ const Ice::LoggerPtr _logger;
+ const TPtr _session;
+};
+
class ReapThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index 9a857ced479..91724513ee4 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -704,13 +704,12 @@ RegistryI::createSession(const string& user, const string& password, const Curre
}
SessionIPtr session = _clientSessionFactory->createSessionServant(user, 0);
- session->setServantLocator(_sessionServantLocator);
- SessionPrx proxy = SessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con));
+ Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con);
if(_sessionTimeout > 0)
{
- _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout);
+ _reaper->add(new SessionReapable<SessionI>(_traceLevels->logger, session), _sessionTimeout);
}
- return proxy;
+ return SessionPrx::uncheckedCast(proxy);
}
AdminSessionPrx
@@ -758,15 +757,12 @@ RegistryI::createAdminSession(const string& user, const string& password, const
}
AdminSessionIPtr session = _adminSessionFactory->createSessionServant(user);
- ObjectPrx admin = _sessionServantLocator->add(new AdminI(_database, this, session), current.con);
- session->setAdmin(AdminPrx::uncheckedCast(admin));
- session->setServantLocator(_sessionServantLocator);
- AdminSessionPrx proxy = AdminSessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con));
+ Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con, this);
if(_sessionTimeout > 0)
{
- _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout);
+ _reaper->add(new SessionReapable<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout);
}
- return proxy;
+ return AdminSessionPrx::uncheckedCast(proxy);
}
SessionPrx
@@ -823,13 +819,12 @@ RegistryI::createSessionFromSecureConnection(const Current& current)
}
SessionIPtr session = _clientSessionFactory->createSessionServant(userDN, 0);
- session->setServantLocator(_sessionServantLocator);
- SessionPrx proxy = SessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con));
+ Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con);
if(_sessionTimeout > 0)
{
- _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout);
+ _reaper->add(new SessionReapable<SessionI>(_traceLevels->logger, session), _sessionTimeout);
}
- return proxy;
+ return SessionPrx::uncheckedCast(proxy);
}
AdminSessionPrx
@@ -875,15 +870,12 @@ RegistryI::createAdminSessionFromSecureConnection(const Current& current)
// We let the connection access the administrative interface.
//
AdminSessionIPtr session = _adminSessionFactory->createSessionServant(userDN);
- ObjectPrx admin = _sessionServantLocator->add(new AdminI(_database, this, session), current.con);
- session->setAdmin(AdminPrx::uncheckedCast(admin));
- session->setServantLocator(_sessionServantLocator);
- AdminSessionPrx proxy = AdminSessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con));
+ Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con, this);
if(_sessionTimeout > 0)
{
- _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout);
+ _reaper->add(new SessionReapable<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout);
}
- return proxy;
+ return AdminSessionPrx::uncheckedCast(proxy);
}
int
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index cb4ace333da..9e21cf98d5b 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -23,15 +23,15 @@ ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator, const IceSt
IceStorm::TopicPrx t;
try
{
- t = topicManager->create("NodeNotifier");
+ t = topicManager->create("ReplicaObserverTopic");
}
catch(const IceStorm::TopicExists&)
{
- t = topicManager->retrieve("NodeNotifier");
+ t = topicManager->retrieve("ReplicaObserverTopic");
}
- const_cast<IceStorm::TopicPrx&>(_topic) = t;
- const_cast<NodePrx&>(_nodes) = NodePrx::uncheckedCast(_topic->getPublisher());
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
+ const_cast<ReplicaObserverPrx&>(_observers) = ReplicaObserverPrx::uncheckedCast(_topic->getPublisher());
}
ReplicaEntryPtr
@@ -39,22 +39,38 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session)
{
Lock sync(*this);
- while(true)
+ ReplicaEntryPtr entry;
+ while(entry = getImpl(name))
{
- ReplicaEntryPtr entry = getImpl(name);
- if(entry)
+ ReplicaSessionIPtr session = entry->getSession();
+ if(session->isDestroyed())
{
- if(entry->getSession()->isDestroyed())
+ wait(); // Wait for the session to be removed.
+ }
+ else
+ {
+ //
+ // Check if the replica is still reachable, if not, we
+ // destroy its session.
+ //
+ sync.release();
+ try
{
- wait();
- continue;
+ session->getInternalRegistry()->ice_ping();
+ throw ReplicaActiveException();
}
- else
+ catch(const Ice::LocalException&)
{
- throw ReplicaActiveException();
+ try
+ {
+ session->destroy();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
+ sync.acquire();
}
- break;
}
if(_traceLevels && _traceLevels->replica > 0)
@@ -65,7 +81,7 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session)
try
{
- _nodes->replicaAdded(session->getInternalRegistry());
+ _observers->replicaAdded(session->getInternalRegistry());
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -104,7 +120,7 @@ ReplicaCache::remove(const string& name, bool shutdown)
{
try
{
- _nodes->replicaRemoved(entry->getProxy());
+ _observers->replicaRemoved(entry->getProxy());
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -139,13 +155,21 @@ ReplicaCache::get(const string& name) const
}
void
-ReplicaCache::nodeAdded(const NodePrx& node)
+ReplicaCache::subscribe(const ReplicaObserverPrx& observer)
{
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
try
{
- _topic->subscribe(qos, node);
+ Lock sync(*this);
+ InternalRegistryPrxSeq replicas;
+ for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p)
+ {
+ replicas.push_back(p->second->getProxy());
+ }
+
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer);
+ ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas);
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -157,17 +181,17 @@ ReplicaCache::nodeAdded(const NodePrx& node)
if(traceLevels)
{
Ice::Warning out(traceLevels->logger);
- out << "unexpected exception while subscribing node from replica observer topic:\n" << ex;
+ out << "unexpected exception while subscribing observer from replica observer topic:\n" << ex;
}
}
}
void
-ReplicaCache::nodeRemoved(const NodePrx& node)
+ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer)
{
try
{
- _topic->unsubscribe(node);
+ _topic->unsubscribe(observer);
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -179,7 +203,7 @@ ReplicaCache::nodeRemoved(const NodePrx& node)
if(traceLevels)
{
Ice::Warning out(traceLevels->logger);
- out << "unexpected exception while unsubscribing node from replica observer topic:\n" << ex;
+ out << "unexpected exception while unsubscribing observer from replica observer topic:\n" << ex;
}
}
}
diff --git a/cpp/src/IceGrid/ReplicaCache.h b/cpp/src/IceGrid/ReplicaCache.h
index 2a91e36a83e..9a13423bd44 100644
--- a/cpp/src/IceGrid/ReplicaCache.h
+++ b/cpp/src/IceGrid/ReplicaCache.h
@@ -53,8 +53,8 @@ public:
ReplicaEntryPtr remove(const std::string&, bool);
ReplicaEntryPtr get(const std::string&) const;
- void nodeAdded(const NodePrx&);
- void nodeRemoved(const NodePrx&);
+ void subscribe(const ReplicaObserverPrx&);
+ void unsubscribe(const ReplicaObserverPrx&);
Ice::ObjectPrx getEndpoints(const std::string&, const Ice::ObjectPrx&) const;
@@ -65,7 +65,7 @@ private:
const Ice::CommunicatorPtr _communicator;
const IceStorm::TopicPrx _topic;
- const NodePrx _nodes;
+ const ReplicaObserverPrx _observers;
InternalRegistryPrx _self; // This replica internal registry proxy.
};
diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp
index 6ba795b30b4..9b1696dd653 100644
--- a/cpp/src/IceGrid/ReplicaSessionI.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionI.cpp
@@ -29,14 +29,12 @@ operator==(const ObjectInfo& info, const Ice::Identity& id)
ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database,
const WellKnownObjectsManagerPtr& wellKnownObjects,
- const string& name,
const RegistryInfo& info,
const InternalRegistryPrx& proxy,
int timeout) :
_database(database),
_wellKnownObjects(wellKnownObjects),
_traceLevels(database->getTraceLevels()),
- _name(name),
_internalRegistry(InternalRegistryPrx::uncheckedCast(proxy->ice_timeout(timeout * 1000))),
_info(info),
_timeout(timeout),
@@ -46,9 +44,11 @@ ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database,
__setNoDelete(true);
try
{
- _database->getReplicaCache().add(name, this);
+ _database->getReplicaCache().add(info.name, this);
ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName);
RegistryObserverTopicPtr::dynamicCast(obsv)->registryUp(_info);
+
+ _proxy = ReplicaSessionPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(this));
}
catch(...)
{
@@ -72,7 +72,7 @@ ReplicaSessionI::keepAlive(const Ice::Current& current)
if(_traceLevels->replica > 2)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
- out << "replica `" << _name << "' keep alive ";
+ out << "replica `" << _info.name << "' keep alive ";
}
}
@@ -91,9 +91,9 @@ ReplicaSessionI::setDatabaseObserver(const DatabaseObserverPrx& observer, const
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
_observer = observer;
- _database->getObserverTopic(ApplicationObserverTopicName)->subscribe(_observer, _name);
- _database->getObserverTopic(AdapterObserverTopicName)->subscribe(_observer, _name);
- _database->getObserverTopic(ObjectObserverTopicName)->subscribe(_observer, _name);
+ _database->getObserverTopic(ApplicationObserverTopicName)->subscribe(_observer, _info.name);
+ _database->getObserverTopic(AdapterObserverTopicName)->subscribe(_observer, _info.name);
+ _database->getObserverTopic(ObjectObserverTopicName)->subscribe(_observer, _info.name);
}
void
@@ -130,7 +130,7 @@ ReplicaSessionI::registerWellKnownObjects(const ObjectInfoSeq& objects, const Ic
// are correctly setup when the replica starts accepting requests
// from clients (if the replica is being started).
//
- _database->getObserverTopic(ObjectObserverTopicName)->waitForSyncedSubscribers(serial, _name);
+ _database->getObserverTopic(ObjectObserverTopicName)->waitForSyncedSubscribers(serial, _info.name);
}
void
@@ -152,15 +152,72 @@ ReplicaSessionI::receivedUpdate(TopicName topicName, int serial, const string& f
ObserverTopicPtr topic = _database->getObserverTopic(topicName);
if(topic)
{
- topic->receivedUpdate(_name, serial, failure);
+ topic->receivedUpdate(_info.name, serial, failure);
}
}
void
-ReplicaSessionI::destroy(const Ice::Current& current)
+ReplicaSessionI::destroy(const Ice::Current&)
+{
+ destroyImpl(false);
+}
+
+IceUtil::Time
+ReplicaSessionI::timestamp() const
+{
+ Lock sync(*this);
+ if(_destroy)
+ {
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ return _timestamp;
+}
+
+void
+ReplicaSessionI::shutdown()
+{
+ destroyImpl(true);
+}
+
+const InternalRegistryPrx&
+ReplicaSessionI::getInternalRegistry() const
{
- const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp
+ return _internalRegistry;
+}
+
+const RegistryInfo&
+ReplicaSessionI::getInfo() const
+{
+ return _info;
+}
+
+ReplicaSessionPrx
+ReplicaSessionI::getProxy() const
+{
+ return _proxy;
+}
+Ice::ObjectPrx
+ReplicaSessionI::getEndpoint(const std::string& name)
+{
+ Lock sync(*this);
+ if(_destroy)
+ {
+ return 0;
+ }
+ return _replicaEndpoints[name];
+}
+
+bool
+ReplicaSessionI::isDestroyed() const
+{
+ Lock sync(*this);
+ return _destroy;
+}
+
+void
+ReplicaSessionI::destroyImpl(bool shutdown)
+{
{
Lock sync(*this);
if(_destroy)
@@ -172,9 +229,9 @@ ReplicaSessionI::destroy(const Ice::Current& current)
if(_observer)
{
- _database->getObserverTopic(ApplicationObserverTopicName)->unsubscribe(_observer, _name);
- _database->getObserverTopic(AdapterObserverTopicName)->unsubscribe(_observer, _name);
- _database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _name);
+ _database->getObserverTopic(ApplicationObserverTopicName)->unsubscribe(_observer, _info.name);
+ _database->getObserverTopic(AdapterObserverTopicName)->unsubscribe(_observer, _info.name);
+ _database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _info.name);
}
if(!_replicaWellKnownObjects.empty())
@@ -200,52 +257,23 @@ ReplicaSessionI::destroy(const Ice::Current& current)
// Notify the observer that the registry is down.
//
ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName);
- RegistryObserverTopicPtr::dynamicCast(obsv)->registryDown(_name);
+ RegistryObserverTopicPtr::dynamicCast(obsv)->registryDown(_info.name);
//
// Remove the replica from the cache. This must be done last. As
// soon as the replica is removed another session might be
// created.
//
- _database->getReplicaCache().remove(_name, shutdown);
+ _database->getReplicaCache().remove(_info.name, shutdown);
- if(current.adapter)
+ if(!shutdown)
{
try
{
- current.adapter->remove(current.id);
+ _database->getInternalAdapter()->remove(_proxy->ice_getIdentity());
}
catch(const Ice::ObjectAdapterDeactivatedException&)
{
}
}
}
-
-IceUtil::Time
-ReplicaSessionI::timestamp() const
-{
- Lock sync(*this);
- if(_destroy)
- {
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
- return _timestamp;
-}
-
-Ice::ObjectPrx
-ReplicaSessionI::getEndpoint(const std::string& name)
-{
- Lock sync(*this);
- if(_destroy)
- {
- return 0;
- }
- return _replicaEndpoints[name];
-}
-
-bool
-ReplicaSessionI::isDestroyed() const
-{
- Lock sync(*this);
- return _destroy;
-}
diff --git a/cpp/src/IceGrid/ReplicaSessionI.h b/cpp/src/IceGrid/ReplicaSessionI.h
index 25553f0843a..fa70707218e 100644
--- a/cpp/src/IceGrid/ReplicaSessionI.h
+++ b/cpp/src/IceGrid/ReplicaSessionI.h
@@ -29,7 +29,7 @@ class ReplicaSessionI : public ReplicaSession, public IceUtil::Mutex
{
public:
- ReplicaSessionI(const DatabasePtr&, const WellKnownObjectsManagerPtr&, const std::string&, const RegistryInfo&,
+ ReplicaSessionI(const DatabasePtr&, const WellKnownObjectsManagerPtr&, const RegistryInfo&,
const InternalRegistryPrx&, int);
virtual void keepAlive(const Ice::Current&);
@@ -40,25 +40,29 @@ public:
virtual void setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&,
const Ice::Current&);
virtual void receivedUpdate(TopicName, int, const std::string&, const Ice::Current&);
- virtual void destroy(const Ice::Current&);
+ virtual void destroy(const Ice::Current& = Ice::Current());
virtual IceUtil::Time timestamp() const;
+ virtual void shutdown();
- const InternalRegistryPrx& getInternalRegistry() const { return _internalRegistry; }
- const RegistryInfo& getInfo() const { return _info; }
+ const InternalRegistryPrx& getInternalRegistry() const;
+ const RegistryInfo& getInfo() const;
+ ReplicaSessionPrx getProxy() const;
Ice::ObjectPrx getEndpoint(const std::string&);
bool isDestroyed() const;
private:
+
+ void destroyImpl(bool);
const DatabasePtr _database;
const WellKnownObjectsManagerPtr _wellKnownObjects;
const TraceLevelsPtr _traceLevels;
- const std::string _name;
const InternalRegistryPrx _internalRegistry;
const RegistryInfo _info;
const int _timeout;
+ ReplicaSessionPrx _proxy;
DatabaseObserverPrx _observer;
ObjectInfoSeq _replicaWellKnownObjects;
StringObjectProxyDict _replicaEndpoints;
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index 31802da355c..fa99a035092 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -333,9 +333,10 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica)
NodePrxSeq
ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const
{
+ assert(_thread && _thread->getRegistry());
try
{
- return _master->getNodes();
+ return _thread->getRegistry()->getNodes();
}
catch(const Ice::LocalException&)
{
@@ -526,7 +527,7 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic
{
try
{
- ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry);
+ ReplicaSessionPrx session = registry->registerReplica(_info, _internalRegistry);
int t = session->getTimeout();
if(t > 0)
{
diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp
index 9f099595fa4..7eb1fc72c77 100644
--- a/cpp/src/IceGrid/SessionI.cpp
+++ b/cpp/src/IceGrid/SessionI.cpp
@@ -106,29 +106,32 @@ BaseSessionI::keepAlive(const Ice::Current& current)
}
void
-BaseSessionI::destroy(const Ice::Current& current)
+BaseSessionI::destroyImpl(bool shutdown)
{
Lock sync(*this);
if(_destroyed)
{
Ice::ObjectNotExistException ex(__FILE__, __LINE__);
- ex.id = current.id;
+ ex.id = _identity;
throw ex;
}
_destroyed = true;
- if(_servantLocator)
- {
- _servantLocator->remove(current.id);
- }
- else if(current.adapter)
+ if(!shutdown)
{
- try
+ if(_servantLocator)
{
- current.adapter->remove(current.id);
+ _servantLocator->remove(_identity);
}
- catch(const Ice::ObjectAdapterDeactivatedException&)
+ else if(_adapter)
{
+ try
+ {
+ _adapter->remove(_identity);
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ }
}
}
@@ -147,63 +150,33 @@ BaseSessionI::timestamp() const
}
void
-BaseSessionI::setServantLocator(const SessionServantLocatorIPtr& servantLocator)
+BaseSessionI::shutdown()
{
- //
- // This is supposed to be called after creation only.
- //
- const_cast<SessionServantLocatorIPtr&>(_servantLocator) = servantLocator;
+ destroyImpl(true);
}
-SessionReapable::SessionReapable(const Ice::ObjectAdapterPtr& adapter,
- const Ice::ObjectPtr& session,
- const Ice::Identity& id) :
- _adapter(adapter),
- _servant(session),
- _session(dynamic_cast<BaseSessionI*>(_servant.get())),
- _id(id)
+Ice::ObjectPrx
+BaseSessionI::registerWithServantLocator(const SessionServantLocatorIPtr& servantLoc, const Ice::ConnectionPtr& con)
{
+ //
+ // This is supposed to be called after creation only, no need to synchronize.
+ //
+ _servantLocator = servantLoc;
+ Ice::ObjectPrx proxy = servantLoc->add(this, con);
+ _identity = proxy->ice_getIdentity();
+ return proxy;
}
-SessionReapable::~SessionReapable()
-{
-}
-
-IceUtil::Time
-SessionReapable::timestamp() const
-{
- return _session->timestamp();
-}
-
-void
-SessionReapable::destroy(bool destroy)
+Ice::ObjectPrx
+BaseSessionI::registerWithObjectAdapter(const Ice::ObjectAdapterPtr& adapter)
{
- try
- {
- //
- // Invoke on the servant directly instead of the
- // proxy. Invoking on the proxy might not always work if the
- // communicator is being shutdown/destroyed. We have to create
- // a fake "current" because the session destroy methods needs
- // the adapter and object identity to unregister the servant
- // from the adapter.
- //
- Ice::Current current;
- if(!destroy)
- {
- current.adapter = _adapter;
- current.id = _id;
- }
- _session->destroy(current);
- }
- catch(const Ice::ObjectNotExistException&)
- {
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_adapter->getCommunicator()->getLogger());
- out << "unexpected exception while reaping node session:\n" << ex;
- }
+ //
+ // This is supposed to be called after creation only, no need to synchronize.
+ //
+ _adapter = adapter;
+ _identity.category = _database->getInstanceName();
+ _identity.name = IceUtil::generateUUID();
+ return _adapter->add(this, _identity);
}
SessionI::SessionI(const string& id,
@@ -251,33 +224,9 @@ SessionI::setAllocationTimeout(int timeout, const Ice::Current&)
}
void
-SessionI::destroy(const Ice::Current& current)
+SessionI::destroy(const Ice::Current&)
{
- BaseSessionI::destroy(current);
-
- //
- // NOTE: The _requests and _allocations attributes are immutable
- // once the session is destroyed so we don't need mutex protection
- // here to access them.
- //
-
- for(set<AllocationRequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- (*p)->cancel(AllocationException("session destroyed"));
- }
- _requests.clear();
-
- for(set<AllocatablePtr>::const_iterator q = _allocations.begin(); q != _allocations.end(); ++q)
- {
- try
- {
- (*q)->release(this);
- }
- catch(const AllocationException&)
- {
- }
- }
- _allocations.clear();
+ destroyImpl(false);
}
int
@@ -332,6 +281,36 @@ SessionI::removeAllocation(const AllocatablePtr& allocatable)
_allocations.erase(allocatable);
}
+void
+SessionI::destroyImpl(bool shutdown)
+{
+ BaseSessionI::destroyImpl(shutdown);
+
+ //
+ // NOTE: The _requests and _allocations attributes are immutable
+ // once the session is destroyed so we don't need mutex protection
+ // here to access them.
+ //
+
+ for(set<AllocationRequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ (*p)->cancel(AllocationException("session destroyed"));
+ }
+ _requests.clear();
+
+ for(set<AllocatablePtr>::const_iterator q = _allocations.begin(); q != _allocations.end(); ++q)
+ {
+ try
+ {
+ (*q)->release(this);
+ }
+ catch(const AllocationException&)
+ {
+ }
+ }
+ _allocations.clear();
+}
+
ClientSessionFactory::ClientSessionFactory(const Ice::ObjectAdapterPtr& adapter,
const DatabasePtr& database,
const WaitQueuePtr& waitQueue,
@@ -348,20 +327,16 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const Glaci
{
assert(_adapter);
- Ice::IdentitySeq ids; // Identities of the object the session is allowed to access.
-
- Ice::Identity id;
- id.category = _database->getInstanceName();
-
- // The session object
- id.name = IceUtil::generateUUID();
- ids.push_back(id);
SessionIPtr session = createSessionServant(sessionId, ctl);
- Glacier2::SessionPrx s = Glacier2::SessionPrx::uncheckedCast(_adapter->add(session, id));
+ Ice::ObjectPrx proxy = session->registerWithObjectAdapter(_adapter);
- // The IceGrid::Query object
- id.name = "Query";
- ids.push_back(id);
+ Ice::Identity queryId;
+ queryId.category = _database->getInstanceName();
+ queryId.name = "Query";
+
+ Ice::IdentitySeq ids; // Identities of the object the session is allowed to access.
+ ids.push_back(proxy->ice_getIdentity()); // Session object
+ ids.push_back(queryId);
int timeout = 0;
if(ctl)
@@ -372,7 +347,7 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const Glaci
}
catch(const Ice::LocalException&)
{
- s->destroy();
+ session->destroy(Ice::Current());
return 0;
}
timeout = ctl->getSessionTimeout();
@@ -380,10 +355,10 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const Glaci
if(timeout > 0)
{
- _reaper->add(new SessionReapable(_adapter, session, s->ice_getIdentity()), timeout);
+ _reaper->add(new SessionReapable<SessionI>(_database->getTraceLevels()->logger, session), timeout);
}
- return s;
+ return Glacier2::SessionPrx::uncheckedCast(proxy);
}
SessionIPtr
@@ -413,7 +388,8 @@ ClientSSLSessionManagerI::ClientSSLSessionManagerI(const ClientSessionFactoryPtr
}
Glacier2::SessionPrx
-ClientSSLSessionManagerI::create(const Glacier2::SSLInfo& info, const Glacier2::SessionControlPrx& ctl,
+ClientSSLSessionManagerI::create(const Glacier2::SSLInfo& info,
+ const Glacier2::SessionControlPrx& ctl,
const Ice::Current& current)
{
string userDN;
diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h
index 4391f2d414b..5ebdf00195f 100644
--- a/cpp/src/IceGrid/SessionI.h
+++ b/cpp/src/IceGrid/SessionI.h
@@ -36,29 +36,35 @@ typedef IceUtil::Handle<WaitQueue> WaitQueuePtr;
class SessionI;
typedef IceUtil::Handle<SessionI> SessionIPtr;
-class BaseSessionI : public IceUtil::Mutex
+class BaseSessionI : virtual Ice::Object, public IceUtil::Mutex
{
public:
virtual ~BaseSessionI();
virtual void keepAlive(const Ice::Current&);
- virtual void destroy(const Ice::Current&);
IceUtil::Time timestamp() const;
- void setServantLocator(const SessionServantLocatorIPtr&);
+ void shutdown();
+
+ virtual Ice::ObjectPrx registerWithServantLocator(const SessionServantLocatorIPtr&, const Ice::ConnectionPtr&);
+ virtual Ice::ObjectPrx registerWithObjectAdapter(const Ice::ObjectAdapterPtr&);
const std::string& getId() const { return _id; }
protected:
+ virtual void destroyImpl(bool);
+
BaseSessionI(const std::string&, const std::string&, const DatabasePtr&);
const std::string _id;
const std::string _prefix;
const TraceLevelsPtr _traceLevels;
const DatabasePtr _database;
- const SessionServantLocatorIPtr _servantLocator;
+ SessionServantLocatorIPtr _servantLocator;
+ Ice::ObjectAdapterPtr _adapter;
+ Ice::Identity _identity;
bool _destroyed;
IceUtil::Time _timestamp;
};
@@ -68,24 +74,6 @@ class SessionDestroyedException
{
};
-class SessionReapable : public Reapable
-{
-public:
-
- SessionReapable(const Ice::ObjectAdapterPtr&, const Ice::ObjectPtr&, const Ice::Identity&);
- virtual ~SessionReapable();
-
- virtual IceUtil::Time timestamp() const;
- virtual void destroy(bool);
-
-private:
-
- const Ice::ObjectAdapterPtr _adapter;
- const Ice::ObjectPtr _servant;
- BaseSessionI* _session;
- const Ice::Identity _id;
-};
-
class SessionI : public BaseSessionI, public Session
{
public:
@@ -115,6 +103,8 @@ public:
protected:
+ virtual void destroyImpl(bool);
+
const WaitQueuePtr _waitQueue;
const Glacier2::SessionControlPrx _sessionControl;
const Ice::ConnectionPtr _connection;
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index 7d659d593ea..84dafe96310 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -54,7 +54,7 @@ public:
{
TPrx session;
InternalRegistryPrx registry;
- IceUtil::Time timeout = IceUtil::Time::seconds(10);
+ IceUtil::Time timeout = IceUtil::Time::seconds(15);
Action action = Connect;
while(true)
@@ -215,6 +215,21 @@ public:
notifyAll();
}
+ bool
+ terminateIfDisconnected()
+ {
+ Lock sync(*this);
+ if(_state != Disconnected)
+ {
+ return false; // Nothing we can do for now.
+ }
+ assert(_state != Destroyed);
+ _state = Destroyed;
+ _nextAction = None;
+ notifyAll();
+ return true;
+ }
+
void
terminate(bool destroySession = true)
{