summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/ConnectionFactory.cpp90
-rw-r--r--cpp/src/Ice/ConnectionFactory.h4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp10
-rw-r--r--cpp/src/Ice/ObjectAdapterI.cpp64
-rw-r--r--cpp/src/Ice/ObjectAdapterI.h8
-rw-r--r--cpp/src/Ice/RouterInfo.cpp31
-rw-r--r--cpp/src/Ice/RouterInfo.h1
7 files changed, 142 insertions, 66 deletions
diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp
index d6ae53d56ce..b720f2b274c 100644
--- a/cpp/src/Ice/ConnectionFactory.cpp
+++ b/cpp/src/Ice/ConnectionFactory.cpp
@@ -370,7 +370,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpt
}
void
-IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router)
+IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
@@ -379,61 +379,59 @@ IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router)
throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
- RouterInfoPtr routerInfo = _instance->routerManager()->get(router);
- if(routerInfo)
+ assert(routerInfo);
+
+ //
+ // Search for connections to the router's client proxy endpoints,
+ // and update the object adapter for such connections, so that
+ // callbacks from the router can be received over such
+ // connections.
+ //
+ ObjectPrx proxy = routerInfo->getClientProxy();
+ ObjectAdapterPtr adapter = routerInfo->getAdapter();
+ vector<EndpointIPtr> endpoints = proxy->__reference()->getEndpoints();
+ vector<EndpointIPtr>::const_iterator p;
+ for(p = endpoints.begin(); p != endpoints.end(); ++p)
{
+ EndpointIPtr endpoint = *p;
+
//
- // Search for connections to the router's client proxy
- // endpoints, and update the object adapter for such
- // connections, so that callbacks from the router can be
- // received over such connections.
+ // Modify endpoints with overrides.
//
- ObjectPrx proxy = routerInfo->getClientProxy();
- ObjectAdapterPtr adapter = routerInfo->getAdapter();
- vector<EndpointIPtr> endpoints = proxy->__reference()->getEndpoints();
- vector<EndpointIPtr>::const_iterator p;
- for(p = endpoints.begin(); p != endpoints.end(); ++p)
+ if(_instance->defaultsAndOverrides()->overrideTimeout)
{
- EndpointIPtr endpoint = *p;
+ endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
+ }
- //
- // Modify endpoints with overrides.
- //
- if(_instance->defaultsAndOverrides()->overrideTimeout)
- {
- endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
- }
+ //
+ // The Connection object does not take the compression flag of
+ // endpoints into account, but instead gets the information
+ // about whether messages should be compressed or not from
+ // other sources. In order to allow connection sharing for
+ // endpoints that differ in the value of the compression flag
+ // only, we always set the compression flag to false here in
+ // this connection factory.
+ //
+ endpoint = endpoint->compress(false);
- //
- // The Connection object does not take the compression flag of
- // endpoints into account, but instead gets the information
- // about whether messages should be compressed or not from
- // other sources. In order to allow connection sharing for
- // endpoints that differ in the value of the compression flag
- // only, we always set the compression flag to false here in
- // this connection factory.
- //
- endpoint = endpoint->compress(false);
+ pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
+ multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint);
- pair<multimap<EndpointIPtr, ConnectionIPtr>::iterator,
- multimap<EndpointIPtr, ConnectionIPtr>::iterator> pr = _connections.equal_range(endpoint);
-
- while(pr.first != pr.second)
+ while(pr.first != pr.second)
+ {
+ try
{
- try
- {
- pr.first->second->setAdapter(adapter);
- }
- catch(const Ice::LocalException&)
- {
- //
- // Ignore, the connection is being closed or closed.
- //
- }
- ++pr.first;
+ pr.first->second->setAdapter(adapter);
+ }
+ catch(const Ice::LocalException&)
+ {
+ //
+ // Ignore, the connection is being closed or closed.
+ //
}
+ ++pr.first;
}
- }
+ }
}
void
diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h
index 22d3dc47e1e..9beccb40c31 100644
--- a/cpp/src/Ice/ConnectionFactory.h
+++ b/cpp/src/Ice/ConnectionFactory.h
@@ -20,7 +20,7 @@
#include <Ice/EndpointIF.h>
#include <Ice/AcceptorF.h>
#include <Ice/TransceiverF.h>
-#include <Ice/RouterF.h>
+#include <Ice/RouterInfoF.h>
#include <Ice/EventHandler.h>
#include <list>
#include <set>
@@ -45,7 +45,7 @@ public:
void waitUntilFinished();
Ice::ConnectionIPtr create(const std::vector<EndpointIPtr>&, bool&);
- void setRouter(const ::Ice::RouterPrx&);
+ void setRouterInfo(const RouterInfoPtr&);
void removeAdapter(const ::Ice::ObjectAdapterPtr&);
void flushBatchRequests();
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index f9baff297ef..7082d4aed1e 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1145,8 +1145,8 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
// dispatch count with any old adapter is zero.
//
// A deadlock can occur if we wait while an operation is
- // outstanding on this adapter that holds a lock while
- // calling this function (e.g., __getDelegate).
+ // outstanding on this adapter that holds a lock while calling
+ // this function (e.g., __getDelegate).
//
// In order to avoid such a deadlock, we only wait if the new
// adapter is different than the current one.
@@ -1161,10 +1161,10 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter)
}
//
- // We never change the thread pool with which we were initially
- // registered, even if we add or remove an object adapter.
+ // We never change the thread pool with which we were
+ // initially registered, even if we add or remove an object
+ // adapter.
//
-
_adapter = adapter;
if(_adapter)
{
diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp
index 79a947e6a02..32d6157771c 100644
--- a/cpp/src/Ice/ObjectAdapterI.cpp
+++ b/cpp/src/Ice/ObjectAdapterI.cpp
@@ -495,6 +495,8 @@ Ice::ObjectAdapterI::addRouter(const RouterPrx& router)
RouterInfoPtr routerInfo = _instance->routerManager()->get(router);
if(routerInfo)
{
+ _routerInfos.push_back(routerInfo);
+
//
// Add the router's server proxy endpoints to this object
// adapter.
@@ -517,7 +519,52 @@ Ice::ObjectAdapterI::addRouter(const RouterPrx& router)
// router's client proxy to use this object adapter for
// callbacks.
//
- _instance->outgoingConnectionFactory()->setRouter(routerInfo->getRouter());
+ _instance->outgoingConnectionFactory()->setRouterInfo(routerInfo);
+ }
+}
+
+void
+Ice::ObjectAdapterI::removeRouter(const RouterPrx& router)
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ checkForDeactivation();
+
+ RouterInfoPtr routerInfo = _instance->routerManager()->erase(router);
+ if(routerInfo)
+ {
+ //
+ // Rebuild the router endpoints from our set of router infos.
+ //
+ _routerEndpoints.clear();
+ vector<RouterInfoPtr>::iterator p = _routerInfos.begin();
+ while(p != _routerInfos.end())
+ {
+ if(*p == routerInfo)
+ {
+ p = _routerInfos.erase(p);
+ continue;
+ }
+ ObjectPrx proxy = (*p)->getServerProxy();
+ vector<EndpointIPtr> endpoints = proxy->__reference()->getEndpoints();
+ copy(endpoints.begin(), endpoints.end(), back_inserter(_routerEndpoints));
+ ++p;
+ }
+
+ sort(_routerEndpoints.begin(), _routerEndpoints.end()); // Must be sorted.
+ _routerEndpoints.erase(unique(_routerEndpoints.begin(), _routerEndpoints.end()), _routerEndpoints.end());
+
+ //
+ // Clear this object adapter with the router.
+ //
+ routerInfo->setAdapter(0);
+
+ //
+ // Also modify all existing outgoing connections to the
+ // router's client proxy to use this object adapter for
+ // callbacks.
+ //
+ _instance->outgoingConnectionFactory()->setRouterInfo(routerInfo);
}
}
@@ -801,11 +848,9 @@ Ice::ObjectAdapterI::newProxy(const Identity& ident, const string& facet) const
//
// Create a reference with the adapter id.
//
- ReferencePtr ref = _instance->referenceFactory()->create(ident, _instance->getDefaultContext(), facet,
- Reference::ModeTwoway, false, _id,
- 0, _locatorInfo,
- _instance->defaultsAndOverrides()->
- defaultCollocationOptimization);
+ ReferencePtr ref = _instance->referenceFactory()->create(
+ ident, _instance->getDefaultContext(), facet, Reference::ModeTwoway, false, _id,
+ 0, _locatorInfo, _instance->defaultsAndOverrides()->defaultCollocationOptimization);
//
// Return a proxy for the reference.
@@ -843,10 +888,9 @@ Ice::ObjectAdapterI::newDirectProxy(const Identity& ident, const string& facet)
//
// Create a reference and return a proxy for this reference.
//
- ReferencePtr ref = _instance->referenceFactory()->create(ident, _instance->getDefaultContext(), facet,
- Reference::ModeTwoway, false, endpoints, 0,
- _instance->defaultsAndOverrides()->
- defaultCollocationOptimization);
+ ReferencePtr ref = _instance->referenceFactory()->create(
+ ident, _instance->getDefaultContext(), facet, Reference::ModeTwoway, false, endpoints, 0,
+ _instance->defaultsAndOverrides()-> defaultCollocationOptimization);
return _instance->proxyFactory()->referenceToProxy(ref);
}
diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h
index b1901111c60..c595b6e0877 100644
--- a/cpp/src/Ice/ObjectAdapterI.h
+++ b/cpp/src/Ice/ObjectAdapterI.h
@@ -21,10 +21,11 @@
#include <Ice/ServantManagerF.h>
#include <Ice/ProxyF.h>
#include <Ice/ObjectF.h>
-#include <Ice/Exception.h>
+#include <Ice/RouterInfoF.h>
#include <Ice/EndpointIF.h>
#include <Ice/LocatorInfoF.h>
#include <Ice/ThreadPoolF.h>
+#include <Ice/Exception.h>
#include <Ice/Process.h>
#include <list>
@@ -68,6 +69,7 @@ public:
virtual ObjectPrx createReverseProxy(const Identity&) const;
virtual void addRouter(const RouterPrx&);
+ virtual void removeRouter(const RouterPrx&);
virtual void setLocator(const LocatorPrx&);
// virtual LocatorPrx getLocator() const;
@@ -104,6 +106,7 @@ private:
const std::string _id;
std::vector<IceInternal::IncomingConnectionFactoryPtr> _incomingConnectionFactories;
std::vector<IceInternal::EndpointIPtr> _routerEndpoints;
+ std::vector<IceInternal::RouterInfoPtr> _routerInfos;
std::vector<IceInternal::EndpointIPtr> _publishedEndpoints;
IceInternal::LocatorInfoPtr _locatorInfo;
int _directCount; // The number of direct proxies dispatching on this object adapter.
@@ -116,12 +119,11 @@ private:
ProcessI(const CommunicatorPtr&);
virtual void shutdown(const Current&);
-
virtual void writeMessage(const std::string&, Int, const Current&);
private:
- CommunicatorPtr _communicator;
+ const CommunicatorPtr _communicator;
};
};
diff --git a/cpp/src/Ice/RouterInfo.cpp b/cpp/src/Ice/RouterInfo.cpp
index c0e5e64e9a9..93915ee1377 100644
--- a/cpp/src/Ice/RouterInfo.cpp
+++ b/cpp/src/Ice/RouterInfo.cpp
@@ -79,6 +79,37 @@ IceInternal::RouterManager::get(const RouterPrx& rtr)
return _tableHint->second;
}
+RouterInfoPtr
+IceInternal::RouterManager::erase(const RouterPrx& rtr)
+{
+ RouterInfoPtr info;
+ if(rtr)
+ {
+ RouterPrx router = RouterPrx::uncheckedCast(rtr->ice_router(0)); // The router cannot be routed.
+ IceUtil::Mutex::Lock sync(*this);
+
+ map<RouterPrx, RouterInfoPtr>::iterator p = _table.end();
+ if(_tableHint != _table.end() && _tableHint->first == router)
+ {
+ p = _tableHint;
+ _tableHint = _table.end();
+ }
+
+ if(p == _table.end())
+ {
+ p = _table.find(router);
+ }
+
+ if(p != _table.end())
+ {
+ info = p->second;
+ _table.erase(p);
+ }
+ }
+
+ return info;
+}
+
IceInternal::RouterInfo::RouterInfo(const RouterPrx& router) :
_router(router),
_routingTable(new RoutingTable)
diff --git a/cpp/src/Ice/RouterInfo.h b/cpp/src/Ice/RouterInfo.h
index 815ea325b0b..60b99554e36 100644
--- a/cpp/src/Ice/RouterInfo.h
+++ b/cpp/src/Ice/RouterInfo.h
@@ -33,6 +33,7 @@ public:
// the router info if it doesn't exist yet.
//
RouterInfoPtr get(const Ice::RouterPrx&);
+ RouterInfoPtr erase(const Ice::RouterPrx&);
private: