diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 10 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdminI.cpp | 15 | ||||
-rw-r--r-- | cpp/src/IceGrid/AllocatableObjectCache.cpp | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/Cache.h | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 42 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridNode.cpp | 5 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.cpp | 134 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeCache.h | 10 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 10 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 13 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.h | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/ObjectCache.cpp | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 27 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.cpp | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.h | 1 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 23 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 84 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 1 |
19 files changed, 219 insertions, 183 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index 583ac2663fc..449e03b5c6a 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -135,8 +135,9 @@ AdapterCache::removeServerAdapter(const string& id) { Lock sync(*this); - ServerAdapterEntryPtr entry = ServerAdapterEntryPtr::dynamicCast(removeImpl(id)); + ServerAdapterEntryPtr entry = ServerAdapterEntryPtr::dynamicCast(getImpl(id)); assert(entry); + removeImpl(id); string replicaGroupId = entry->getReplicaGroupId(); if(!replicaGroupId.empty()) @@ -151,8 +152,7 @@ void AdapterCache::removeReplicaGroup(const string& id) { Lock sync(*this); - ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(removeImpl(id)); - assert(entry); + removeImpl(id); } AdapterEntryPtr @@ -166,7 +166,7 @@ AdapterCache::addImpl(const string& id, const AdapterEntryPtr& entry) return Cache<string, AdapterEntry>::addImpl(id, entry); } -AdapterEntryPtr +void AdapterCache::removeImpl(const string& id) { if(_traceLevels && _traceLevels->adapter > 0) @@ -174,7 +174,7 @@ AdapterCache::removeImpl(const string& id) Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); out << "removed adapter `" << id << "'"; } - return Cache<string, AdapterEntry>::removeImpl(id); + Cache<string, AdapterEntry>::removeImpl(id); } AdapterEntry::AdapterEntry(AdapterCache& cache, const string& id) : diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index b5fdc7958b2..94a19e1fe40 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -118,7 +118,7 @@ public: protected: virtual AdapterEntryPtr addImpl(const std::string&, const AdapterEntryPtr&); - virtual AdapterEntryPtr removeImpl(const std::string&); + virtual void removeImpl(const std::string&); }; diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp index 1294bf18bb2..e1a380b73c3 100644 --- a/cpp/src/IceGrid/AdminI.cpp +++ b/cpp/src/IceGrid/AdminI.cpp @@ -655,8 +655,9 @@ AdminI::updateObject(const Ice::ObjectPrx& proxy, const ::Ice::Current& current) const Ice::Identity id = proxy->ice_getIdentity(); if(id.category == _database->getInstanceName()) { - throw DeploymentException("updating object `" + current.adapter->getCommunicator()->identityToString(id) + - "' is not allowed"); + DeploymentException ex; + ex.reason ="updating object `" + _database->getCommunicator()->identityToString(id) + "' is not allowed"; + throw ex; } _database->updateObject(proxy); } @@ -668,8 +669,9 @@ AdminI::addObjectWithType(const Ice::ObjectPrx& proxy, const string& type, const const Ice::Identity id = proxy->ice_getIdentity(); if(id.category == _database->getInstanceName()) { - throw DeploymentException("adding object `" + current.adapter->getCommunicator()->identityToString(id) + - "' is not allowed"); + DeploymentException ex; + ex.reason = "adding object `" + _database->getCommunicator()->identityToString(id) + "' is not allowed"; + throw ex; } ObjectInfo info; @@ -684,8 +686,9 @@ AdminI::removeObject(const Ice::Identity& id, const Ice::Current& current) checkIsMaster(); if(id.category == _database->getInstanceName()) { - throw DeploymentException("removing object `" + current.adapter->getCommunicator()->identityToString(id) + - "' is not allowed"); + DeploymentException ex; + ex.reason = "removing object `" + _database->getCommunicator()->identityToString(id) + "' is not allowed"; + throw ex; } _database->removeObject(id); } diff --git a/cpp/src/IceGrid/AllocatableObjectCache.cpp b/cpp/src/IceGrid/AllocatableObjectCache.cpp index 341d110a56b..8be86c60f7e 100644 --- a/cpp/src/IceGrid/AllocatableObjectCache.cpp +++ b/cpp/src/IceGrid/AllocatableObjectCache.cpp @@ -166,8 +166,9 @@ AllocatableObjectEntryPtr AllocatableObjectCache::remove(const Ice::Identity& id) { Lock sync(*this); - AllocatableObjectEntryPtr entry = removeImpl(id); + AllocatableObjectEntryPtr entry = getImpl(id); assert(entry); + removeImpl(id); map<string, TypeEntry>::iterator p = _types.find(entry->getType()); assert(p != _types.end()); diff --git a/cpp/src/IceGrid/Cache.h b/cpp/src/IceGrid/Cache.h index 65b246a2be7..403fc6cffc9 100644 --- a/cpp/src/IceGrid/Cache.h +++ b/cpp/src/IceGrid/Cache.h @@ -42,11 +42,11 @@ public: return getImpl(key); } - ValuePtr + void remove(const Key& key) { Lock sync(*this); - return removeImpl(key); + removeImpl(key); } void @@ -95,7 +95,7 @@ protected: return entry; } - virtual ValuePtr + virtual void removeImpl(const Key& key) { typename ValueMap::iterator p = _entries.end(); @@ -115,15 +115,12 @@ protected: assert(p != _entries.end()); if(p->second->canRemove()) { - ValuePtr entry = p->second; _entries.erase(p); _entriesHint = _entries.end(); - return entry; } else { _entriesHint = p; - return p->second; } } diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 308264781cc..dfb2d839d90 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -357,7 +357,13 @@ Database::syncObjects(const ObjectInfoSeq& objects) _objects.clear(); for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) { - _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + const Ice::Identity& id = q->proxy->ice_getIdentity(); + if(id.category != _instanceName || id.name.find("Node-") != 0) + { + // Don't replicate node well-known objects. These objects are + // maintained by each replica with each node session. + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + } } serial = ++_objectSerial; } @@ -618,7 +624,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) out << "removed application `" << name << "'"; } - if(session) + if(_master) { try { @@ -629,10 +635,6 @@ Database::removeApplication(const string& name, AdminSessionI* session) // Ignore, this is traced by the node cache. } } - else - { - // TODO: XXX: synchronize the servers - } } ApplicationInfo @@ -663,18 +665,10 @@ Database::addNode(const string& name, const NodeSessionIPtr& session) { _nodeCache.get(name, true)->setSession(session); -// // -// // Only the master adds the node well-known proxy to its -// // database. The well-known proxy will be transmitted to the -// // replicas through the replication of the database. -// // -// if(_master) -// { - ObjectInfo info; - info.type = Node::ice_staticId(); - info.proxy = session->getNode(); - addObject(info, true); -// } + ObjectInfo info; + info.type = Node::ice_staticId(); + info.proxy = session->getNode(); + addObject(info, true); } NodePrx @@ -695,9 +689,9 @@ Database::removeNode(const string& name, const NodeSessionIPtr& session, bool sh // // If the registry isn't being shutdown and this registry is the // master we remove the node well-known proxy from the object - // adapter. Replicas will be notified through the replication. + // adapter. Replicas will be notified through replication. // - if(!shutdown && _master) + if(!shutdown) { removeObject(session->getNode()->ice_getIdentity()); } @@ -709,6 +703,10 @@ Database::removeNode(const string& name, const NodeSessionIPtr& session, bool sh // _nodeObserverTopic->nodeDown(name); + // + // Clear the node session. Once this is called, the node can + // create a new session. + // _nodeCache.get(name)->setSession(0); } @@ -1128,8 +1126,8 @@ Database::getAllAdapters(const string& expression) void Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) { - int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); + int serial; bool update = false; { Lock sync(*this); @@ -1263,7 +1261,6 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects) vector<bool> updated; { Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { @@ -1310,6 +1307,7 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects) { int serial; { + Lock sync(*this); Freeze::TransactionHolder txHolder(_connection); for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p) { diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index 2e278db93b2..a4b2d5be8d7 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -540,10 +540,9 @@ NodeService::start(int argc, char* argv[]) // We wait for the node to be registered with the registry // before to claim it's ready. // - // TODO: XXX: That's not correct. The node can't be - // interrupted if we wait here... - // + enableInterrupt(); _sessions.waitForCreate(); + disableInterrupt(); print(bundleName + " ready"); } diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index bb65d422d93..da37fba58a1 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -208,10 +208,15 @@ NodeCache::get(const string& name, bool create) const NodeEntry::NodeEntry(NodeCache& cache, const std::string& name) : _cache(cache), + _ref(0), _name(name) { } +NodeEntry::~NodeEntry() +{ +} + void NodeEntry::addDescriptor(const string& application, const NodeDescriptor& descriptor) { @@ -222,16 +227,8 @@ NodeEntry::addDescriptor(const string& application, const NodeDescriptor& descri void NodeEntry::removeDescriptor(const string& application) { - bool remove = false; - { - Lock sync(*this); - _descriptors.erase(application); - remove = _servers.empty() && !_session && _descriptors.empty(); - } - if(remove) - { - _cache.remove(_name); - } + Lock sync(*this); + _descriptors.erase(application); } void @@ -244,45 +241,45 @@ NodeEntry::addServer(const ServerEntryPtr& entry) void NodeEntry::removeServer(const ServerEntryPtr& entry) { - bool remove = false; - { - Lock sync(*this); - _servers.erase(entry->getId()); - remove = _servers.empty() && !_session && _descriptors.empty(); - } - if(remove) - { - _cache.remove(_name); - } + Lock sync(*this); + _servers.erase(entry->getId()); } void NodeEntry::setSession(const NodeSessionIPtr& session) { - bool remove = false; - { - Lock sync(*this); - if(session && _session) - { - throw NodeActiveException(); - } - else if(!session && !_session) - { - return; - } + Lock sync(*this); - if(!session && _session) + if(session) + { + // If the current session has just been destroyed, wait for the setSession(0) call. + assert(session != _session); + while(_session && _session->isDestroyed()) { - _cache.getReplicaCache().nodeRemoved(_session->getNode()); + wait(); } + } + + if(session && _session) + { + throw NodeActiveException(); + } + else if(!session && !_session) + { + return; + } + + if(!session && _session) + { + _cache.getReplicaCache().nodeRemoved(_session->getNode()); + } + + _session = session; + notifyAll(); - _session = session; - remove = _servers.empty() && !_session && _descriptors.empty(); - - if(_session) - { - _cache.getReplicaCache().nodeAdded(session->getNode()); - } + if(_session) + { + _cache.getReplicaCache().nodeAdded(session->getNode()); } if(session) @@ -301,15 +298,6 @@ NodeEntry::setSession(const NodeSessionIPtr& session) out << "node `" << _name << "' down"; } } - - // - // NOTE: this needs to be the last thing to do as this will - // destroy this entry. - // - if(remove) - { - _cache.remove(_name); - } } NodePrx @@ -393,7 +381,7 @@ bool NodeEntry::canRemove() { Lock sync(*this); - return !_session && _servers.empty() && _descriptors.empty(); + return _servers.empty() && !_session && _descriptors.empty(); } void @@ -520,3 +508,49 @@ NodeEntry::getServerDescriptor(const ServerInfo& server, const SessionIPtr& sess return ServerHelper(_cache.getCommunicator(), server.descriptor).instantiate(resolve, PropertyDescriptorSeq()); } } + +void +NodeEntry::__incRef() +{ + Lock sync(*this); + assert(_ref >= 0); + ++_ref; +} + +void +NodeEntry::__decRef() +{ + // + // The node entry implements its own reference counting. If the + // reference count drops to 1, this means that only the cache + // holds a reference on the node entry. If that's the case, we + // check if the node entry can be removed or not and if it can be + // removed we remove it from the cache map. + // + + bool doRemove = false; + bool doDelete = false; + { + Lock sync(*this); + assert(_ref > 0); + --_ref; + + if(_ref == 1) + { + doRemove = _servers.empty() && !_session && _descriptors.empty(); + } + else if(_ref == 0) + { + doDelete = true; + } + } + + if(doRemove) + { + _cache.remove(_name); + } + else if(doDelete) + { + delete this; + } +} diff --git a/cpp/src/IceGrid/NodeCache.h b/cpp/src/IceGrid/NodeCache.h index 4ebec4f9a19..cfedcea35b6 100644 --- a/cpp/src/IceGrid/NodeCache.h +++ b/cpp/src/IceGrid/NodeCache.h @@ -31,12 +31,13 @@ typedef IceUtil::Handle<ServerEntry> ServerEntryPtr; class ReplicaCache; -class NodeEntry : public IceUtil::Shared, public IceUtil::Mutex +class NodeEntry : public IceUtil::Monitor<IceUtil::Mutex> { public: NodeEntry(NodeCache&, const std::string&); - + virtual ~NodeEntry(); + void addDescriptor(const std::string&, const NodeDescriptor&); void removeDescriptor(const std::string&); @@ -55,11 +56,16 @@ public: void destroyServer(const ServerEntryPtr&, const ServerInfo&); ServerInfo getServerInfo(const ServerInfo&, const SessionIPtr&); + void __incRef(); + void __decRef(); + private: ServerDescriptorPtr getServerDescriptor(const ServerInfo&, const SessionIPtr&); NodeCache& _cache; + IceUtil::Mutex _refMutex; + int _ref; const std::string _name; NodeSessionIPtr _session; std::map<std::string, ServerEntryPtr> _servers; diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 1eed1b8bbf6..4eb61cc7294 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -485,16 +485,6 @@ void NodeI::shutdown(const Ice::Current&) const { _activator->shutdown(); - // - // TODO: XXX: Wait for the session to be down with the registry - // who invoked this call. Perhaps it's better to have the registry - // wait actually... - // -// IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_sessionMonitor); -// while(_session) -// { -// _sessionMonitor.wait(); -// } } Ice::CommunicatorPtr diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index e5ca8dfef6d..747fbcec915 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -119,12 +119,6 @@ NodeSessionI::waitForApplicationReplication_async(const AMD_NodeSession_waitForA void NodeSessionI::destroy(const Ice::Current& current) { - // - // TODO: XXX: If we set destroy to true now, it's possible that - // the node calls keepAlive on the sesion and tries to create the - // session after getting the ONE and before the node is removed - // from the db... - // { Lock sync(*this); if(_destroy) @@ -177,3 +171,10 @@ NodeSessionI::timestamp() const } return _timestamp; } + +bool +NodeSessionI::isDestroyed() const +{ + Lock sync(*this); + return _destroy; +} diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index a21c86d8b17..28fed1c7b6d 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -40,6 +40,7 @@ public: const NodeInfo& getInfo() const; const LoadInfo& getLoadInfo() const; virtual IceUtil::Time timestamp() const; + bool isDestroyed() const; private: diff --git a/cpp/src/IceGrid/ObjectCache.cpp b/cpp/src/IceGrid/ObjectCache.cpp index c2b58f08833..ee6483c9d9f 100644 --- a/cpp/src/IceGrid/ObjectCache.cpp +++ b/cpp/src/IceGrid/ObjectCache.cpp @@ -114,8 +114,9 @@ ObjectEntryPtr ObjectCache::remove(const Ice::Identity& id) { Lock sync(*this); - ObjectEntryPtr entry = removeImpl(id); + ObjectEntryPtr entry = getImpl(id); assert(entry); + removeImpl(id); map<string, TypeEntry>::iterator p = _types.find(entry->getType()); assert(p != _types.end()); diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index f411e170730..c217a205ea7 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -47,9 +47,22 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) { Lock sync(*this); - if(getImpl(name)) + while(true) { - throw ReplicaActiveException(); + ReplicaEntryPtr entry = getImpl(name); + if(entry) + { + if(entry->getSession()->isDestroyed()) + { + wait(); + continue; + } + else + { + throw ReplicaActiveException(); + } + } + break; } if(_traceLevels && _traceLevels->replica > 0) @@ -61,6 +74,11 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) entry = addImpl(name, new ReplicaEntry(name, session)); } + // + // Note: it's safe to do this outside the synchronization because + // remove() can't be called until this method returns (and until + // the replica session is fully created). + // try { _nodes->replicaAdded(session->getInternalRegistry()); @@ -79,9 +97,10 @@ ReplicaCache::remove(const string& name) ReplicaEntryPtr entry; { Lock sync(*this); - - entry = removeImpl(name); + entry = getImpl(name); assert(entry); + removeImpl(name); + notifyAll(); if(_traceLevels && _traceLevels->replica > 0) { diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp index 605d0e77595..48250fb52d7 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.cpp +++ b/cpp/src/IceGrid/ReplicaSessionI.cpp @@ -115,6 +115,7 @@ ReplicaSessionI::receivedUpdate(TopicName topic, int serial, const string& failu void ReplicaSessionI::destroy(const Ice::Current& current) { + bool shutdown = !current.adapter; { Lock sync(*this); if(_destroy) @@ -126,7 +127,6 @@ ReplicaSessionI::destroy(const Ice::Current& current) _database->removeReplica(_name, this); _wellKnownObjects->unregisterWellKnownObjects(_replicaWellKnownObjects); - bool shutdown = !current.adapter; if(shutdown) { ObjectInfo info; @@ -168,3 +168,10 @@ ReplicaSessionI::getEndpoint(const std::string& name) Lock sync(*this); return _replicaEndpoints[name]; } + +bool +ReplicaSessionI::isDestroyed() const +{ + Lock sync(*this); + return _destroy; +} diff --git a/cpp/src/IceGrid/ReplicaSessionI.h b/cpp/src/IceGrid/ReplicaSessionI.h index aa737b91b06..c01f0290308 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.h +++ b/cpp/src/IceGrid/ReplicaSessionI.h @@ -48,6 +48,7 @@ public: const RegistryInfo& getInfo() const { return _info; } Ice::ObjectPrx getEndpoint(const std::string&); + bool isDestroyed() const; private: diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index 60fde710b13..6feb9f41403 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -143,7 +143,13 @@ public: string failure; try { - _database->addObject(info, true); + const Ice::Identity& id = info.proxy->ice_getIdentity(); + if(id.category != _database->getInstanceName() || id.name.find("Node-") != 0) + { + // Don't replicate node well-known objects. These objects are + // maintained by each replica with each node session. + _database->addObject(info, true); + } } catch(const ObjectExistsException& ex) { @@ -161,7 +167,13 @@ public: string failure; try { - _database->addObject(info, true); + const Ice::Identity& id = info.proxy->ice_getIdentity(); + if(id.category != _database->getInstanceName() || id.name.find("Node-") != 0) + { + // Don't replicate node well-known objects. These objects are + // maintained by each replica with each node session. + _database->addObject(info, true); + } } catch(const DeploymentException& ex) { @@ -178,7 +190,12 @@ public: string failure; try { - _database->removeObject(id); + if(id.category != _database->getInstanceName() || id.name.find("Node-") != 0) + { + // Don't replicate node well-known objects. These objects are + // maintained by each replica with each node session. + _database->removeObject(id); + } } catch(const DeploymentException& ex) { diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 1b9969a4543..48dbaea12dd 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -14,49 +14,9 @@ using namespace std; using namespace IceGrid; -namespace IceGrid -{ - -template<class T> -class InitCB : public T -{ -public: - - InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& subscriberName, - const string& name, int serial) : - _topic(topic), - _observer(observer), - _subscriberName(subscriberName), - _name(name), - _serial(serial) - { - } - - void - ice_response() - { - _topic->subscribe(_observer, _subscriberName, _serial); - } - - void - ice_exception(const Ice::Exception& ex) - { - Ice::Warning out(_observer->ice_getCommunicator()->getLogger()); - out << "couldn't initialize " << _name << " observer:\n" << ex; - } - -private: - - const ObserverTopicPtr _topic; - const Ice::ObjectPrx _observer; - const string _subscriberName; - const string _name; - const int _serial; -}; - -}; ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) : + _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0) { IceStorm::TopicPrx t; @@ -201,9 +161,9 @@ ObserverTopic::waitForSyncedSubscribers(int serial) { os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n"; } - // TODO: XXX -// Ice::Error err(_traceLevels->logger); -// err << os.str(); + + Ice::Error err(_logger); + err << os.str(); } return; } @@ -263,7 +223,7 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `registryUp' update:\n" << ex; } } @@ -286,7 +246,7 @@ RegistryObserverTopic::registryDown(const string& name) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `registryDown' update:\n" << ex; } } @@ -341,7 +301,7 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current& curre } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing 'nodeUp' update:\n" << ex; } } @@ -400,7 +360,7 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `updateServer' update:\n" << ex; } } @@ -453,7 +413,7 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `updateAdapter' update:\n" << ex; } } @@ -478,7 +438,7 @@ NodeObserverTopic::nodeDown(const string& name) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `nodeDown' update:\n" << ex; } } @@ -525,7 +485,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationInit' update:\n" << ex; } } @@ -546,7 +506,7 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -568,7 +528,7 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -621,7 +581,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -667,7 +627,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterInit' update:\n" << ex; } } @@ -688,7 +648,7 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -710,7 +670,7 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -732,7 +692,7 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -778,7 +738,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `objectInit' update:\n" << ex; } } @@ -799,7 +759,7 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -821,7 +781,7 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } waitForSyncedSubscribers(serial); @@ -843,7 +803,7 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) } catch(const Ice::LocalException& ex) { - Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); + Ice::Warning out(_logger); out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } waitForSyncedSubscribers(serial); diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index 3720d7f734c..f8fc0312c44 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -42,6 +42,7 @@ protected: void updateSerial(int); Ice::Context getContext(int) const; + Ice::LoggerPtr _logger; IceStorm::TopicPrx _topic; Ice::ObjectPrx _basePublisher; std::set<Ice::Identity> _waitForSubscribe; |