diff options
-rw-r--r-- | cppe/demo/IceE/chat/ChatClientDlg.cpp | 28 | ||||
-rw-r--r-- | cppe/demo/IceE/chat/ChatClientDlg.h | 1 | ||||
-rw-r--r-- | cppe/include/IceE/Config.h | 1 | ||||
-rwxr-xr-x | cppe/include/IceE/ObjectAdapter.h | 3 | ||||
-rw-r--r-- | cppe/include/IceE/RouterInfoF.h (renamed from cppe/src/IceE/RouterInfoF.h) | 0 | ||||
-rw-r--r-- | cppe/src/IceE/ObjectAdapter.cpp | 49 | ||||
-rwxr-xr-x | cppe/src/IceE/OutgoingConnectionFactory.cpp | 73 | ||||
-rwxr-xr-x | cppe/src/IceE/OutgoingConnectionFactory.h | 4 | ||||
-rw-r--r-- | cppe/src/IceE/RouterInfo.cpp | 31 | ||||
-rw-r--r-- | cppe/src/IceE/RouterInfo.h | 1 | ||||
-rw-r--r-- | cs/src/Ice/ConnectionFactory.cs | 87 | ||||
-rwxr-xr-x | cs/src/Ice/ObjectAdapterI.cs | 83 | ||||
-rwxr-xr-x | cs/src/Ice/RouterInfo.cs | 22 | ||||
-rw-r--r-- | java/src/Ice/ObjectAdapterI.java | 2 |
14 files changed, 281 insertions, 104 deletions
diff --git a/cppe/demo/IceE/chat/ChatClientDlg.cpp b/cppe/demo/IceE/chat/ChatClientDlg.cpp index 1208ce80590..50f60f7ae47 100644 --- a/cppe/demo/IceE/chat/ChatClientDlg.cpp +++ b/cppe/demo/IceE/chat/ChatClientDlg.cpp @@ -341,29 +341,24 @@ CChatClientDlg::OnLogin() try { string routerStr = Ice::printfToString("Glacier2/router:tcp -p %s -h %s", port.c_str(), host.c_str()); - Glacier2::RouterPrx router = Glacier2::RouterPrx::checkedCast(_communicator->stringToProxy(routerStr)); - assert(router); + _router = Glacier2::RouterPrx::checkedCast(_communicator->stringToProxy(routerStr)); + assert(_router); // - // Set the router on the Communicator. + // Now setup the new router. // - _communicator->setDefaultRouter(router); - - //Ice::PropertiesPtr properties = _communicator->getProperties(); - //properties->setProperty("Chat.Client.Router", routerStr); - - _chat = ChatSessionPrx::uncheckedCast(router->createSession(user, password)); + _chat = ChatSessionPrx::uncheckedCast(_router->createSession(user, password)->ice_router(_router)); // // Add the new router to the object adapter. // - _adapter->addRouter(router); + _adapter->addRouter(_router); // // Create the callback object. This must have the // category as defined by the Glacier2 session. // - string category = router->getServerProxy()->ice_getIdentity().category; + string category = _router->getServerProxy()->ice_getIdentity().category; Ice::Identity callbackReceiverIdent; callbackReceiverIdent.name = "callbackReceiver"; callbackReceiverIdent.category = category; @@ -397,7 +392,7 @@ CChatClientDlg::OnLogin() assert(_callback); _adapter->remove(_callback->ice_getIdentity()); _callback = 0; - +
assert(_chat); _chat = 0; @@ -408,14 +403,21 @@ CChatClientDlg::OnLogin() _ping->getThreadControl().join(); _ping = 0; + // + // Clear the router. + // + assert(_router); try { - Glacier2::RouterPrx::uncheckedCast(_communicator->getDefaultRouter())->destroySession(); + _router->destroySession(); } catch(const Ice::Exception& ex) { AfxMessageBox(CString(ex.toString().c_str()), MB_OK|MB_ICONEXCLAMATION); } + + _adapter->removeRouter(_router); + _router = 0; } // diff --git a/cppe/demo/IceE/chat/ChatClientDlg.h b/cppe/demo/IceE/chat/ChatClientDlg.h index 2d5d37ae301..2861f4b1c91 100644 --- a/cppe/demo/IceE/chat/ChatClientDlg.h +++ b/cppe/demo/IceE/chat/ChatClientDlg.h @@ -39,6 +39,7 @@ protected: Ice::ObjectAdapterPtr _adapter; Demo::ChatCallbackPrx _callback; Demo::ChatSessionPrx _chat; + Glacier2::RouterPrx _router; const LogIPtr _log; SessionPingThreadPtr _ping; diff --git a/cppe/include/IceE/Config.h b/cppe/include/IceE/Config.h index 964a8e656c8..33ddf4c2ff2 100644 --- a/cppe/include/IceE/Config.h +++ b/cppe/include/IceE/Config.h @@ -26,7 +26,6 @@ // #define ICEE_HAS_BATCH - // *********************************************************************** // // User should not change anything below this line! diff --git a/cppe/include/IceE/ObjectAdapter.h b/cppe/include/IceE/ObjectAdapter.h index 1e7b1411ec7..73175413196 100755 --- a/cppe/include/IceE/ObjectAdapter.h +++ b/cppe/include/IceE/ObjectAdapter.h @@ -22,6 +22,7 @@ #ifdef ICEE_HAS_ROUTER
# include <IceE/RouterF.h>
+# include <IceE/RouterInfoF.h>
#endif
#ifdef ICEE_HAS_LOCATOR
@@ -72,6 +73,7 @@ public: #ifdef ICEE_HAS_ROUTER
void addRouter(const RouterPrx&);
+ void removeRouter(const RouterPrx&);
#endif
#ifdef ICEE_HAS_LOCATOR
void setLocator(const LocatorPrx&);
@@ -108,6 +110,7 @@ private: std::vector<IceInternal::IncomingConnectionFactoryPtr> _incomingConnectionFactories;
#ifdef ICEE_HAS_ROUTER
std::vector<IceInternal::EndpointPtr> _routerEndpoints;
+ std::vector<IceInternal::RouterInfoPtr> _routerInfos;
#endif
std::vector<IceInternal::EndpointPtr> _publishedEndpoints;
#ifdef ICEE_HAS_LOCATOR
diff --git a/cppe/src/IceE/RouterInfoF.h b/cppe/include/IceE/RouterInfoF.h index 33f02b33a01..33f02b33a01 100644 --- a/cppe/src/IceE/RouterInfoF.h +++ b/cppe/include/IceE/RouterInfoF.h diff --git a/cppe/src/IceE/ObjectAdapter.cpp b/cppe/src/IceE/ObjectAdapter.cpp index 724a7f2737e..878e9934f09 100644 --- a/cppe/src/IceE/ObjectAdapter.cpp +++ b/cppe/src/IceE/ObjectAdapter.cpp @@ -450,6 +450,8 @@ Ice::ObjectAdapter::addRouter(const RouterPrx& router) RouterInfoPtr routerInfo = _instance->routerManager()->get(router); if(routerInfo) { + _routerInfos.push_back(routerInfo); + // // Add the router's server proxy endpoints to this object // adapter. @@ -472,7 +474,52 @@ Ice::ObjectAdapter::addRouter(const RouterPrx& router) // router's client proxy to use this object adapter for // callbacks. // - _instance->outgoingConnectionFactory()->setRouter(routerInfo->getRouter()); + _instance->outgoingConnectionFactory()->setRouterInfo(routerInfo); + } +} + +void +Ice::ObjectAdapter::removeRouter(const RouterPrx& router) +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + checkForDeactivation(); + + RouterInfoPtr routerInfo = _instance->routerManager()->erase(router); + if(routerInfo) + { + // + // Rebuild the router endpoints from our set of router infos. + // + _routerEndpoints.clear(); + vector<RouterInfoPtr>::iterator p = _routerInfos.begin(); + while(p != _routerInfos.end()) + { + if(*p == routerInfo) + { + p = _routerInfos.erase(p); + continue; + } + ObjectPrx proxy = (*p)->getServerProxy(); + vector<EndpointPtr> endpoints = proxy->__reference()->getEndpoints(); + copy(endpoints.begin(), endpoints.end(), back_inserter(_routerEndpoints)); + ++p; + } + + sort(_routerEndpoints.begin(), _routerEndpoints.end()); // Must be sorted. + _routerEndpoints.erase(unique(_routerEndpoints.begin(), _routerEndpoints.end()), _routerEndpoints.end()); + + // + // Clear this object adapter with the router. + // + routerInfo->setAdapter(0); + + // + // Also modify all existing outgoing connections to the + // router's client proxy to use this object adapter for + // callbacks. + // + _instance->outgoingConnectionFactory()->setRouterInfo(routerInfo); } } #endif diff --git a/cppe/src/IceE/OutgoingConnectionFactory.cpp b/cppe/src/IceE/OutgoingConnectionFactory.cpp index 9d3fcf18e6c..cdf0fe582e7 100755 --- a/cppe/src/IceE/OutgoingConnectionFactory.cpp +++ b/cppe/src/IceE/OutgoingConnectionFactory.cpp @@ -331,7 +331,7 @@ IceInternal::OutgoingConnectionFactory::create(const vector<EndpointPtr>& endpts #ifdef ICEE_HAS_ROUTER void -IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router) +IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); @@ -340,53 +340,50 @@ IceInternal::OutgoingConnectionFactory::setRouter(const RouterPrx& router) throw CommunicatorDestroyedException(__FILE__, __LINE__); } - RouterInfoPtr routerInfo = _instance->routerManager()->get(router); - if(routerInfo) + assert(routerInfo); + // + // Search for connections to the router's client proxy + // endpoints, and update the object adapter for such + // connections, so that callbacks from the router can be + // received over such connections. + // + ObjectPrx proxy = routerInfo->getClientProxy(); +#ifndef ICEE_PURE_CLIENT + ObjectAdapterPtr adapter = routerInfo->getAdapter(); +#endif + vector<EndpointPtr> endpoints = proxy->__reference()->getEndpoints(); + vector<EndpointPtr>::const_iterator p; + for(p = endpoints.begin(); p != endpoints.end(); ++p) { + EndpointPtr endpoint = *p; + // - // Search for connections to the router's client proxy - // endpoints, and update the object adapter for such - // connections, so that callbacks from the router can be - // received over such connections. + // Modify endpoints with overrides. // - ObjectPrx proxy = routerInfo->getClientProxy(); -#ifndef ICEE_PURE_CLIENT - ObjectAdapterPtr adapter = routerInfo->getAdapter(); -#endif - vector<EndpointPtr> endpoints = proxy->__reference()->getEndpoints(); - vector<EndpointPtr>::const_iterator p; - for(p = endpoints.begin(); p != endpoints.end(); ++p) + if(_instance->defaultsAndOverrides()->overrideTimeout) { - EndpointPtr endpoint = *p; + endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + } - // - // Modify endpoints with overrides. - // - if(_instance->defaultsAndOverrides()->overrideTimeout) +#ifndef ICEE_PURE_CLIENT + pair<multimap<EndpointPtr, ConnectionPtr>::iterator, + multimap<EndpointPtr, ConnectionPtr>::iterator> pr = _connections.equal_range(endpoint); + + while(pr.first != pr.second) + { + try { - endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); + pr.first->second->setAdapter(adapter); } - -#ifndef ICEE_PURE_CLIENT - pair<multimap<EndpointPtr, ConnectionPtr>::iterator, - multimap<EndpointPtr, ConnectionPtr>::iterator> pr = _connections.equal_range(endpoint); - - while(pr.first != pr.second) + catch(const Ice::LocalException&) { - try - { - pr.first->second->setAdapter(adapter); - } - catch(const Ice::LocalException&) - { - // - // Ignore, the connection is being closed or closed. - // - } - ++pr.first; + // + // Ignore, the connection is being closed or closed. + // } -#endif + ++pr.first; } +#endif } } diff --git a/cppe/src/IceE/OutgoingConnectionFactory.h b/cppe/src/IceE/OutgoingConnectionFactory.h index b4851d65a48..c14c9ecdb2e 100755 --- a/cppe/src/IceE/OutgoingConnectionFactory.h +++ b/cppe/src/IceE/OutgoingConnectionFactory.h @@ -16,7 +16,7 @@ #include <IceE/ObjectAdapterF.h> #include <IceE/EndpointF.h> #ifdef ICEE_HAS_ROUTER -# include <IceE/RouterF.h> +# include <IceE/RouterInfoF.h> #endif
#include <IceE/Shared.h> #include <IceE/Mutex.h> @@ -36,7 +36,7 @@ public: Ice::ConnectionPtr create(const std::vector<EndpointPtr>&); #ifdef ICEE_HAS_ROUTER - void setRouter(const ::Ice::RouterPrx&); + void setRouterInfo(const RouterInfoPtr&); #endif void removeAdapter(const ::Ice::ObjectAdapterPtr&); #ifdef ICEE_HAS_BATCH diff --git a/cppe/src/IceE/RouterInfo.cpp b/cppe/src/IceE/RouterInfo.cpp index 130166f4e23..6c0e16f16cf 100644 --- a/cppe/src/IceE/RouterInfo.cpp +++ b/cppe/src/IceE/RouterInfo.cpp @@ -83,6 +83,37 @@ IceInternal::RouterManager::get(const RouterPrx& rtr) return _tableHint->second; } +RouterInfoPtr +IceInternal::RouterManager::erase(const RouterPrx& rtr) +{ + RouterInfoPtr info; + if(rtr) + { + RouterPrx router = RouterPrx::uncheckedCast(rtr->ice_router(0)); // The router cannot be routed. + IceUtil::Mutex::Lock sync(*this); + + map<RouterPrx, RouterInfoPtr>::iterator p = _table.end(); + if(_tableHint != _table.end() && _tableHint->first == router) + { + p = _tableHint; + _tableHint = _table.end(); + } + + if(p == _table.end()) + { + p = _table.find(router); + } + + if(p != _table.end()) + { + info = p->second; + _table.erase(p); + } + } + + return info; +} + IceInternal::RouterInfo::RouterInfo(const RouterPrx& router) : _router(router), _routingTable(new RoutingTable) diff --git a/cppe/src/IceE/RouterInfo.h b/cppe/src/IceE/RouterInfo.h index e641a132943..04d7bbe7f95 100644 --- a/cppe/src/IceE/RouterInfo.h +++ b/cppe/src/IceE/RouterInfo.h @@ -39,6 +39,7 @@ public: // the router info if it doesn't exist yet. // RouterInfoPtr get(const Ice::RouterPrx&); + RouterInfoPtr erase(const Ice::RouterPrx&); private: diff --git a/cs/src/Ice/ConnectionFactory.cs b/cs/src/Ice/ConnectionFactory.cs index 031fa0efc61..e91dcc3dc47 100644 --- a/cs/src/Ice/ConnectionFactory.cs +++ b/cs/src/Ice/ConnectionFactory.cs @@ -364,7 +364,7 @@ namespace IceInternal return newConnection; } - public void setRouter(Ice.RouterPrx router) + public void setRouterInfo(IceInternal.RouterInfo routerInfo) { lock(this) { @@ -373,57 +373,54 @@ namespace IceInternal throw new Ice.CommunicatorDestroyedException(); } - RouterInfo routerInfo = _instance.routerManager().get(router); - if(routerInfo != null) + Debug.Assert(routerInfo != null); + // + // Search for connections to the router's client proxy + // endpoints, and update the object adapter for such + // connections, so that callbacks from the router can be + // received over such connections. + // + Ice.ObjectPrx proxy = routerInfo.getClientProxy(); + Ice.ObjectAdapter adapter = routerInfo.getAdapter(); + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); + EndpointI[] endpoints = ((Ice.ObjectPrxHelperBase)proxy).__reference().getEndpoints(); + for(int i = 0; i < endpoints.Length; i++) { + EndpointI endpoint = endpoints[i]; + // - // Search for connections to the router's client proxy - // endpoints, and update the object adapter for such - // connections, so that callbacks from the router can be - // received over such connections. + // Modify endpoints with overrides. // - Ice.ObjectPrx proxy = routerInfo.getClientProxy(); - Ice.ObjectAdapter adapter = routerInfo.getAdapter(); - DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); - EndpointI[] endpoints = ((Ice.ObjectPrxHelperBase)proxy).__reference().getEndpoints(); - for(int i = 0; i < endpoints.Length; i++) + if(defaultsAndOverrides.overrideTimeout) { - EndpointI endpoint = endpoints[i]; - - // - // Modify endpoints with overrides. - // - if(defaultsAndOverrides.overrideTimeout) - { - endpoint = endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); - } + endpoint = endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); + } - // - // The Ice.ConnectionI object does not take the compression flag of - // endpoints into account, but instead gets the information - // about whether messages should be compressed or not from - // other sources. In order to allow connection sharing for - // endpoints that differ in the value of the compression flag - // only, we always set the compression flag to false here in - // this connection factory. - // - endpoint = endpoint.compress(false); - - LinkedList connectionList = (LinkedList)_connections[endpoints[i]]; - if(connectionList != null) + // + // The Ice.ConnectionI object does not take the compression flag of + // endpoints into account, but instead gets the information + // about whether messages should be compressed or not from + // other sources. In order to allow connection sharing for + // endpoints that differ in the value of the compression flag + // only, we always set the compression flag to false here in + // this connection factory. + // + endpoint = endpoint.compress(false); + + LinkedList connectionList = (LinkedList)_connections[endpoints[i]]; + if(connectionList != null) + { + foreach(Ice.ConnectionI connection in connectionList) { - foreach(Ice.ConnectionI connection in connectionList) + try + { + connection.setAdapter(adapter); + } + catch(Ice.LocalException) { - try - { - connection.setAdapter(adapter); - } - catch(Ice.LocalException) - { - // - // Ignore, the connection is being closed or closed. - // - } + // + // Ignore, the connection is being closed or closed. + // } } } diff --git a/cs/src/Ice/ObjectAdapterI.cs b/cs/src/Ice/ObjectAdapterI.cs index 11f2232785d..53d7d9a4b51 100755 --- a/cs/src/Ice/ObjectAdapterI.cs +++ b/cs/src/Ice/ObjectAdapterI.cs @@ -508,13 +508,16 @@ namespace Ice IceInternal.RouterInfo routerInfo = _instance.routerManager().get(router); if(routerInfo != null) { + _routerInfos.Add(routerInfo); + // // Add the router's server proxy endpoints to this object // adapter. // ObjectPrxHelperBase proxy = (ObjectPrxHelperBase)routerInfo.getServerProxy(); IceInternal.EndpointI[] endpoints = proxy.__reference().getEndpoints(); - for(int i = 0; i < endpoints.Length; ++i) + int i; + for(i = 0; i < endpoints.Length; ++i) { _routerEndpoints.Add(endpoints[i]); } @@ -523,7 +526,8 @@ namespace Ice // // Remove duplicate endpoints, so we have a list of unique endpoints. // - for(int i = 0; i < _routerEndpoints.Count - 1; ++i) + i = 0; + while(i < _routerEndpoints.Count-1) { System.Object o1 = _routerEndpoints[i]; System.Object o2 = _routerEndpoints[i + 1]; @@ -531,8 +535,12 @@ namespace Ice { _routerEndpoints.Remove(i); } + else + { + ++i; + } } - + // // Associate this object adapter with the router. This way, // new outgoing connections to the router's client proxy will @@ -545,7 +553,72 @@ namespace Ice // router's client proxy to use this object adapter for // callbacks. // - _instance.outgoingConnectionFactory().setRouter(routerInfo.getRouter()); + _instance.outgoingConnectionFactory().setRouterInfo(routerInfo); + } + } + } + + public void removeRouter(RouterPrx router) + { + lock(this) + { + checkForDeactivation(); + + IceInternal.RouterInfo routerInfo = _instance.routerManager().erase(router); + if(routerInfo != null) + { + // + // Rebuild the router endpoints from our set of router infos. + // + _routerEndpoints.Clear(); + int i; + int p = 0; + while(p < _routerInfos.Count) + { + if(_routerInfos[p] == routerInfo) + { + _routerInfos.Remove(p); + continue; + } + ObjectPrxHelperBase proxy = (ObjectPrxHelperBase)routerInfo.getServerProxy(); + IceInternal.EndpointI[] endpoints = proxy.__reference().getEndpoints(); + for(i = 0; i < endpoints.Length; ++i) + { + _routerEndpoints.Add(endpoints[i]); + } + ++p; + } + + _routerEndpoints.Sort(); // Must be sorted. + // + // Remove duplicate endpoints, so we have a list of unique endpoints. + // + i = 0; + while(i < _routerEndpoints.Count-1) + { + System.Object o1 = _routerEndpoints[i]; + System.Object o2 = _routerEndpoints[i + 1]; + if(o1.Equals(o2)) + { + _routerEndpoints.Remove(i); + } + else + { + ++i; + } + } + + // + // Clear this object adapter with the router. + // + routerInfo.setAdapter(null); + + // + // Also modify all existing outgoing connections to the + // router's client proxy to use this object adapter for + // callbacks. + // + _instance.outgoingConnectionFactory().setRouterInfo(routerInfo); } } } @@ -728,6 +801,7 @@ namespace Ice _id = instance.properties().getProperty(name + ".AdapterId"); _incomingConnectionFactories = new ArrayList(); _routerEndpoints = new ArrayList(); + _routerInfos = new ArrayList(); _directCount = 0; _waitForDeactivate = false; @@ -1010,6 +1084,7 @@ namespace Ice private readonly string _id; private ArrayList _incomingConnectionFactories; private ArrayList _routerEndpoints; + private ArrayList _routerInfos; private ArrayList _publishedEndpoints; private IceInternal.LocatorInfo _locatorInfo; private int _directCount; diff --git a/cs/src/Ice/RouterInfo.cs b/cs/src/Ice/RouterInfo.cs index 629add8a24a..3740b683d95 100755 --- a/cs/src/Ice/RouterInfo.cs +++ b/cs/src/Ice/RouterInfo.cs @@ -196,6 +196,28 @@ namespace IceInternal return info; } } + + // + // Returns router info for a given router. Automatically creates + // the router info if it doesn't exist yet. + // + public RouterInfo erase(Ice.RouterPrx rtr) + { + RouterInfo info = null; + if(rtr == null) + { + Ice.RouterPrx router = Ice.RouterPrxHelper.uncheckedCast(rtr.ice_router(null)); // The router cannot be routed. + lock(this) + { + info = (RouterInfo)_table[router]; + if(info != null) + { + _table.Remove(router); + } + } + } + return info; + } private Hashtable _table; } diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index bd644eddd9b..f642d2074fb 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -496,6 +496,8 @@ public final class ObjectAdapterI extends LocalObjectImpl implements ObjectAdapt IceInternal.RouterInfo routerInfo = _instance.routerManager().get(router); if(routerInfo != null) { + _routerInfos.add(routerInfo); + // // Add the router's server proxy endpoints to this object // adapter. |