diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-20 17:44:41 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-20 17:44:41 +0000 |
commit | f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad (patch) | |
tree | 520786ae72c4376b505f21f8adf9f5ea522cf9bf /java/src | |
parent | Win32 fixes (diff) | |
download | ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.bz2 ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.tar.xz ice-f86bb34ec33de67fcc569d1f8cf6df2a6b7af6ad.zip |
C++ -> Java
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/ObjectPrxHelper.java | 70 | ||||
-rw-r--r-- | java/src/Ice/_ObjectDel.java | 3 | ||||
-rw-r--r-- | java/src/Ice/_ObjectDelD.java | 7 | ||||
-rw-r--r-- | java/src/Ice/_ObjectDelM.java | 268 | ||||
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 4 | ||||
-rw-r--r-- | java/src/IceInternal/Connection.java | 741 | ||||
-rw-r--r-- | java/src/IceInternal/Instance.java | 50 | ||||
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 335 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/ProxyFactory.java | 63 | ||||
-rw-r--r-- | java/src/IceInternal/Reference.java | 249 |
11 files changed, 943 insertions, 849 deletions
diff --git a/java/src/Ice/ObjectPrxHelper.java b/java/src/Ice/ObjectPrxHelper.java index 4b7037a7bc4..07f850ab756 100644 --- a/java/src/Ice/ObjectPrxHelper.java +++ b/java/src/Ice/ObjectPrxHelper.java @@ -225,8 +225,8 @@ public class ObjectPrxHelper implements ObjectPrx { try { - _ObjectDel __del = __getDelegate(); - __del.ice_invoke_async(cb, operation, mode, inParams, context); +// _ObjectDel __del = __getDelegate(); +// __del.ice_invoke_async(cb, operation, mode, inParams, context); return; } catch(LocalException __ex) @@ -623,63 +623,15 @@ public class ObjectPrxHelper implements ObjectPrx _reference.locatorInfo.clearObjectCache(_reference); } - ++cnt; - - IceInternal.TraceLevels traceLevels = _reference.instance.traceLevels(); - Logger logger = _reference.instance.logger(); - IceInternal.ProxyFactory proxyFactory = _reference.instance.proxyFactory(); - - // - // Instance components may be null if Communicator has been destroyed. - // - if(traceLevels != null && logger != null && proxyFactory != null) - { - int[] retryIntervals = proxyFactory.getRetryIntervals(); - - if(cnt > retryIntervals.length) - { - if(traceLevels.retry >= 1) - { - String s = "cannot retry operation call because retry limit has been exceeded\n" + ex.toString(); - logger.trace(traceLevels.retryCat, s); - } - throw ex; - } - - if(traceLevels.retry >= 1) - { - String s = "re-trying operation call"; - if(cnt > 0 && retryIntervals[cnt - 1] > 0) - { - s += " in " + retryIntervals[cnt - 1] + "ms"; - } - s += " because of exception\n" + ex; - logger.trace(traceLevels.retryCat, s); - } - - if(cnt > 0) - { - // - // Sleep before retrying. - // - try - { - Thread.currentThread().sleep(retryIntervals[cnt - 1]); - } - catch(InterruptedException ex1) - { - } - } - - return cnt; - } - else - { - // - // Impossible to retry after Communicator has been destroyed. - // - throw ex; - } + IceInternal.ProxyFactory proxyFactory = _reference.instance.proxyFactory(); + if(proxyFactory != null) + { + return proxyFactory.checkRetryAfterException(ex, cnt); + } + else + { + throw ex; // The communicator is already destroyed, so we cannot retry. + } } public final synchronized void diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java index 5d929566123..486140d701f 100644 --- a/java/src/Ice/_ObjectDel.java +++ b/java/src/Ice/_ObjectDel.java @@ -34,7 +34,4 @@ public interface _ObjectDel boolean ice_invoke(String operation, Ice.OperationMode mode, byte[] inParams, ByteSeqHolder outParams, java.util.Map context) throws IceInternal.NonRepeatable; - - void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams, - java.util.Map context); } diff --git a/java/src/Ice/_ObjectDelD.java b/java/src/Ice/_ObjectDelD.java index cc18359fa7a..d78e47da599 100644 --- a/java/src/Ice/_ObjectDelD.java +++ b/java/src/Ice/_ObjectDelD.java @@ -125,13 +125,6 @@ public class _ObjectDelD implements _ObjectDel throw new CollocationOptimizationException(); } - public void - ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams, - java.util.Map context) - { - throw new CollocationOptimizationException(); - } - // // Only for use by ObjectPrx. // diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index cf3505fc3da..549859a7bbc 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -175,16 +175,6 @@ public class _ObjectDelM implements _ObjectDel } } - public void - ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams, - java.util.Map context) - { - cb.__setup(__connection, __reference, operation, mode, context); - IceInternal.BasicStream __os = cb.__os(); - __os.writeBlob(inParams); - cb.__invoke(); - } - // // Only for use by ObjectPrx // @@ -204,19 +194,6 @@ public class _ObjectDelM implements _ObjectDel assert(__reference == null); assert(__connection == null); - if(from.__connection != null) - { - from.__connection.incProxyCount(); - } - -// Can not happen, __connection must be null. -/* - if(__connection != null) - { - __connection.decProxyCount(); - } -*/ - __reference = from.__reference; __connection = from.__connection; } @@ -235,244 +212,10 @@ public class _ObjectDelM implements _ObjectDel assert(__reference == null); assert(__connection == null); - __reference = ref; - - if(__reference.reverseAdapter != null) - { - // - // If we have a reverse object adapter, we use the incoming - // connections from such object adapter. - // - ObjectAdapterI adapter = (ObjectAdapterI)__reference.reverseAdapter; - IceInternal.Connection[] connections = adapter.getIncomingConnections(); - - IceInternal.Endpoint[] endpoints = new IceInternal.Endpoint[connections.length]; - for(int i = 0; i < connections.length; i++) - { - endpoints[i] = connections[i].endpoint(); - } - endpoints = filterEndpoints(endpoints); - - if(endpoints.length == 0) - { - NoEndpointException e = new NoEndpointException(); - e.proxy = __reference.toString(); - throw e; - } - - int j; - for(j = 0; j < connections.length; j++) - { - if(connections[j].endpoint().equals(endpoints[0])) - { - break; - } - } - assert(j < connections.length); - __connection = connections[j]; - __connection.incProxyCount(); - } - else - { - while(true) - { - IceInternal.Endpoint[] endpoints = null; - BooleanHolder cached = new BooleanHolder(); - cached.value = false; - - if(__reference.routerInfo != null) - { - // - // If we route, we send everything to the router's client - // proxy endpoints. - // - ObjectPrx proxy = __reference.routerInfo.getClientProxy(); - endpoints = ((ObjectPrxHelper)proxy).__reference().endpoints; - } - else if(__reference.endpoints.length > 0) - { - endpoints = __reference.endpoints; - } - else if(__reference.locatorInfo != null) - { - endpoints = __reference.locatorInfo.getEndpoints(__reference, cached); - } - - IceInternal.Endpoint[] filteredEndpoints = null; - if(endpoints != null) - { - filteredEndpoints = filterEndpoints(endpoints); - } - if(filteredEndpoints == null || filteredEndpoints.length == 0) - { - NoEndpointException e = new NoEndpointException(); - e.proxy = __reference.toString(); - throw e; - } - - try - { - IceInternal.OutgoingConnectionFactory factory = __reference.instance.outgoingConnectionFactory(); - __connection = factory.create(filteredEndpoints); - assert(__connection != null); - __connection.incProxyCount(); - } - catch(LocalException ex) - { - if(__reference.routerInfo == null && __reference.endpoints.length == 0) - { - assert(__reference.locatorInfo != null); - __reference.locatorInfo.clearCache(__reference); - - if(cached.value) - { - IceInternal.TraceLevels traceLevels = __reference.instance.traceLevels(); - Logger logger = __reference.instance.logger(); - - if(traceLevels.retry >= 2) - { - String s = "connection to cached endpoints failed\n" + - "removing endpoints from cache and trying one more time\n" + ex; - logger.trace(traceLevels.retryCat, s); - } - - continue; - } - } - - throw ex; - } - - break; - } - - // - // If we have a router, set the object adapter for this - // router (if any) to the new connection, so that - // callbacks from the router can be received over this new - // connection. - // - if(__reference.routerInfo != null) - { - __connection.setAdapter(__reference.routerInfo.getAdapter()); - } - } + __reference = ref; + __connection = __reference.getConnection(); } - - private IceInternal.Endpoint[] - filterEndpoints(IceInternal.Endpoint[] allEndpoints) - { - java.util.ArrayList endpoints = new java.util.ArrayList(); - - // - // Filter out unknown endpoints. - // - for(int i = 0; i < allEndpoints.length; i++) - { - if(!allEndpoints[i].unknown()) - { - endpoints.add(allEndpoints[i]); - } - } - - switch(__reference.mode) - { - case IceInternal.Reference.ModeTwoway: - case IceInternal.Reference.ModeOneway: - case IceInternal.Reference.ModeBatchOneway: - { - // - // Filter out datagram endpoints. - // - java.util.Iterator i = endpoints.iterator(); - while(i.hasNext()) - { - IceInternal.Endpoint endpoint = (IceInternal.Endpoint)i.next(); - if(endpoint.datagram()) - { - i.remove(); - } - } - break; - } - - case IceInternal.Reference.ModeDatagram: - case IceInternal.Reference.ModeBatchDatagram: - { - // - // Filter out non-datagram endpoints. - // - java.util.Iterator i = endpoints.iterator(); - while(i.hasNext()) - { - IceInternal.Endpoint endpoint = (IceInternal.Endpoint)i.next(); - if(!endpoint.datagram()) - { - i.remove(); - } - } - break; - } - } - - // - // Randomize the order of endpoints. - // - java.util.Collections.shuffle(endpoints); - - // - // If a secure connection is requested, remove all non-secure - // endpoints. Otherwise make non-secure endpoints preferred over - // secure endpoints by partitioning the endpoint vector, so that - // non-secure endpoints come first. - // - if(__reference.secure) - { - java.util.Iterator i = endpoints.iterator(); - while(i.hasNext()) - { - IceInternal.Endpoint endpoint = (IceInternal.Endpoint)i.next(); - if(!endpoint.secure()) - { - i.remove(); - } - } - } - else - { - java.util.Collections.sort(endpoints, __comparator); - } - - IceInternal.Endpoint[] arr = new IceInternal.Endpoint[endpoints.size()]; - endpoints.toArray(arr); - return arr; - } - - private static class EndpointComparator implements java.util.Comparator - { - public int - compare(java.lang.Object l, java.lang.Object r) - { - IceInternal.Endpoint le = (IceInternal.Endpoint)l; - IceInternal.Endpoint re = (IceInternal.Endpoint)r; - boolean ls = le.secure(); - boolean rs = re.secure(); - if((ls && rs) || (!ls && !rs)) - { - return 0; - } - else if(!ls && rs) - { - return -1; - } - else - { - return 1; - } - } - } - private static EndpointComparator __comparator = new EndpointComparator(); - + protected IceInternal.Outgoing getOutgoing(String operation, OperationMode mode, java.util.Map context) { @@ -515,11 +258,6 @@ public class _ObjectDelM implements _ObjectDel finalize() throws Throwable { - if(__connection != null) - { - __connection.decProxyCount(); - } - while(__outgoingCache != null) { IceInternal.Outgoing next = __outgoingCache.next; diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index 780791f11e8..28a65d388a1 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -986,6 +986,10 @@ public class BasicStream public String[] readStringSeq() { + // + // TODO: This code is dangerous, because it cannot be + // checked whether sz is a reasonable value. + // final int sz = readSize(); String[] v = new String[sz]; for(int i = 0; i < sz; i++) diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index bce9f51a5dd..b9919b9adb9 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -19,15 +19,7 @@ public final class Connection extends EventHandler public synchronized void validate() { - if(_exception != null) - { - throw _exception; - } - - if(_state != StateNotValidated) - { - return; - } + assert(_state == StateNotValidated); if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. { @@ -35,24 +27,27 @@ public final class Connection extends EventHandler { if(_adapter != null) { - // - // Incoming connections play the active role with - // respect to connection validation. - // - BasicStream os = new BasicStream(_instance); - os.writeByte(Protocol.magic[0]); - os.writeByte(Protocol.magic[1]); - os.writeByte(Protocol.magic[2]); - os.writeByte(Protocol.magic[3]); - os.writeByte(Protocol.protocolMajor); - os.writeByte(Protocol.protocolMinor); - os.writeByte(Protocol.encodingMajor); - os.writeByte(Protocol.encodingMinor); - os.writeByte(Protocol.validateConnectionMsg); - os.writeByte((byte)0); // Compression status. - os.writeInt(Protocol.headerSize); // Message size. - TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + synchronized(_sendMutex) + { + // + // Incoming connections play the active role + // with respect to connection validation. + // + BasicStream os = new BasicStream(_instance); + os.writeByte(Protocol.magic[0]); + os.writeByte(Protocol.magic[1]); + os.writeByte(Protocol.magic[2]); + os.writeByte(Protocol.magic[3]); + os.writeByte(Protocol.protocolMajor); + os.writeByte(Protocol.protocolMinor); + os.writeByte(Protocol.encodingMajor); + os.writeByte(Protocol.encodingMinor); + os.writeByte(Protocol.validateConnectionMsg); + os.writeByte((byte)0); // Compression status. + os.writeInt(Protocol.headerSize); // Message size. + TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } } else { @@ -158,24 +153,9 @@ public final class Connection extends EventHandler } } - // - // We only print warnings after successful connection validation. - // - _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; - - // - // We only use active connection management after successful - // connection validation. We don't use active connection - // management for datagram connections at all, because such - // "virtual connections" cannot be reestablished. - // - if(!_endpoint.datagram()) + if(_acmTimeout > 0) { - _acmTimeout = _instance.properties().getPropertyAsInt("Ice.ConnectionIdleTime"); - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; } // @@ -219,37 +199,21 @@ public final class Connection extends EventHandler } } - public boolean + public synchronized boolean isValidated() { - // - // No synchronization necessary, _state is declared - // volatile. Synchronization is not possible here anyway, - // because this function must not block. - // return _state > StateNotValidated; } - public boolean + public synchronized boolean isDestroyed() { - // - // No synchronization necessary, _state is declared - // volatile. Synchronization is not possible here anyway, - // because this function must not block. - // return _state >= StateClosing; } - public boolean + public synchronized boolean isFinished() { - // - // No synchronization necessary, _transceiver and - // _dispatchCount are declared volatile. Synchronization is - // not possible here anyway, because this function must not - // block. - // return _transceiver == null && _dispatchCount == 0; } @@ -317,8 +281,8 @@ public final class Connection extends EventHandler else { // - // We already waited long enough, so let's close this - // connection! + // We already waited long enough, so let's + // close this connection! // setState(StateClosed, new Ice.CloseTimeoutException()); } @@ -367,11 +331,14 @@ public final class Connection extends EventHandler // // Active connection management for idle connections. // - // TODO: Hack: ACM for incoming connections doesn't work right - // with AMI. - // - if(_acmTimeout > 0 && closingOK() && _adapter == null) + if(_acmTimeout > 0 && + _requests.isEmpty() && + _asyncRequests.isEmpty() && + !_batchStreamInUse && + _dispatchCount == 0) { + assert(_batchStream.isEmpty()); + if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis) { setState(StateClosing, new Ice.ConnectionTimeoutException()); @@ -380,25 +347,6 @@ public final class Connection extends EventHandler } } - public synchronized void - incProxyCount() - { - assert(_proxyCount >= 0); - ++_proxyCount; - } - - public synchronized void - decProxyCount() - { - assert(_proxyCount > 0); - --_proxyCount; - - if(_proxyCount == 0 && _adapter == null && closingOK()) - { - setState(StateClosing, new Ice.CloseConnectionException()); - } - } - private final static byte[] _requestHdr = { Protocol.magic[0], @@ -415,122 +363,224 @@ public final class Connection extends EventHandler (byte)0, (byte)0, (byte)0, (byte)0 // Request ID (placeholder). }; + // + // TODO: Should not be a member function of Connection. + // public void prepareRequest(BasicStream os) { os.writeBlob(_requestHdr); } - public synchronized void - sendRequest(Outgoing out, boolean oneway) + public void + sendRequest(BasicStream os, Outgoing out) { - if(_exception != null) - { - throw _exception; - } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - int requestId = 0; - - try + + synchronized(this) { - BasicStream os = out.os(); - os.pos(10); - + assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams. + + if(_exception != null) + { + throw _exception; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + // - // Fill in the message size and request ID. + // Fill in the message size. // + os.pos(10); os.writeInt(os.size()); - if(!_endpoint.datagram() && !oneway) + + // + // Only add to the request map if this is a twoway call. + // + if(out != null) { + // + // Create a new unique request ID. + // requestId = _nextRequestId++; if(requestId <= 0) { _nextRequestId = 1; requestId = _nextRequestId++; } + + // + // Fill in the request ID. + // + os.pos(Protocol.headerSize); os.writeInt(requestId); + + // + // Add to the requests map. + // + _requests.put(requestId, out); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; } - - // - // Send the request. - // - TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; } - - // - // Only add to the request map if there was no exception, and if - // the operation is not oneway. - // - if(!_endpoint.datagram() && !oneway) + + try { - _requests.put(requestId, out); + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the request. + // + TraceUtil.traceRequest("sending request", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } } - - if(_acmTimeout > 0) + catch(Ice.LocalException ex) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + if(out != null) + { + // + // If the request has already been removed from + // the request map, we are out of luck. It would + // mean that finished() has been called already, + // and therefore the exception has been set using + // the Outgoing::finished() callback. In this + // case, we cannot throw the exception here, + // because we must not both raise an exception and + // have Outgoing::finished() called with an + // exception. This means that in some rare cases, + // a request will not be retried even though it + // could. But I honestly don't know how I could + // avoid this, without a very elaborate and + // complex design, which would be bad for + // performance. + // + Outgoing o = (Outgoing)_requests.remove(requestId); + if(o != null) + { + assert(o == out); + throw _exception; + } + } + else + { + throw _exception; + } + } } } - public synchronized void - sendAsyncRequest(OutgoingAsync out) + public void + sendAsyncRequest(BasicStream os, OutgoingAsync out) { - if(_exception != null) - { - throw _exception; - } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - int requestId = 0; - - try + + synchronized(this) { - BasicStream os = out.__os(); - os.pos(10); - + assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway. + + if(_exception != null) + { + throw _exception; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + // - // Fill in the message size and request ID. + // Fill in the message size. // + os.pos(10); os.writeInt(os.size()); + + // + // Create a new unique request ID. + // requestId = _nextRequestId++; if(requestId <= 0) { _nextRequestId = 1; requestId = _nextRequestId++; } + + // + // Fill in the request ID. + // + os.pos(Protocol.headerSize); os.writeInt(requestId); // - // Send the request. + // Add to the requests map. // - TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); + _asyncRequests.put(requestId, out); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } } - catch(Ice.LocalException ex) + + try { - setState(StateClosed, ex); - assert(_exception != null); - throw _exception; + synchronized(_sendMutex) + { + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } + + // + // Send the request. + // + TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + } } - - // - // Only add to the request map if there was no exception. - // - _asyncRequests.put(requestId, out); - - if(_acmTimeout > 0) + catch(Ice.LocalException ex) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + synchronized(this) + { + setState(StateClosed, ex); + assert(_exception != null); + + // + // If the request has already been removed from the + // async request map, we are out of luck. It would + // mean that finished() has been called already, and + // therefore the exception has been set using the + // OutgoingAsync::__finished() callback. In this case, + // we cannot throw the exception here, because we must + // not both raise an exception and have + // OutgoingAsync::__finished() called with an + // exception. This means that in some rare cases, a + // request will not be retried even though it + // could. But I honestly don't know how I could avoid + // this, without a very elaborate and complex design, + // which would be bad for performance. + // + OutgoingAsync o = (OutgoingAsync)_asyncRequests.remove(requestId); + if(o != null) + { + assert(o == out); + throw _exception; + } + } } } @@ -568,6 +618,7 @@ public final class Connection extends EventHandler { throw _exception; } + assert(_state > StateNotValidated); assert(_state < StateClosing); @@ -589,7 +640,7 @@ public final class Connection extends EventHandler // // _batchStream now belongs to the caller, until - // finishBatchRequest() or abortBatchRequest() is called. + // finishBatchRequest() is called. // } @@ -600,6 +651,7 @@ public final class Connection extends EventHandler { throw _exception; } + assert(_state > StateNotValidated); assert(_state < StateClosing); @@ -614,135 +666,170 @@ public final class Connection extends EventHandler notifyAll(); } - public synchronized void - abortBatchRequest() - { - setState(StateClosed, new Ice.AbortBatchRequestException()); - - // - // Give the Connection back. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); - } - - public synchronized void + public void flushBatchRequest() { - while(_batchStreamInUse && _exception == null) + synchronized(this) { - try + while(_batchStreamInUse && _exception == null) { - wait(); + try + { + wait(); + } + catch(InterruptedException ex) + { + } } - catch(InterruptedException ex) + + if(_exception != null) + { + throw _exception; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + if(_batchStream.isEmpty()) { + return; // Nothing to do. } - } - if(_exception != null) - { - throw _exception; + // + // Fill in the message size. + // + _batchStream.pos(10); + _batchStream.writeInt(_batchStream.size()); + + // + // Fill in the number of requests in the batch. + // + _batchStream.writeInt(_batchRequestNum); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + + // + // Prevent that new batch requests are added while we are + // flushing. + // + _batchStreamInUse = true; } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - if(!_batchStream.isEmpty()) + + try { - try + synchronized(_sendMutex) { - _batchStream.pos(10); - - // - // Fill in the message size. - // - _batchStream.writeInt(_batchStream.size()); - - // - // Fill in the number of requests in the batch. - // - _batchStream.writeInt(_batchRequestNum); + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } // // Send the batch request. // TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); _transceiver.write(_batchStream, _endpoint.timeout()); - - // - // Reset _batchStream so that new batch messages can be sent. - // - _batchStream.destroy(); - _batchStream = new BasicStream(_instance); - _batchRequestNum = 0; } - catch(Ice.LocalException ex) + } + catch(Ice.LocalException ex) + { + synchronized(this) { setState(StateClosed, ex); assert(_exception != null); + + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // throw _exception; } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; - } } - if(_proxyCount == 0 && _adapter == null && closingOK()) + synchronized(this) { - setState(StateClosing, new Ice.CloseConnectionException()); + // + // Reset the batch stream, and notify that flushing is over. + // + _batchStream.destroy(); + _batchStream = new BasicStream(_instance); + _batchRequestNum = 0; + _batchStreamInUse = false; + notifyAll(); } } - public synchronized void + public void sendResponse(BasicStream os, byte compress) { try { - if(--_dispatchCount == 0) + synchronized(_sendMutex) { - notifyAll(); - } - - if(_state == StateClosed) - { - return; - } + if(_transceiver == null) // Has the transceiver already been closed? + { + assert(_exception != null); + throw _exception; // The exception is immutable at this point. + } - // - // Fill in the message size. - // - os.pos(10); - final int sz = os.size(); - os.writeInt(sz); - - // - // Send the reply. - // - TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); + // + // Fill in the message size. + // + os.pos(10); + os.writeInt(os.size()); + + // + // Send the reply. + // + TraceUtil.traceReply("sending reply", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); } } catch(Ice.LocalException ex) { - setState(StateClosed, ex); + synchronized(this) + { + setState(StateClosed, ex); + } } - if(_acmTimeout > 0) + synchronized(this) { - _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + assert(_state > StateNotValidated); + + try + { + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000; + } + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + } } } public synchronized void sendNoResponse() { + assert(_state > StateNotValidated); + try { if(--_dispatchCount == 0) @@ -750,11 +837,6 @@ public final class Connection extends EventHandler notifyAll(); } - if(_state == StateClosed) - { - return; - } - if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); @@ -914,8 +996,7 @@ public final class Connection extends EventHandler { if(_warn) { - _logger.warning("ignoring close connection message for datagram connection:\n" + - _transceiver.toString()); + _logger.warning("ignoring close connection message for datagram connection:\n" + _desc); } } else @@ -946,6 +1027,8 @@ public final class Connection extends EventHandler synchronized(this) { + assert(_state > StateNotValidated); + if(_state == StateClosed) { return; @@ -1015,11 +1098,6 @@ public final class Connection extends EventHandler { throw new Ice.UnknownRequestIdException(); } - - if(_proxyCount == 0 && _adapter == null && closingOK()) - { - setState(StateClosing, new Ice.CloseConnectionException()); - } } break; } @@ -1029,8 +1107,7 @@ public final class Connection extends EventHandler TraceUtil.traceHeader("received validate connection", stream, _logger, _traceLevels); if(_warn) { - _logger.warning("ignoring unexpected validate connection message:\n" + - _transceiver.toString()); + _logger.warning("ignoring unexpected validate connection message:\n" + _desc); } break; } @@ -1148,7 +1225,7 @@ public final class Connection extends EventHandler { threadPool.promoteFollower(); - Ice.LocalException closeException = null; + Ice.LocalException exception = null; IntMap requests = null; IntMap asyncRequests = null; @@ -1159,20 +1236,26 @@ public final class Connection extends EventHandler { registerWithPool(); } - else if(_state == StateClosed && _transceiver != null) + else if(_state == StateClosed) { - try - { - _transceiver.close(); - } - catch(Ice.LocalException ex) + // + // We must make sure that nobody is sending when we + // close the transceiver. + // + synchronized(_sendMutex) { - closeException = ex; + try + { + _transceiver.close(); + } + catch(Ice.LocalException ex) + { + exception = ex; + } + + _transceiver = null; + notifyAll(); } - - _transceiver = null; - _threadPool = null; // We don't need the thread pool anymore. - notifyAll(); } if(_state == StateClosed || _state == StateClosing) @@ -1207,9 +1290,9 @@ public final class Connection extends EventHandler } } - if(closeException != null) + if(exception != null) { - throw closeException; + throw exception; } } @@ -1219,31 +1302,30 @@ public final class Connection extends EventHandler setState(StateClosed, ex); } - public synchronized String + public String toString() { - assert(_transceiver != null); - return _transceiver.toString(); + return _desc; // No mutex lock, _desc is immutable. } Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, Ice.ObjectAdapter adapter) { super(instance); _transceiver = transceiver; + _desc = transceiver.toString(); _endpoint = endpoint; _adapter = adapter; - _logger = instance.logger(); // Cached for better performance. - _traceLevels = instance.traceLevels(); // Cached for better performance. + _logger = instance.logger(); // Cached for better performance. + _traceLevels = instance.traceLevels(); // Cached for better performance. _registeredWithPool = false; - _warn = false; - _acmTimeout = 0; + _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false; + _acmTimeout = _endpoint.datagram() ? 0 : _instance.connectionIdleTime(); _acmAbsoluteTimeoutMillis = 0; _nextRequestId = 1; _batchStream = new BasicStream(instance); _batchStreamInUse = false; _batchRequestNum = 0; _dispatchCount = 0; - _proxyCount = 0; _state = StateNotValidated; _stateTime = System.currentTimeMillis(); @@ -1266,7 +1348,6 @@ public final class Connection extends EventHandler assert(_state == StateClosed); assert(_transceiver == null); assert(_dispatchCount == 0); - assert(_proxyCount == 0); assert(_incomingCache == null); _batchStream.destroy(); @@ -1400,17 +1481,24 @@ public final class Connection extends EventHandler { assert(!_registeredWithPool); - try - { - _transceiver.close(); - } - catch(Ice.LocalException ex) + // + // We must make sure that nobody is sending when + // we close the transceiver. + // + synchronized(_sendMutex) { - // Here we ignore any exceptions in close(). - } + try + { + _transceiver.close(); + } + catch(Ice.LocalException ex) + { + // Here we ignore any exceptions in close(). + } - _transceiver = null; - _threadPool = null; // We don't need the thread pool anymore. + _transceiver = null; + //notifyAll(); // We notify already below. + } } else { @@ -1446,29 +1534,32 @@ public final class Connection extends EventHandler if(!_endpoint.datagram()) { - // - // Before we shut down, we send a close connection - // message. - // - BasicStream os = new BasicStream(_instance); - os.writeByte(Protocol.magic[0]); - os.writeByte(Protocol.magic[1]); - os.writeByte(Protocol.magic[2]); - os.writeByte(Protocol.magic[3]); - os.writeByte(Protocol.protocolMajor); - os.writeByte(Protocol.protocolMinor); - os.writeByte(Protocol.encodingMajor); - os.writeByte(Protocol.encodingMinor); - os.writeByte(Protocol.closeConnectionMsg); - os.writeByte((byte)0); // Compression status. - os.writeInt(Protocol.headerSize); // Message size. - - // - // Send the message. - // - TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver.write(os, _endpoint.timeout()); - _transceiver.shutdown(); + synchronized(_sendMutex) + { + // + // Before we shut down, we send a close connection + // message. + // + BasicStream os = new BasicStream(_instance); + os.writeByte(Protocol.magic[0]); + os.writeByte(Protocol.magic[1]); + os.writeByte(Protocol.magic[2]); + os.writeByte(Protocol.magic[3]); + os.writeByte(Protocol.protocolMajor); + os.writeByte(Protocol.protocolMinor); + os.writeByte(Protocol.encodingMajor); + os.writeByte(Protocol.encodingMinor); + os.writeByte(Protocol.closeConnectionMsg); + os.writeByte((byte)0); // Compression status. + os.writeInt(Protocol.headerSize); // Message size. + + // + // Send the message. + // + TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels); + _transceiver.write(os, _endpoint.timeout()); + _transceiver.shutdown(); + } } } @@ -1477,7 +1568,6 @@ public final class Connection extends EventHandler { if(!_registeredWithPool) { - assert(_threadPool != null); _threadPool._register(_transceiver.fd(), this); _registeredWithPool = true; @@ -1494,7 +1584,6 @@ public final class Connection extends EventHandler { if(_registeredWithPool) { - assert(_threadPool != null); _threadPool.unregister(_transceiver.fd()); _registeredWithPool = false; @@ -1523,7 +1612,7 @@ public final class Connection extends EventHandler } while(t != null); pw.flush(); - String s = msg + ":\n" + sw.toString() + _transceiver.toString(); + String s = msg + ":\n" + sw.toString() + _desc; _logger.warning(s); } @@ -1581,18 +1670,8 @@ public final class Connection extends EventHandler } */ - private boolean - closingOK() - { - return - _requests.isEmpty() && - _asyncRequests.isEmpty() && - !_batchStreamInUse && - _batchStream.isEmpty() && - _dispatchCount == 0; - } - - private volatile Transceiver _transceiver; // Must be volatile, see comment in isFinished(). + private Transceiver _transceiver; + private final String _desc; private final Endpoint _endpoint; private Ice.ObjectAdapter _adapter; @@ -1602,11 +1681,11 @@ public final class Connection extends EventHandler private final TraceLevels _traceLevels; private boolean _registeredWithPool; - private ThreadPool _threadPool; + private final ThreadPool _threadPool; - private boolean _warn; + private final boolean _warn; - private int _acmTimeout; + private final int _acmTimeout; private long _acmAbsoluteTimeoutMillis; private int _nextRequestId; @@ -1619,12 +1698,16 @@ public final class Connection extends EventHandler private boolean _batchStreamInUse; private int _batchRequestNum; - private volatile int _dispatchCount; // Must be volatile, see comment in isDestroyed(). + private int _dispatchCount; - private int _proxyCount; + private int _state; // The current state. + private long _stateTime; // The last time when the state was changed. - private volatile int _state; // The current state. Must be volatile, see comment in isDestroyed(). - private long _stateTime; // The time when the state was changed the last time. + // + // We have a separate mutex for sending, so that we don't block + // the whole connection when we do a blocking send. + // + private java.lang.Object _sendMutex = new java.lang.Object(); private Incoming _incomingCache; private java.lang.Object _incomingCacheMutex = new java.lang.Object(); diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index ba460ff50d2..e60d40518a6 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -241,6 +241,13 @@ public class Instance return _messageSizeMax; } + public int + connectionIdleTime() + { + // No mutex lock, immutable. + return _connectionIdleTime; + } + public void flushBatchRequests() { @@ -297,21 +304,35 @@ public class Instance _defaultsAndOverrides = new DefaultsAndOverrides(_properties); - final int defaultMessageSizeMax = 1024; - final int num = _properties.getPropertyAsIntWithDefault("Ice.MessageSizeMax", defaultMessageSizeMax); - if(num < 1) { - _messageSizeMax = defaultMessageSizeMax * 1024; // Ignore stupid values. + final int defaultMessageSizeMax = 1024; + int num = _properties.getPropertyAsIntWithDefault("Ice.MessageSizeMax", defaultMessageSizeMax); + if(num < 1) + { + _messageSizeMax = defaultMessageSizeMax * 1024; // Ignore stupid values. + } + else if(num > 0x7fffffff / 1024) + { + _messageSizeMax = 0x7fffffff; + } + else + { + _messageSizeMax = num * 1024; // Property is in kilobytes, _messageSizeMax in bytes + } } - else if(num > 0x7fffffff / 1024) - { - _messageSizeMax = 0x7fffffff; - } - else + { - _messageSizeMax = num * 1024; // Property is in kilobytes, _messageSizeMax in bytes + int num = _properties.getPropertyAsIntWithDefault("Ice.ConnectionIdleTime", 60); + if(num < 0) + { + _connectionIdleTime = 0; + } + else + { + _connectionIdleTime = num; + } } - + _routerManager = new RouterManager(); _locatorManager = new LocatorManager(); @@ -394,11 +415,9 @@ public class Instance } // - // Connection monitor initializations must be done after - // daemon() is called, since daemon() forks. + // Start connection monitor if necessary. // - int acmTimeout = _properties.getPropertyAsInt("Ice.ConnectionIdleTime"); - int interval = _properties.getPropertyAsIntWithDefault("Ice.MonitorConnections", acmTimeout); + int interval = _properties.getPropertyAsIntWithDefault("Ice.MonitorConnections", _connectionIdleTime); if(interval > 0) { _connectionMonitor = new ConnectionMonitor(this, interval); @@ -537,6 +556,7 @@ public class Instance private final TraceLevels _traceLevels; // Immutable, not reset by destroy(). private final DefaultsAndOverrides _defaultsAndOverrides; // Immutable, not reset by destroy(). private final int _messageSizeMax; // Immutable, not reset by destroy(). + private final int _connectionIdleTime; // Immutable, not reset by destroy(). private RouterManager _routerManager; private LocatorManager _locatorManager; private ReferenceFactory _referenceFactory; diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index 962585b1da3..ea36c10e100 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -43,13 +43,6 @@ public final class Outgoing public void destroy() { - if(_state == StateUnsent && - (_reference.mode == Reference.ModeBatchOneway || - _reference.mode == Reference.ModeBatchDatagram)) - { - _connection.abortBatchRequest(); - } - _os.destroy(); _is.destroy(); } @@ -65,26 +58,48 @@ public final class Outgoing { case Reference.ModeTwoway: { - Ice.LocalException exception = null; + // + // We let all exceptions raised by sending directly + // propagate to the caller, because they can be + // retried without violating "at-most-once". In case + // of such exceptions, the connection object does not + // call back on this object, so we don't need to lock + // the mutex, keep track of state, or save exceptions. + // + _connection.sendRequest(_os, this); + + // + // Wait until the request has completed, or until the + // request times out. + // + + boolean timedOut = false; synchronized(this) { - _connection.sendRequest(this, false); - _state = StateInProgress; + // + // It's possible that the request has already + // completed, due to a regular response, or because of + // an exception. So we only change the state to "in + // progress" if it is still "unsent". + // + if(_state == StateUnsent) + { + _state = StateInProgress; + } int timeout = _connection.timeout(); - while(_state == StateInProgress) + while(_state == StateInProgress && !timedOut) { try { if(timeout >= 0) { wait(timeout); - + if(_state == StateInProgress) { - exception = new Ice.TimeoutException(); - break; + timedOut = true; } } else @@ -97,14 +112,14 @@ public final class Outgoing } } } - - if(exception != null) - { + + if(timedOut) + { // // Must be called outside the synchronization of // this object // - _connection.exception(exception); + _connection.exception(new Ice.TimeoutException()); // // We must wait until the exception set above has @@ -156,7 +171,7 @@ public final class Outgoing { return false; } - + assert(_state == StateOK); break; } @@ -164,30 +179,27 @@ public final class Outgoing case Reference.ModeOneway: case Reference.ModeDatagram: { - try - { - _connection.sendRequest(this, true); - } - catch(Ice.DatagramLimitException ex) - { - throw new NonRepeatable(ex); - } - _state = StateInProgress; + // + // For oneway and datagram requests, the connection + // object never calls back on this object. Therefore + // we don't need to lock the mutex, keep track of + // state, or save exceptions. We simply let all + // exceptions from sending propagate to the caller, + // because such exceptions can be retried without + // violating "at-most-once". + // + _connection.sendRequest(_os, null); break; } case Reference.ModeBatchOneway: case Reference.ModeBatchDatagram: { - // - // The state must be set to StateInProgress before calling - // finishBatchRequest, because otherwise if - // finishBatchRequest raises an exception, the destructor - // of this class will call abortBatchRequest, and calling - // both finishBatchRequest and abortBatchRequest is - // illegal. - // - _state = StateInProgress; + // + // For batch oneways and datagrams, the same rules as for + // regular oneways and datagrams (see comment above) + // apply. + // _connection.finishBatchRequest(_os); break; } @@ -199,130 +211,126 @@ public final class Outgoing public synchronized void finished(BasicStream is) { - // - // The state might be StateLocalException if there was a - // timeout in invoke(). - // - if(_state == StateInProgress) - { - _is.swap(is); - byte status = _is.readByte(); - - switch((int)status) - { - case DispatchStatus._DispatchOK: - { - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // oneway requests as blobs. - // - _is.startReadEncaps(); - _state = StateOK; - break; - } - - case DispatchStatus._DispatchUserException: - { - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // oneway requests as blobs. - // - _is.startReadEncaps(); - _state = StateUserException; - break; - } - - case DispatchStatus._DispatchObjectNotExist: - case DispatchStatus._DispatchFacetNotExist: - case DispatchStatus._DispatchOperationNotExist: + assert(_reference.mode == Reference.ModeTwoway); // Can only be called for twoways. + + assert(_state <= StateInProgress); + + byte status = _is.readByte(); + + switch((int)status) + { + case DispatchStatus._DispatchOK: + { + // + // Input and output parameters are always sent in an + // encapsulation, which makes it possible to forward + // oneway requests as blobs. + // + _is.startReadEncaps(); + _state = StateOK; + break; + } + + case DispatchStatus._DispatchUserException: + { + // + // Input and output parameters are always sent in an + // encapsulation, which makes it possible to forward + // oneway requests as blobs. + // + _is.startReadEncaps(); + _state = StateUserException; + break; + } + + case DispatchStatus._DispatchObjectNotExist: + case DispatchStatus._DispatchFacetNotExist: + case DispatchStatus._DispatchOperationNotExist: + { + _state = StateLocalException; + + Ice.RequestFailedException ex = null; + switch((int)status) { - _state = StateLocalException; - - Ice.RequestFailedException ex = null; - switch((int)status) + case DispatchStatus._DispatchObjectNotExist: { - case DispatchStatus._DispatchObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } - - case DispatchStatus._DispatchFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } - - case DispatchStatus._DispatchOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } - - default: - { - assert(false); - break; - } + ex = new Ice.ObjectNotExistException(); + break; } - - ex.id = new Ice.Identity(); - ex.id.__read(_is); - ex.facet = _is.readStringSeq(); - ex.operation = _is.readString(); - _exception = ex; - break; - } - - case DispatchStatus._DispatchUnknownException: - case DispatchStatus._DispatchUnknownLocalException: - case DispatchStatus._DispatchUnknownUserException: - { - _state = StateLocalException; - - Ice.UnknownException ex = null; - switch((int)status) + + case DispatchStatus._DispatchFacetNotExist: { - case DispatchStatus._DispatchUnknownException: - { - ex = new Ice.UnknownException(); - break; - } - - case DispatchStatus._DispatchUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } - - case DispatchStatus._DispatchUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } - - default: - { - assert(false); - break; - } + ex = new Ice.FacetNotExistException(); + break; } - - ex.unknown = _is.readString(); - _exception = ex; - break; - } + + case DispatchStatus._DispatchOperationNotExist: + { + ex = new Ice.OperationNotExistException(); + break; + } + + default: + { + assert(false); + break; + } + } - default: - { - _state = StateLocalException; - _exception = new Ice.UnknownReplyStatusException(); - break; - } - } - } + ex.id = new Ice.Identity(); + ex.id.__read(_is); + ex.facet = _is.readStringSeq(); + ex.operation = _is.readString(); + _exception = ex; + break; + } + + case DispatchStatus._DispatchUnknownException: + case DispatchStatus._DispatchUnknownLocalException: + case DispatchStatus._DispatchUnknownUserException: + { + _state = StateLocalException; + + Ice.UnknownException ex = null; + switch((int)status) + { + case DispatchStatus._DispatchUnknownException: + { + ex = new Ice.UnknownException(); + break; + } + + case DispatchStatus._DispatchUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } + + case DispatchStatus._DispatchUnknownUserException: + { + ex = new Ice.UnknownUserException(); + break; + } + + default: + { + assert(false); + break; + } + } + + ex.unknown = _is.readString(); + _exception = ex; + break; + } + + default: + { + _state = StateLocalException; + _exception = new Ice.UnknownReplyStatusException(); + break; + } + } notify(); } @@ -330,16 +338,13 @@ public final class Outgoing public synchronized void finished(Ice.LocalException ex) { - // - // The state might be StateLocalException if there was a - // timeout in invoke(). - // - if(_state == StateInProgress) - { - _state = StateLocalException; - _exception = ex; - notify(); - } + assert(_reference.mode == Reference.ModeTwoway); // Can only be called for twoways. + + assert(_state <= StateInProgress); + + _state = StateLocalException; + _exception = ex; + notify(); } public BasicStream diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index f198b8df397..dc7720c70eb 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -95,7 +95,7 @@ public abstract class OutgoingAsync { _os.endWriteEncaps(); - _connection.sendAsyncRequest(this); + _connection.sendAsyncRequest(_os, this); if(_connection.timeout() >= 0) { diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java index 926de612160..c03b3fbe5be 100644 --- a/java/src/IceInternal/ProxyFactory.java +++ b/java/src/IceInternal/ProxyFactory.java @@ -81,14 +81,67 @@ public final class ProxyFactory } } - public int[] - getRetryIntervals() - { - return _retryIntervals; + public int + checkRetryAfterException(Ice.LocalException ex, int cnt) + { + ++cnt; + + TraceLevels traceLevels = _instance.traceLevels(); + Ice.Logger logger = _instance.logger(); + + // + // Instance components may be null if Communicator has been destroyed. + // + if(traceLevels != null && logger != null) + { + if(cnt > _retryIntervals.length) + { + if(traceLevels.retry >= 1) + { + String s = "cannot retry operation call because retry limit has been exceeded\n" + ex.toString(); + logger.trace(traceLevels.retryCat, s); + } + throw ex; + } + + if(traceLevels.retry >= 1) + { + String s = "re-trying operation call"; + if(cnt > 0 && _retryIntervals[cnt - 1] > 0) + { + s += " in " + _retryIntervals[cnt - 1] + "ms"; + } + s += " because of exception\n" + ex; + logger.trace(traceLevels.retryCat, s); + } + + if(cnt > 0) + { + // + // Sleep before retrying. + // + try + { + Thread.currentThread().sleep(_retryIntervals[cnt - 1]); + } + catch(InterruptedException ex1) + { + } + } + + return cnt; + } + else + { + // + // Impossible to retry after Communicator has been destroyed. + // + throw ex; + } } // - // Only for use by Instance + // Only for use by Instance. // ProxyFactory(Instance instance) { diff --git a/java/src/IceInternal/Reference.java b/java/src/IceInternal/Reference.java index 508c23ac47c..925ca70f439 100644 --- a/java/src/IceInternal/Reference.java +++ b/java/src/IceInternal/Reference.java @@ -573,6 +573,255 @@ public final class Reference } // + // Get a suitable connection for this reference. + // + public Connection + getConnection() + { + Connection connection; + + if(reverseAdapter != null) + { + // + // If we have a reverse object adapter, we use the incoming + // connections from such object adapter. + // + Ice.ObjectAdapterI adapter = (Ice.ObjectAdapterI)reverseAdapter; + Connection[] connections = adapter.getIncomingConnections(); + + Endpoint[] endpoints = new Endpoint[connections.length]; + for(int i = 0; i < connections.length; i++) + { + endpoints[i] = connections[i].endpoint(); + } + endpoints = filterEndpoints(endpoints); + + if(endpoints.length == 0) + { + Ice.NoEndpointException e = new Ice.NoEndpointException(); + e.proxy = toString(); + throw e; + } + + int j; + for(j = 0; j < connections.length; j++) + { + if(connections[j].endpoint().equals(endpoints[0])) + { + break; + } + } + assert(j < connections.length); + connection = connections[j]; + } + else + { + while(true) + { + Endpoint[] endpoints = null; + Ice.BooleanHolder cached = new Ice.BooleanHolder(); + cached.value = false; + + if(routerInfo != null) + { + // + // If we route, we send everything to the router's client + // proxy endpoints. + // + Ice.ObjectPrx proxy = routerInfo.getClientProxy(); + endpoints = ((Ice.ObjectPrxHelper)proxy).__reference().endpoints; + } + else if(endpoints.length > 0) + { + endpoints = endpoints; + } + else if(locatorInfo != null) + { + endpoints = locatorInfo.getEndpoints(this, cached); + } + + Endpoint[] filteredEndpoints = null; + if(endpoints != null) + { + filteredEndpoints = filterEndpoints(endpoints); + } + if(filteredEndpoints == null || filteredEndpoints.length == 0) + { + Ice.NoEndpointException e = new Ice.NoEndpointException(); + e.proxy = toString(); + throw e; + } + + try + { + OutgoingConnectionFactory factory = instance.outgoingConnectionFactory(); + connection = factory.create(filteredEndpoints); + assert(connection != null); + } + catch(Ice.LocalException ex) + { + if(routerInfo == null && endpoints.length == 0) + { + assert(locatorInfo != null); + locatorInfo.clearCache(this); + + if(cached.value) + { + TraceLevels traceLevels = instance.traceLevels(); + Ice.Logger logger = instance.logger(); + + if(traceLevels.retry >= 2) + { + String s = "connection to cached endpoints failed\n" + + "removing endpoints from cache and trying one more time\n" + ex; + logger.trace(traceLevels.retryCat, s); + } + + continue; + } + } + + throw ex; + } + + break; + } + + // + // If we have a router, set the object adapter for this + // router (if any) to the new connection, so that + // callbacks from the router can be received over this new + // connection. + // + if(routerInfo != null) + { + connection.setAdapter(routerInfo.getAdapter()); + } + } + + assert(connection != null); + return connection; + } + + // + // Filter endpoints based on criteria from this reference. + // + public Endpoint[] + filterEndpoints(Endpoint[] allEndpoints) + { + java.util.ArrayList endpoints = new java.util.ArrayList(); + + // + // Filter out unknown endpoints. + // + for(int i = 0; i < allEndpoints.length; i++) + { + if(!allEndpoints[i].unknown()) + { + endpoints.add(allEndpoints[i]); + } + } + + switch(mode) + { + case Reference.ModeTwoway: + case Reference.ModeOneway: + case Reference.ModeBatchOneway: + { + // + // Filter out datagram endpoints. + // + java.util.Iterator i = endpoints.iterator(); + while(i.hasNext()) + { + Endpoint endpoint = (Endpoint)i.next(); + if(endpoint.datagram()) + { + i.remove(); + } + } + break; + } + + case Reference.ModeDatagram: + case Reference.ModeBatchDatagram: + { + // + // Filter out non-datagram endpoints. + // + java.util.Iterator i = endpoints.iterator(); + while(i.hasNext()) + { + Endpoint endpoint = (Endpoint)i.next(); + if(!endpoint.datagram()) + { + i.remove(); + } + } + break; + } + } + + // + // Randomize the order of endpoints. + // + java.util.Collections.shuffle(endpoints); + + // + // If a secure connection is requested, remove all non-secure + // endpoints. Otherwise make non-secure endpoints preferred over + // secure endpoints by partitioning the endpoint vector, so that + // non-secure endpoints come first. + // + if(secure) + { + java.util.Iterator i = endpoints.iterator(); + while(i.hasNext()) + { + Endpoint endpoint = (Endpoint)i.next(); + if(!endpoint.secure()) + { + i.remove(); + } + } + } + else + { + java.util.Collections.sort(endpoints, _comparator); + } + + Endpoint[] arr = new Endpoint[endpoints.size()]; + endpoints.toArray(arr); + return arr; + } + + static class EndpointComparator implements java.util.Comparator + { + public int + compare(java.lang.Object l, java.lang.Object r) + { + IceInternal.Endpoint le = (IceInternal.Endpoint)l; + IceInternal.Endpoint re = (IceInternal.Endpoint)r; + boolean ls = le.secure(); + boolean rs = re.secure(); + if((ls && rs) || (!ls && !rs)) + { + return 0; + } + else if(!ls && rs) + { + return -1; + } + else + { + return 1; + } + } + } + + private static EndpointComparator _comparator = new EndpointComparator(); + + // // Only for use by ReferenceFactory // Reference(Instance inst, |