summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/AdminSessionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/IceGrid/AdminSessionI.cpp
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/IceGrid/AdminSessionI.cpp')
-rw-r--r--cpp/src/IceGrid/AdminSessionI.cpp111
1 files changed, 92 insertions, 19 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp
index cfaf3fd686d..3be228f8ae5 100644
--- a/cpp/src/IceGrid/AdminSessionI.cpp
+++ b/cpp/src/IceGrid/AdminSessionI.cpp
@@ -20,6 +20,66 @@
using namespace std;
using namespace IceGrid;
+namespace
+{
+
+
+class SubscriberForwarderI : public Ice::BlobjectArrayAsync
+{
+ class CallbackI: public IceUtil::Shared
+ {
+ public:
+
+ virtual void
+ exception(const Ice::Exception& ex, const Ice::AMD_Object_ice_invokePtr& amdCB)
+ {
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::CloseConnectionException&)
+ {
+ amdCB->ice_response(true, Ice::ByteSeq()); // Bi-dir connection is closed, ignore.
+ }
+ catch(const Ice::Exception& ex)
+ {
+ amdCB->ice_exception(ex);
+ }
+ }
+
+ virtual void
+ response(bool ok,
+ const pair<const Ice::Byte*, const Ice::Byte*>& outP,
+ const Ice::AMD_Object_ice_invokePtr& amdCB)
+ {
+ amdCB->ice_response(ok, outP);
+ }
+ };
+
+public:
+
+ SubscriberForwarderI(const Ice::ObjectPrx& proxy) :
+ _proxy(proxy),
+ _callback(newCallback_Object_ice_invoke(new CallbackI(), &CallbackI::response, &CallbackI::exception))
+ {
+ }
+
+ virtual void
+ ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB,
+ const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
+ const Ice::Current& current)
+ {
+ _proxy->begin_ice_invoke(current.operation, current.mode, inParams, current.ctx, _callback, amdCB);
+ }
+
+private:
+
+ const Ice::ObjectPrx _proxy;
+ const Ice::Callback_Object_ice_invokePtr _callback;
+};
+
+}
+
FileIteratorI::FileIteratorI(const AdminSessionIPtr& session,
const FileReaderPrx& reader,
const string& filename,
@@ -187,12 +247,11 @@ AdminSessionI::setObserversByIdentity(const Ice::Identity& registryObserver,
throw ex;
}
- setupObserverSubscription(RegistryObserverTopicName, toProxy(registryObserver, current.con, current.encoding));
- setupObserverSubscription(NodeObserverTopicName, toProxy(nodeObserver, current.con, current.encoding));
- setupObserverSubscription(ApplicationObserverTopicName, toProxy(appObserver, current.con, current.encoding));
- setupObserverSubscription(AdapterObserverTopicName, toProxy(adapterObserver, current.con, current.encoding));
- setupObserverSubscription(ObjectObserverTopicName, toProxy(objectObserver, current.con, current.encoding));
-
+ setupObserverSubscription(RegistryObserverTopicName, addForwarder(registryObserver, current), true);
+ setupObserverSubscription(NodeObserverTopicName, addForwarder(nodeObserver, current), true);
+ setupObserverSubscription(ApplicationObserverTopicName, addForwarder(appObserver, current), true);
+ setupObserverSubscription(AdapterObserverTopicName, addForwarder(adapterObserver, current), true);
+ setupObserverSubscription(ObjectObserverTopicName, addForwarder(objectObserver, current), true);
}
int
@@ -321,38 +380,52 @@ AdminSessionI::destroy(const Ice::Current&)
}
void
-AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer)
+AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& observer, bool forwarder)
{
- if(_observers[name] && _observers[name] != observer)
+ if(_observers.find(name) != _observers.end() && _observers[name].first != observer)
{
- _database->getObserverTopic(name)->unsubscribe(_observers[name]);
- _observers[name] = 0;
+ _database->getObserverTopic(name)->unsubscribe(_observers[name].first);
+ if(_observers[name].second)
+ {
+ try
+ {
+ // Unregister forwarder object
+ _registry->getRegistryAdapter()->remove(_observers[name].first->ice_getIdentity());
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ }
+ }
+ _observers.erase(name);
}
if(observer)
{
- _observers[name] = observer;
- _database->getObserverTopic(name)->subscribe(_observers[name]);
+ _observers[name].first = observer;
+ _observers[name].second = forwarder;
+ _database->getObserverTopic(name)->subscribe(observer);
}
}
Ice::ObjectPrx
-AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection, const Ice::EncodingVersion& v)
+AdminSessionI::addForwarder(const Ice::Identity& id, const Ice::Current& current)
{
- return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id)->ice_encodingVersion(v);
+ if(id.name.empty())
+ {
+ return Ice::ObjectPrx();
+ }
+ Ice::ObjectPrx prx = current.con->createProxy(id)->ice_encodingVersion(current.encoding);
+ return _registry->getRegistryAdapter()->addWithUUID(new SubscriberForwarderI(prx));
}
FileIteratorPrx
-AdminSessionI::addFileIterator(const FileReaderPrx& reader,
- const string& filename,
- int nLines,
- const Ice::Current& current)
+AdminSessionI::addFileIterator(const FileReaderPrx& reader, const string& filename, int nLines, const Ice::Current& c)
{
Lock sync(*this);
if(_destroyed)
{
Ice::ObjectNotExistException ex(__FILE__, __LINE__);
- ex.id = current.id;
+ ex.id = c.id;
throw ex;
}