diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/IceGrid/AdminSessionI.cpp | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/IceGrid/AdminSessionI.cpp')
-rw-r--r-- | cpp/src/IceGrid/AdminSessionI.cpp | 111 |
1 files changed, 92 insertions, 19 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp index cfaf3fd686d..3be228f8ae5 100644 --- a/cpp/src/IceGrid/AdminSessionI.cpp +++ b/cpp/src/IceGrid/AdminSessionI.cpp @@ -20,6 +20,66 @@ using namespace std; using namespace IceGrid; +namespace +{ + + +class SubscriberForwarderI : public Ice::BlobjectArrayAsync +{ + class CallbackI: public IceUtil::Shared + { + public: + + virtual void + exception(const Ice::Exception& ex, const Ice::AMD_Object_ice_invokePtr& amdCB) + { + try + { + ex.ice_throw(); + } + catch(const Ice::CloseConnectionException&) + { + amdCB->ice_response(true, Ice::ByteSeq()); // Bi-dir connection is closed, ignore. + } + catch(const Ice::Exception& ex) + { + amdCB->ice_exception(ex); + } + } + + virtual void + response(bool ok, + const pair<const Ice::Byte*, const Ice::Byte*>& outP, + const Ice::AMD_Object_ice_invokePtr& amdCB) + { + amdCB->ice_response(ok, outP); + } + }; + +public: + + SubscriberForwarderI(const Ice::ObjectPrx& proxy) : + _proxy(proxy), + _callback(newCallback_Object_ice_invoke(new CallbackI(), &CallbackI::response, &CallbackI::exception)) + { + } + + virtual void + ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB, + const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + const Ice::Current& current) + { + _proxy->begin_ice_invoke(current.operation, current.mode, inParams, current.ctx, _callback, amdCB); + } + +private: + + const Ice::ObjectPrx _proxy; + const Ice::Callback_Object_ice_invokePtr _callback; +}; + +} + FileIteratorI::FileIteratorI(const AdminSessionIPtr& session, const FileReaderPrx& reader, const string& filename, @@ -187,12 +247,11 @@ AdminSessionI::setObserversByIdentity(const Ice::Identity& registryObserver, throw ex; } - setupObserverSubscription(RegistryObserverTopicName, toProxy(registryObserver, current.con, current.encoding)); - setupObserverSubscription(NodeObserverTopicName, toProxy(nodeObserver, current.con, current.encoding)); - setupObserverSubscription(ApplicationObserverTopicName, toProxy(appObserver, current.con, current.encoding)); - setupObserverSubscription(AdapterObserverTopicName, toProxy(adapterObserver, current.con, current.encoding)); - setupObserverSubscription(ObjectObserverTopicName, toProxy(objectObserver, current.con, current.encoding)); - + setupObserverSubscription(RegistryObserverTopicName, addForwarder(registryObserver, current), true); + setupObserverSubscription(NodeObserverTopicName, addForwarder(nodeObserver, current), true); + setupObserverSubscription(ApplicationObserverTopicName, addForwarder(appObserver, current), true); + setupObserverSubscription(AdapterObserverTopicName, addForwarder(adapterObserver, current), true); + setupObserverSubscription(ObjectObserverTopicName, addForwarder(objectObserver, current), true); } int @@ -321,38 +380,52 @@ AdminSessionI::destroy(const Ice::Current&) } void -AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer) +AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer, bool forwarder) { - if(_observers[name] && _observers[name] != observer) + if(_observers.find(name) != _observers.end() && _observers[name].first != observer) { - _database->getObserverTopic(name)->unsubscribe(_observers[name]); - _observers[name] = 0; + _database->getObserverTopic(name)->unsubscribe(_observers[name].first); + if(_observers[name].second) + { + try + { + // Unregister forwarder object + _registry->getRegistryAdapter()->remove(_observers[name].first->ice_getIdentity()); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + } + } + _observers.erase(name); } if(observer) { - _observers[name] = observer; - _database->getObserverTopic(name)->subscribe(_observers[name]); + _observers[name].first = observer; + _observers[name].second = forwarder; + _database->getObserverTopic(name)->subscribe(observer); } } Ice::ObjectPrx -AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection, const Ice::EncodingVersion& v) +AdminSessionI::addForwarder(const Ice::Identity& id, const Ice::Current& current) { - return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id)->ice_encodingVersion(v); + if(id.name.empty()) + { + return Ice::ObjectPrx(); + } + Ice::ObjectPrx prx = current.con->createProxy(id)->ice_encodingVersion(current.encoding); + return _registry->getRegistryAdapter()->addWithUUID(new SubscriberForwarderI(prx)); } FileIteratorPrx -AdminSessionI::addFileIterator(const FileReaderPrx& reader, - const string& filename, - int nLines, - const Ice::Current& current) +AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, int nLines, const Ice::Current& c) { Lock sync(*this); if(_destroyed) { Ice::ObjectNotExistException ex(__FILE__, __LINE__); - ex.id = current.id; + ex.id = c.id; throw ex; } |