summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/ObjectPrxHelper.java70
-rw-r--r--java/src/Ice/_ObjectDel.java3
-rw-r--r--java/src/Ice/_ObjectDelD.java7
-rw-r--r--java/src/Ice/_ObjectDelM.java268
-rw-r--r--java/src/IceInternal/BasicStream.java4
-rw-r--r--java/src/IceInternal/Connection.java741
-rw-r--r--java/src/IceInternal/Instance.java50
-rw-r--r--java/src/IceInternal/Outgoing.java335
-rw-r--r--java/src/IceInternal/OutgoingAsync.java2
-rw-r--r--java/src/IceInternal/ProxyFactory.java63
-rw-r--r--java/src/IceInternal/Reference.java249
11 files changed, 943 insertions, 849 deletions
diff --git a/java/src/Ice/ObjectPrxHelper.java b/java/src/Ice/ObjectPrxHelper.java
index 4b7037a7bc4..07f850ab756 100644
--- a/java/src/Ice/ObjectPrxHelper.java
+++ b/java/src/Ice/ObjectPrxHelper.java
@@ -225,8 +225,8 @@ public class ObjectPrxHelper implements ObjectPrx
{
try
{
- _ObjectDel __del = __getDelegate();
- __del.ice_invoke_async(cb, operation, mode, inParams, context);
+// _ObjectDel __del = __getDelegate();
+// __del.ice_invoke_async(cb, operation, mode, inParams, context);
return;
}
catch(LocalException __ex)
@@ -623,63 +623,15 @@ public class ObjectPrxHelper implements ObjectPrx
_reference.locatorInfo.clearObjectCache(_reference);
}
- ++cnt;
-
- IceInternal.TraceLevels traceLevels = _reference.instance.traceLevels();
- Logger logger = _reference.instance.logger();
- IceInternal.ProxyFactory proxyFactory = _reference.instance.proxyFactory();
-
- //
- // Instance components may be null if Communicator has been destroyed.
- //
- if(traceLevels != null && logger != null && proxyFactory != null)
- {
- int[] retryIntervals = proxyFactory.getRetryIntervals();
-
- if(cnt > retryIntervals.length)
- {
- if(traceLevels.retry >= 1)
- {
- String s = "cannot retry operation call because retry limit has been exceeded\n" + ex.toString();
- logger.trace(traceLevels.retryCat, s);
- }
- throw ex;
- }
-
- if(traceLevels.retry >= 1)
- {
- String s = "re-trying operation call";
- if(cnt > 0 && retryIntervals[cnt - 1] > 0)
- {
- s += " in " + retryIntervals[cnt - 1] + "ms";
- }
- s += " because of exception\n" + ex;
- logger.trace(traceLevels.retryCat, s);
- }
-
- if(cnt > 0)
- {
- //
- // Sleep before retrying.
- //
- try
- {
- Thread.currentThread().sleep(retryIntervals[cnt - 1]);
- }
- catch(InterruptedException ex1)
- {
- }
- }
-
- return cnt;
- }
- else
- {
- //
- // Impossible to retry after Communicator has been destroyed.
- //
- throw ex;
- }
+ IceInternal.ProxyFactory proxyFactory = _reference.instance.proxyFactory();
+ if(proxyFactory != null)
+ {
+ return proxyFactory.checkRetryAfterException(ex, cnt);
+ }
+ else
+ {
+ throw ex; // The communicator is already destroyed, so we cannot retry.
+ }
}
public final synchronized void
diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java
index 5d929566123..486140d701f 100644
--- a/java/src/Ice/_ObjectDel.java
+++ b/java/src/Ice/_ObjectDel.java
@@ -34,7 +34,4 @@ public interface _ObjectDel
boolean ice_invoke(String operation, Ice.OperationMode mode, byte[] inParams, ByteSeqHolder outParams,
java.util.Map context)
throws IceInternal.NonRepeatable;
-
- void ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams,
- java.util.Map context);
}
diff --git a/java/src/Ice/_ObjectDelD.java b/java/src/Ice/_ObjectDelD.java
index cc18359fa7a..d78e47da599 100644
--- a/java/src/Ice/_ObjectDelD.java
+++ b/java/src/Ice/_ObjectDelD.java
@@ -125,13 +125,6 @@ public class _ObjectDelD implements _ObjectDel
throw new CollocationOptimizationException();
}
- public void
- ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams,
- java.util.Map context)
- {
- throw new CollocationOptimizationException();
- }
-
//
// Only for use by ObjectPrx.
//
diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java
index cf3505fc3da..549859a7bbc 100644
--- a/java/src/Ice/_ObjectDelM.java
+++ b/java/src/Ice/_ObjectDelM.java
@@ -175,16 +175,6 @@ public class _ObjectDelM implements _ObjectDel
}
}
- public void
- ice_invoke_async(AMI_Object_ice_invoke cb, String operation, Ice.OperationMode mode, byte[] inParams,
- java.util.Map context)
- {
- cb.__setup(__connection, __reference, operation, mode, context);
- IceInternal.BasicStream __os = cb.__os();
- __os.writeBlob(inParams);
- cb.__invoke();
- }
-
//
// Only for use by ObjectPrx
//
@@ -204,19 +194,6 @@ public class _ObjectDelM implements _ObjectDel
assert(__reference == null);
assert(__connection == null);
- if(from.__connection != null)
- {
- from.__connection.incProxyCount();
- }
-
-// Can not happen, __connection must be null.
-/*
- if(__connection != null)
- {
- __connection.decProxyCount();
- }
-*/
-
__reference = from.__reference;
__connection = from.__connection;
}
@@ -235,244 +212,10 @@ public class _ObjectDelM implements _ObjectDel
assert(__reference == null);
assert(__connection == null);
- __reference = ref;
-
- if(__reference.reverseAdapter != null)
- {
- //
- // If we have a reverse object adapter, we use the incoming
- // connections from such object adapter.
- //
- ObjectAdapterI adapter = (ObjectAdapterI)__reference.reverseAdapter;
- IceInternal.Connection[] connections = adapter.getIncomingConnections();
-
- IceInternal.Endpoint[] endpoints = new IceInternal.Endpoint[connections.length];
- for(int i = 0; i < connections.length; i++)
- {
- endpoints[i] = connections[i].endpoint();
- }
- endpoints = filterEndpoints(endpoints);
-
- if(endpoints.length == 0)
- {
- NoEndpointException e = new NoEndpointException();
- e.proxy = __reference.toString();
- throw e;
- }
-
- int j;
- for(j = 0; j < connections.length; j++)
- {
- if(connections[j].endpoint().equals(endpoints[0]))
- {
- break;
- }
- }
- assert(j < connections.length);
- __connection = connections[j];
- __connection.incProxyCount();
- }
- else
- {
- while(true)
- {
- IceInternal.Endpoint[] endpoints = null;
- BooleanHolder cached = new BooleanHolder();
- cached.value = false;
-
- if(__reference.routerInfo != null)
- {
- //
- // If we route, we send everything to the router's client
- // proxy endpoints.
- //
- ObjectPrx proxy = __reference.routerInfo.getClientProxy();
- endpoints = ((ObjectPrxHelper)proxy).__reference().endpoints;
- }
- else if(__reference.endpoints.length > 0)
- {
- endpoints = __reference.endpoints;
- }
- else if(__reference.locatorInfo != null)
- {
- endpoints = __reference.locatorInfo.getEndpoints(__reference, cached);
- }
-
- IceInternal.Endpoint[] filteredEndpoints = null;
- if(endpoints != null)
- {
- filteredEndpoints = filterEndpoints(endpoints);
- }
- if(filteredEndpoints == null || filteredEndpoints.length == 0)
- {
- NoEndpointException e = new NoEndpointException();
- e.proxy = __reference.toString();
- throw e;
- }
-
- try
- {
- IceInternal.OutgoingConnectionFactory factory = __reference.instance.outgoingConnectionFactory();
- __connection = factory.create(filteredEndpoints);
- assert(__connection != null);
- __connection.incProxyCount();
- }
- catch(LocalException ex)
- {
- if(__reference.routerInfo == null && __reference.endpoints.length == 0)
- {
- assert(__reference.locatorInfo != null);
- __reference.locatorInfo.clearCache(__reference);
-
- if(cached.value)
- {
- IceInternal.TraceLevels traceLevels = __reference.instance.traceLevels();
- Logger logger = __reference.instance.logger();
-
- if(traceLevels.retry >= 2)
- {
- String s = "connection to cached endpoints failed\n" +
- "removing endpoints from cache and trying one more time\n" + ex;
- logger.trace(traceLevels.retryCat, s);
- }
-
- continue;
- }
- }
-
- throw ex;
- }
-
- break;
- }
-
- //
- // If we have a router, set the object adapter for this
- // router (if any) to the new connection, so that
- // callbacks from the router can be received over this new
- // connection.
- //
- if(__reference.routerInfo != null)
- {
- __connection.setAdapter(__reference.routerInfo.getAdapter());
- }
- }
+ __reference = ref;
+ __connection = __reference.getConnection();
}
-
- private IceInternal.Endpoint[]
- filterEndpoints(IceInternal.Endpoint[] allEndpoints)
- {
- java.util.ArrayList endpoints = new java.util.ArrayList();
-
- //
- // Filter out unknown endpoints.
- //
- for(int i = 0; i < allEndpoints.length; i++)
- {
- if(!allEndpoints[i].unknown())
- {
- endpoints.add(allEndpoints[i]);
- }
- }
-
- switch(__reference.mode)
- {
- case IceInternal.Reference.ModeTwoway:
- case IceInternal.Reference.ModeOneway:
- case IceInternal.Reference.ModeBatchOneway:
- {
- //
- // Filter out datagram endpoints.
- //
- java.util.Iterator i = endpoints.iterator();
- while(i.hasNext())
- {
- IceInternal.Endpoint endpoint = (IceInternal.Endpoint)i.next();
- if(endpoint.datagram())
- {
- i.remove();
- }
- }
- break;
- }
-
- case IceInternal.Reference.ModeDatagram:
- case IceInternal.Reference.ModeBatchDatagram:
- {
- //
- // Filter out non-datagram endpoints.
- //
- java.util.Iterator i = endpoints.iterator();
- while(i.hasNext())
- {
- IceInternal.Endpoint endpoint = (IceInternal.Endpoint)i.next();
- if(!endpoint.datagram())
- {
- i.remove();
- }
- }
- break;
- }
- }
-
- //
- // Randomize the order of endpoints.
- //
- java.util.Collections.shuffle(endpoints);
-
- //
- // If a secure connection is requested, remove all non-secure
- // endpoints. Otherwise make non-secure endpoints preferred over
- // secure endpoints by partitioning the endpoint vector, so that
- // non-secure endpoints come first.
- //
- if(__reference.secure)
- {
- java.util.Iterator i = endpoints.iterator();
- while(i.hasNext())
- {
- IceInternal.Endpoint endpoint = (IceInternal.Endpoint)i.next();
- if(!endpoint.secure())
- {
- i.remove();
- }
- }
- }
- else
- {
- java.util.Collections.sort(endpoints, __comparator);
- }
-
- IceInternal.Endpoint[] arr = new IceInternal.Endpoint[endpoints.size()];
- endpoints.toArray(arr);
- return arr;
- }
-
- private static class EndpointComparator implements java.util.Comparator
- {
- public int
- compare(java.lang.Object l, java.lang.Object r)
- {
- IceInternal.Endpoint le = (IceInternal.Endpoint)l;
- IceInternal.Endpoint re = (IceInternal.Endpoint)r;
- boolean ls = le.secure();
- boolean rs = re.secure();
- if((ls && rs) || (!ls && !rs))
- {
- return 0;
- }
- else if(!ls && rs)
- {
- return -1;
- }
- else
- {
- return 1;
- }
- }
- }
- private static EndpointComparator __comparator = new EndpointComparator();
-
+
protected IceInternal.Outgoing
getOutgoing(String operation, OperationMode mode, java.util.Map context)
{
@@ -515,11 +258,6 @@ public class _ObjectDelM implements _ObjectDel
finalize()
throws Throwable
{
- if(__connection != null)
- {
- __connection.decProxyCount();
- }
-
while(__outgoingCache != null)
{
IceInternal.Outgoing next = __outgoingCache.next;
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index 780791f11e8..28a65d388a1 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -986,6 +986,10 @@ public class BasicStream
public String[]
readStringSeq()
{
+ //
+ // TODO: This code is dangerous, because it cannot be
+ // checked whether sz is a reasonable value.
+ //
final int sz = readSize();
String[] v = new String[sz];
for(int i = 0; i < sz; i++)
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index bce9f51a5dd..b9919b9adb9 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -19,15 +19,7 @@ public final class Connection extends EventHandler
public synchronized void
validate()
{
- if(_exception != null)
- {
- throw _exception;
- }
-
- if(_state != StateNotValidated)
- {
- return;
- }
+ assert(_state == StateNotValidated);
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
{
@@ -35,24 +27,27 @@ public final class Connection extends EventHandler
{
if(_adapter != null)
{
- //
- // Incoming connections play the active role with
- // respect to connection validation.
- //
- BasicStream os = new BasicStream(_instance);
- os.writeByte(Protocol.magic[0]);
- os.writeByte(Protocol.magic[1]);
- os.writeByte(Protocol.magic[2]);
- os.writeByte(Protocol.magic[3]);
- os.writeByte(Protocol.protocolMajor);
- os.writeByte(Protocol.protocolMinor);
- os.writeByte(Protocol.encodingMajor);
- os.writeByte(Protocol.encodingMinor);
- os.writeByte(Protocol.validateConnectionMsg);
- os.writeByte((byte)0); // Compression status.
- os.writeInt(Protocol.headerSize); // Message size.
- TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ synchronized(_sendMutex)
+ {
+ //
+ // Incoming connections play the active role
+ // with respect to connection validation.
+ //
+ BasicStream os = new BasicStream(_instance);
+ os.writeByte(Protocol.magic[0]);
+ os.writeByte(Protocol.magic[1]);
+ os.writeByte(Protocol.magic[2]);
+ os.writeByte(Protocol.magic[3]);
+ os.writeByte(Protocol.protocolMajor);
+ os.writeByte(Protocol.protocolMinor);
+ os.writeByte(Protocol.encodingMajor);
+ os.writeByte(Protocol.encodingMinor);
+ os.writeByte(Protocol.validateConnectionMsg);
+ os.writeByte((byte)0); // Compression status.
+ os.writeInt(Protocol.headerSize); // Message size.
+ TraceUtil.traceHeader("sending validate connection", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
}
else
{
@@ -158,24 +153,9 @@ public final class Connection extends EventHandler
}
}
- //
- // We only print warnings after successful connection validation.
- //
- _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
-
- //
- // We only use active connection management after successful
- // connection validation. We don't use active connection
- // management for datagram connections at all, because such
- // "virtual connections" cannot be reestablished.
- //
- if(!_endpoint.datagram())
+ if(_acmTimeout > 0)
{
- _acmTimeout = _instance.properties().getPropertyAsInt("Ice.ConnectionIdleTime");
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
- }
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
}
//
@@ -219,37 +199,21 @@ public final class Connection extends EventHandler
}
}
- public boolean
+ public synchronized boolean
isValidated()
{
- //
- // No synchronization necessary, _state is declared
- // volatile. Synchronization is not possible here anyway,
- // because this function must not block.
- //
return _state > StateNotValidated;
}
- public boolean
+ public synchronized boolean
isDestroyed()
{
- //
- // No synchronization necessary, _state is declared
- // volatile. Synchronization is not possible here anyway,
- // because this function must not block.
- //
return _state >= StateClosing;
}
- public boolean
+ public synchronized boolean
isFinished()
{
- //
- // No synchronization necessary, _transceiver and
- // _dispatchCount are declared volatile. Synchronization is
- // not possible here anyway, because this function must not
- // block.
- //
return _transceiver == null && _dispatchCount == 0;
}
@@ -317,8 +281,8 @@ public final class Connection extends EventHandler
else
{
//
- // We already waited long enough, so let's close this
- // connection!
+ // We already waited long enough, so let's
+ // close this connection!
//
setState(StateClosed, new Ice.CloseTimeoutException());
}
@@ -367,11 +331,14 @@ public final class Connection extends EventHandler
//
// Active connection management for idle connections.
//
- // TODO: Hack: ACM for incoming connections doesn't work right
- // with AMI.
- //
- if(_acmTimeout > 0 && closingOK() && _adapter == null)
+ if(_acmTimeout > 0 &&
+ _requests.isEmpty() &&
+ _asyncRequests.isEmpty() &&
+ !_batchStreamInUse &&
+ _dispatchCount == 0)
{
+ assert(_batchStream.isEmpty());
+
if(System.currentTimeMillis() >= _acmAbsoluteTimeoutMillis)
{
setState(StateClosing, new Ice.ConnectionTimeoutException());
@@ -380,25 +347,6 @@ public final class Connection extends EventHandler
}
}
- public synchronized void
- incProxyCount()
- {
- assert(_proxyCount >= 0);
- ++_proxyCount;
- }
-
- public synchronized void
- decProxyCount()
- {
- assert(_proxyCount > 0);
- --_proxyCount;
-
- if(_proxyCount == 0 && _adapter == null && closingOK())
- {
- setState(StateClosing, new Ice.CloseConnectionException());
- }
- }
-
private final static byte[] _requestHdr =
{
Protocol.magic[0],
@@ -415,122 +363,224 @@ public final class Connection extends EventHandler
(byte)0, (byte)0, (byte)0, (byte)0 // Request ID (placeholder).
};
+ //
+ // TODO: Should not be a member function of Connection.
+ //
public void
prepareRequest(BasicStream os)
{
os.writeBlob(_requestHdr);
}
- public synchronized void
- sendRequest(Outgoing out, boolean oneway)
+ public void
+ sendRequest(BasicStream os, Outgoing out)
{
- if(_exception != null)
- {
- throw _exception;
- }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
int requestId = 0;
-
- try
+
+ synchronized(this)
{
- BasicStream os = out.os();
- os.pos(10);
-
+ assert(!(out != null && _endpoint.datagram())); // Twoway requests cannot be datagrams.
+
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
//
- // Fill in the message size and request ID.
+ // Fill in the message size.
//
+ os.pos(10);
os.writeInt(os.size());
- if(!_endpoint.datagram() && !oneway)
+
+ //
+ // Only add to the request map if this is a twoway call.
+ //
+ if(out != null)
{
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
if(requestId <= 0)
{
_nextRequestId = 1;
requestId = _nextRequestId++;
}
+
+ //
+ // Fill in the request ID.
+ //
+ os.pos(Protocol.headerSize);
os.writeInt(requestId);
+
+ //
+ // Add to the requests map.
+ //
+ _requests.put(requestId, out);
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
}
-
- //
- // Send the request.
- //
- TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
}
-
- //
- // Only add to the request map if there was no exception, and if
- // the operation is not oneway.
- //
- if(!_endpoint.datagram() && !oneway)
+
+ try
{
- _requests.put(requestId, out);
+ synchronized(_sendMutex)
+ {
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
+
+ //
+ // Send the request.
+ //
+ TraceUtil.traceRequest("sending request", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
}
-
- if(_acmTimeout > 0)
+ catch(Ice.LocalException ex)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+
+ if(out != null)
+ {
+ //
+ // If the request has already been removed from
+ // the request map, we are out of luck. It would
+ // mean that finished() has been called already,
+ // and therefore the exception has been set using
+ // the Outgoing::finished() callback. In this
+ // case, we cannot throw the exception here,
+ // because we must not both raise an exception and
+ // have Outgoing::finished() called with an
+ // exception. This means that in some rare cases,
+ // a request will not be retried even though it
+ // could. But I honestly don't know how I could
+ // avoid this, without a very elaborate and
+ // complex design, which would be bad for
+ // performance.
+ //
+ Outgoing o = (Outgoing)_requests.remove(requestId);
+ if(o != null)
+ {
+ assert(o == out);
+ throw _exception;
+ }
+ }
+ else
+ {
+ throw _exception;
+ }
+ }
}
}
- public synchronized void
- sendAsyncRequest(OutgoingAsync out)
+ public void
+ sendAsyncRequest(BasicStream os, OutgoingAsync out)
{
- if(_exception != null)
- {
- throw _exception;
- }
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
int requestId = 0;
-
- try
+
+ synchronized(this)
{
- BasicStream os = out.__os();
- os.pos(10);
-
+ assert(!_endpoint.datagram()); // Twoway requests cannot be datagrams, and async implies twoway.
+
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
//
- // Fill in the message size and request ID.
+ // Fill in the message size.
//
+ os.pos(10);
os.writeInt(os.size());
+
+ //
+ // Create a new unique request ID.
+ //
requestId = _nextRequestId++;
if(requestId <= 0)
{
_nextRequestId = 1;
requestId = _nextRequestId++;
}
+
+ //
+ // Fill in the request ID.
+ //
+ os.pos(Protocol.headerSize);
os.writeInt(requestId);
//
- // Send the request.
+ // Add to the requests map.
//
- TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
+ _asyncRequests.put(requestId, out);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
}
- catch(Ice.LocalException ex)
+
+ try
{
- setState(StateClosed, ex);
- assert(_exception != null);
- throw _exception;
+ synchronized(_sendMutex)
+ {
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
+
+ //
+ // Send the request.
+ //
+ TraceUtil.traceRequest("sending asynchronous request", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ }
}
-
- //
- // Only add to the request map if there was no exception.
- //
- _asyncRequests.put(requestId, out);
-
- if(_acmTimeout > 0)
+ catch(Ice.LocalException ex)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ assert(_exception != null);
+
+ //
+ // If the request has already been removed from the
+ // async request map, we are out of luck. It would
+ // mean that finished() has been called already, and
+ // therefore the exception has been set using the
+ // OutgoingAsync::__finished() callback. In this case,
+ // we cannot throw the exception here, because we must
+ // not both raise an exception and have
+ // OutgoingAsync::__finished() called with an
+ // exception. This means that in some rare cases, a
+ // request will not be retried even though it
+ // could. But I honestly don't know how I could avoid
+ // this, without a very elaborate and complex design,
+ // which would be bad for performance.
+ //
+ OutgoingAsync o = (OutgoingAsync)_asyncRequests.remove(requestId);
+ if(o != null)
+ {
+ assert(o == out);
+ throw _exception;
+ }
+ }
}
}
@@ -568,6 +618,7 @@ public final class Connection extends EventHandler
{
throw _exception;
}
+
assert(_state > StateNotValidated);
assert(_state < StateClosing);
@@ -589,7 +640,7 @@ public final class Connection extends EventHandler
//
// _batchStream now belongs to the caller, until
- // finishBatchRequest() or abortBatchRequest() is called.
+ // finishBatchRequest() is called.
//
}
@@ -600,6 +651,7 @@ public final class Connection extends EventHandler
{
throw _exception;
}
+
assert(_state > StateNotValidated);
assert(_state < StateClosing);
@@ -614,135 +666,170 @@ public final class Connection extends EventHandler
notifyAll();
}
- public synchronized void
- abortBatchRequest()
- {
- setState(StateClosed, new Ice.AbortBatchRequestException());
-
- //
- // Give the Connection back.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
-
- public synchronized void
+ public void
flushBatchRequest()
{
- while(_batchStreamInUse && _exception == null)
+ synchronized(this)
{
- try
+ while(_batchStreamInUse && _exception == null)
{
- wait();
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
- catch(InterruptedException ex)
+
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+
+ assert(_state > StateNotValidated);
+ assert(_state < StateClosing);
+
+ if(_batchStream.isEmpty())
{
+ return; // Nothing to do.
}
- }
- if(_exception != null)
- {
- throw _exception;
+ //
+ // Fill in the message size.
+ //
+ _batchStream.pos(10);
+ _batchStream.writeInt(_batchStream.size());
+
+ //
+ // Fill in the number of requests in the batch.
+ //
+ _batchStream.writeInt(_batchRequestNum);
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+
+ //
+ // Prevent that new batch requests are added while we are
+ // flushing.
+ //
+ _batchStreamInUse = true;
}
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- if(!_batchStream.isEmpty())
+
+ try
{
- try
+ synchronized(_sendMutex)
{
- _batchStream.pos(10);
-
- //
- // Fill in the message size.
- //
- _batchStream.writeInt(_batchStream.size());
-
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.writeInt(_batchRequestNum);
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
//
// Send the batch request.
//
TraceUtil.traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
_transceiver.write(_batchStream, _endpoint.timeout());
-
- //
- // Reset _batchStream so that new batch messages can be sent.
- //
- _batchStream.destroy();
- _batchStream = new BasicStream(_instance);
- _batchRequestNum = 0;
}
- catch(Ice.LocalException ex)
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(this)
{
setState(StateClosed, ex);
assert(_exception != null);
+
+ //
+ // Since batch requests are all oneways (or datagrams), we
+ // must report the exception to the caller.
+ //
throw _exception;
}
-
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
- }
}
- if(_proxyCount == 0 && _adapter == null && closingOK())
+ synchronized(this)
{
- setState(StateClosing, new Ice.CloseConnectionException());
+ //
+ // Reset the batch stream, and notify that flushing is over.
+ //
+ _batchStream.destroy();
+ _batchStream = new BasicStream(_instance);
+ _batchRequestNum = 0;
+ _batchStreamInUse = false;
+ notifyAll();
}
}
- public synchronized void
+ public void
sendResponse(BasicStream os, byte compress)
{
try
{
- if(--_dispatchCount == 0)
+ synchronized(_sendMutex)
{
- notifyAll();
- }
-
- if(_state == StateClosed)
- {
- return;
- }
+ if(_transceiver == null) // Has the transceiver already been closed?
+ {
+ assert(_exception != null);
+ throw _exception; // The exception is immutable at this point.
+ }
- //
- // Fill in the message size.
- //
- os.pos(10);
- final int sz = os.size();
- os.writeInt(sz);
-
- //
- // Send the reply.
- //
- TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
-
- if(_state == StateClosing && _dispatchCount == 0)
- {
- initiateShutdown();
+ //
+ // Fill in the message size.
+ //
+ os.pos(10);
+ os.writeInt(os.size());
+
+ //
+ // Send the reply.
+ //
+ TraceUtil.traceReply("sending reply", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
}
}
catch(Ice.LocalException ex)
{
- setState(StateClosed, ex);
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ }
}
- if(_acmTimeout > 0)
+ synchronized(this)
{
- _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ assert(_state > StateNotValidated);
+
+ try
+ {
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
+ if(_state == StateClosing && _dispatchCount == 0)
+ {
+ initiateShutdown();
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeoutMillis = System.currentTimeMillis() + _acmTimeout * 1000;
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
}
}
public synchronized void
sendNoResponse()
{
+ assert(_state > StateNotValidated);
+
try
{
if(--_dispatchCount == 0)
@@ -750,11 +837,6 @@ public final class Connection extends EventHandler
notifyAll();
}
- if(_state == StateClosed)
- {
- return;
- }
-
if(_state == StateClosing && _dispatchCount == 0)
{
initiateShutdown();
@@ -914,8 +996,7 @@ public final class Connection extends EventHandler
{
if(_warn)
{
- _logger.warning("ignoring close connection message for datagram connection:\n" +
- _transceiver.toString());
+ _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
}
}
else
@@ -946,6 +1027,8 @@ public final class Connection extends EventHandler
synchronized(this)
{
+ assert(_state > StateNotValidated);
+
if(_state == StateClosed)
{
return;
@@ -1015,11 +1098,6 @@ public final class Connection extends EventHandler
{
throw new Ice.UnknownRequestIdException();
}
-
- if(_proxyCount == 0 && _adapter == null && closingOK())
- {
- setState(StateClosing, new Ice.CloseConnectionException());
- }
}
break;
}
@@ -1029,8 +1107,7 @@ public final class Connection extends EventHandler
TraceUtil.traceHeader("received validate connection", stream, _logger, _traceLevels);
if(_warn)
{
- _logger.warning("ignoring unexpected validate connection message:\n" +
- _transceiver.toString());
+ _logger.warning("ignoring unexpected validate connection message:\n" + _desc);
}
break;
}
@@ -1148,7 +1225,7 @@ public final class Connection extends EventHandler
{
threadPool.promoteFollower();
- Ice.LocalException closeException = null;
+ Ice.LocalException exception = null;
IntMap requests = null;
IntMap asyncRequests = null;
@@ -1159,20 +1236,26 @@ public final class Connection extends EventHandler
{
registerWithPool();
}
- else if(_state == StateClosed && _transceiver != null)
+ else if(_state == StateClosed)
{
- try
- {
- _transceiver.close();
- }
- catch(Ice.LocalException ex)
+ //
+ // We must make sure that nobody is sending when we
+ // close the transceiver.
+ //
+ synchronized(_sendMutex)
{
- closeException = ex;
+ try
+ {
+ _transceiver.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception = ex;
+ }
+
+ _transceiver = null;
+ notifyAll();
}
-
- _transceiver = null;
- _threadPool = null; // We don't need the thread pool anymore.
- notifyAll();
}
if(_state == StateClosed || _state == StateClosing)
@@ -1207,9 +1290,9 @@ public final class Connection extends EventHandler
}
}
- if(closeException != null)
+ if(exception != null)
{
- throw closeException;
+ throw exception;
}
}
@@ -1219,31 +1302,30 @@ public final class Connection extends EventHandler
setState(StateClosed, ex);
}
- public synchronized String
+ public String
toString()
{
- assert(_transceiver != null);
- return _transceiver.toString();
+ return _desc; // No mutex lock, _desc is immutable.
}
Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, Ice.ObjectAdapter adapter)
{
super(instance);
_transceiver = transceiver;
+ _desc = transceiver.toString();
_endpoint = endpoint;
_adapter = adapter;
- _logger = instance.logger(); // Cached for better performance.
- _traceLevels = instance.traceLevels(); // Cached for better performance.
+ _logger = instance.logger(); // Cached for better performance.
+ _traceLevels = instance.traceLevels(); // Cached for better performance.
_registeredWithPool = false;
- _warn = false;
- _acmTimeout = 0;
+ _warn = _instance.properties().getPropertyAsInt("Ice.Warn.Connections") > 0 ? true : false;
+ _acmTimeout = _endpoint.datagram() ? 0 : _instance.connectionIdleTime();
_acmAbsoluteTimeoutMillis = 0;
_nextRequestId = 1;
_batchStream = new BasicStream(instance);
_batchStreamInUse = false;
_batchRequestNum = 0;
_dispatchCount = 0;
- _proxyCount = 0;
_state = StateNotValidated;
_stateTime = System.currentTimeMillis();
@@ -1266,7 +1348,6 @@ public final class Connection extends EventHandler
assert(_state == StateClosed);
assert(_transceiver == null);
assert(_dispatchCount == 0);
- assert(_proxyCount == 0);
assert(_incomingCache == null);
_batchStream.destroy();
@@ -1400,17 +1481,24 @@ public final class Connection extends EventHandler
{
assert(!_registeredWithPool);
- try
- {
- _transceiver.close();
- }
- catch(Ice.LocalException ex)
+ //
+ // We must make sure that nobody is sending when
+ // we close the transceiver.
+ //
+ synchronized(_sendMutex)
{
- // Here we ignore any exceptions in close().
- }
+ try
+ {
+ _transceiver.close();
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Here we ignore any exceptions in close().
+ }
- _transceiver = null;
- _threadPool = null; // We don't need the thread pool anymore.
+ _transceiver = null;
+ //notifyAll(); // We notify already below.
+ }
}
else
{
@@ -1446,29 +1534,32 @@ public final class Connection extends EventHandler
if(!_endpoint.datagram())
{
- //
- // Before we shut down, we send a close connection
- // message.
- //
- BasicStream os = new BasicStream(_instance);
- os.writeByte(Protocol.magic[0]);
- os.writeByte(Protocol.magic[1]);
- os.writeByte(Protocol.magic[2]);
- os.writeByte(Protocol.magic[3]);
- os.writeByte(Protocol.protocolMajor);
- os.writeByte(Protocol.protocolMinor);
- os.writeByte(Protocol.encodingMajor);
- os.writeByte(Protocol.encodingMinor);
- os.writeByte(Protocol.closeConnectionMsg);
- os.writeByte((byte)0); // Compression status.
- os.writeInt(Protocol.headerSize); // Message size.
-
- //
- // Send the message.
- //
- TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels);
- _transceiver.write(os, _endpoint.timeout());
- _transceiver.shutdown();
+ synchronized(_sendMutex)
+ {
+ //
+ // Before we shut down, we send a close connection
+ // message.
+ //
+ BasicStream os = new BasicStream(_instance);
+ os.writeByte(Protocol.magic[0]);
+ os.writeByte(Protocol.magic[1]);
+ os.writeByte(Protocol.magic[2]);
+ os.writeByte(Protocol.magic[3]);
+ os.writeByte(Protocol.protocolMajor);
+ os.writeByte(Protocol.protocolMinor);
+ os.writeByte(Protocol.encodingMajor);
+ os.writeByte(Protocol.encodingMinor);
+ os.writeByte(Protocol.closeConnectionMsg);
+ os.writeByte((byte)0); // Compression status.
+ os.writeInt(Protocol.headerSize); // Message size.
+
+ //
+ // Send the message.
+ //
+ TraceUtil.traceHeader("sending close connection", os, _logger, _traceLevels);
+ _transceiver.write(os, _endpoint.timeout());
+ _transceiver.shutdown();
+ }
}
}
@@ -1477,7 +1568,6 @@ public final class Connection extends EventHandler
{
if(!_registeredWithPool)
{
- assert(_threadPool != null);
_threadPool._register(_transceiver.fd(), this);
_registeredWithPool = true;
@@ -1494,7 +1584,6 @@ public final class Connection extends EventHandler
{
if(_registeredWithPool)
{
- assert(_threadPool != null);
_threadPool.unregister(_transceiver.fd());
_registeredWithPool = false;
@@ -1523,7 +1612,7 @@ public final class Connection extends EventHandler
}
while(t != null);
pw.flush();
- String s = msg + ":\n" + sw.toString() + _transceiver.toString();
+ String s = msg + ":\n" + sw.toString() + _desc;
_logger.warning(s);
}
@@ -1581,18 +1670,8 @@ public final class Connection extends EventHandler
}
*/
- private boolean
- closingOK()
- {
- return
- _requests.isEmpty() &&
- _asyncRequests.isEmpty() &&
- !_batchStreamInUse &&
- _batchStream.isEmpty() &&
- _dispatchCount == 0;
- }
-
- private volatile Transceiver _transceiver; // Must be volatile, see comment in isFinished().
+ private Transceiver _transceiver;
+ private final String _desc;
private final Endpoint _endpoint;
private Ice.ObjectAdapter _adapter;
@@ -1602,11 +1681,11 @@ public final class Connection extends EventHandler
private final TraceLevels _traceLevels;
private boolean _registeredWithPool;
- private ThreadPool _threadPool;
+ private final ThreadPool _threadPool;
- private boolean _warn;
+ private final boolean _warn;
- private int _acmTimeout;
+ private final int _acmTimeout;
private long _acmAbsoluteTimeoutMillis;
private int _nextRequestId;
@@ -1619,12 +1698,16 @@ public final class Connection extends EventHandler
private boolean _batchStreamInUse;
private int _batchRequestNum;
- private volatile int _dispatchCount; // Must be volatile, see comment in isDestroyed().
+ private int _dispatchCount;
- private int _proxyCount;
+ private int _state; // The current state.
+ private long _stateTime; // The last time when the state was changed.
- private volatile int _state; // The current state. Must be volatile, see comment in isDestroyed().
- private long _stateTime; // The time when the state was changed the last time.
+ //
+ // We have a separate mutex for sending, so that we don't block
+ // the whole connection when we do a blocking send.
+ //
+ private java.lang.Object _sendMutex = new java.lang.Object();
private Incoming _incomingCache;
private java.lang.Object _incomingCacheMutex = new java.lang.Object();
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index ba460ff50d2..e60d40518a6 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -241,6 +241,13 @@ public class Instance
return _messageSizeMax;
}
+ public int
+ connectionIdleTime()
+ {
+ // No mutex lock, immutable.
+ return _connectionIdleTime;
+ }
+
public void
flushBatchRequests()
{
@@ -297,21 +304,35 @@ public class Instance
_defaultsAndOverrides = new DefaultsAndOverrides(_properties);
- final int defaultMessageSizeMax = 1024;
- final int num = _properties.getPropertyAsIntWithDefault("Ice.MessageSizeMax", defaultMessageSizeMax);
- if(num < 1)
{
- _messageSizeMax = defaultMessageSizeMax * 1024; // Ignore stupid values.
+ final int defaultMessageSizeMax = 1024;
+ int num = _properties.getPropertyAsIntWithDefault("Ice.MessageSizeMax", defaultMessageSizeMax);
+ if(num < 1)
+ {
+ _messageSizeMax = defaultMessageSizeMax * 1024; // Ignore stupid values.
+ }
+ else if(num > 0x7fffffff / 1024)
+ {
+ _messageSizeMax = 0x7fffffff;
+ }
+ else
+ {
+ _messageSizeMax = num * 1024; // Property is in kilobytes, _messageSizeMax in bytes
+ }
}
- else if(num > 0x7fffffff / 1024)
- {
- _messageSizeMax = 0x7fffffff;
- }
- else
+
{
- _messageSizeMax = num * 1024; // Property is in kilobytes, _messageSizeMax in bytes
+ int num = _properties.getPropertyAsIntWithDefault("Ice.ConnectionIdleTime", 60);
+ if(num < 0)
+ {
+ _connectionIdleTime = 0;
+ }
+ else
+ {
+ _connectionIdleTime = num;
+ }
}
-
+
_routerManager = new RouterManager();
_locatorManager = new LocatorManager();
@@ -394,11 +415,9 @@ public class Instance
}
//
- // Connection monitor initializations must be done after
- // daemon() is called, since daemon() forks.
+ // Start connection monitor if necessary.
//
- int acmTimeout = _properties.getPropertyAsInt("Ice.ConnectionIdleTime");
- int interval = _properties.getPropertyAsIntWithDefault("Ice.MonitorConnections", acmTimeout);
+ int interval = _properties.getPropertyAsIntWithDefault("Ice.MonitorConnections", _connectionIdleTime);
if(interval > 0)
{
_connectionMonitor = new ConnectionMonitor(this, interval);
@@ -537,6 +556,7 @@ public class Instance
private final TraceLevels _traceLevels; // Immutable, not reset by destroy().
private final DefaultsAndOverrides _defaultsAndOverrides; // Immutable, not reset by destroy().
private final int _messageSizeMax; // Immutable, not reset by destroy().
+ private final int _connectionIdleTime; // Immutable, not reset by destroy().
private RouterManager _routerManager;
private LocatorManager _locatorManager;
private ReferenceFactory _referenceFactory;
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index 962585b1da3..ea36c10e100 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -43,13 +43,6 @@ public final class Outgoing
public void
destroy()
{
- if(_state == StateUnsent &&
- (_reference.mode == Reference.ModeBatchOneway ||
- _reference.mode == Reference.ModeBatchDatagram))
- {
- _connection.abortBatchRequest();
- }
-
_os.destroy();
_is.destroy();
}
@@ -65,26 +58,48 @@ public final class Outgoing
{
case Reference.ModeTwoway:
{
- Ice.LocalException exception = null;
+ //
+ // We let all exceptions raised by sending directly
+ // propagate to the caller, because they can be
+ // retried without violating "at-most-once". In case
+ // of such exceptions, the connection object does not
+ // call back on this object, so we don't need to lock
+ // the mutex, keep track of state, or save exceptions.
+ //
+ _connection.sendRequest(_os, this);
+
+ //
+ // Wait until the request has completed, or until the
+ // request times out.
+ //
+
+ boolean timedOut = false;
synchronized(this)
{
- _connection.sendRequest(this, false);
- _state = StateInProgress;
+ //
+ // It's possible that the request has already
+ // completed, due to a regular response, or because of
+ // an exception. So we only change the state to "in
+ // progress" if it is still "unsent".
+ //
+ if(_state == StateUnsent)
+ {
+ _state = StateInProgress;
+ }
int timeout = _connection.timeout();
- while(_state == StateInProgress)
+ while(_state == StateInProgress && !timedOut)
{
try
{
if(timeout >= 0)
{
wait(timeout);
-
+
if(_state == StateInProgress)
{
- exception = new Ice.TimeoutException();
- break;
+ timedOut = true;
}
}
else
@@ -97,14 +112,14 @@ public final class Outgoing
}
}
}
-
- if(exception != null)
- {
+
+ if(timedOut)
+ {
//
// Must be called outside the synchronization of
// this object
//
- _connection.exception(exception);
+ _connection.exception(new Ice.TimeoutException());
//
// We must wait until the exception set above has
@@ -156,7 +171,7 @@ public final class Outgoing
{
return false;
}
-
+
assert(_state == StateOK);
break;
}
@@ -164,30 +179,27 @@ public final class Outgoing
case Reference.ModeOneway:
case Reference.ModeDatagram:
{
- try
- {
- _connection.sendRequest(this, true);
- }
- catch(Ice.DatagramLimitException ex)
- {
- throw new NonRepeatable(ex);
- }
- _state = StateInProgress;
+ //
+ // For oneway and datagram requests, the connection
+ // object never calls back on this object. Therefore
+ // we don't need to lock the mutex, keep track of
+ // state, or save exceptions. We simply let all
+ // exceptions from sending propagate to the caller,
+ // because such exceptions can be retried without
+ // violating "at-most-once".
+ //
+ _connection.sendRequest(_os, null);
break;
}
case Reference.ModeBatchOneway:
case Reference.ModeBatchDatagram:
{
- //
- // The state must be set to StateInProgress before calling
- // finishBatchRequest, because otherwise if
- // finishBatchRequest raises an exception, the destructor
- // of this class will call abortBatchRequest, and calling
- // both finishBatchRequest and abortBatchRequest is
- // illegal.
- //
- _state = StateInProgress;
+ //
+ // For batch oneways and datagrams, the same rules as for
+ // regular oneways and datagrams (see comment above)
+ // apply.
+ //
_connection.finishBatchRequest(_os);
break;
}
@@ -199,130 +211,126 @@ public final class Outgoing
public synchronized void
finished(BasicStream is)
{
- //
- // The state might be StateLocalException if there was a
- // timeout in invoke().
- //
- if(_state == StateInProgress)
- {
- _is.swap(is);
- byte status = _is.readByte();
-
- switch((int)status)
- {
- case DispatchStatus._DispatchOK:
- {
- //
- // Input and output parameters are always sent in an
- // encapsulation, which makes it possible to forward
- // oneway requests as blobs.
- //
- _is.startReadEncaps();
- _state = StateOK;
- break;
- }
-
- case DispatchStatus._DispatchUserException:
- {
- //
- // Input and output parameters are always sent in an
- // encapsulation, which makes it possible to forward
- // oneway requests as blobs.
- //
- _is.startReadEncaps();
- _state = StateUserException;
- break;
- }
-
- case DispatchStatus._DispatchObjectNotExist:
- case DispatchStatus._DispatchFacetNotExist:
- case DispatchStatus._DispatchOperationNotExist:
+ assert(_reference.mode == Reference.ModeTwoway); // Can only be called for twoways.
+
+ assert(_state <= StateInProgress);
+
+ byte status = _is.readByte();
+
+ switch((int)status)
+ {
+ case DispatchStatus._DispatchOK:
+ {
+ //
+ // Input and output parameters are always sent in an
+ // encapsulation, which makes it possible to forward
+ // oneway requests as blobs.
+ //
+ _is.startReadEncaps();
+ _state = StateOK;
+ break;
+ }
+
+ case DispatchStatus._DispatchUserException:
+ {
+ //
+ // Input and output parameters are always sent in an
+ // encapsulation, which makes it possible to forward
+ // oneway requests as blobs.
+ //
+ _is.startReadEncaps();
+ _state = StateUserException;
+ break;
+ }
+
+ case DispatchStatus._DispatchObjectNotExist:
+ case DispatchStatus._DispatchFacetNotExist:
+ case DispatchStatus._DispatchOperationNotExist:
+ {
+ _state = StateLocalException;
+
+ Ice.RequestFailedException ex = null;
+ switch((int)status)
{
- _state = StateLocalException;
-
- Ice.RequestFailedException ex = null;
- switch((int)status)
+ case DispatchStatus._DispatchObjectNotExist:
{
- case DispatchStatus._DispatchObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
- break;
- }
-
- case DispatchStatus._DispatchFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
-
- case DispatchStatus._DispatchOperationNotExist:
- {
- ex = new Ice.OperationNotExistException();
- break;
- }
-
- default:
- {
- assert(false);
- break;
- }
+ ex = new Ice.ObjectNotExistException();
+ break;
}
-
- ex.id = new Ice.Identity();
- ex.id.__read(_is);
- ex.facet = _is.readStringSeq();
- ex.operation = _is.readString();
- _exception = ex;
- break;
- }
-
- case DispatchStatus._DispatchUnknownException:
- case DispatchStatus._DispatchUnknownLocalException:
- case DispatchStatus._DispatchUnknownUserException:
- {
- _state = StateLocalException;
-
- Ice.UnknownException ex = null;
- switch((int)status)
+
+ case DispatchStatus._DispatchFacetNotExist:
{
- case DispatchStatus._DispatchUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
-
- case DispatchStatus._DispatchUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
- }
-
- case DispatchStatus._DispatchUnknownUserException:
- {
- ex = new Ice.UnknownUserException();
- break;
- }
-
- default:
- {
- assert(false);
- break;
- }
+ ex = new Ice.FacetNotExistException();
+ break;
}
-
- ex.unknown = _is.readString();
- _exception = ex;
- break;
- }
+
+ case DispatchStatus._DispatchOperationNotExist:
+ {
+ ex = new Ice.OperationNotExistException();
+ break;
+ }
+
+ default:
+ {
+ assert(false);
+ break;
+ }
+ }
- default:
- {
- _state = StateLocalException;
- _exception = new Ice.UnknownReplyStatusException();
- break;
- }
- }
- }
+ ex.id = new Ice.Identity();
+ ex.id.__read(_is);
+ ex.facet = _is.readStringSeq();
+ ex.operation = _is.readString();
+ _exception = ex;
+ break;
+ }
+
+ case DispatchStatus._DispatchUnknownException:
+ case DispatchStatus._DispatchUnknownLocalException:
+ case DispatchStatus._DispatchUnknownUserException:
+ {
+ _state = StateLocalException;
+
+ Ice.UnknownException ex = null;
+ switch((int)status)
+ {
+ case DispatchStatus._DispatchUnknownException:
+ {
+ ex = new Ice.UnknownException();
+ break;
+ }
+
+ case DispatchStatus._DispatchUnknownLocalException:
+ {
+ ex = new Ice.UnknownLocalException();
+ break;
+ }
+
+ case DispatchStatus._DispatchUnknownUserException:
+ {
+ ex = new Ice.UnknownUserException();
+ break;
+ }
+
+ default:
+ {
+ assert(false);
+ break;
+ }
+ }
+
+ ex.unknown = _is.readString();
+ _exception = ex;
+ break;
+ }
+
+ default:
+ {
+ _state = StateLocalException;
+ _exception = new Ice.UnknownReplyStatusException();
+ break;
+ }
+ }
notify();
}
@@ -330,16 +338,13 @@ public final class Outgoing
public synchronized void
finished(Ice.LocalException ex)
{
- //
- // The state might be StateLocalException if there was a
- // timeout in invoke().
- //
- if(_state == StateInProgress)
- {
- _state = StateLocalException;
- _exception = ex;
- notify();
- }
+ assert(_reference.mode == Reference.ModeTwoway); // Can only be called for twoways.
+
+ assert(_state <= StateInProgress);
+
+ _state = StateLocalException;
+ _exception = ex;
+ notify();
}
public BasicStream
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index f198b8df397..dc7720c70eb 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -95,7 +95,7 @@ public abstract class OutgoingAsync
{
_os.endWriteEncaps();
- _connection.sendAsyncRequest(this);
+ _connection.sendAsyncRequest(_os, this);
if(_connection.timeout() >= 0)
{
diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java
index 926de612160..c03b3fbe5be 100644
--- a/java/src/IceInternal/ProxyFactory.java
+++ b/java/src/IceInternal/ProxyFactory.java
@@ -81,14 +81,67 @@ public final class ProxyFactory
}
}
- public int[]
- getRetryIntervals()
- {
- return _retryIntervals;
+ public int
+ checkRetryAfterException(Ice.LocalException ex, int cnt)
+ {
+ ++cnt;
+
+ TraceLevels traceLevels = _instance.traceLevels();
+ Ice.Logger logger = _instance.logger();
+
+ //
+ // Instance components may be null if Communicator has been destroyed.
+ //
+ if(traceLevels != null && logger != null)
+ {
+ if(cnt > _retryIntervals.length)
+ {
+ if(traceLevels.retry >= 1)
+ {
+ String s = "cannot retry operation call because retry limit has been exceeded\n" + ex.toString();
+ logger.trace(traceLevels.retryCat, s);
+ }
+ throw ex;
+ }
+
+ if(traceLevels.retry >= 1)
+ {
+ String s = "re-trying operation call";
+ if(cnt > 0 && _retryIntervals[cnt - 1] > 0)
+ {
+ s += " in " + _retryIntervals[cnt - 1] + "ms";
+ }
+ s += " because of exception\n" + ex;
+ logger.trace(traceLevels.retryCat, s);
+ }
+
+ if(cnt > 0)
+ {
+ //
+ // Sleep before retrying.
+ //
+ try
+ {
+ Thread.currentThread().sleep(_retryIntervals[cnt - 1]);
+ }
+ catch(InterruptedException ex1)
+ {
+ }
+ }
+
+ return cnt;
+ }
+ else
+ {
+ //
+ // Impossible to retry after Communicator has been destroyed.
+ //
+ throw ex;
+ }
}
//
- // Only for use by Instance
+ // Only for use by Instance.
//
ProxyFactory(Instance instance)
{
diff --git a/java/src/IceInternal/Reference.java b/java/src/IceInternal/Reference.java
index 508c23ac47c..925ca70f439 100644
--- a/java/src/IceInternal/Reference.java
+++ b/java/src/IceInternal/Reference.java
@@ -573,6 +573,255 @@ public final class Reference
}
//
+ // Get a suitable connection for this reference.
+ //
+ public Connection
+ getConnection()
+ {
+ Connection connection;
+
+ if(reverseAdapter != null)
+ {
+ //
+ // If we have a reverse object adapter, we use the incoming
+ // connections from such object adapter.
+ //
+ Ice.ObjectAdapterI adapter = (Ice.ObjectAdapterI)reverseAdapter;
+ Connection[] connections = adapter.getIncomingConnections();
+
+ Endpoint[] endpoints = new Endpoint[connections.length];
+ for(int i = 0; i < connections.length; i++)
+ {
+ endpoints[i] = connections[i].endpoint();
+ }
+ endpoints = filterEndpoints(endpoints);
+
+ if(endpoints.length == 0)
+ {
+ Ice.NoEndpointException e = new Ice.NoEndpointException();
+ e.proxy = toString();
+ throw e;
+ }
+
+ int j;
+ for(j = 0; j < connections.length; j++)
+ {
+ if(connections[j].endpoint().equals(endpoints[0]))
+ {
+ break;
+ }
+ }
+ assert(j < connections.length);
+ connection = connections[j];
+ }
+ else
+ {
+ while(true)
+ {
+ Endpoint[] endpoints = null;
+ Ice.BooleanHolder cached = new Ice.BooleanHolder();
+ cached.value = false;
+
+ if(routerInfo != null)
+ {
+ //
+ // If we route, we send everything to the router's client
+ // proxy endpoints.
+ //
+ Ice.ObjectPrx proxy = routerInfo.getClientProxy();
+ endpoints = ((Ice.ObjectPrxHelper)proxy).__reference().endpoints;
+ }
+ else if(endpoints.length > 0)
+ {
+ endpoints = endpoints;
+ }
+ else if(locatorInfo != null)
+ {
+ endpoints = locatorInfo.getEndpoints(this, cached);
+ }
+
+ Endpoint[] filteredEndpoints = null;
+ if(endpoints != null)
+ {
+ filteredEndpoints = filterEndpoints(endpoints);
+ }
+ if(filteredEndpoints == null || filteredEndpoints.length == 0)
+ {
+ Ice.NoEndpointException e = new Ice.NoEndpointException();
+ e.proxy = toString();
+ throw e;
+ }
+
+ try
+ {
+ OutgoingConnectionFactory factory = instance.outgoingConnectionFactory();
+ connection = factory.create(filteredEndpoints);
+ assert(connection != null);
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(routerInfo == null && endpoints.length == 0)
+ {
+ assert(locatorInfo != null);
+ locatorInfo.clearCache(this);
+
+ if(cached.value)
+ {
+ TraceLevels traceLevels = instance.traceLevels();
+ Ice.Logger logger = instance.logger();
+
+ if(traceLevels.retry >= 2)
+ {
+ String s = "connection to cached endpoints failed\n" +
+ "removing endpoints from cache and trying one more time\n" + ex;
+ logger.trace(traceLevels.retryCat, s);
+ }
+
+ continue;
+ }
+ }
+
+ throw ex;
+ }
+
+ break;
+ }
+
+ //
+ // If we have a router, set the object adapter for this
+ // router (if any) to the new connection, so that
+ // callbacks from the router can be received over this new
+ // connection.
+ //
+ if(routerInfo != null)
+ {
+ connection.setAdapter(routerInfo.getAdapter());
+ }
+ }
+
+ assert(connection != null);
+ return connection;
+ }
+
+ //
+ // Filter endpoints based on criteria from this reference.
+ //
+ public Endpoint[]
+ filterEndpoints(Endpoint[] allEndpoints)
+ {
+ java.util.ArrayList endpoints = new java.util.ArrayList();
+
+ //
+ // Filter out unknown endpoints.
+ //
+ for(int i = 0; i < allEndpoints.length; i++)
+ {
+ if(!allEndpoints[i].unknown())
+ {
+ endpoints.add(allEndpoints[i]);
+ }
+ }
+
+ switch(mode)
+ {
+ case Reference.ModeTwoway:
+ case Reference.ModeOneway:
+ case Reference.ModeBatchOneway:
+ {
+ //
+ // Filter out datagram endpoints.
+ //
+ java.util.Iterator i = endpoints.iterator();
+ while(i.hasNext())
+ {
+ Endpoint endpoint = (Endpoint)i.next();
+ if(endpoint.datagram())
+ {
+ i.remove();
+ }
+ }
+ break;
+ }
+
+ case Reference.ModeDatagram:
+ case Reference.ModeBatchDatagram:
+ {
+ //
+ // Filter out non-datagram endpoints.
+ //
+ java.util.Iterator i = endpoints.iterator();
+ while(i.hasNext())
+ {
+ Endpoint endpoint = (Endpoint)i.next();
+ if(!endpoint.datagram())
+ {
+ i.remove();
+ }
+ }
+ break;
+ }
+ }
+
+ //
+ // Randomize the order of endpoints.
+ //
+ java.util.Collections.shuffle(endpoints);
+
+ //
+ // If a secure connection is requested, remove all non-secure
+ // endpoints. Otherwise make non-secure endpoints preferred over
+ // secure endpoints by partitioning the endpoint vector, so that
+ // non-secure endpoints come first.
+ //
+ if(secure)
+ {
+ java.util.Iterator i = endpoints.iterator();
+ while(i.hasNext())
+ {
+ Endpoint endpoint = (Endpoint)i.next();
+ if(!endpoint.secure())
+ {
+ i.remove();
+ }
+ }
+ }
+ else
+ {
+ java.util.Collections.sort(endpoints, _comparator);
+ }
+
+ Endpoint[] arr = new Endpoint[endpoints.size()];
+ endpoints.toArray(arr);
+ return arr;
+ }
+
+ static class EndpointComparator implements java.util.Comparator
+ {
+ public int
+ compare(java.lang.Object l, java.lang.Object r)
+ {
+ IceInternal.Endpoint le = (IceInternal.Endpoint)l;
+ IceInternal.Endpoint re = (IceInternal.Endpoint)r;
+ boolean ls = le.secure();
+ boolean rs = re.secure();
+ if((ls && rs) || (!ls && !rs))
+ {
+ return 0;
+ }
+ else if(!ls && rs)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ }
+
+ private static EndpointComparator _comparator = new EndpointComparator();
+
+ //
// Only for use by ReferenceFactory
//
Reference(Instance inst,