diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-10-22 16:33:13 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-10-22 16:33:13 +0200 |
commit | d8da15a2d803da81b76568d0c8f620f8ed26d0fa (patch) | |
tree | f193067905624d61e0e8a3e6cd7d2498b9bf1873 /java/src | |
parent | Fixed demo dist to allow gradle build of java demos (diff) | |
download | ice-d8da15a2d803da81b76568d0c8f620f8ed26d0fa.tar.bz2 ice-d8da15a2d803da81b76568d0c8f620f8ed26d0fa.tar.xz ice-d8da15a2d803da81b76568d0c8f620f8ed26d0fa.zip |
Fixed ICE-3490: guarantee invocation serialization for proxies which are equal
Diffstat (limited to 'java/src')
9 files changed, 167 insertions, 61 deletions
diff --git a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java index 2c763f56cc2..27f8b15014a 100644 --- a/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/src/main/java/Ice/ObjectPrxHelperBase.java @@ -2757,15 +2757,15 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable { return _requestHandler; } - _requestHandler = createRequestHandler(); - handler = _requestHandler; + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); + _requestHandler = handler; } } else { - handler = createRequestHandler(); + handler = _reference.getInstance().requestHandlerFactory().getRequestHandler(_reference, this); } - return handler.connect(); + return handler.connect(this); } public void @@ -2803,26 +2803,6 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } - private IceInternal.RequestHandler - createRequestHandler() - { - if(_reference.getCollocationOptimized()) - { - ObjectAdapter adapter = _reference.getInstance().objectAdapterFactory().findObjectAdapter(this); - if(adapter != null) - { - return new IceInternal.CollocatedRequestHandler(_reference, adapter); - } - } - - IceInternal.RequestHandler handler = new IceInternal.ConnectRequestHandler(_reference, this); - if(_reference.getInstance().queueRequests()) - { - handler = new QueueRequestHandler(_reference.getInstance(), handler); - } - return handler; - } - // // Only for use by IceInternal.ProxyFactory // diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java index e3f5f045f58..bd882384441 100644 --- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java @@ -58,7 +58,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler @Override public RequestHandler - connect() + connect(Ice.ObjectPrxHelperBase proxy) { return this; } diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java index c1636662c8c..4da9f0ab401 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectRequestHandler.java @@ -33,19 +33,24 @@ public class ConnectRequestHandler @Override public RequestHandler - connect() + connect(Ice.ObjectPrxHelperBase proxy) { - Ice.ObjectPrxHelperBase proxy = _proxy; - try + // + // Initiate the connection if connect() is called by the proxy that + // created the handler. + // + if(proxy == _proxy) { _reference.getConnection(this); - + } + + try + { synchronized(this) { if(!initialized()) { - // The proxy request handler will be updated when the connection is set. - _updateRequestHandler = true; + _proxies.add(proxy); return this; } } @@ -56,11 +61,15 @@ public class ConnectRequestHandler throw ex; } - assert(_connection != null); - - RequestHandler handler = new ConnectionRequestHandler(_reference, _connection, _compress); - proxy.__setRequestHandler(this, handler); - return handler; + if(_connectionRequestHandler != null) + { + proxy.__setRequestHandler(this, _connectionRequestHandler); + return _connectionRequestHandler; + } + else + { + return this; + } } @Override @@ -286,15 +295,19 @@ public class ConnectRequestHandler { assert(!_initialized && _exception == null); _exception = ex; + _proxies.clear(); _proxy = null; // Break cyclic reference count. - // - // If some requests were queued, we notify them of the failure. This is done from a thread - // from the client thread pool since this will result in ice_exception callbacks to be - // called. - // flushRequestsWithException(); + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException exc) + { + // Ignore + } notifyAll(); } @@ -313,7 +326,7 @@ public class ConnectRequestHandler } public - ConnectRequestHandler(Reference ref, Ice.ObjectPrx proxy) + ConnectRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy) { _reference = ref; _response = _reference.getMode() == Reference.ModeTwoway; @@ -325,7 +338,6 @@ public class ConnectRequestHandler _batchRequestInProgress = false; _batchRequestsSize = Protocol.requestBatchHdr.length; _batchStream = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding, _batchAutoFlush); - _updateRequestHandler = false; } private boolean @@ -452,17 +464,18 @@ public class ConnectRequestHandler } // - // We've finished sending the queued requests and the request handler now send - // the requests over the connection directly. It's time to substitute the - // request handler of the proxy with the more efficient connection request - // handler which does not have any synchronization. This also breaks the cyclic - // reference count with the proxy. - // - // NOTE: _updateRequestHandler is immutable once _flushing = true + // If we aren't caching the connection, don't bother creating a + // connection request handler. Otherwise, update the proxies + // request handler to use the more efficient connection request + // handler. // - if(_updateRequestHandler && _exception == null) + if(_reference.getCacheConnection() && _exception == null) { - _proxy.__setRequestHandler(this, new ConnectionRequestHandler(_reference, _connection, _compress)); + _connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress); + for(Ice.ObjectPrxHelperBase proxy : _proxies) + { + proxy.__setRequestHandler(this, _connectionRequestHandler); + } } synchronized(this) @@ -473,6 +486,15 @@ public class ConnectRequestHandler _initialized = true; _flushing = false; } + try + { + _reference.getInstance().requestHandlerFactory().removeRequestHandler(_reference, this); + } + catch(Ice.CommunicatorDestroyedException ex) + { + // Ignore + } + _proxies.clear(); _proxy = null; // Break cyclic reference count. notifyAll(); } @@ -526,6 +548,7 @@ public class ConnectRequestHandler private boolean _response; private Ice.ObjectPrxHelperBase _proxy; + private java.util.List<Ice.ObjectPrxHelperBase> _proxies = new java.util.ArrayList<Ice.ObjectPrxHelperBase>(); private final boolean _batchAutoFlush; @@ -539,5 +562,6 @@ public class ConnectRequestHandler private boolean _batchRequestInProgress; private int _batchRequestsSize; private BasicStream _batchStream; - private boolean _updateRequestHandler; + + private RequestHandler _connectionRequestHandler; } diff --git a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java index 924b633e670..d9a1fc5e9a4 100644 --- a/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/ConnectionRequestHandler.java @@ -13,10 +13,9 @@ public class ConnectionRequestHandler implements RequestHandler { @Override public RequestHandler - connect() + connect(Ice.ObjectPrxHelperBase proxy) { - assert(false); // This request handler is only created after connection binding. - return null; + return this; } @Override diff --git a/java/src/Ice/src/main/java/IceInternal/Instance.java b/java/src/Ice/src/main/java/IceInternal/Instance.java index 900784eafd5..94a74a959ee 100644 --- a/java/src/Ice/src/main/java/IceInternal/Instance.java +++ b/java/src/Ice/src/main/java/IceInternal/Instance.java @@ -205,6 +205,18 @@ public final class Instance return _referenceFactory; } + public synchronized RequestHandlerFactory + requestHandlerFactory() + { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } + + assert(_requestHandlerFactory != null); + return _requestHandlerFactory; + } + public synchronized ProxyFactory proxyFactory() { @@ -880,6 +892,8 @@ public final class Instance _referenceFactory = new ReferenceFactory(this, communicator); + _requestHandlerFactory = new RequestHandlerFactory(this); + _proxyFactory = new ProxyFactory(this); boolean ipv4 = _initData.properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; @@ -956,6 +970,7 @@ public final class Instance { IceUtilInternal.Assert.FinalizerAssert(_state == StateDestroyed); IceUtilInternal.Assert.FinalizerAssert(_referenceFactory == null); + IceUtilInternal.Assert.FinalizerAssert(_requestHandlerFactory == null); IceUtilInternal.Assert.FinalizerAssert(_proxyFactory == null); IceUtilInternal.Assert.FinalizerAssert(_outgoingConnectionFactory == null); IceUtilInternal.Assert.FinalizerAssert(_servantFactoryManager == null); @@ -1258,7 +1273,9 @@ public final class Instance //_referenceFactory.destroy(); // No destroy function defined. _referenceFactory = null; - + + _requestHandlerFactory = null; + // _proxyFactory.destroy(); // No destroy function defined. _proxyFactory = null; @@ -1564,6 +1581,7 @@ public final class Instance private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; + private RequestHandlerFactory _requestHandlerFactory; private ProxyFactory _proxyFactory; private OutgoingConnectionFactory _outgoingConnectionFactory; private ObjectFactoryManager _servantFactoryManager; diff --git a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java index 29779078111..ce20fb95e9d 100644 --- a/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java +++ b/java/src/Ice/src/main/java/IceInternal/ProxyOutgoingAsyncBase.java @@ -52,6 +52,11 @@ public abstract class ProxyOutgoingAsyncBase extends OutgoingAsyncBase // try { + // + // It's important to let the retry queue do the retry even if + // the retry interval is 0. This method can be called with the + // connection locked so we can't just retry here. + // _instance.retryQueue().add(this, handleException(exc)); return false; } diff --git a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java index fdb7a672e2a..4d2be680c83 100644 --- a/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/QueueRequestHandler.java @@ -30,14 +30,14 @@ public class QueueRequestHandler implements RequestHandler @Override public RequestHandler - connect() + connect(final Ice.ObjectPrxHelperBase proxy) { performCallable(new Callable<Void>() { @Override public Void call() { - _delegate.connect(); + _delegate.connect(proxy); return null; } }); @@ -184,7 +184,8 @@ public class QueueRequestHandler implements RequestHandler return _delegate.waitForConnection(); } - private <T> T performCallable(Callable<T> callable) { + private <T> T performCallable(Callable<T> callable) + { try { Future<T> future = _executor.submit(callable); diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java index f87b215919f..6bca1b4ff24 100644 --- a/java/src/Ice/src/main/java/IceInternal/RequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/RequestHandler.java @@ -11,7 +11,7 @@ package IceInternal; public interface RequestHandler extends CancellationHandler { - RequestHandler connect(); + RequestHandler connect(Ice.ObjectPrxHelperBase proxy); RequestHandler update(RequestHandler previousHandler, RequestHandler newHandler); void prepareBatchRequest(BasicStream out) diff --git a/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java new file mode 100644 index 00000000000..d8f79c7c79b --- /dev/null +++ b/java/src/Ice/src/main/java/IceInternal/RequestHandlerFactory.java @@ -0,0 +1,79 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import java.util.Map; +import java.util.HashMap; + +public final class RequestHandlerFactory +{ + RequestHandlerFactory(Instance instance) + { + _instance = instance; + } + + public RequestHandler + getRequestHandler(Reference ref, Ice.ObjectPrxHelperBase proxy) + { + if(ref.getCollocationOptimized()) + { + Ice.ObjectAdapter adapter = _instance.objectAdapterFactory().findObjectAdapter(proxy); + if(adapter != null) + { + return new CollocatedRequestHandler(ref, adapter); + } + } + + if(ref.getCacheConnection()) + { + synchronized(this) + { + RequestHandler handler = _handlers.get(ref); + if(handler != null) + { + return handler; + } + + handler = new ConnectRequestHandler(ref, proxy); + if(_instance.queueRequests()) + { + handler = new QueueRequestHandler(_instance, handler); + } + _handlers.put(ref, handler); + return handler; + } + } + else + { + RequestHandler handler = new ConnectRequestHandler(ref, proxy); + if(_instance.queueRequests()) + { + handler = new QueueRequestHandler(_instance, handler); + } + return handler; + } + } + + void + removeRequestHandler(Reference ref, RequestHandler handler) + { + if(ref.getCacheConnection()) + { + synchronized(this) + { + assert(_handlers.containsKey(ref)); + _handlers.remove(ref); + } + } + } + + private final Instance _instance; + private final Map<Reference, RequestHandler> _handlers = new HashMap<Reference, RequestHandler>(); +} |