diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-07-25 17:25:58 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-07-25 17:25:58 +0000 |
commit | 95779a66eb32286140dec13a1641f5723d322168 (patch) | |
tree | dd86f141a0460bfa7746bc563cde037eedd7cd84 /cpp/src | |
parent | http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=1189 (diff) | |
download | ice-95779a66eb32286140dec13a1641f5723d322168.tar.bz2 ice-95779a66eb32286140dec13a1641f5723d322168.tar.xz ice-95779a66eb32286140dec13a1641f5723d322168.zip |
More replication work.
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/PropertyNames.cpp | 5 | ||||
-rw-r--r-- | cpp/src/Ice/PropertyNames.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 153 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridNode.cpp | 29 | ||||
-rw-r--r-- | cpp/src/IceGrid/Internal.ice | 10 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 20 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 8 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 465 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.h | 41 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 46 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 13 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 157 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerAdapterI.cpp | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 14 |
20 files changed, 697 insertions, 283 deletions
diff --git a/cpp/src/Ice/PropertyNames.cpp b/cpp/src/Ice/PropertyNames.cpp index 5a62d274ed2..83d31a9ac2f 100644 --- a/cpp/src/Ice/PropertyNames.cpp +++ b/cpp/src/Ice/PropertyNames.cpp @@ -7,7 +7,7 @@ // // ********************************************************************** -// Generated by makeprops.py from file `../config/PropertyNames.def', Fri Jul 21 11:02:34 2006 +// Generated by makeprops.py from file `../config/PropertyNames.def', Mon Jul 24 16:49:43 2006 // IMPORTANT: Do not edit this file -- any edits made here will be lost! @@ -119,8 +119,9 @@ const char* IceInternal::PropertyNames::IceGridProps[] = "IceGrid.Node.ThreadPool.StackSize", "IceGrid.Node.Trace.Activator", "IceGrid.Node.Trace.Adapter", - "IceGrid.Node.Trace.Server", "IceGrid.Node.Trace.Patch", + "IceGrid.Node.Trace.Replica", + "IceGrid.Node.Trace.Server", "IceGrid.Node.UserAccounts", "IceGrid.Node.UserAccountMapper", "IceGrid.Node.WaitTime", diff --git a/cpp/src/Ice/PropertyNames.h b/cpp/src/Ice/PropertyNames.h index 94946126e04..5a9cb0859c2 100644 --- a/cpp/src/Ice/PropertyNames.h +++ b/cpp/src/Ice/PropertyNames.h @@ -7,7 +7,7 @@ // // ********************************************************************** -// Generated by makeprops.py from file `../config/PropertyNames.def', Fri Jul 21 11:02:34 2006 +// Generated by makeprops.py from file `../config/PropertyNames.def', Mon Jul 24 16:49:43 2006 // IMPORTANT: Do not edit this file -- any edits made here will be lost! diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 4b680e1ca77..7fee927b7cd 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -139,6 +139,20 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter, _lock(0), _serial(-1) { + ServerEntrySeq entries; + for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) + { + try + { + load(ApplicationHelper(_communicator, p->second), entries); + } + catch(const DeploymentException& ex) + { + Ice::Warning warn(_traceLevels->logger); + warn << "invalid application `" << p->first << "':\n" << ex.reason; + } + } + // // Register a default servant to manage manually registered object adapters. // @@ -243,69 +257,37 @@ Database::unlock(AdminSessionI* session) } void -Database::init(int serial) +Database::initMaster() { - ApplicationDescriptorSeq applications; - AdapterInfoSeq adapters; - ObjectInfoSeq objects; + Lock sync(*this); - // - // Cache the servers & adapters. - // - ServerEntrySeq entries; + _serverCache.setTraceLevels(_traceLevels); + _nodeCache.setTraceLevels(_traceLevels); + _replicaCache.setTraceLevels(_traceLevels); + _adapterCache.setTraceLevels(_traceLevels); + _objectCache.setTraceLevels(_traceLevels); + _allocatableObjectCache.setTraceLevels(_traceLevels); + + _nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager); + _registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager); + _serial = 0; + + ApplicationDescriptorSeq applications; for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p) { applications.push_back(p->second); - try - { - load(ApplicationHelper(_communicator, p->second), entries); - } - catch(const DeploymentException& ex) - { - Ice::Warning warn(_traceLevels->logger); - warn << "invalid application `" << p->first << "':\n" << ex.reason; - } - } - + } + AdapterInfoSeq adapters; for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q) { adapters.push_back(q->second); - if(adapters.back().id.empty()) - { - adapters.back().id = q->first; - } } - + ObjectInfoSeq objects; for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r) { objects.push_back(r->second); } - - _serverCache.setTraceLevels(_traceLevels); - _nodeCache.setTraceLevels(_traceLevels); - _replicaCache.setTraceLevels(_traceLevels); - _adapterCache.setTraceLevels(_traceLevels); - _objectCache.setTraceLevels(_traceLevels); - _allocatableObjectCache.setTraceLevels(_traceLevels); - - _serial = serial; - - if(_registryObserverTopic) - { - // - // Initialize the topic cache. - // - _registryObserverTopic->getPublisher()->init(_serial, applications, adapters, objects); - } -} - -void -Database::initMaster() -{ - Lock sync(*this); - _nodeObserverTopic = new NodeObserverTopic(_internalAdapter, _topicManager); - _registryObserverTopic = new RegistryObserverTopic(_internalAdapter, _topicManager); - init(0); + _registryObserverTopic->getPublisher()->init(_serial, applications, adapters, objects); } void @@ -315,11 +297,58 @@ Database::initReplica(int masterSerial, const ObjectInfoSeq& objects) { Lock sync(*this); - - _descriptors.clear(); + + if(_serial < 0) + { + _serverCache.setTraceLevels(_traceLevels); + _nodeCache.setTraceLevels(_traceLevels); + _replicaCache.setTraceLevels(_traceLevels); + _adapterCache.setTraceLevels(_traceLevels); + _objectCache.setTraceLevels(_traceLevels); + _allocatableObjectCache.setTraceLevels(_traceLevels); + } + else + { +// assert(_serial <= masterSerial); // TODO: Master might have been restarted. + } + _serial = masterSerial; + + ServerEntrySeq entries; + set<string> names; for(ApplicationDescriptorSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) { + try + { + StringApplicationDescriptorDict::const_iterator s = _descriptors.find(p->name); + if(s != _descriptors.end()) + { + reload(ApplicationHelper(_communicator, s->second), ApplicationHelper(_communicator, *p), entries); + } + else + { + load(ApplicationHelper(_communicator, *p), entries); + } + } + catch(const DeploymentException& ex) + { + Ice::Warning warn(_traceLevels->logger); + warn << "invalid application `" << p->name << "':\n" << ex.reason; + } _descriptors.put(StringApplicationDescriptorDict::value_type(p->name, *p)); + names.insert(p->name); + } + StringApplicationDescriptorDict::iterator s = _descriptors.begin(); + while(s != _descriptors.end()) + { + if(names.find(s->first) == names.end()) + { + unload(ApplicationHelper(_communicator, s->second), entries); + _descriptors.erase(s++); + } + else + { + ++s; + } } _objects.clear(); @@ -333,9 +362,6 @@ Database::initReplica(int masterSerial, { _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); } - - init(masterSerial); - notifyAll(); } void @@ -613,17 +639,6 @@ Database::getAllApplications(const string& expression) void Database::addNode(const string& name, const NodeSessionIPtr& session) { - { - // - // Wait for the database to be initialized before to add a - // node. - // - Lock sync(*this); - while(_serial < 0) - { - wait(); - } - } _nodeCache.get(name, true)->setSession(session); } @@ -661,6 +676,12 @@ Database::addReplica(const string& name, const ReplicaSessionIPtr& session) _replicaCache.add(name, session, this); } +InternalRegistryPrxSeq +Database::getReplicas() const +{ + return _replicaCache.getAll(); +} + void Database::removeReplica(const string& name) { diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index d412024a519..9a965fc005a 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -94,6 +94,7 @@ public: Ice::StringSeq getAllNodes(const std::string& = std::string()); void addReplica(const std::string&, const ReplicaSessionIPtr&); + InternalRegistryPrxSeq getReplicas() const; void removeReplica(const std::string&); ServerInfo getServerInfo(const std::string&, bool = false); diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index 51e168fdff6..219b70a4457 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -253,20 +253,19 @@ NodeService::start(int argc, char* argv[]) } // - // TODO: XXX: set communicator->setDefaultLocator() here?! I - // believe this is necessary for the NodeSessionManager. - // - - - // - // Set the Ice.Default.Locator property to point to the - // collocated locator (this property is passed by the - // activator to each activated server). + // Set the default locator property to point to the collocated + // locator (this property is passed by the activator to each + // activated server). The default locator is also needed by + // the node session manager. // - const string instanceNameProperty = "IceGrid.InstanceName"; - const string locatorId = properties->getPropertyWithDefault(instanceNameProperty, "IceGrid") + "/Locator"; - string locatorPrx = locatorId + ":" + properties->getProperty("IceGrid.Registry.Client.Endpoints"); - properties->setProperty("Ice.Default.Locator", locatorPrx); + if(properties->getProperty("Ice.Default.Locator").empty()) + { + const string instanceNameProperty = "IceGrid.InstanceName"; + const string locatorId = properties->getPropertyWithDefault(instanceNameProperty, "IceGrid") + "/Locator"; + string locatorPrx = locatorId + ":" + properties->getProperty("IceGrid.Registry.Client.Endpoints"); + communicator()->setDefaultLocator(LocatorPrx::uncheckedCast(communicator()->stringToProxy(locatorPrx))); + properties->setProperty("Ice.Default.Locator", locatorPrx); + } } else if(properties->getProperty("Ice.Default.Locator").empty()) { @@ -455,8 +454,8 @@ NodeService::start(int argc, char* argv[]) AdminPrx admin; try { - const string instanceNameProperty = "IceGrid.InstanceName"; - const string adminId = properties->getPropertyWithDefault(instanceNameProperty, "IceGrid") + "/Admin"; + const string instanceName = communicator()->getDefaultLocator()->ice_getIdentity().category; + const string adminId = instanceName + "/Admin"; admin = AdminPrx::checkedCast(communicator()->stringToProxy(adminId)); } catch(const LocalException& ex) diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index 7b4f90a87e9..c07eff5de17 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -176,6 +176,7 @@ interface Server }; interface InternalRegistry; +sequence<InternalRegistry*> InternalRegistryPrxSeq; interface Node { @@ -295,7 +296,14 @@ interface NodeSession * Get the name of the servers deployed on the node. * **/ - Ice::StringSeq getServers(); + nonmutating Ice::StringSeq getServers(); + + /** + * + * Get the replicas of the IceGrid registry. + * + **/ + nonmutating InternalRegistryPrxSeq getReplicas(); /** * diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 134303b7ffc..b6ee2e13b88 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -198,8 +198,7 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, _tmpDir = _dataDir + "/tmp"; Ice::PropertiesPtr properties = getCommunicator()->getProperties(); - const string instanceNameProperty = "IceGrid.InstanceName"; - const_cast<string&>(_instanceName) = properties->getPropertyWithDefault(instanceNameProperty, "IceGrid"); + const_cast<string&>(_instanceName) = getCommunicator()->getDefaultLocator()->ice_getIdentity().category; const_cast<Ice::Int&>(_waitTime) = properties->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60); } @@ -551,15 +550,14 @@ NodeI::getUserAccountMapper() const NodeSessionPrx NodeI::registerWithRegistry(const InternalRegistryPrx& registry) { - NodeSessionPrx session = registry->registerNode(_name, _proxy, _platform.getNodeInfo()); - NodeObserverPrx observer = session->getObserver(); - if(observer) - { - IceUtil::Mutex::Lock sync(_observerMutex); - _observer = observer; - } - checkConsistency(session); - return session; + return registry->registerNode(_name, _proxy, _platform.getNodeInfo()); +} + +void +NodeI::setObserver(const NodeObserverPrx& observer) +{ + IceUtil::Mutex::Lock sync(_observerMutex); + _observer = observer; } void diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index d72887d2aa2..2dafc898c9f 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -63,10 +63,11 @@ public: PlatformInfo& getPlatformInfo() const { return _platform; } NodeSessionPrx registerWithRegistry(const InternalRegistryPrx&); + void setObserver(const NodeObserverPrx&); + void checkConsistency(const NodeSessionPrx&); private: - void checkConsistency(const NodeSessionPrx&); void checkConsistencyNoSync(const Ice::StringSeq&); bool canRemoveServerDirectory(const std::string&); void initObserver(const Ice::StringSeq&); diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index 756d10d6635..349f7c0abf9 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -82,11 +82,17 @@ NodeSessionI::getObserver(const Ice::Current& current) const } Ice::StringSeq -NodeSessionI::getServers(const Ice::Current& current) +NodeSessionI::getServers(const Ice::Current& current) const { return _database->getAllNodeServers(_name); } +InternalRegistryPrxSeq +NodeSessionI::getReplicas(const Ice::Current& current) const +{ + return _database->getReplicas(); +} + void NodeSessionI::destroy(const Ice::Current& current) { diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index d7b221d11f8..e178e2b594e 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -30,7 +30,8 @@ public: virtual void keepAlive(const LoadInfo&, const Ice::Current&); virtual int getTimeout(const Ice::Current&) const; virtual NodeObserverPrx getObserver(const Ice::Current&) const; - virtual Ice::StringSeq getServers(const Ice::Current&); + virtual Ice::StringSeq getServers(const Ice::Current&) const; + virtual InternalRegistryPrxSeq getReplicas(const Ice::Current&) const; virtual void destroy(const Ice::Current&); const NodePrx& getNode() const; diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 4e6381c38c9..6c4d4cefc90 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -16,13 +16,23 @@ using namespace std; using namespace IceGrid; -NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, - const NodeIPtr& node) : - _registry(InternalRegistryPrx::uncheckedCast(registry->ice_adapterId(""))), +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) : + _registry(registry), _node(node), - _timeout(IceUtil::Time::seconds(5)), _shutdown(false) { + string name = _registry->ice_getIdentity().name; + const string prefix("InternalRegistry-"); + string::size_type pos = name.find(prefix); + if(pos != string::npos) + { + name = name.substr(prefix.size()); + } + const_cast<string&>(_name) = name; + const_cast<InternalRegistryPrx&>(_registry) = + InternalRegistryPrx::uncheckedCast( + _registry->ice_getCommunicator()->stringToProxy( + _registry->ice_getCommunicator()->identityToString(_registry->ice_getIdentity()))); } void @@ -32,27 +42,102 @@ NodeSessionKeepAliveThread::run() // Keep alive the session. // NodeSessionPrx session; + IceUtil::Time timeout = IceUtil::Time::seconds(5); + TraceLevelsPtr traceLevels = _node->getTraceLevels(); while(true) { - keepAlive(session); + // + // Send a keep alive message to the session. + // + if(session) + { + if(traceLevels && traceLevels->replica > 2) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "sending keep alive message to replica `" << _name << "'"; + } + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "lost session with replica `" << _name << "':\n" << ex; + } + session = 0; + } + } + + // + // If the session isn't established yet, try to create a new + // session. + // + if(!session) { - Lock sync(*this); + try + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with replica `" << _name << "'"; + } - session = _session; + session = _node->registerWithRegistry(_registry); + { + Lock sync(*this); + _session = session; + notifyAll(); + } - if(!_shutdown) + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); + } + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with replica `" << _name << "'"; + } + } + catch(const NodeActiveException&) + { + if(traceLevels) + { + traceLevels->logger->error("a node with the same name is already registered and active"); + } + } + catch(const Ice::LocalException& ex) { - timedWait(_timeout); + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with replica `" << _name << "':\n" << ex; + } } + } + // + // Wait for the configured timeout duration. + // + { + Lock sync(*this); + if(!_shutdown) + { + timedWait(timeout); + } if(_shutdown) { break; } - } + } } - + // // Destroy the session. // @@ -61,27 +146,33 @@ NodeSessionKeepAliveThread::run() try { session->destroy(); + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "destroyed replica `" << _name << "' session"; + } } - catch(const Ice::LocalException&) + catch(const Ice::LocalException& ex) { - // - // TODO: XXX: TRACE? - // -// ostringstream os; -// os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex; -// _node->getTraceLevels()->logger->warning(os.str()); + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "couldn't destroy replica `" << _name << "' session:\n" << ex; + } } } } -void +bool NodeSessionKeepAliveThread::waitForCreate() { Lock sync(*this); - while(!_session) + while(!_session && !_shutdown) { wait(); } + return !_shutdown; } void @@ -92,100 +183,260 @@ NodeSessionKeepAliveThread::terminate() notifyAll(); } +NodeSessionManager::NodeSessionManager() : + _serial(1), + _destroyed(false) +{ +} + void -NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) +NodeSessionManager::create(const NodeIPtr& node) { - if(session) - { - try - { - session->keepAlive(_node->getPlatformInfo().getLoadInfo()); - return; // We're done! - } - catch(const Ice::LocalException&) - { - } - } + assert(!_node); + + const_cast<NodeIPtr&>(_node) = node; + + Ice::CommunicatorPtr communicator = _node->getCommunicator(); + assert(communicator->getDefaultLocator()); + Ice::Identity id = communicator->getDefaultLocator()->ice_getIdentity(); - try + id.name = "Query"; + _query = QueryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + + id.name = "InternalRegistry"; + _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); + + _thread = new Thread(*this); + _thread->start(); +} + +void +NodeSessionManager::run() +{ + NodeSessionPrx session; + TraceLevelsPtr traceLevels = _node->getTraceLevels(); + IceUtil::Time timeout = IceUtil::Time::seconds(5); + while(true) { - NodeSessionPrx newSession = _node->registerWithRegistry(_registry); - int timeout = newSession->getTimeout(); + if(session) { - Lock sync(*this); - if(timeout > 0) + if(traceLevels && traceLevels->replica > 2) { - _timeout = IceUtil::Time::seconds(timeout); + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "sending keep alive message to master replica"; + } + + try + { + session->keepAlive(_node->getPlatformInfo().getLoadInfo()); + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "lost session with master replica:\n" << ex; + } + session = 0; + _node->setObserver(0); + { + Lock sync(*this); + _masterSession = session; + notifyAll(); + } } - _session = newSession; - notifyAll(); } - } - catch(const NodeActiveException&) - { - _node->getTraceLevels()->logger->error("a node with the same name is already registered and active"); - } - catch(const Ice::LocalException&) - { - // - // TODO: FIX THIS SHOULD BE A TRACE - // -// ostringstream os; -// os << "couldn't contact the IceGrid registry:\n" << ex; -// _node->getTraceLevels()->logger->warning(os.str()); - } -} -NodeSessionManager::NodeSessionManager() -{ -} + if(!session) + { + // + // Establish a session with the master IceGrid registry. + // + try + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with master replica"; + } -void -NodeSessionManager::create(const NodeIPtr& node) -{ - assert(!_node); - const_cast<NodeIPtr&>(_node) = node; + session = _node->registerWithRegistry(_master); - Ice::CommunicatorPtr communicator = _node->getCommunicator(); - Ice::PropertiesPtr properties = communicator->getProperties(); + _node->setObserver(session->getObserver()); - Ice::LocatorPrx locator = communicator->getDefaultLocator(); - assert(locator); - string instanceName = locator->ice_getIdentity().category; + // + // We only check consistency with the master registry. + // + _node->checkConsistency(session); + + { + Lock sync(*this); + _masterSession = session; + notifyAll(); + } - QueryPrx query = QueryPrx::uncheckedCast(communicator->stringToProxy(instanceName + "/Query")); - Ice::ObjectProxySeq proxies = query->findAllObjectsByType(InternalRegistry::ice_staticId()); - NodeSessionKeepAliveThreadPtr thread; + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); + } - Lock sync(*this); - for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with master replica"; + } + } + catch(const NodeActiveException&) + { + if(traceLevels) + { + traceLevels->logger->error("a node with the same name is already registered and active"); + } + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with master replica:\n" << ex; + } + } + + // + // Get the list of replicas (either with the master + // session or the IceGrid::Query interface) and make sure + // we have sessions opened to these replicas. + // + try + { + unsigned long serial = 0; + InternalRegistryPrxSeq replicas; + while(true) + { + { + Lock sync(*this); + if(serial == _serial) + { + _serial = 1; + syncReplicas(replicas); + break; + } + serial = _serial; + } + + if(session) + { + replicas = session->getReplicas(); + } + else + { + replicas.clear(); + Ice::ObjectProxySeq proxies = _query->findAllObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); + } + } + } + } + catch(const Ice::LocalException& ex) + { + // IGNORE + } + } + + { + Lock sync(*this); + if(!_destroyed) + { + timedWait(timeout); + } + if(_destroyed) + { + break; + } + } + } + + // + // Destroy the session. + // + if(session) { - thread = new NodeSessionKeepAliveThread(InternalRegistryPrx::uncheckedCast(*p), _node); - thread->start(); - _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + try + { + session->destroy(); + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "destroyed master replica session"; + } + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "couldn't destroy master replica session:\n" << ex; + } + } } } void NodeSessionManager::waitForCreate() { - Lock sync(*this); - for(NodeSessionMap::const_iterator p = _sessions.begin(); p != _sessions.end(); ++p) + // + // Wait for the node to establish the session with the master or + // at least one replica registry. + // + while(true) { - p->second->waitForCreate(); + NodeSessionKeepAliveThreadPtr thread; + { + Lock sync(*this); + while(!_masterSession || _sessions.empty() || !_destroyed) + { + wait(); + } + + if(_masterSession || _destroyed) + { + return; + } + else + { + thread = _sessions.begin()->second; + } + } + if(thread->waitForCreate()) + { + break; + } } } void NodeSessionManager::destroy() { - Lock sync(*this); + NodeSessionMap sessions; + { + Lock sync(*this); + _destroyed = true; + _sessions.swap(sessions); + notifyAll(); + } + NodeSessionMap::const_iterator p; - for(p = _sessions.begin(); p != _sessions.end(); ++p) + for(p = sessions.begin(); p != sessions.end(); ++p) { p->second->terminate(); } - for(p = _sessions.begin(); p != _sessions.end(); ++p) + + _thread->getThreadControl().join(); + for(p = sessions.begin(); p != sessions.end(); ++p) { p->second->getThreadControl().join(); } @@ -195,6 +446,12 @@ void NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) { Lock sync(*this); + if(_destroyed) + { + return; + } + + ++_serial; if(_sessions.find(replica->ice_getIdentity()) != _sessions.end()) { return; @@ -211,6 +468,12 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) NodeSessionKeepAliveThreadPtr thread; { Lock sync(*this); + if(_destroyed) + { + return; + } + + --_serial; NodeSessionMap::iterator p = _sessions.find(replica->ice_getIdentity()); if(p != _sessions.end()) { @@ -225,3 +488,41 @@ NodeSessionManager::replicaRemoved(const InternalRegistryPrx& replica) } } +void +NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) +{ + NodeSessionMap sessions; + _sessions.swap(sessions); + + NodeSessionKeepAliveThreadPtr thread; + for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + if((*p)->ice_getIdentity() == _master->ice_getIdentity()) + { + continue; + } + NodeSessionMap::const_iterator q = sessions.find((*p)->ice_getIdentity()); + if(q != sessions.end()) + { + thread = q->second; + sessions.erase((*p)->ice_getIdentity()); + } + else + { + thread = new NodeSessionKeepAliveThread(*p, _node); + thread->start(); + } + _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); + } + + NodeSessionMap::const_iterator q; + for(q = sessions.begin(); q != sessions.end(); ++q) + { + q->second->terminate(); + } + for(q = sessions.begin(); q != sessions.end(); ++q) + { + q->second->getThreadControl().join(); + } +} + diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h index 9b655e4a422..8304a5047d8 100644 --- a/cpp/src/IceGrid/NodeSessionManager.h +++ b/cpp/src/IceGrid/NodeSessionManager.h @@ -15,6 +15,7 @@ #include <IceUtil/Monitor.h> #include <IceUtil/Thread.h> +#include <IceGrid/Query.h> #include <IceGrid/Internal.h> namespace IceGrid @@ -31,22 +32,20 @@ public: virtual void run(); - void waitForCreate(); + bool waitForCreate(); void terminate(); private: - void keepAlive(const NodeSessionPrx&); - const InternalRegistryPrx _registry; const NodeIPtr _node; - IceUtil::Time _timeout; + const std::string _name; NodeSessionPrx _session; bool _shutdown; }; typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr; -class NodeSessionManager : public IceUtil::Mutex +class NodeSessionManager : public IceUtil::Monitor<IceUtil::Mutex> { public: @@ -56,12 +55,44 @@ public: void waitForCreate(); void destroy(); + void run(); + void replicaAdded(const InternalRegistryPrx&); void replicaRemoved(const InternalRegistryPrx&); private: + void syncReplicas(const InternalRegistryPrxSeq&); + + class Thread : public IceUtil::Thread + { + public: + + Thread(NodeSessionManager& manager) : _manager(manager) + { + } + + virtual void + run() + { + _manager.run(); + } + + private: + + NodeSessionManager& _manager; + }; + + const NodeIPtr _node; + IceUtil::ThreadPtr _thread; + QueryPrx _query; + InternalRegistryPrx _master; + NodeSessionPrx _masterSession; + unsigned long _serial; + bool _destroyed; + IceUtil::Time _timeout; + typedef std::map<Ice::Identity, NodeSessionKeepAliveThreadPtr> NodeSessionMap; NodeSessionMap _sessions; }; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 1c16687c2c5..6a4fe6c2b53 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -262,8 +262,23 @@ RegistryI::start(bool nowarn) // // Get the instance name // - const string instanceNameProperty = "IceGrid.InstanceName"; - _instanceName = properties->getPropertyWithDefault(instanceNameProperty, "IceGrid"); + if(replicaName.empty()) + { + const string instanceNameProperty = "IceGrid.InstanceName"; + _instanceName = properties->getPropertyWithDefault(instanceNameProperty, "IceGrid"); + } + else + { + if(properties->getProperty("Ice.Default.Locator").empty()) + { + Error out(_communicator->getLogger()); + out << "property `Ice.Default.Locator' is not set"; + return false; + } + _instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category; + } + + // // Add a default servant locator to the client object adapter. The @@ -326,13 +341,12 @@ RegistryI::start(bool nowarn) intSessionTimeout, _traceLevels); InternalRegistryPrx internalRegistry; - QueryPrx query; if(replicaName.empty()) { _database->initMaster(); LocatorPrx internalLocator = setupLocator(clientAdapter, serverAdapter, registryAdapter); - query = setupQuery(clientAdapter); + setupQuery(clientAdapter); setupAdmin(adminAdapter); setupRegistry(clientAdapter); internalRegistry = setupInternalRegistry(registryAdapter, replicaName); @@ -353,24 +367,12 @@ RegistryI::start(bool nowarn) // the master. // - if(properties->getProperty("Ice.Default.Locator").empty()) - { - Error out(_communicator->getLogger()); - out << "property `Ice.Default.Locator' is not set"; - return false; - } - setupLocator(clientAdapter, serverAdapter, 0); setupQuery(clientAdapter); internalRegistry = setupInternalRegistry(registryAdapter, replicaName); } - if(replicaName.empty()) - { - addWellKnownObject(internalRegistry, InternalRegistry::ice_staticId()); - addWellKnownObject(query, Query::ice_staticId()); - } - else + if(!replicaName.empty()) { _session.create(replicaName, _database, internalRegistry, clientAdapter, serverAdapter); } @@ -413,11 +415,12 @@ RegistryI::setupLocator(const Ice::ObjectAdapterPtr& clientAdapter, } } -QueryPrx +void RegistryI::setupQuery(const Ice::ObjectAdapterPtr& clientAdapter) { Identity queryId = _communicator->stringToIdentity(_instanceName + "/Query"); - return QueryPrx::uncheckedCast(clientAdapter->add(new QueryI(_communicator, _database), queryId)); + clientAdapter->add(new QueryI(_communicator, _database), queryId); + addWellKnownObject(clientAdapter->createProxy(queryId), Query::ice_staticId()); } void @@ -446,8 +449,9 @@ RegistryI::setupInternalRegistry(const Ice::ObjectAdapterPtr& registryAdapter, c internalRegistryId.name += "-" + replicaName; } ObjectPtr internalRegistry = new InternalRegistryI(_database, _internalReaper); - registryAdapter->add(internalRegistry, internalRegistryId); - return InternalRegistryPrx::uncheckedCast(registryAdapter->createProxy(internalRegistryId)); + Ice::ObjectPrx proxy = registryAdapter->add(internalRegistry, internalRegistryId); + addWellKnownObject(proxy, InternalRegistry::ice_staticId()); + return InternalRegistryPrx::uncheckedCast(proxy); } void diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h index 19edca16270..18846a4cdde 100644 --- a/cpp/src/IceGrid/RegistryI.h +++ b/cpp/src/IceGrid/RegistryI.h @@ -65,7 +65,7 @@ private: Ice::LocatorPrx setupLocator(const Ice::ObjectAdapterPtr&, const Ice::ObjectAdapterPtr&, const Ice::ObjectAdapterPtr&); - QueryPrx setupQuery(const Ice::ObjectAdapterPtr&); + void setupQuery(const Ice::ObjectAdapterPtr&); void setupAdmin(const Ice::ObjectAdapterPtr&); void setupRegistry(const Ice::ObjectAdapterPtr&); InternalRegistryPrx setupInternalRegistry(const Ice::ObjectAdapterPtr&, const std::string&); diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index 91c5652ae11..248bdd1b6c4 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -164,6 +164,19 @@ ReplicaCache::nodeRemoved(const NodePrx& node) } } +InternalRegistryPrxSeq +ReplicaCache::getAll() const +{ + Lock sync(*this); + InternalRegistryPrxSeq replicas; + replicas.reserve(_entries.size()); + for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p) + { + replicas.push_back(p->second->getSession()->getProxy()); + } + return replicas; +} + Ice::ObjectPrx ReplicaCache::getClientProxy() const { diff --git a/cpp/src/IceGrid/ReplicaCache.h b/cpp/src/IceGrid/ReplicaCache.h index 44bce1c24fc..5712cbc001a 100644 --- a/cpp/src/IceGrid/ReplicaCache.h +++ b/cpp/src/IceGrid/ReplicaCache.h @@ -57,6 +57,8 @@ public: void nodeAdded(const NodePrx&); void nodeRemoved(const NodePrx&); + InternalRegistryPrxSeq getAll() const; + private: Ice::ObjectPrx getClientProxy() const; diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 486cba6f700..51f57cabf03 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -102,7 +102,6 @@ ReplicaSessionKeepAliveThread::ReplicaSessionKeepAliveThread(const std::string& _replica(replica), _info(info), _database(database), - _timeout(IceUtil::Time::seconds(5)), _shutdown(false) { } @@ -114,20 +113,91 @@ ReplicaSessionKeepAliveThread::run() // Keep alive the session. // ReplicaSessionPrx session; + IceUtil::Time timeout = IceUtil::Time::seconds(5); + TraceLevelsPtr traceLevels = _database->getTraceLevels(); while(true) { - keepAlive(session); - + if(session) { - Lock sync(*this); + try + { + if(traceLevels && traceLevels->replica > 2) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "sending keep alive message to master replica"; + } - session = _session; + session->keepAlive(); + } + catch(const Ice::LocalException& ex) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "lost session with master replica:\n" << ex; + } + session = 0; + } + } - if(!_shutdown) + if(!session) + { + try + { + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "trying to establish session with master replica"; + } + + session = _master->registerReplica(_name, _replica, _info); + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t); + } + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with master replica"; + } + } + catch(const ReplicaActiveException&) { - timedWait(_timeout); + if(traceLevels) + { + traceLevels->logger->error("a replica with the same name is already registered and active"); + } } + catch(const Ice::LocalException& ex) + { + ObjectInfo info; + info.type = InternalRegistry::ice_staticId(); + info.proxy = _replica; + _database->addObject(info, true); + + Ice::Identity id; + id.category = _replica->ice_getIdentity().category; + id.name = "Query"; + info.type = Query::ice_staticId(); + info.proxy = _info.clientProxy->ice_identity(id); + _database->addObject(info, true); + + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "failed to establish session with master replica:\n" << ex; + } + } + } + { + Lock sync(*this); + if(!_shutdown) + { + timedWait(timeout); + } if(_shutdown) { break; @@ -143,30 +213,25 @@ ReplicaSessionKeepAliveThread::run() try { session->destroy(); + + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "destroyed master replica session"; + } } - catch(const Ice::LocalException&) + catch(const Ice::LocalException& ex) { - // - // TODO: XXX: TRACE? - // -// ostringstream os; -// os << "couldn't contact the IceGrid registry to destroy the node session:\n" << ex; -// _database->getTraceLevels()->logger->warning(os.str()); + if(traceLevels && traceLevels->replica > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "couldn't destroy master replica session:\n" << ex; + } } } } void -ReplicaSessionKeepAliveThread::waitForCreate() -{ - Lock sync(*this); - while(!_session) - { - wait(); - } -} - -void ReplicaSessionKeepAliveThread::terminate() { Lock sync(*this); @@ -174,50 +239,6 @@ ReplicaSessionKeepAliveThread::terminate() notifyAll(); } -void -ReplicaSessionKeepAliveThread::keepAlive(const ReplicaSessionPrx& session) -{ - if(session) - { - try - { - session->keepAlive(); - return; // We're done! - } - catch(const Ice::LocalException&) - { - } - } - - try - { - ReplicaSessionPrx newSession = _master->registerReplica(_name, _replica, _info); - int timeout = newSession->getTimeout(); - { - Lock sync(*this); - if(timeout > 0) - { - _timeout = IceUtil::Time::seconds(timeout); - } - _session = newSession; - notifyAll(); - } - } - catch(const ReplicaActiveException&) - { - _database->getTraceLevels()->logger->error("a replica with the same name is already registered and active"); - } - catch(const Ice::LocalException&) - { - // - // TODO: FIX THIS SHOULD BE A TRACE - // -// ostringstream os; -// os << "couldn't contact the IceGrid registry:\n" << ex; -// _database->getTraceLevels()->logger->warning(os.str()); - } -} - ReplicaSessionManager::ReplicaSessionManager() { } diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h index 3abbb96b049..7de7b519360 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.h +++ b/cpp/src/IceGrid/ReplicaSessionManager.h @@ -32,7 +32,6 @@ public: virtual void run(); - void waitForCreate(); void terminate(); private: @@ -44,8 +43,6 @@ private: const InternalRegistryPrx _replica; const ReplicaInfo _info; const DatabasePtr _database; - IceUtil::Time _timeout; - ReplicaSessionPrx _session; bool _shutdown; }; typedef IceUtil::Handle<ReplicaSessionKeepAliveThread> ReplicaSessionKeepAliveThreadPtr; diff --git a/cpp/src/IceGrid/ServerAdapterI.cpp b/cpp/src/IceGrid/ServerAdapterI.cpp index 5ae4619e327..d00f721cdd9 100644 --- a/cpp/src/IceGrid/ServerAdapterI.cpp +++ b/cpp/src/IceGrid/ServerAdapterI.cpp @@ -151,6 +151,9 @@ ServerAdapterI::setDirectProxy(const Ice::ObjectPrx& prx, const Ice::Current&) } catch(const Ice::LocalException&) { + // + // Expected if the master IceGrid registry is down. + // } } } diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index a313e1f9388..2c0ea9ce4ff 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -742,8 +742,11 @@ ServerI::setEnabled(bool enabled, const ::Ice::Current&) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_node->getTraceLevels()->logger); - out << "unexpected observer exception:\n" << ex; + // + // Expected if the master IceGrid registry is down. + // + //Ice::Warning out(_node->getTraceLevels()->logger); + //out << "unexpected observer exception:\n" << ex; } } } @@ -2168,8 +2171,11 @@ ServerI::setStateNoSync(InternalServerState st, const std::string& reason) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_node->getTraceLevels()->logger); - out << "unexpected observer exception:\n" << ex; + // + // Expected if the master IceGrid registry is down. + // + //Ice::Warning out(_node->getTraceLevels()->logger); + //out << "unexpected observer exception:\n" << ex; } } } |