diff options
Diffstat (limited to 'cs')
-rw-r--r-- | cs/src/Ice/CollocatedRequestHandler.cs | 2 | ||||
-rw-r--r-- | cs/src/Ice/ConnectRequestHandler.cs | 76 | ||||
-rw-r--r-- | cs/src/Ice/ConnectionRequestHandler.cs | 5 | ||||
-rw-r--r-- | cs/src/Ice/Instance.cs | 20 | ||||
-rw-r--r-- | cs/src/Ice/Makefile | 1 | ||||
-rw-r--r-- | cs/src/Ice/Makefile.mak | 1 | ||||
-rw-r--r-- | cs/src/Ice/OutgoingAsync.cs | 5 | ||||
-rw-r--r-- | cs/src/Ice/Proxy.cs | 22 | ||||
-rw-r--r-- | cs/src/Ice/RequestHandler.cs | 2 | ||||
-rw-r--r-- | cs/src/Ice/RequestHandlerFactory.cs | 71 | ||||
-rw-r--r-- | cs/test/Ice/hold/AllTests.cs | 79 |
11 files changed, 189 insertions, 95 deletions
diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs index a0e1a354938..90079d3bc40 100644 --- a/cs/src/Ice/CollocatedRequestHandler.cs +++ b/cs/src/Ice/CollocatedRequestHandler.cs @@ -42,7 +42,7 @@ namespace IceInternal _batchStream = new BasicStream(@ref.getInstance(), Ice.Util.currentProtocolEncoding, _batchAutoFlush); } - public RequestHandler connect() + public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { return this; } diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs index a5953343b10..b516af9a4e7 100644 --- a/cs/src/Ice/ConnectRequestHandler.cs +++ b/cs/src/Ice/ConnectRequestHandler.cs @@ -36,19 +36,24 @@ namespace IceInternal internal Ice.AsyncCallback sentCallback = null; } - public RequestHandler connect() + public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { - Ice.ObjectPrxHelperBase proxy = _proxy; - try + // + // Initiate the connection if connect() is called by the proxy that + // created the handler. + // + if(Object.ReferenceEquals(proxy, _proxy)) { _reference.getConnection(this); + } + try + { lock(this) { if(!initialized()) { - // The proxy request handler will be updated when the connection is set. - _updateRequestHandler = true; + _proxies.Add(proxy); return this; } } @@ -59,11 +64,15 @@ namespace IceInternal throw ex; } - Debug.Assert(_connection != null); - - RequestHandler handler = new ConnectionRequestHandler(_reference, _connection, _compress); - proxy.setRequestHandler__(this, handler); - return handler; + if(_connectionRequestHandler != null) + { + proxy.setRequestHandler__(this, _connectionRequestHandler); + return _connectionRequestHandler; + } + else + { + return this; + } } public RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler) @@ -276,13 +285,17 @@ namespace IceInternal { Debug.Assert(!_initialized && _exception == null); _exception = ex; + _proxies.Clear(); _proxy = null; // Break cyclic reference count. - // - // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be - // called. - // + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException) + { + // Ignore + } flushRequestsWithException(); System.Threading.Monitor.PulseAll(this); @@ -313,7 +326,6 @@ namespace IceInternal _batchRequestInProgress = false; _batchRequestsSize = Protocol.requestBatchHdr.Length; _batchStream = new BasicStream(@ref.getInstance(), Ice.Util.currentProtocolEncoding, _batchAutoFlush); - _updateRequestHandler = false; } private bool initialized() @@ -424,17 +436,18 @@ namespace IceInternal } // - // We've finished sending the queued requests and the request handler now send - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. - // - // NOTE: _updateRequestHandler is immutable once _flushing = true + // If we aren't caching the connection, don't bother creating a + // connection request handler. Otherwise, update the proxies + // request handler to use the more efficient connection request + // handler. // - if(_updateRequestHandler && _exception == null) + if(_reference.getCacheConnection() && _exception == null) { - _proxy.setRequestHandler__(this, new ConnectionRequestHandler(_reference, _connection, _compress)); + _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + foreach(Ice.ObjectPrxHelperBase prx in _proxies) + { + prx.setRequestHandler__(this, _connectionRequestHandler); + } } lock(this) @@ -445,6 +458,15 @@ namespace IceInternal _initialized = true; _flushing = false; } + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException) + { + // Ignore + } + _proxies.Clear(); _proxy = null; // Break cyclic reference count. System.Threading.Monitor.PulseAll(this); } @@ -471,6 +493,7 @@ namespace IceInternal private bool _response; private Ice.ObjectPrxHelperBase _proxy; + private List<Ice.ObjectPrxHelperBase> _proxies = new List<Ice.ObjectPrxHelperBase>(); private bool _batchAutoFlush; @@ -484,6 +507,7 @@ namespace IceInternal private bool _batchRequestInProgress; private int _batchRequestsSize; private BasicStream _batchStream; - private bool _updateRequestHandler; + + private RequestHandler _connectionRequestHandler; } } diff --git a/cs/src/Ice/ConnectionRequestHandler.cs b/cs/src/Ice/ConnectionRequestHandler.cs index a0f1edec56f..683fefabc5e 100644 --- a/cs/src/Ice/ConnectionRequestHandler.cs +++ b/cs/src/Ice/ConnectionRequestHandler.cs @@ -16,10 +16,9 @@ namespace IceInternal { public class ConnectionRequestHandler : RequestHandler { - public RequestHandler connect() + public RequestHandler connect(Ice.ObjectPrxHelperBase proxy) { - Debug.Assert(false); // This request handler is only created after connection binding. - return null; + return this; } public RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler) diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs index 03b6a11fb06..3ec55288b22 100644 --- a/cs/src/Ice/Instance.cs +++ b/cs/src/Ice/Instance.cs @@ -117,6 +117,20 @@ namespace IceInternal } } + public RequestHandlerFactory requestHandlerFactory() + { + lock(this) + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + Debug.Assert(_requestHandlerFactory != null); + return _requestHandlerFactory; + } + } + public ProxyFactory proxyFactory() { lock(this) @@ -817,6 +831,8 @@ namespace IceInternal _proxyFactory = new ProxyFactory(this); + _requestHandlerFactory = new RequestHandlerFactory(this); + bool ipv4 = _initData.properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; bool ipv6 = _initData.properties.getPropertyAsIntWithDefault("Ice.IPv6", 1) > 0; if(!ipv4 && !ipv6) @@ -1178,6 +1194,9 @@ namespace IceInternal _referenceFactory = null; // No destroy function defined. + _requestHandlerFactory = null; + + // No destroy function defined. // _proxyFactory.destroy(); _proxyFactory = null; @@ -1421,6 +1440,7 @@ namespace IceInternal private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; + private RequestHandlerFactory _requestHandlerFactory; private ProxyFactory _proxyFactory; private OutgoingConnectionFactory _outgoingConnectionFactory; private ObjectFactoryManager _servantFactoryManager; diff --git a/cs/src/Ice/Makefile b/cs/src/Ice/Makefile index dbdd6671d1c..a7f918b555c 100644 --- a/cs/src/Ice/Makefile +++ b/cs/src/Ice/Makefile @@ -90,6 +90,7 @@ SRCS = Acceptor.cs \ ReferenceFactory.cs \ ReplyStatus.cs \ RequestHandler.cs \ + RequestHandlerFactory.cs \ ResponseHandler.cs \ RetryQueue.cs \ RouterInfo.cs \ diff --git a/cs/src/Ice/Makefile.mak b/cs/src/Ice/Makefile.mak index 454302543aa..09eeee5484f 100644 --- a/cs/src/Ice/Makefile.mak +++ b/cs/src/Ice/Makefile.mak @@ -91,6 +91,7 @@ SRCS = Acceptor.cs \ ReferenceFactory.cs \ ReplyStatus.cs \ RequestHandler.cs \ + RequestHandlerFactory.cs \ ResponseHandler.cs \ RetryQueue.cs \ RouterInfo.cs \ diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 6da49814641..f7c3e7d5d11 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -150,6 +150,11 @@ namespace IceInternal // try { + // + // It's important to let the retry queue do the retry even if + // the retry interval is 0. This method can be called with the + // connection locked so we can't just retry here. + // instance_.retryQueue().add(this, handleException(exc)); return null; } diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs index c815004d284..7846ddffe3a 100644 --- a/cs/src/Ice/Proxy.cs +++ b/cs/src/Ice/Proxy.cs @@ -2432,15 +2432,15 @@ namespace Ice { return _requestHandler; } - _requestHandler = createRequestHandler(); - handler = _requestHandler; + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); + _requestHandler = handler; } } else { - handler = createRequestHandler(); + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); } - return handler.connect(); + return handler.connect(this); } public void setRequestHandler__(IceInternal.RequestHandler previous, IceInternal.RequestHandler handler) @@ -2464,20 +2464,6 @@ namespace Ice } } - private IceInternal.RequestHandler createRequestHandler() - { - if(_reference.getCollocationOptimized()) - { - ObjectAdapter adapter = _reference.getInstance().objectAdapterFactory().findObjectAdapter(this); - if(adapter != null) - { - return new IceInternal.CollocatedRequestHandler(_reference, adapter); - } - } - - return new IceInternal.ConnectRequestHandler(_reference, this); - } - // // Only for use by IceInternal.ProxyFactory // diff --git a/cs/src/Ice/RequestHandler.cs b/cs/src/Ice/RequestHandler.cs index 6cef5d16d4b..56f53f8910c 100644 --- a/cs/src/Ice/RequestHandler.cs +++ b/cs/src/Ice/RequestHandler.cs @@ -19,7 +19,7 @@ namespace IceInternal public interface RequestHandler : CancellationHandler { - RequestHandler connect(); + RequestHandler connect(Ice.ObjectPrxHelperBase proxy); RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler); void prepareBatchRequest(BasicStream @out); diff --git a/cs/src/Ice/RequestHandlerFactory.cs b/cs/src/Ice/RequestHandlerFactory.cs new file mode 100644 index 00000000000..e25d9733c33 --- /dev/null +++ b/cs/src/Ice/RequestHandlerFactory.cs @@ -0,0 +1,71 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +using System.Collections.Generic; +using System.Diagnostics; + +namespace IceInternal +{ + public class RequestHandlerFactory + { + internal RequestHandlerFactory(Instance instance) + { + _instance = instance; + } + + public RequestHandler + getRequestHandler(Reference rf, Ice.ObjectPrxHelperBase proxy) + { + if(rf.getCollocationOptimized()) + { + Ice.ObjectAdapter adapter = _instance.objectAdapterFactory().findObjectAdapter(proxy); + if(adapter != null) + { + return new CollocatedRequestHandler(rf, adapter); + } + } + + if(rf.getCacheConnection()) + { + lock(this) + { + RequestHandler handler; + if(_handlers.TryGetValue(rf, out handler)) + { + return handler; + } + + handler = new ConnectRequestHandler(rf, proxy); + _handlers.Add(rf, handler); + return handler; + } + } + else + { + return new ConnectRequestHandler(rf, proxy); + } + } + + internal void + removeRequestHandler(Reference rf, RequestHandler handler) + { + if(rf.getCacheConnection()) + { + lock(this) + { + Debug.Assert(_handlers[rf] == handler); + _handlers.Remove(rf); + } + } + } + + readonly Instance _instance; + readonly Dictionary<Reference, RequestHandler> _handlers = new Dictionary<Reference, RequestHandler>(); + } +} diff --git a/cs/test/Ice/hold/AllTests.cs b/cs/test/Ice/hold/AllTests.cs index 5f210443414..c1eb3588422 100644 --- a/cs/test/Ice/hold/AllTests.cs +++ b/cs/test/Ice/hold/AllTests.cs @@ -75,29 +75,6 @@ public class AllTests : TestCommon.TestApp { } - public void - sent(bool sync) - { - lock(this) - { - _sent = true; - System.Threading.Monitor.Pulse(this); - } - } - - public void - waitForSent() - { - lock(this) - { - while(!_sent) - { - System.Threading.Monitor.Wait(this); - } - } - } - - private bool _sent = false; private Condition _condition; private int _expected; } @@ -157,16 +134,15 @@ public class AllTests : TestCommon.TestApp { Condition cond = new Condition(true); int value = 0; - SetCB cb = null; + Ice.AsyncResult result = null; while(cond.value()) { - cb = new SetCB(cond, value); - hold.begin_set(++value, value < 500 ? rand.Next(5) : 0). - whenCompleted(cb.response, cb.exception).whenSent(cb.sent); + SetCB cb = new SetCB(cond, value); + result = hold.begin_set(++value, value < 500 ? rand.Next(5) : 0).whenCompleted(cb.response, + cb.exception); if(value % 100 == 0) { - cb.waitForSent(); - cb = null; + result.waitForSent(); } if(value > 100000) @@ -177,11 +153,7 @@ public class AllTests : TestCommon.TestApp break; } } - if(cb != null) - { - cb.waitForSent(); - cb = null; - } + result.waitForSent(); } WriteLine("ok"); @@ -190,26 +162,20 @@ public class AllTests : TestCommon.TestApp { Condition cond = new Condition(true); int value = 0; - SetCB cb = null; + Ice.AsyncResult result = null; while(value < 3000 && cond.value()) { - cb = new SetCB(cond, value); - holdSerialized.begin_set(++value, value < 500 ? rand.Next(5) : 0). - whenCompleted(cb.response, cb.exception).whenSent(cb.sent); + SetCB cb = new SetCB(cond, value); + result = holdSerialized.begin_set(++value, 0).whenCompleted(cb.response, cb.exception); if(value % 100 == 0) { - cb.waitForSent(); - cb = null; + result.waitForSent(); } } - if(cb != null) - { - cb.waitForSent(); - cb = null; - } + result.waitForCompleted(); test(cond.value()); - for(int i = 0; i < 20000; ++i) + for(int i = 0; i < 10000; ++i) { holdSerializedOneway.setOneway(value + 1, value); ++value; @@ -221,6 +187,27 @@ public class AllTests : TestCommon.TestApp } WriteLine("ok"); + Write("testing serialization... "); + Flush(); + { + int value = 0; + holdSerialized.set(value, 0); + Ice.AsyncResult result = null; + for(int i = 0; i < 10000; ++i) + { + // Create a new proxy for each request + result = ((HoldPrx)holdSerialized.ice_oneway()).begin_setOneway(value + 1, value); + ++value; + if((i % 100) == 0) + { + result.waitForSent(); + holdSerialized.ice_getConnection().close(false); + } + } + result.waitForCompleted(); + } + WriteLine("ok"); + Write("testing waitForHold... "); Flush(); { |