diff options
Diffstat (limited to 'cpp/src')
24 files changed, 887 insertions, 698 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index e3b4b7a87de..ce9317907f4 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -59,10 +59,25 @@ AdminSessionI::~AdminSessionI() { } -void -AdminSessionI::setAdmin(const AdminPrx& admin) +Ice::ObjectPrx +AdminSessionI::registerWithServantLocator(const SessionServantLocatorIPtr& servantLoc, + const Ice::ConnectionPtr& con, + const RegistryIPtr& registry) +{ + Ice::ObjectPrx proxy = BaseSessionI::registerWithServantLocator(servantLoc, con); + _admin = AdminPrx::uncheckedCast(servantLoc->add(new AdminI(_database, registry, this), con)); + return proxy; +} + +Ice::ObjectPrx +AdminSessionI::registerWithObjectAdapter(const Ice::ObjectAdapterPtr& adapter, const RegistryIPtr& registry) { - const_cast<AdminPrx&>(_admin) = admin; + Ice::ObjectPrx proxy = BaseSessionI::registerWithObjectAdapter(adapter); + Ice::Identity identity; + identity.category = _database->getInstanceName(); + identity.name = IceUtil::generateUUID(); + _admin = AdminPrx::uncheckedCast(adapter->add(new AdminI(_database, registry, this), identity)); + return proxy; } AdminPrx @@ -247,58 +262,140 @@ AdminSessionI::openRegistryStdErr(const std::string& name, const Ice::Current& c } void -AdminSessionI::destroy(const Ice::Current& current) +AdminSessionI::destroy(const Ice::Current&) { - BaseSessionI::destroy(current); - - try + destroyImpl(false); +} + +void +AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer) +{ + if(_observers[name] && _observers[name] != observer) { - _database->unlock(this); + _database->getObserverTopic(name)->unsubscribe(_observers[name]); + _observers[name] = 0; } - catch(AccessDeniedException&) + + if(observer) { + _observers[name] = observer; + _database->getObserverTopic(name)->subscribe(_observers[name]); } +} - // - // Unregister the admin servant from the session servant locator - // or object adapter. - // +Ice::ObjectPrx +AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection) +{ + return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id); +} + +FileIteratorPrx +AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, const Ice::Current& current) +{ + Lock sync(*this); + if(_destroyed) + { + Ice::ObjectNotExistException ex(__FILE__, __LINE__); + ex.id = current.id; + throw ex; + } + + Ice::ObjectPrx obj; + Ice::ObjectPtr servant = new FileIteratorI(this, reader, filename); + if(_servantLocator) + { + obj = _servantLocator->add(servant, current.con); + } + else + { + assert(_adapter); + obj = _adapter->addWithUUID(servant); + } + _iterators.insert(obj->ice_getIdentity()); + return FileIteratorPrx::uncheckedCast(obj); +} + +void +AdminSessionI::removeFileIterator(const Ice::Identity& id, const Ice::Current& current) +{ + Lock sync(*this); if(_servantLocator) { - _servantLocator->remove(_admin->ice_getIdentity()); + _servantLocator->remove(id); + } + else + { + try + { + assert(_adapter); + _adapter->remove(id); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + } + } + _iterators.erase(id); +} + +void +AdminSessionI::destroyImpl(bool shutdown) +{ + BaseSessionI::destroyImpl(shutdown); + + try + { + _database->unlock(this); } - else if(current.adapter) + catch(AccessDeniedException&) { - current.adapter->remove(_admin->ice_getIdentity()); } // - // Unregister the iterators from the session servant locator or - // object adapter. + // Unregister the admin servant from the session servant locator + // or object adapter. // - for(set<Ice::Identity>::const_iterator p = _iterators.begin(); p != _iterators.end(); ++p) + if(!shutdown) { if(_servantLocator) { - _servantLocator->remove(*p); + _servantLocator->remove(_admin->ice_getIdentity()); } - else if(current.adapter) + else if(_adapter) { try { - current.adapter->remove(*p); + _adapter->remove(_admin->ice_getIdentity()); } - catch(const Ice::LocalException&) + catch(const Ice::ObjectAdapterDeactivatedException&) { } } - } - // - // Unsubscribe from the topics. - // - if(current.adapter) // Not shutting down - { + // + // Unregister the iterators from the session servant locator or + // object adapter. + // + for(set<Ice::Identity>::const_iterator p = _iterators.begin(); p != _iterators.end(); ++p) + { + if(_servantLocator) + { + _servantLocator->remove(*p); + } + else if(_adapter) + { + try + { + _adapter->remove(*p); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + } + } + } + + // + // Unsubscribe from the topics. + // setupObserverSubscription(RegistryObserverTopicName, 0); setupObserverSubscription(NodeObserverTopicName, 0); setupObserverSubscription(ApplicationObserverTopicName, 0); @@ -324,26 +421,17 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie { assert(_adapter); - Ice::IdentitySeq ids; // Identities of the object the session is allowed to access. - - Ice::Identity id; - id.category = _database->getInstanceName(); - - // The per-session admin object. - id.name = IceUtil::generateUUID(); AdminSessionIPtr session = createSessionServant(sessionId); - AdminPrx admin = AdminPrx::uncheckedCast(_adapter->add(new AdminI(_database, _registry, session), id)); - session->setAdmin(admin); - ids.push_back(id); + Ice::ObjectPrx proxy = session->registerWithObjectAdapter(_adapter, _registry); - // The session admin object. - id.name = IceUtil::generateUUID(); - Glacier2::SessionPrx s = Glacier2::SessionPrx::uncheckedCast(_adapter->add(session, id)); - ids.push_back(id); + Ice::Identity queryId; + queryId.category = _database->getInstanceName(); + queryId.name = "Query"; - // The IceGrid::Query object - id.name = "Query"; - ids.push_back(id); + Ice::IdentitySeq ids; // Identities of the object the session is allowed to access. + ids.push_back(queryId); // The IceGrid::Query object + ids.push_back(proxy->ice_getIdentity()); // The session object. + ids.push_back(session->getAdmin()->ice_getIdentity()); // The per-session admin object. int timeout = 0; if(ctl) @@ -354,7 +442,7 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie } catch(const Ice::LocalException&) { - s->destroy(); + session->destroy(Ice::Current()); return 0; } timeout = ctl->getSessionTimeout(); @@ -362,10 +450,10 @@ AdminSessionFactory::createGlacier2Session(const string& sessionId, const Glacie if(timeout > 0) { - _reaper->add(new SessionReapable(_adapter, session, s->ice_getIdentity()), timeout); + _reaper->add(new SessionReapable<AdminSessionI>(_database->getTraceLevels()->logger, session), timeout); } - return s; + return Glacier2::SessionPrx::uncheckedCast(proxy); } AdminSessionIPtr @@ -419,70 +507,3 @@ AdminSSLSessionManagerI::create(const Glacier2::SSLInfo& info, return _factory->createGlacier2Session(userDN, ctl); } -void -AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer) -{ - if(_observers[name] && _observers[name] != observer) - { - _database->getObserverTopic(name)->unsubscribe(_observers[name]); - _observers[name] = 0; - } - - if(observer) - { - _observers[name] = observer; - _database->getObserverTopic(name)->subscribe(_observers[name]); - } -} - -Ice::ObjectPrx -AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection) -{ - return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id); -} - -FileIteratorPrx -AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, const Ice::Current& current) -{ - Lock sync(*this); - if(_destroyed) - { - Ice::ObjectNotExistException ex(__FILE__, __LINE__); - ex.id = current.id; - throw ex; - } - - Ice::ObjectPrx obj; - Ice::ObjectPtr servant = new FileIteratorI(this, reader, filename); - if(_servantLocator) - { - obj = _servantLocator->add(servant, current.con); - } - else - { - obj = current.adapter->addWithUUID(servant); - } - _iterators.insert(obj->ice_getIdentity()); - return FileIteratorPrx::uncheckedCast(obj); -} - -void -AdminSessionI::removeFileIterator(const Ice::Identity& id, const Ice::Current& current) -{ - Lock sync(*this); - if(_servantLocator) - { - _servantLocator->remove(id); - } - else - { - try - { - current.adapter->remove(id); - } - catch(const Ice::LocalException&) - { - } - } - _iterators.erase(id); -} diff --git a/cpp/src/IceGrid/AdminSessionI.h b/cpp/src/IceGrid/AdminSessionI.h index df7802cf960..dd6f388a041 100644 --- a/cpp/src/IceGrid/AdminSessionI.h +++ b/cpp/src/IceGrid/AdminSessionI.h @@ -31,11 +31,13 @@ public: AdminSessionI(const std::string&, const DatabasePtr&, int, const std::string&); virtual ~AdminSessionI(); - void setAdmin(const AdminPrx&); + virtual Ice::ObjectPrx registerWithServantLocator(const SessionServantLocatorIPtr&, const Ice::ConnectionPtr&, + const RegistryIPtr&); + virtual Ice::ObjectPrx registerWithObjectAdapter(const Ice::ObjectAdapterPtr&, const RegistryIPtr&); virtual void keepAlive(const Ice::Current& current) { BaseSessionI::keepAlive(current); } - virtual AdminPrx getAdmin(const Ice::Current&) const; + virtual AdminPrx getAdmin(const Ice::Current& = Ice::Current()) const; virtual void setObservers(const RegistryObserverPrx&, const NodeObserverPrx&, const ApplicationObserverPrx&, const AdapterObserverPrx&, const ObjectObserverPrx&, const Ice::Current&); @@ -67,9 +69,11 @@ private: Ice::ObjectPrx toProxy(const Ice::Identity&, const Ice::ConnectionPtr&); FileIteratorPrx addFileIterator(const FileReaderPrx&, const std::string&, const Ice::Current&); + virtual void destroyImpl(bool); + const int _timeout; - const AdminPrx _admin; const std::string _replicaName; + AdminPrx _admin; std::map<TopicName, Ice::ObjectPrx> _observers; std::set<Ice::Identity> _iterators; }; diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index 337db3a32ec..c23e68edc10 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -486,7 +486,7 @@ NodeService::start(int argc, char* argv[]) // Notify the node session manager that the node can start // accepting incoming connections. // - _sessions.activated(); + _sessions.activate(); string bundleName = properties->getProperty("IceGrid.Node.PrintServersReady"); if(!bundleName.empty() || !desc.empty()) diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index 9eac496d733..3eb99cd89e0 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -192,7 +192,28 @@ interface Server extends FileReader interface InternalRegistry; sequence<InternalRegistry*> InternalRegistryPrxSeq; -interface Node extends FileReader +interface ReplicaObserver +{ + void replicaInit(InternalRegistryPrxSeq replicas); + + /** + * + * Notification that a replica has been added. The node should + * establish a session with this new replica. + * + **/ + void replicaAdded(InternalRegistry* replica); + + /** + * + * Notification that a replica has been removed. The node should + * destroy the session to this replica. + * + **/ + void replicaRemoved(InternalRegistry* replica); +}; + +interface Node extends FileReader, ReplicaObserver { /** * @@ -238,22 +259,6 @@ interface Node extends FileReader /** * - * Notification that a replica has been added. The node should - * establish a session with this new replica. - * - **/ - void replicaAdded(InternalRegistry* replica); - - /** - * - * Notification that a replica has been removed. The node should - * destroy the session to this replica. - * - **/ - void replicaRemoved(InternalRegistry* replica); - - /** - * * Get the node name. * **/ @@ -304,6 +309,13 @@ interface NodeSession /** * + * Set the replica observer. + * + **/ + void setReplicaObserver(ReplicaObserver* observer); + + /** + * * Return the node session timeout. * **/ @@ -447,22 +459,20 @@ interface InternalRegistry extends FileReader * is already registered, [registerNode] will overide the previous * node only if it's not active. * - * @param name The name of the node to register. - * - * @param nd The proxy of the node. - * * @param info Some information on the node. + * + * @param prx The proxy of the node. * - * @return The name of the servers currently deployed on the node. + * @return The node session proxy. * * @throws NodeActiveException Raised if the node is already * registered and currently active. * **/ - NodeSession* registerNode(string name, Node* nd, NodeInfo info) + NodeSession* registerNode(NodeInfo info, Node* prx) throws NodeActiveException; - ReplicaSession* registerReplica(string name, RegistryInfo info, InternalRegistry* prx) + ReplicaSession* registerReplica(RegistryInfo info, InternalRegistry* prx) throws ReplicaActiveException; void registerWithReplica(InternalRegistry* prx); diff --git a/cpp/src/IceGrid/InternalRegistryI.cpp b/cpp/src/IceGrid/InternalRegistryI.cpp index 4a6d55347fd..3172c52f648 100644 --- a/cpp/src/IceGrid/InternalRegistryI.cpp +++ b/cpp/src/IceGrid/InternalRegistryI.cpp @@ -23,73 +23,6 @@ using namespace std; using namespace IceGrid; -namespace IceGrid -{ - -template<class T> -class SessionReapable : public Reapable -{ - typedef IceUtil::Handle<T> TPtr; - -public: - - SessionReapable(const Ice::ObjectAdapterPtr& adapter, const TPtr& session, const Ice::ObjectPrx& proxy) : - _adapter(adapter), - _session(session), - _proxy(proxy) - { - } - - virtual ~SessionReapable() - { - } - - virtual IceUtil::Time - timestamp() const - { - return _session->timestamp(); - } - - virtual void - destroy(bool destroy) - { - try - { - // - // Invoke on the servant directly instead of the - // proxy. Invoking on the proxy might not always work if the - // communicator is being shutdown/destroyed. We have to create - // a fake "current" because the session destroy methods needs - // the adapter and object identity to unregister the servant - // from the adapter. - // - Ice::Current current; - if(!destroy) - { - current.adapter = _adapter; - current.id = _proxy->ice_getIdentity(); - } - _session->destroy(current); - } - catch(const Ice::ObjectNotExistException&) - { - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_proxy->ice_getCommunicator()->getLogger()); - out << "unexpected exception while reaping session:\n" << ex; - } - } - -private: - - const Ice::ObjectAdapterPtr _adapter; - const TPtr _session; - const Ice::ObjectPrx _proxy; -}; - -} - InternalRegistryI::InternalRegistryI(const RegistryIPtr& registry, const DatabasePtr& database, const ReapThreadPtr& reaper, @@ -112,17 +45,14 @@ InternalRegistryI::~InternalRegistryI() } NodeSessionPrx -InternalRegistryI::registerNode(const std::string& name, - const NodePrx& node, - const NodeInfo& info, - const Ice::Current& current) +InternalRegistryI::registerNode(const NodeInfo& info, const NodePrx& node, const Ice::Current& current) { + const Ice::LoggerPtr logger = _database->getTraceLevels()->logger; try { - NodeSessionIPtr session = new NodeSessionI(_database, name, node, info, _nodeSessionTimeout); - NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); - _reaper->add(new SessionReapable<NodeSessionI>(current.adapter, session, proxy), _nodeSessionTimeout); - return proxy; + NodeSessionIPtr session = new NodeSessionI(_database, node, info, _nodeSessionTimeout); + _reaper->add(new SessionReapable<NodeSessionI>(logger, session), _nodeSessionTimeout); + return session->getProxy(); } catch(const Ice::ObjectAdapterDeactivatedException&) { @@ -131,18 +61,16 @@ InternalRegistryI::registerNode(const std::string& name, } ReplicaSessionPrx -InternalRegistryI::registerReplica(const std::string& name, - const RegistryInfo& info, - const InternalRegistryPrx& registry, +InternalRegistryI::registerReplica(const RegistryInfo& info, + const InternalRegistryPrx& prx, const Ice::Current& current) { + const Ice::LoggerPtr logger = _database->getTraceLevels()->logger; try { - ReplicaSessionIPtr session = new ReplicaSessionI(_database, _wellKnownObjects, name, info, registry, - _replicaSessionTimeout); - ReplicaSessionPrx proxy = ReplicaSessionPrx::uncheckedCast(current.adapter->addWithUUID(session)); - _reaper->add(new SessionReapable<ReplicaSessionI>(current.adapter, session, proxy), _replicaSessionTimeout); - return proxy; + ReplicaSessionIPtr s = new ReplicaSessionI(_database, _wellKnownObjects, info, prx, _replicaSessionTimeout); + _reaper->add(new SessionReapable<ReplicaSessionI>(logger, s), _replicaSessionTimeout); + return s->getProxy(); } catch(const Ice::ObjectAdapterDeactivatedException&) { diff --git a/cpp/src/IceGrid/InternalRegistryI.h b/cpp/src/IceGrid/InternalRegistryI.h index a9585b3668d..27c470ab4e1 100644 --- a/cpp/src/IceGrid/InternalRegistryI.h +++ b/cpp/src/IceGrid/InternalRegistryI.h @@ -41,9 +41,8 @@ public: const WellKnownObjectsManagerPtr&, ReplicaSessionManager&); virtual ~InternalRegistryI(); - virtual NodeSessionPrx registerNode(const std::string&, const NodePrx&, const NodeInfo&, const Ice::Current&); - virtual ReplicaSessionPrx registerReplica(const std::string&, const RegistryInfo&, const InternalRegistryPrx&, - const Ice::Current&); + virtual NodeSessionPrx registerNode(const NodeInfo&, const NodePrx&, const Ice::Current&); + virtual ReplicaSessionPrx registerReplica(const RegistryInfo&, const InternalRegistryPrx&, const Ice::Current&); virtual void registerWithReplica(const InternalRegistryPrx&, const Ice::Current&); diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index d0a8e000671..dfd740c97e5 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -258,20 +258,37 @@ NodeEntry::setSession(const NodeSessionIPtr& session) { Lock sync(*this); - if(session && _session) + if(session) { - if(_session->isDestroyed()) + while(_session) { - // If the current session has just been destroyed, wait for the setSession(0) call. - assert(session != _session); - while(_session) + if(_session->isDestroyed()) { + // If the current session has just been destroyed, wait for the setSession(0) call. + assert(session != _session); wait(); } - } - else - { - throw NodeActiveException(); + else + { + NodeSessionIPtr session = _session; + sync.release(); + try + { + session->getNode()->ice_ping(); + throw NodeActiveException(); + } + catch(const Ice::LocalException&) + { + try + { + session->destroy(); + } + catch(const Ice::ObjectNotExistException&) + { + } + } + sync.acquire(); + } } } else if(!session && !_session) @@ -281,17 +298,11 @@ NodeEntry::setSession(const NodeSessionIPtr& session) if(!session && _session) { - _cache.getReplicaCache().nodeRemoved(_session->getNode()); } _session = session; notifyAll(); - if(_session) - { - _cache.getReplicaCache().nodeAdded(session->getNode()); - } - // // Clear the saved proxy, the node has established a session // so we won't need anymore to try to register it with this diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 81a29709767..e1ddd0cc112 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -493,9 +493,15 @@ NodeI::registerWithReplica(const InternalRegistryPrx& replica, const Ice::Curren } void +NodeI::replicaInit(const InternalRegistryPrxSeq& replicas, const Ice::Current&) +{ + _sessions.replicaInit(replicas); +} + +void NodeI::replicaAdded(const InternalRegistryPrx& replica, const Ice::Current&) { - _sessions.replicaAdded(replica, false); + _sessions.replicaAdded(replica); } void @@ -611,6 +617,12 @@ NodeI::getFileCache() const return _fileCache; } +NodePrx +NodeI::getProxy() const +{ + return _proxy; +} + string NodeI::getOutputDir() const { @@ -626,7 +638,7 @@ NodeI::getRedirectErrToOut() const NodeSessionPrx NodeI::registerWithRegistry(const InternalRegistryPrx& registry) { - return registry->registerNode(_name, _proxy, _platform.getNodeInfo()); + return registry->registerNode(_platform.getNodeInfo(), _proxy); } void diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index d1d1db2b3f6..72b08ab6c24 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -47,6 +47,8 @@ public: const Ice::Current&); virtual void registerWithReplica(const InternalRegistryPrx&, const Ice::Current&); + + virtual void replicaInit(const InternalRegistryPrxSeq&, const Ice::Current&); virtual void replicaAdded(const InternalRegistryPrx&, const Ice::Current&); virtual void replicaRemoved(const InternalRegistryPrx&, const Ice::Current&); @@ -67,6 +69,7 @@ public: UserAccountMapperPrx getUserAccountMapper() const; PlatformInfo& getPlatformInfo() const; FileCachePtr getFileCache() const; + NodePrx getProxy() const; std::string getOutputDir() const; bool getRedirectErrToOut() const; diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index fa3f29903d4..c0e916684b9 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -17,13 +17,11 @@ using namespace std; using namespace IceGrid; NodeSessionI::NodeSessionI(const DatabasePtr& database, - const string& name, const NodePrx& node, const NodeInfo& info, int timeout) : _database(database), _traceLevels(database->getTraceLevels()), - _name(name), _node(NodePrx::uncheckedCast(node->ice_timeout(timeout * 1000))), _info(info), _timeout(timeout), @@ -33,12 +31,14 @@ NodeSessionI::NodeSessionI(const DatabasePtr& database, __setNoDelete(true); try { - _database->getNode(name, true)->setSession(this); + _database->getNode(info.name, true)->setSession(this); ObjectInfo info; info.type = Node::ice_staticId(); info.proxy = _node; _database->addInternalObject(info, true); // Add or update previous node proxy. + + _proxy = NodeSessionPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(this)); } catch(...) { @@ -63,11 +63,29 @@ NodeSessionI::keepAlive(const LoadInfo& load, const Ice::Current& current) if(_traceLevels->node > 2) { Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "node `" << _name << "' keep alive "; + out << "node `" << _info.name << "' keep alive "; out << "(load = " << _load.avg1 << ", " << _load.avg5 << ", " << _load.avg15 << ")"; } } +void +NodeSessionI::setReplicaObserver(const ReplicaObserverPrx& observer, const Ice::Current&) +{ + Lock sync(*this); + if(_destroy) + { + return; + } + else if(_replicaObserver) // This might happen on activation of the node. + { + assert(_replicaObserver == observer); + return; + } + + _replicaObserver = observer; + _database->getReplicaCache().subscribe(observer); +} + int NodeSessionI::getTimeout(const Ice::Current& current) const { @@ -86,7 +104,7 @@ NodeSessionI::loadServers(const Ice::Current& current) const // // Get the server proxies to load them on the node. // - Ice::StringSeq servers = _database->getNode(_name)->getServers(); + Ice::StringSeq servers = _database->getNode(_info.name)->getServers(); for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) { try @@ -103,7 +121,7 @@ NodeSessionI::loadServers(const Ice::Current& current) const Ice::StringSeq NodeSessionI::getServers(const Ice::Current& current) const { - return _database->getNode(_name)->getServers(); + return _database->getNode(_info.name)->getServers(); } void @@ -116,10 +134,63 @@ NodeSessionI::waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplic } void -NodeSessionI::destroy(const Ice::Current& current) +NodeSessionI::destroy(const Ice::Current&) { - const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp + destroyImpl(false); +} +IceUtil::Time +NodeSessionI::timestamp() const +{ + Lock sync(*this); + if(_destroy) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + return _timestamp; +} + +void +NodeSessionI::shutdown() +{ + destroyImpl(true); +} + +const NodePrx& +NodeSessionI::getNode() const +{ + return _node; +} + +const NodeInfo& +NodeSessionI::getInfo() const +{ + return _info; +} + +const LoadInfo& +NodeSessionI::getLoadInfo() const +{ + Lock sync(*this); + return _load; +} + +NodeSessionPrx +NodeSessionI::getProxy() const +{ + return _proxy; +} + +bool +NodeSessionI::isDestroyed() const +{ + Lock sync(*this); + return _destroy; +} + +void +NodeSessionI::destroyImpl(bool shutdown) +{ { Lock sync(*this); if(_destroy) @@ -129,7 +200,7 @@ NodeSessionI::destroy(const Ice::Current& current) _destroy = true; } - Ice::StringSeq servers = _database->getNode(_name)->getServers(); + Ice::StringSeq servers = _database->getNode(_info.name)->getServers(); for(Ice::StringSeq::const_iterator p = servers.begin(); p != servers.end(); ++p) { try @@ -154,61 +225,32 @@ NodeSessionI::destroy(const Ice::Current& current) // // Next we notify the observer. // - NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_name); - + NodeObserverTopicPtr::dynamicCast(_database->getObserverTopic(NodeObserverTopicName))->nodeDown(_info.name); + + // + // Unsubscribe the node replica observer. + // + if(_replicaObserver) + { + _database->getReplicaCache().unsubscribe(_replicaObserver); + _replicaObserver = 0; + } // // Finally, we clear the session, this must be done last. As soon // as the node entry session is set to 0 another session might be // created. // - _database->getNode(_name)->setSession(0); + _database->getNode(_info.name)->setSession(0); if(!shutdown) { try { - current.adapter->remove(current.id); + _database->getInternalAdapter()->remove(_proxy->ice_getIdentity()); } catch(const Ice::ObjectAdapterDeactivatedException&) { } } } - -const NodePrx& -NodeSessionI::getNode() const -{ - return _node; -} - -const NodeInfo& -NodeSessionI::getInfo() const -{ - return _info; -} - -const LoadInfo& -NodeSessionI::getLoadInfo() const -{ - Lock sync(*this); - return _load; -} - -IceUtil::Time -NodeSessionI::timestamp() const -{ - Lock sync(*this); - if(_destroy) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - return _timestamp; -} - -bool -NodeSessionI::isDestroyed() const -{ - Lock sync(*this); - return _destroy; -} diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index eeb66cf5bac..d5aaa028965 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -25,24 +25,31 @@ class NodeSessionI : public NodeSession, public IceUtil::Mutex { public: - NodeSessionI(const DatabasePtr&, const std::string&, const NodePrx&, const NodeInfo&, int); + NodeSessionI(const DatabasePtr&, const NodePrx&, const NodeInfo&, int); virtual void keepAlive(const LoadInfo&, const Ice::Current&); + virtual void setReplicaObserver(const ReplicaObserverPrx&, const Ice::Current&); virtual int getTimeout(const Ice::Current& = Ice::Current()) const; virtual NodeObserverPrx getObserver(const Ice::Current&) const; virtual void loadServers(const Ice::Current&) const; virtual Ice::StringSeq getServers(const Ice::Current&) const; virtual void waitForApplicationUpdate_async(const AMD_NodeSession_waitForApplicationUpdatePtr&, const std::string&, int, const Ice::Current&) const; - virtual void destroy(const Ice::Current&); + virtual void destroy(const Ice::Current& = Ice::Current()); + virtual IceUtil::Time timestamp() const; + virtual void shutdown(); + const NodePrx& getNode() const; const NodeInfo& getInfo() const; const LoadInfo& getLoadInfo() const; - virtual IceUtil::Time timestamp() const; + NodeSessionPrx getProxy() const; + bool isDestroyed() const; private: + + void destroyImpl(bool); const DatabasePtr _database; const TraceLevelsPtr _traceLevels; @@ -50,6 +57,8 @@ private: const NodePrx _node; const NodeInfo _info; const int _timeout; + NodeSessionPrx _proxy; + ReplicaObserverPrx _replicaObserver; IceUtil::Time _timestamp; LoadInfo _load; bool _destroy; diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 6ee09c7fa3c..967a15dae5c 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -237,10 +237,10 @@ NodeSessionManager::create(const NodeIPtr& node) _thread->start(); // - // We can't wait for the session to be created here as the node - // adapter isn't activated yet and the registry would hang trying - // to load the servers on the node (when createSession invokes - // loadServers() on the session). + // Try to create the session. It's important that we wait for the + // creation of the session as this will also try to create sessions + // with replicas (see createdSession below) and this must be done + // before the node is activated. // _thread->tryCreateSession(true); } @@ -258,17 +258,23 @@ NodeSessionManager::create(const InternalRegistryPrx& replica) } else { - replicaAdded(replica, true); + createReplicaSession(replica, true); } } void -NodeSessionManager::activated() +NodeSessionManager::activate() { { Lock sync(*this); _activated = true; } + + // + // Get the master session, if it's not created, try to create it + // again and make sure that the servers are synchronized and the + // replica observer is set on the session. + // NodeSessionPrx session = _thread->getSession(); if(!session) { @@ -277,7 +283,14 @@ NodeSessionManager::activated() } if(session) { - syncServers(session); + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } } } @@ -321,121 +334,119 @@ NodeSessionManager::destroy() } void -NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica, bool waitTryCreateSession) +NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) { - Lock sync(*this); - if(_destroyed) { - return; - } + Lock sync(*this); + if(_destroyed) + { + return; + } - ++_serial; - NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); - NodeSessionKeepAliveThreadPtr thread; - if(p != _sessions.end()) - { - thread = p->second; - thread->setRegistry(replica); + // + // Initialize the set of replicas known by the master. + // + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + _replicas.insert((*p)->ice_getIdentity()); + } } - else + + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); - _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); - thread->start(); + createReplicaSession(*p, false); } - thread->tryCreateSession(waitTryCreateSession); } void -NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) { - NodeSessionKeepAliveThreadPtr thread; { Lock sync(*this); if(_destroyed) { return; } + _replicas.insert(replica->ice_getIdentity()); + } + + createReplicaSession(replica, false); +} - ++_serial; - NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity()); - if(p != _sessions.end()) +void +NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) +{ + { + Lock sync(*this); + if(_destroyed) { - thread = p->second; - _sessions.erase(p); + return; } + _replicas.erase(replica->ice_getIdentity()); } - if(thread) - { - _node->removeObserver(thread->getSession()); // Needs to be done here because we don't destroy the session. - thread->terminate(false); // Don't destroy the session, the replica is being shutdown! - thread->getThreadControl().join(); - } + + // + // We don't remove the session here. It will eventually be reaped + // by reapReplicas() if the session is dead. + // } void -NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) -{ - NodeSessionMap sessions; - _sessions.swap(sessions); +NodeSessionManager::createReplicaSession(const InternalRegistryPrx& replica, bool waitTryCreateSession) +{ + Lock sync(*this); + if(_destroyed) + { + return; + } + NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); NodeSessionKeepAliveThreadPtr thread; - vector<NodeSessionKeepAliveThreadPtr> newSessions; - for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + if(p != _sessions.end()) { - if((*p)->ice_getIdentity() == _master->ice_getIdentity()) - { - continue; - } - NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity()); - if(q != sessions.end()) - { - thread = q->second; - sessions.erase((*p)->ice_getIdentity()); - } - else - { - thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects); - thread->start(); - thread->tryCreateSession(false); - newSessions.push_back(thread); - } - _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + thread = p->second; + thread->setRegistry(replica); } + else + { + thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); + _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); + thread->start(); + } + thread->tryCreateSession(waitTryCreateSession); +} - NodeSessionMap::iterator q = sessions.begin(); - while(q != sessions.end()) +void +NodeSessionManager::reapReplicas() +{ + vector<NodeSessionKeepAliveThreadPtr> reap; { - if(q->second->getSession()) // Don't destroy sessions which are still alive! + Lock sync(*this); + if(_destroyed) { - _sessions.insert(make_pair(q->first, q->second)); - sessions.erase(q++); + return; } - else + + NodeSessionMap::iterator q = _sessions.begin(); + while(q != _sessions.end()) { - ++q; + if(_replicas.find(q->first) == _replicas.end() && q->second->terminateIfDisconnected()) + { + _node->removeObserver(q->second->getSession()); + reap.push_back(q->second); + _sessions.erase(q++); + } + else + { + ++q; + } } } - for(q = sessions.begin(); q != sessions.end(); ++q) - { - q->second->terminate(); - } - for(q = sessions.begin(); q != sessions.end(); ++q) - { - q->second->getThreadControl().join(); - } - // - // If the node is being started, we wait for the new sessions to - // be created. This ensures that once the node is activated all - // the known replicas are connected. - // - if(!_activated) + for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator p = reap.begin(); p != reap.end(); ++p) { - for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator t = newSessions.begin(); t != newSessions.end(); ++t) - { - (*t)->tryCreateSession(true); - } + (*p)->getThreadControl().join(); } } @@ -454,99 +465,130 @@ NodeSessionManager::syncServers(const NodeSessionPrx& session) // established their session with the node. // assert(session); - try - { - session->loadServers(); - _node->checkConsistency(session); - } - catch(const Ice::LocalException&) - { - } + session->loadServers(); + _node->checkConsistency(session); } void NodeSessionManager::createdSession(const NodeSessionPrx& session) { + bool activated; + { + Lock sync(*this); + activated = _activated; + } + // - // Get the list of replicas (either with the master session or the - // IceGrid::Query interface) and make sure we have sessions opened - // to these replicas. + // Synchronize the servers if the session is active and if the + // node adapter has been activated (otherwise, the servers will be + // synced after the node adapter activation, see activate()). // - try + // We also set the replica observer to receive notifications of + // replica addition/removal. + // + if(session && activated) + { + try + { + session->setReplicaObserver(_node->getProxy()); + syncServers(session); + } + catch(const Ice::LocalException&) + { + } + return; + } + + // + // If there's no master session or if the node adapter isn't + // activated yet, we retrieve a list of the replicas either from + // the master or from the known replicas (the ones configured with + // Ice.Default.Locator) and we try to establish connections to + // each of the replicas. + // + + InternalRegistryPrxSeq replicas; + if(session) { - unsigned long serial = 0; - InternalRegistryPrxSeq replicas; - while(true) + assert(!activated); // The node adapter isn't activated yet so + // we're not subscribed yet to the replica + // observer topic. + try { + replicas = _thread->getRegistry()->getReplicas(); + } + catch(const Ice::LocalException&) + { + } + } + else + { + map<Ice::Identity, Ice::ObjectPrx> proxies; + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) + { + try { - Lock sync(*this); - if(serial == _serial) + Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) { - NodeSessionManager& self = const_cast<NodeSessionManager&>(*this); - self._serial = 1; - self.syncReplicas(replicas); - break; + // + // NOTE: We might override a good proxy here! We could improve this to make + // sure that we don't override the proxy for replica N if that proxy was + // obtained from replica N. + // + proxies[(*q)->ice_getIdentity()] = *q; } - serial = _serial; } - - if(session) + catch(const Ice::LocalException&) { - replicas = _thread->getRegistry()->getReplicas(); + // IGNORE } - else + } + + for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + } + } + + { + Lock sync(*this); + + // + // If the node adapter was activated since we last check, we don't need + // to initialize the replicas here, it will be done by replicaInit(). + // + if(!session || !_activated) + { + _replicas.clear(); + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { - replicas.clear(); - map<Ice::Identity, Ice::ObjectPrx> proxies; - for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) - { - try - { - Ice::ObjectProxySeq prxs = (*p)->findAllObjectsByType(InternalRegistry::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator q = prxs.begin(); q != prxs.end(); ++q) - { - // - // NOTE: We might override a good proxy - // here! We could improve this to make - // sure that we don't override the proxy - // for replica N if that proxy was - // obtained from replica N. - // - proxies[(*q)->ice_getIdentity()] = *q; - } - } - catch(const Ice::LocalException&) - { - // IGNORE - } - } - for(map<Ice::Identity, Ice::ObjectPrx>::const_iterator q = proxies.begin(); q != proxies.end(); ++q) + if((*p)->ice_getIdentity() != _master->ice_getIdentity()) { - replicas.push_back(InternalRegistryPrx::uncheckedCast(q->second)); + _replicas.insert((*p)->ice_getIdentity()); } } } } - catch(const Ice::LocalException&) - { - // IGNORE - } - + // - // Synchronize the servers if the session is active and if the - // node adapter has been activated (otherwise, the servers will be - // synced after the node adapter activation, see activated()). + // Create the replica sessions and wait for the creation. It's + // important to wait to ensure that the replica sessions are + // created before the node adapter is activated. // - if(session) + InternalRegistryPrxSeq::const_iterator t; + for(t = replicas.begin(); t != replicas.end(); ++t) { - bool activated; + if((*t)->ice_getIdentity() != _master->ice_getIdentity()) { - Lock sync(*this); - activated = _activated; + createReplicaSession(*t, false); } - if(activated) + } + for(t = replicas.begin(); t != replicas.end(); ++t) + { + if((*t)->ice_getIdentity() != _master->ice_getIdentity()) { - syncServers(session); + createReplicaSession(*t, true); } } } diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h index 5d28f060aca..ea5895e25d9 100644 --- a/cpp/src/IceGrid/NodeSessionManager.h +++ b/cpp/src/IceGrid/NodeSessionManager.h @@ -54,19 +54,23 @@ public: void create(const NodeIPtr&); void create(const InternalRegistryPrx&); - void activated(); + void activate(); bool waitForCreate(); void terminate(); void destroy(); - void replicaAdded(const InternalRegistryPrx&, bool); + void replicaInit(const InternalRegistryPrxSeq&); + void replicaAdded(const InternalRegistryPrx&); void replicaRemoved(const InternalRegistryPrx&); NodeSessionPrx getMasterNodeSession() const { return _thread->getSession(); } private: - void syncReplicas(const InternalRegistryPrxSeq&); + void createReplicaSession(const InternalRegistryPrx&, bool); + + void reapReplicas(); + void syncServers(const NodeSessionPrx&); class Thread : public NodeSessionKeepAliveThread @@ -84,9 +88,25 @@ private: { NodeSessionPrx session = NodeSessionKeepAliveThread::createSession(master, timeout); _manager.createdSession(session); + _manager.reapReplicas(); return session; } + virtual void + destroySession(const NodeSessionPrx& session) + { + NodeSessionKeepAliveThread::destroySession(session); + _manager.reapReplicas(); + } + + virtual bool + keepAlive(const NodeSessionPrx& session) + { + bool alive = NodeSessionKeepAliveThread::keepAlive(session); + _manager.reapReplicas(); + return alive; + } + private: NodeSessionManager& _manager; @@ -106,6 +126,7 @@ private: typedef std::map<Ice::Identity, NodeSessionKeepAliveThreadPtr> NodeSessionMap; NodeSessionMap _sessions; + std::set<Ice::Identity> _replicas; }; } diff --git a/cpp/src/IceGrid/PlatformInfo.cpp b/cpp/src/IceGrid/PlatformInfo.cpp index da02ec5f2d4..c5fc269efc6 100644 --- a/cpp/src/IceGrid/PlatformInfo.cpp +++ b/cpp/src/IceGrid/PlatformInfo.cpp @@ -42,28 +42,28 @@ using namespace IceGrid; namespace IceGrid { -string -getLocalizedPerfName(const map<string, string>& perfNames, const string& name) -{ - unsigned long idx; - map<string, string>::const_iterator p = perfNames.find(name); - if(p == perfNames.end()) + string + getLocalizedPerfName(const map<string, string>& perfNames, const string& name) { - return ""; - } - istringstream is(p->second); - is >> idx; + unsigned long idx; + map<string, string>::const_iterator p = perfNames.find(name); + if(p == perfNames.end()) + { + return ""; + } + istringstream is(p->second); + is >> idx; - vector<char> localized; - unsigned long size = 256; - localized.resize(size); - while(PdhLookupPerfNameByIndex(0, idx, &localized[0], &size) == PDH_MORE_DATA) - { - size += 256; + vector<char> localized; + unsigned long size = 256; localized.resize(size); + while(PdhLookupPerfNameByIndex(0, idx, &localized[0], &size) == PDH_MORE_DATA) + { + size += 256; + localized.resize(size); + } + return string(&localized[0]); } - return string(&localized[0]); -} }; diff --git a/cpp/src/IceGrid/ReapThread.h b/cpp/src/IceGrid/ReapThread.h index e39812e0128..164ffe40fc8 100644 --- a/cpp/src/IceGrid/ReapThread.h +++ b/cpp/src/IceGrid/ReapThread.h @@ -13,6 +13,11 @@ #include <IceUtil/Thread.h> #include <IceUtil/Mutex.h> #include <IceUtil/Monitor.h> + +#include <Ice/Logger.h> +#include <Ice/LocalException.h> +#include <Ice/LoggerUtil.h> + #include <list> namespace IceGrid @@ -30,6 +35,58 @@ public: }; typedef IceUtil::Handle<Reapable> ReapablePtr; +template<class T> +class SessionReapable : public Reapable +{ + typedef IceUtil::Handle<T> TPtr; + +public: + + SessionReapable(const Ice::LoggerPtr& logger, const TPtr& session) : + _logger(logger), _session(session) + { + } + + virtual ~SessionReapable() + { + } + + virtual IceUtil::Time + timestamp() const + { + return _session->timestamp(); + } + + virtual void + destroy(bool shutdown) + { + try + { + if(shutdown) + { + _session->shutdown(); + } + else + { + _session->destroy(Ice::Current()); + } + } + catch(const Ice::ObjectNotExistException&) + { + } + catch(const Ice::LocalException& ex) + { + Ice::Warning out(_logger); + out << "unexpected exception while reaping session:\n" << ex; + } + } + +private: + + const Ice::LoggerPtr _logger; + const TPtr _session; +}; + class ReapThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> { public: diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 9a857ced479..91724513ee4 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -704,13 +704,12 @@ RegistryI::createSession(const string& user, const string& password, const Curre } SessionIPtr session = _clientSessionFactory->createSessionServant(user, 0); - session->setServantLocator(_sessionServantLocator); - SessionPrx proxy = SessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con)); + Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con); if(_sessionTimeout > 0) { - _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout); + _reaper->add(new SessionReapable<SessionI>(_traceLevels->logger, session), _sessionTimeout); } - return proxy; + return SessionPrx::uncheckedCast(proxy); } AdminSessionPrx @@ -758,15 +757,12 @@ RegistryI::createAdminSession(const string& user, const string& password, const } AdminSessionIPtr session = _adminSessionFactory->createSessionServant(user); - ObjectPrx admin = _sessionServantLocator->add(new AdminI(_database, this, session), current.con); - session->setAdmin(AdminPrx::uncheckedCast(admin)); - session->setServantLocator(_sessionServantLocator); - AdminSessionPrx proxy = AdminSessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con)); + Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con, this); if(_sessionTimeout > 0) { - _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout); + _reaper->add(new SessionReapable<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout); } - return proxy; + return AdminSessionPrx::uncheckedCast(proxy); } SessionPrx @@ -823,13 +819,12 @@ RegistryI::createSessionFromSecureConnection(const Current& current) } SessionIPtr session = _clientSessionFactory->createSessionServant(userDN, 0); - session->setServantLocator(_sessionServantLocator); - SessionPrx proxy = SessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con)); + Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con); if(_sessionTimeout > 0) { - _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout); + _reaper->add(new SessionReapable<SessionI>(_traceLevels->logger, session), _sessionTimeout); } - return proxy; + return SessionPrx::uncheckedCast(proxy); } AdminSessionPrx @@ -875,15 +870,12 @@ RegistryI::createAdminSessionFromSecureConnection(const Current& current) // We let the connection access the administrative interface. // AdminSessionIPtr session = _adminSessionFactory->createSessionServant(userDN); - ObjectPrx admin = _sessionServantLocator->add(new AdminI(_database, this, session), current.con); - session->setAdmin(AdminPrx::uncheckedCast(admin)); - session->setServantLocator(_sessionServantLocator); - AdminSessionPrx proxy = AdminSessionPrx::uncheckedCast(_sessionServantLocator->add(session, current.con)); + Ice::ObjectPrx proxy = session->registerWithServantLocator(_sessionServantLocator, current.con, this); if(_sessionTimeout > 0) { - _reaper->add(new SessionReapable(current.adapter, session, proxy->ice_getIdentity()), _sessionTimeout); + _reaper->add(new SessionReapable<AdminSessionI>(_traceLevels->logger, session), _sessionTimeout); } - return proxy; + return AdminSessionPrx::uncheckedCast(proxy); } int diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index cb4ace333da..9e21cf98d5b 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -23,15 +23,15 @@ ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator, const IceSt IceStorm::TopicPrx t; try { - t = topicManager->create("NodeNotifier"); + t = topicManager->create("ReplicaObserverTopic"); } catch(const IceStorm::TopicExists&) { - t = topicManager->retrieve("NodeNotifier"); + t = topicManager->retrieve("ReplicaObserverTopic"); } - const_cast<IceStorm::TopicPrx&>(_topic) = t; - const_cast<NodePrx&>(_nodes) = NodePrx::uncheckedCast(_topic->getPublisher()); + const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true)); + const_cast<ReplicaObserverPrx&>(_observers) = ReplicaObserverPrx::uncheckedCast(_topic->getPublisher()); } ReplicaEntryPtr @@ -39,22 +39,38 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) { Lock sync(*this); - while(true) + ReplicaEntryPtr entry; + while(entry = getImpl(name)) { - ReplicaEntryPtr entry = getImpl(name); - if(entry) + ReplicaSessionIPtr session = entry->getSession(); + if(session->isDestroyed()) { - if(entry->getSession()->isDestroyed()) + wait(); // Wait for the session to be removed. + } + else + { + // + // Check if the replica is still reachable, if not, we + // destroy its session. + // + sync.release(); + try { - wait(); - continue; + session->getInternalRegistry()->ice_ping(); + throw ReplicaActiveException(); } - else + catch(const Ice::LocalException&) { - throw ReplicaActiveException(); + try + { + session->destroy(); + } + catch(const Ice::LocalException&) + { + } } + sync.acquire(); } - break; } if(_traceLevels && _traceLevels->replica > 0) @@ -65,7 +81,7 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) try { - _nodes->replicaAdded(session->getInternalRegistry()); + _observers->replicaAdded(session->getInternalRegistry()); } catch(const Ice::ConnectionRefusedException&) { @@ -104,7 +120,7 @@ ReplicaCache::remove(const string& name, bool shutdown) { try { - _nodes->replicaRemoved(entry->getProxy()); + _observers->replicaRemoved(entry->getProxy()); } catch(const Ice::ConnectionRefusedException&) { @@ -139,13 +155,21 @@ ReplicaCache::get(const string& name) const } void -ReplicaCache::nodeAdded(const NodePrx& node) +ReplicaCache::subscribe(const ReplicaObserverPrx& observer) { - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; try { - _topic->subscribe(qos, node); + Lock sync(*this); + InternalRegistryPrxSeq replicas; + for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p) + { + replicas.push_back(p->second->getProxy()); + } + + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer); + ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas); } catch(const Ice::ConnectionRefusedException&) { @@ -157,17 +181,17 @@ ReplicaCache::nodeAdded(const NodePrx& node) if(traceLevels) { Ice::Warning out(traceLevels->logger); - out << "unexpected exception while subscribing node from replica observer topic:\n" << ex; + out << "unexpected exception while subscribing observer from replica observer topic:\n" << ex; } } } void -ReplicaCache::nodeRemoved(const NodePrx& node) +ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer) { try { - _topic->unsubscribe(node); + _topic->unsubscribe(observer); } catch(const Ice::ConnectionRefusedException&) { @@ -179,7 +203,7 @@ ReplicaCache::nodeRemoved(const NodePrx& node) if(traceLevels) { Ice::Warning out(traceLevels->logger); - out << "unexpected exception while unsubscribing node from replica observer topic:\n" << ex; + out << "unexpected exception while unsubscribing observer from replica observer topic:\n" << ex; } } } diff --git a/cpp/src/IceGrid/ReplicaCache.h b/cpp/src/IceGrid/ReplicaCache.h index 2a91e36a83e..9a13423bd44 100644 --- a/cpp/src/IceGrid/ReplicaCache.h +++ b/cpp/src/IceGrid/ReplicaCache.h @@ -53,8 +53,8 @@ public: ReplicaEntryPtr remove(const std::string&, bool); ReplicaEntryPtr get(const std::string&) const; - void nodeAdded(const NodePrx&); - void nodeRemoved(const NodePrx&); + void subscribe(const ReplicaObserverPrx&); + void unsubscribe(const ReplicaObserverPrx&); Ice::ObjectPrx getEndpoints(const std::string&, const Ice::ObjectPrx&) const; @@ -65,7 +65,7 @@ private: const Ice::CommunicatorPtr _communicator; const IceStorm::TopicPrx _topic; - const NodePrx _nodes; + const ReplicaObserverPrx _observers; InternalRegistryPrx _self; // This replica internal registry proxy. }; diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp index 6ba795b30b4..9b1696dd653 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.cpp +++ b/cpp/src/IceGrid/ReplicaSessionI.cpp @@ -29,14 +29,12 @@ operator==(const ObjectInfo& info, const Ice::Identity& id) ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database, const WellKnownObjectsManagerPtr& wellKnownObjects, - const string& name, const RegistryInfo& info, const InternalRegistryPrx& proxy, int timeout) : _database(database), _wellKnownObjects(wellKnownObjects), _traceLevels(database->getTraceLevels()), - _name(name), _internalRegistry(InternalRegistryPrx::uncheckedCast(proxy->ice_timeout(timeout * 1000))), _info(info), _timeout(timeout), @@ -46,9 +44,11 @@ ReplicaSessionI::ReplicaSessionI(const DatabasePtr& database, __setNoDelete(true); try { - _database->getReplicaCache().add(name, this); + _database->getReplicaCache().add(info.name, this); ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName); RegistryObserverTopicPtr::dynamicCast(obsv)->registryUp(_info); + + _proxy = ReplicaSessionPrx::uncheckedCast(_database->getInternalAdapter()->addWithUUID(this)); } catch(...) { @@ -72,7 +72,7 @@ ReplicaSessionI::keepAlive(const Ice::Current& current) if(_traceLevels->replica > 2) { Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); - out << "replica `" << _name << "' keep alive "; + out << "replica `" << _info.name << "' keep alive "; } } @@ -91,9 +91,9 @@ ReplicaSessionI::setDatabaseObserver(const DatabaseObserverPrx& observer, const throw Ice::ObjectNotExistException(__FILE__, __LINE__); } _observer = observer; - _database->getObserverTopic(ApplicationObserverTopicName)->subscribe(_observer, _name); - _database->getObserverTopic(AdapterObserverTopicName)->subscribe(_observer, _name); - _database->getObserverTopic(ObjectObserverTopicName)->subscribe(_observer, _name); + _database->getObserverTopic(ApplicationObserverTopicName)->subscribe(_observer, _info.name); + _database->getObserverTopic(AdapterObserverTopicName)->subscribe(_observer, _info.name); + _database->getObserverTopic(ObjectObserverTopicName)->subscribe(_observer, _info.name); } void @@ -130,7 +130,7 @@ ReplicaSessionI::registerWellKnownObjects(const ObjectInfoSeq& objects, const Ic // are correctly setup when the replica starts accepting requests // from clients (if the replica is being started). // - _database->getObserverTopic(ObjectObserverTopicName)->waitForSyncedSubscribers(serial, _name); + _database->getObserverTopic(ObjectObserverTopicName)->waitForSyncedSubscribers(serial, _info.name); } void @@ -152,15 +152,72 @@ ReplicaSessionI::receivedUpdate(TopicName topicName, int serial, const string& f ObserverTopicPtr topic = _database->getObserverTopic(topicName); if(topic) { - topic->receivedUpdate(_name, serial, failure); + topic->receivedUpdate(_info.name, serial, failure); } } void -ReplicaSessionI::destroy(const Ice::Current& current) +ReplicaSessionI::destroy(const Ice::Current&) +{ + destroyImpl(false); +} + +IceUtil::Time +ReplicaSessionI::timestamp() const +{ + Lock sync(*this); + if(_destroy) + { + throw Ice::ObjectNotExistException(__FILE__, __LINE__); + } + return _timestamp; +} + +void +ReplicaSessionI::shutdown() +{ + destroyImpl(true); +} + +const InternalRegistryPrx& +ReplicaSessionI::getInternalRegistry() const { - const bool shutdown = !current.adapter; // adapter is null if we're shutting down, see InternalRegistryI.cpp + return _internalRegistry; +} + +const RegistryInfo& +ReplicaSessionI::getInfo() const +{ + return _info; +} + +ReplicaSessionPrx +ReplicaSessionI::getProxy() const +{ + return _proxy; +} +Ice::ObjectPrx +ReplicaSessionI::getEndpoint(const std::string& name) +{ + Lock sync(*this); + if(_destroy) + { + return 0; + } + return _replicaEndpoints[name]; +} + +bool +ReplicaSessionI::isDestroyed() const +{ + Lock sync(*this); + return _destroy; +} + +void +ReplicaSessionI::destroyImpl(bool shutdown) +{ { Lock sync(*this); if(_destroy) @@ -172,9 +229,9 @@ ReplicaSessionI::destroy(const Ice::Current& current) if(_observer) { - _database->getObserverTopic(ApplicationObserverTopicName)->unsubscribe(_observer, _name); - _database->getObserverTopic(AdapterObserverTopicName)->unsubscribe(_observer, _name); - _database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _name); + _database->getObserverTopic(ApplicationObserverTopicName)->unsubscribe(_observer, _info.name); + _database->getObserverTopic(AdapterObserverTopicName)->unsubscribe(_observer, _info.name); + _database->getObserverTopic(ObjectObserverTopicName)->unsubscribe(_observer, _info.name); } if(!_replicaWellKnownObjects.empty()) @@ -200,52 +257,23 @@ ReplicaSessionI::destroy(const Ice::Current& current) // Notify the observer that the registry is down. // ObserverTopicPtr obsv = _database->getObserverTopic(RegistryObserverTopicName); - RegistryObserverTopicPtr::dynamicCast(obsv)->registryDown(_name); + RegistryObserverTopicPtr::dynamicCast(obsv)->registryDown(_info.name); // // Remove the replica from the cache. This must be done last. As // soon as the replica is removed another session might be // created. // - _database->getReplicaCache().remove(_name, shutdown); + _database->getReplicaCache().remove(_info.name, shutdown); - if(current.adapter) + if(!shutdown) { try { - current.adapter->remove(current.id); + _database->getInternalAdapter()->remove(_proxy->ice_getIdentity()); } catch(const Ice::ObjectAdapterDeactivatedException&) { } } } - -IceUtil::Time -ReplicaSessionI::timestamp() const -{ - Lock sync(*this); - if(_destroy) - { - throw Ice::ObjectNotExistException(__FILE__, __LINE__); - } - return _timestamp; -} - -Ice::ObjectPrx -ReplicaSessionI::getEndpoint(const std::string& name) -{ - Lock sync(*this); - if(_destroy) - { - return 0; - } - return _replicaEndpoints[name]; -} - -bool -ReplicaSessionI::isDestroyed() const -{ - Lock sync(*this); - return _destroy; -} diff --git a/cpp/src/IceGrid/ReplicaSessionI.h b/cpp/src/IceGrid/ReplicaSessionI.h index 25553f0843a..fa70707218e 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.h +++ b/cpp/src/IceGrid/ReplicaSessionI.h @@ -29,7 +29,7 @@ class ReplicaSessionI : public ReplicaSession, public IceUtil::Mutex { public: - ReplicaSessionI(const DatabasePtr&, const WellKnownObjectsManagerPtr&, const std::string&, const RegistryInfo&, + ReplicaSessionI(const DatabasePtr&, const WellKnownObjectsManagerPtr&, const RegistryInfo&, const InternalRegistryPrx&, int); virtual void keepAlive(const Ice::Current&); @@ -40,25 +40,29 @@ public: virtual void setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&, const Ice::Current&); virtual void receivedUpdate(TopicName, int, const std::string&, const Ice::Current&); - virtual void destroy(const Ice::Current&); + virtual void destroy(const Ice::Current& = Ice::Current()); virtual IceUtil::Time timestamp() const; + virtual void shutdown(); - const InternalRegistryPrx& getInternalRegistry() const { return _internalRegistry; } - const RegistryInfo& getInfo() const { return _info; } + const InternalRegistryPrx& getInternalRegistry() const; + const RegistryInfo& getInfo() const; + ReplicaSessionPrx getProxy() const; Ice::ObjectPrx getEndpoint(const std::string&); bool isDestroyed() const; private: + + void destroyImpl(bool); const DatabasePtr _database; const WellKnownObjectsManagerPtr _wellKnownObjects; const TraceLevelsPtr _traceLevels; - const std::string _name; const InternalRegistryPrx _internalRegistry; const RegistryInfo _info; const int _timeout; + ReplicaSessionPrx _proxy; DatabaseObserverPrx _observer; ObjectInfoSeq _replicaWellKnownObjects; StringObjectProxyDict _replicaEndpoints; diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 31802da355c..fa99a035092 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -333,9 +333,10 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica) NodePrxSeq ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const { + assert(_thread && _thread->getRegistry()); try { - return _master->getNodes(); + return _thread->getRegistry()->getNodes(); } catch(const Ice::LocalException&) { @@ -526,7 +527,7 @@ ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, Ic { try { - ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry); + ReplicaSessionPrx session = registry->registerReplica(_info, _internalRegistry); int t = session->getTimeout(); if(t > 0) { diff --git a/cpp/src/IceGrid/SessionI.cpp b/cpp/src/IceGrid/SessionI.cpp index 9f099595fa4..7eb1fc72c77 100644 --- a/cpp/src/IceGrid/SessionI.cpp +++ b/cpp/src/IceGrid/SessionI.cpp @@ -106,29 +106,32 @@ BaseSessionI::keepAlive(const Ice::Current& current) } void -BaseSessionI::destroy(const Ice::Current& current) +BaseSessionI::destroyImpl(bool shutdown) { Lock sync(*this); if(_destroyed) { Ice::ObjectNotExistException ex(__FILE__, __LINE__); - ex.id = current.id; + ex.id = _identity; throw ex; } _destroyed = true; - if(_servantLocator) - { - _servantLocator->remove(current.id); - } - else if(current.adapter) + if(!shutdown) { - try + if(_servantLocator) { - current.adapter->remove(current.id); + _servantLocator->remove(_identity); } - catch(const Ice::ObjectAdapterDeactivatedException&) + else if(_adapter) { + try + { + _adapter->remove(_identity); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + } } } @@ -147,63 +150,33 @@ BaseSessionI::timestamp() const } void -BaseSessionI::setServantLocator(const SessionServantLocatorIPtr& servantLocator) +BaseSessionI::shutdown() { - // - // This is supposed to be called after creation only. - // - const_cast<SessionServantLocatorIPtr&>(_servantLocator) = servantLocator; + destroyImpl(true); } -SessionReapable::SessionReapable(const Ice::ObjectAdapterPtr& adapter, - const Ice::ObjectPtr& session, - const Ice::Identity& id) : - _adapter(adapter), - _servant(session), - _session(dynamic_cast<BaseSessionI*>(_servant.get())), - _id(id) +Ice::ObjectPrx +BaseSessionI::registerWithServantLocator(const SessionServantLocatorIPtr& servantLoc, const Ice::ConnectionPtr& con) { + // + // This is supposed to be called after creation only, no need to synchronize. + // + _servantLocator = servantLoc; + Ice::ObjectPrx proxy = servantLoc->add(this, con); + _identity = proxy->ice_getIdentity(); + return proxy; } -SessionReapable::~SessionReapable() -{ -} - -IceUtil::Time -SessionReapable::timestamp() const -{ - return _session->timestamp(); -} - -void -SessionReapable::destroy(bool destroy) +Ice::ObjectPrx +BaseSessionI::registerWithObjectAdapter(const Ice::ObjectAdapterPtr& adapter) { - try - { - // - // Invoke on the servant directly instead of the - // proxy. Invoking on the proxy might not always work if the - // communicator is being shutdown/destroyed. We have to create - // a fake "current" because the session destroy methods needs - // the adapter and object identity to unregister the servant - // from the adapter. - // - Ice::Current current; - if(!destroy) - { - current.adapter = _adapter; - current.id = _id; - } - _session->destroy(current); - } - catch(const Ice::ObjectNotExistException&) - { - } - catch(const Ice::LocalException& ex) - { - Ice::Warning out(_adapter->getCommunicator()->getLogger()); - out << "unexpected exception while reaping node session:\n" << ex; - } + // + // This is supposed to be called after creation only, no need to synchronize. + // + _adapter = adapter; + _identity.category = _database->getInstanceName(); + _identity.name = IceUtil::generateUUID(); + return _adapter->add(this, _identity); } SessionI::SessionI(const string& id, @@ -251,33 +224,9 @@ SessionI::setAllocationTimeout(int timeout, const Ice::Current&) } void -SessionI::destroy(const Ice::Current& current) +SessionI::destroy(const Ice::Current&) { - BaseSessionI::destroy(current); - - // - // NOTE: The _requests and _allocations attributes are immutable - // once the session is destroyed so we don't need mutex protection - // here to access them. - // - - for(set<AllocationRequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - (*p)->cancel(AllocationException("session destroyed")); - } - _requests.clear(); - - for(set<AllocatablePtr>::const_iterator q = _allocations.begin(); q != _allocations.end(); ++q) - { - try - { - (*q)->release(this); - } - catch(const AllocationException&) - { - } - } - _allocations.clear(); + destroyImpl(false); } int @@ -332,6 +281,36 @@ SessionI::removeAllocation(const AllocatablePtr& allocatable) _allocations.erase(allocatable); } +void +SessionI::destroyImpl(bool shutdown) +{ + BaseSessionI::destroyImpl(shutdown); + + // + // NOTE: The _requests and _allocations attributes are immutable + // once the session is destroyed so we don't need mutex protection + // here to access them. + // + + for(set<AllocationRequestPtr>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + { + (*p)->cancel(AllocationException("session destroyed")); + } + _requests.clear(); + + for(set<AllocatablePtr>::const_iterator q = _allocations.begin(); q != _allocations.end(); ++q) + { + try + { + (*q)->release(this); + } + catch(const AllocationException&) + { + } + } + _allocations.clear(); +} + ClientSessionFactory::ClientSessionFactory(const Ice::ObjectAdapterPtr& adapter, const DatabasePtr& database, const WaitQueuePtr& waitQueue, @@ -348,20 +327,16 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const Glaci { assert(_adapter); - Ice::IdentitySeq ids; // Identities of the object the session is allowed to access. - - Ice::Identity id; - id.category = _database->getInstanceName(); - - // The session object - id.name = IceUtil::generateUUID(); - ids.push_back(id); SessionIPtr session = createSessionServant(sessionId, ctl); - Glacier2::SessionPrx s = Glacier2::SessionPrx::uncheckedCast(_adapter->add(session, id)); + Ice::ObjectPrx proxy = session->registerWithObjectAdapter(_adapter); - // The IceGrid::Query object - id.name = "Query"; - ids.push_back(id); + Ice::Identity queryId; + queryId.category = _database->getInstanceName(); + queryId.name = "Query"; + + Ice::IdentitySeq ids; // Identities of the object the session is allowed to access. + ids.push_back(proxy->ice_getIdentity()); // Session object + ids.push_back(queryId); int timeout = 0; if(ctl) @@ -372,7 +347,7 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const Glaci } catch(const Ice::LocalException&) { - s->destroy(); + session->destroy(Ice::Current()); return 0; } timeout = ctl->getSessionTimeout(); @@ -380,10 +355,10 @@ ClientSessionFactory::createGlacier2Session(const string& sessionId, const Glaci if(timeout > 0) { - _reaper->add(new SessionReapable(_adapter, session, s->ice_getIdentity()), timeout); + _reaper->add(new SessionReapable<SessionI>(_database->getTraceLevels()->logger, session), timeout); } - return s; + return Glacier2::SessionPrx::uncheckedCast(proxy); } SessionIPtr @@ -413,7 +388,8 @@ ClientSSLSessionManagerI::ClientSSLSessionManagerI(const ClientSessionFactoryPtr } Glacier2::SessionPrx -ClientSSLSessionManagerI::create(const Glacier2::SSLInfo& info, const Glacier2::SessionControlPrx& ctl, +ClientSSLSessionManagerI::create(const Glacier2::SSLInfo& info, + const Glacier2::SessionControlPrx& ctl, const Ice::Current& current) { string userDN; diff --git a/cpp/src/IceGrid/SessionI.h b/cpp/src/IceGrid/SessionI.h index 4391f2d414b..5ebdf00195f 100644 --- a/cpp/src/IceGrid/SessionI.h +++ b/cpp/src/IceGrid/SessionI.h @@ -36,29 +36,35 @@ typedef IceUtil::Handle<WaitQueue> WaitQueuePtr; class SessionI; typedef IceUtil::Handle<SessionI> SessionIPtr; -class BaseSessionI : public IceUtil::Mutex +class BaseSessionI : virtual Ice::Object, public IceUtil::Mutex { public: virtual ~BaseSessionI(); virtual void keepAlive(const Ice::Current&); - virtual void destroy(const Ice::Current&); IceUtil::Time timestamp() const; - void setServantLocator(const SessionServantLocatorIPtr&); + void shutdown(); + + virtual Ice::ObjectPrx registerWithServantLocator(const SessionServantLocatorIPtr&, const Ice::ConnectionPtr&); + virtual Ice::ObjectPrx registerWithObjectAdapter(const Ice::ObjectAdapterPtr&); const std::string& getId() const { return _id; } protected: + virtual void destroyImpl(bool); + BaseSessionI(const std::string&, const std::string&, const DatabasePtr&); const std::string _id; const std::string _prefix; const TraceLevelsPtr _traceLevels; const DatabasePtr _database; - const SessionServantLocatorIPtr _servantLocator; + SessionServantLocatorIPtr _servantLocator; + Ice::ObjectAdapterPtr _adapter; + Ice::Identity _identity; bool _destroyed; IceUtil::Time _timestamp; }; @@ -68,24 +74,6 @@ class SessionDestroyedException { }; -class SessionReapable : public Reapable -{ -public: - - SessionReapable(const Ice::ObjectAdapterPtr&, const Ice::ObjectPtr&, const Ice::Identity&); - virtual ~SessionReapable(); - - virtual IceUtil::Time timestamp() const; - virtual void destroy(bool); - -private: - - const Ice::ObjectAdapterPtr _adapter; - const Ice::ObjectPtr _servant; - BaseSessionI* _session; - const Ice::Identity _id; -}; - class SessionI : public BaseSessionI, public Session { public: @@ -115,6 +103,8 @@ public: protected: + virtual void destroyImpl(bool); + const WaitQueuePtr _waitQueue; const Glacier2::SessionControlPrx _sessionControl; const Ice::ConnectionPtr _connection; diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index 7d659d593ea..84dafe96310 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -54,7 +54,7 @@ public: { TPrx session; InternalRegistryPrx registry; - IceUtil::Time timeout = IceUtil::Time::seconds(10); + IceUtil::Time timeout = IceUtil::Time::seconds(15); Action action = Connect; while(true) @@ -215,6 +215,21 @@ public: notifyAll(); } + bool + terminateIfDisconnected() + { + Lock sync(*this); + if(_state != Disconnected) + { + return false; // Nothing we can do for now. + } + assert(_state != Destroyed); + _state = Destroyed; + _nextAction = None; + notifyAll(); + return true; + } + void terminate(bool destroySession = true) { |