summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2007-12-12 18:54:19 +0100
committerBenoit Foucher <benoit@zeroc.com>2007-12-12 18:54:19 +0100
commit3dff2b82d498d2e29dc4c42c4053557e16a373d4 (patch)
tree4242da8678ce8f36e34b9d821212cf78519af415 /java/src
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.bz2
ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.tar.xz
ice-3dff2b82d498d2e29dc4c42c4053557e16a373d4.zip
Fixed bug 2592
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AMI_Object_ice_flushBatchRequests.java26
-rw-r--r--java/src/Ice/AMI_Object_ice_invoke.java7
-rw-r--r--java/src/Ice/ConnectionI.java44
-rw-r--r--java/src/Ice/ObjectAdapterI.java10
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java40
-rw-r--r--java/src/Ice/_ObjectDel.java3
-rw-r--r--java/src/Ice/_ObjectDelM.java15
-rw-r--r--java/src/IceInternal/BatchOutgoing.java22
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java86
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java299
-rw-r--r--java/src/IceInternal/ConnectionRequestHandler.java32
-rw-r--r--java/src/IceInternal/EndpointI.java7
-rw-r--r--java/src/IceInternal/Outgoing.java54
-rw-r--r--java/src/IceInternal/OutgoingAsync.java395
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java138
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java230
-rw-r--r--java/src/IceInternal/RequestHandler.java3
-rw-r--r--java/src/IceInternal/TcpEndpointI.java6
-rw-r--r--java/src/IceInternal/TcpTransceiver.java1
-rw-r--r--java/src/IceInternal/ThreadPool.java58
-rw-r--r--java/src/IceInternal/Transceiver.java8
-rw-r--r--java/src/IceInternal/UdpEndpointI.java6
-rw-r--r--java/src/IceInternal/UdpTransceiver.java1
-rw-r--r--java/src/IceSSL/EndpointI.java6
-rw-r--r--java/src/IceSSL/TransceiverI.java1
25 files changed, 713 insertions, 785 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
index ca68d1c2382..31495520034 100644
--- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
+++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
@@ -15,21 +15,29 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc
public final void __invoke(Ice.ObjectPrx prx)
{
- Ice._ObjectDel delegate;
- IceInternal.RequestHandler handler;
+ __acquire(prx);
try
{
+ //
+ // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
+ // requests were queued with the connection, they would be lost without being noticed.
+ //
+ Ice._ObjectDel delegate = null;
+ int cnt = -1; // Don't retry.
Ice.ObjectPrxHelperBase proxy = (Ice.ObjectPrxHelperBase)prx;
- __prepare(proxy.__reference().getInstance());
- delegate = proxy.__getDelegate(true);
- handler = delegate.__getRequestHandler();
+ try
+ {
+ delegate = proxy.__getDelegate(true);
+ delegate.__getRequestHandler().flushAsyncBatchRequests(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ cnt = proxy.__handleException(delegate, ex, cnt);
+ }
}
catch(Ice.LocalException ex)
{
- __finished(ex);
- return;
+ __release(ex);
}
-
- handler.flushAsyncBatchRequests(this);
}
}
diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java
index 1965ac41b48..b891e74375a 100644
--- a/java/src/Ice/AMI_Object_ice_invoke.java
+++ b/java/src/Ice/AMI_Object_ice_invoke.java
@@ -17,18 +17,18 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync
public final void __invoke(Ice.ObjectPrx prx, String operation, OperationMode mode,
byte[] inParams, java.util.Map context)
{
+ __acquire(prx);
try
{
__prepare(prx, operation, mode, context);
__os.writeBlob(inParams);
__os.endWriteEncaps();
+ __send();
}
catch(LocalException ex)
{
- __finished(ex);
- return;
+ __release(ex);
}
- __send();
}
protected final void __response(boolean ok) // ok == true means no user exception.
@@ -45,5 +45,6 @@ public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync
return;
}
ice_response(ok, outParams);
+ __release();
}
}
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index efdb68da8c1..425bdba5d14 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -903,6 +903,7 @@ public final class ConnectionI extends IceInternal.EventHandler
if(_batchRequestNum == 0)
{
+ out.sent(false);
return true;
}
@@ -971,6 +972,7 @@ public final class ConnectionI extends IceInternal.EventHandler
if(_batchRequestNum == 0)
{
+ outAsync.__sent(this);
return;
}
@@ -2035,10 +2037,6 @@ public final class ConnectionI extends IceInternal.EventHandler
return IceInternal.SocketStatus.NeedWrite;
}
}
- catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
- {
- throw ex.get();
- }
catch(Ice.TimeoutException ex)
{
throw new Ice.ConnectTimeoutException();
@@ -2169,21 +2167,10 @@ public final class ConnectionI extends IceInternal.EventHandler
}
- try
+ if(!_transceiver.write(message.stream.getBuffer(), timeout))
{
- if(!_transceiver.write(message.stream.getBuffer(), timeout))
- {
- assert(timeout == 0);
- return false;
- }
- }
- catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
- {
- if(!ex.retry())
- {
- message.sent(this, timeout == 0);
- }
- throw ex.get();
+ assert(timeout == 0);
+ return false;
}
message.sent(this, timeout == 0); // timeout == 0 indicates that this is called by the selector thread.
@@ -2276,25 +2263,14 @@ public final class ConnectionI extends IceInternal.EventHandler
IceInternal.TraceUtil.traceSend(stream, _logger, _traceLevels);
}
- try
+ if(!foreground && _transceiver.write(message.stream.getBuffer(), 0))
{
- if(!foreground && _transceiver.write(message.stream.getBuffer(), 0))
+ message.sent(this, false);
+ if(_acmTimeout > 0)
{
- message.sent(this, false);
- if(_acmTimeout > 0)
- {
- _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
- }
- return false;
+ _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
}
- }
- catch(IceInternal.LocalExceptionWrapper ex) // Java-specific workaround in Transceiver.write().
- {
- if(!ex.retry())
- {
- message.sent(this, false);
- }
- throw ex.get();
+ return false;
}
_sendStreams.addLast(message);
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index adb890ae473..1b39ac340e6 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -941,16 +941,6 @@ public final class ObjectAdapterI implements ObjectAdapter
for(int i = 0; i < endpoints.size(); ++i)
{
IceInternal.EndpointI endp = (IceInternal.EndpointI)endpoints.get(i);
- //
- // TODO: Remove when we no longer support SSL for JDK 1.4.
- //
- if(!_threadPerConnection && endp.requiresThreadPerConnection())
- {
- Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endp.toString();
- throw ex;
- }
-
IceInternal.IncomingConnectionFactory factory =
new IceInternal.IncomingConnectionFactory(instance, endp, this, _name);
_incomingConnectionFactories.add(factory);
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 87c3b86f3d2..4269def1407 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -790,8 +790,8 @@ public class ObjectPrxHelperBase implements ObjectPrx
try
{
__del = __getDelegate(false);
- // Wait for the connection to be established.
- return __del.__getRequestHandler().getConnection(true);
+ return __del.__getRequestHandler().getConnection(true); // Wait for the connection to be established.
+
}
catch(LocalException __ex)
{
@@ -826,24 +826,21 @@ public class ObjectPrxHelperBase implements ObjectPrx
public void
ice_flushBatchRequests()
{
- int __cnt = 0;
- while(true)
+ //
+ // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
+ // requests were queued with the connection, they would be lost without being noticed.
+ //
+ _ObjectDel __del = null;
+ int __cnt = -1; // Don't retry.
+ try
{
- _ObjectDel __del = null;
- try
- {
- __del = __getDelegate(false);
- __del.ice_flushBatchRequests();
- return;
- }
- catch(IceInternal.LocalExceptionWrapper __ex)
- {
- __handleExceptionWrapper(__del, __ex);
- }
- catch(LocalException __ex)
- {
- __cnt = __handleException(__del, __ex, __cnt);
- }
+ __del = __getDelegate(false);
+ __del.ice_flushBatchRequests();
+ return;
+ }
+ catch(LocalException __ex)
+ {
+ __cnt = __handleException(__del, __ex, __cnt);
}
}
@@ -949,6 +946,11 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
}
+ if(cnt == -1) // Don't retry if the retry count is -1.
+ {
+ throw ex;
+ }
+
IceInternal.ProxyFactory proxyFactory;
try
{
diff --git a/java/src/Ice/_ObjectDel.java b/java/src/Ice/_ObjectDel.java
index 8eefc506fca..dcc4feeb334 100644
--- a/java/src/Ice/_ObjectDel.java
+++ b/java/src/Ice/_ObjectDel.java
@@ -27,8 +27,7 @@ public interface _ObjectDel
java.util.Map context)
throws IceInternal.LocalExceptionWrapper;
- void ice_flushBatchRequests()
- throws IceInternal.LocalExceptionWrapper;
+ void ice_flushBatchRequests();
IceInternal.RequestHandler __getRequestHandler();
void __setRequestHandler(IceInternal.RequestHandler handler);
diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java
index 8bc56496b33..a060df4fae3 100644
--- a/java/src/Ice/_ObjectDelM.java
+++ b/java/src/Ice/_ObjectDelM.java
@@ -205,22 +205,9 @@ public class _ObjectDelM implements _ObjectDel
public void
ice_flushBatchRequests()
- throws IceInternal.LocalExceptionWrapper
{
IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(__handler);
- try
- {
- out.invoke();
- }
- catch(Ice.LocalException ex)
- {
- //
- // We never retry flusing the batch requests as the connection batched
- // requests were discarded and the caller needs to be notified of the
- // failure.
- //
- throw new IceInternal.LocalExceptionWrapper(ex, false);
- }
+ out.invoke();
}
public IceInternal.RequestHandler
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java
index 1e0071f047a..eecd4cb692a 100644
--- a/java/src/IceInternal/BatchOutgoing.java
+++ b/java/src/IceInternal/BatchOutgoing.java
@@ -34,23 +34,25 @@ public final class BatchOutgoing implements OutgoingMessageCallback
if(_handler != null && !_handler.flushBatchRequests(this) ||
_connection != null && !_connection.flushBatchRequests(this))
- synchronized(this)
{
- while(_exception == null && !_sent)
+ synchronized(this)
{
- try
+ while(_exception == null && !_sent)
{
- wait();
+ try
+ {
+ wait();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
}
- catch(java.lang.InterruptedException ex)
+
+ if(_exception != null)
{
+ throw _exception;
}
}
-
- if(_exception != null)
- {
- throw _exception;
- }
}
}
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index 94e15c2dca3..b96e3e047cf 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -9,98 +9,18 @@
package IceInternal;
-public abstract class BatchOutgoingAsync implements OutgoingAsyncMessageCallback
+public abstract class BatchOutgoingAsync extends OutgoingAsyncMessageCallback
{
- public
- BatchOutgoingAsync()
- {
- }
-
- public abstract void ice_exception(Ice.LocalException ex);
-
- public final BasicStream
- __os()
- {
- return __os;
- }
-
public final void
__sent(final Ice.ConnectionI connection)
{
- synchronized(_monitor)
- {
- cleanup();
- }
+ __release();
}
public final void
__finished(Ice.LocalException exc)
{
- try
- {
- ice_exception(exc);
- }
- catch(java.lang.Exception ex)
- {
- warning(ex);
- }
- finally
- {
- synchronized(_monitor)
- {
- cleanup();
- }
- }
- }
-
- protected final void
- __prepare(Instance instance)
- {
- synchronized(_monitor)
- {
- while(__os != null)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- assert(__os == null);
- __os = new BasicStream(instance);
- }
- }
-
- private final void
- warning(java.lang.Exception ex)
- {
- if(__os != null) // Don't print anything if cleanup() was already called.
- {
- if(__os.instance().initializationData().properties.getPropertyAsIntWithDefault(
- "Ice.Warn.AMICallback", 1) > 0)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtil.OutputBase out = new IceUtil.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by AMI callback:\n");
- ex.printStackTrace(pw);
- pw.flush();
- __os.instance().initializationData().logger.warning(sw.toString());
- }
- }
- }
-
- private final void
- cleanup()
- {
- __os = null;
- _monitor.notify();
+ __exception(exc);
}
- protected BasicStream __os;
- private final java.lang.Object _monitor = new java.lang.Object();
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 792674f62ac..8bba5a6fc5a 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -35,6 +35,29 @@ public class ConnectRequestHandler
BasicStream os = null;
}
+ public RequestHandler
+ connect()
+ {
+ _reference.getConnection(this);
+
+ synchronized(this)
+ {
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ else if(_connection != null)
+ {
+ return new ConnectionRequestHandler(_reference, _connection, _compress);
+ }
+ else
+ {
+ _updateRequestHandler = true; // The proxy request handler will be updated when the connection is set.
+ return this;
+ }
+ }
+ }
+
public void
prepareBatchRequest(BasicStream os)
{
@@ -114,33 +137,29 @@ public class ConnectRequestHandler
sendRequest(Outgoing out)
throws LocalExceptionWrapper
{
- return (!getConnection(true).sendRequest(out, _compress, _response) || _response) ? _connection : null;
+ if(!getConnection(true).sendRequest(out, _compress, _response) || _response)
+ {
+ return _connection; // The request has been sent or we're expecting a response.
+ }
+ else
+ {
+ return null; // The request hasn't been sent yet.
+ }
}
public void
sendAsyncRequest(OutgoingAsync out)
+ throws LocalExceptionWrapper
{
- try
- {
- synchronized(this)
+ synchronized(this)
+ {
+ if(!initialized())
{
- if(!initialized())
- {
- _requests.add(new Request(out));
- return;
- }
+ _requests.add(new Request(out));
+ return;
}
-
- _connection.sendAsyncRequest(out, _compress, _response);
- }
- catch(LocalExceptionWrapper ex)
- {
- out.__finished(ex);
- }
- catch(Ice.LocalException ex)
- {
- out.__finished(ex);
}
+ _connection.sendAsyncRequest(out, _compress, _response);
}
public boolean
@@ -152,23 +171,15 @@ public class ConnectRequestHandler
public void
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
- try
- {
- synchronized(this)
+ synchronized(this)
+ {
+ if(!initialized())
{
- if(!initialized())
- {
- _requests.add(new Request(out));
- return;
- }
+ _requests.add(new Request(out));
+ return;
}
-
- _connection.flushAsyncBatchRequests(out);
- }
- catch(Ice.LocalException ex)
- {
- out.__finished(ex);
}
+ _connection.flushAsyncBatchRequests(out);
}
public Outgoing
@@ -256,43 +267,46 @@ public class ConnectRequestHandler
// add this proxy to the router info object.
//
RouterInfo ri = _reference.getRouterInfo();
- if(ri != null)
+ if(ri != null && !ri.addProxy(_proxy, this))
{
- if(!ri.addProxy(_proxy, this))
- {
- return; // The request handler will be initialized once addProxy returns.
- }
+ return; // The request handler will be initialized once addProxy returns.
}
+ //
+ // We can now send the queued requests.
+ //
flushRequests();
}
- public void
- setException(Ice.LocalException ex)
+ public synchronized void
+ setException(final Ice.LocalException ex)
{
- synchronized(this)
- {
- assert(!_initialized && _exception == null);
- _exception = ex;
- _proxy = null; // Break cyclic reference count.
- _delegate = null; // Break cyclic reference count.
- notifyAll();
- }
+ assert(!_initialized && _exception == null);
+ assert(_updateRequestHandler || _requests.isEmpty());
- java.util.Iterator p = _requests.iterator();
- while(p.hasNext())
+ _exception = ex;
+ _proxy = null; // Break cyclic reference count.
+ _delegate = null; // Break cyclic reference count.
+
+ //
+ // If some requests were queued, we notify them of the failure. This is done from a thread
+ // from the client thread pool since this will result in ice_exception callbacks to be
+ // called.
+ //
+ if(!_requests.isEmpty())
{
- Request request = (Request)p.next();
- if(request.out != null)
- {
- request.out.__finished(ex);
- }
- else if(request.batchOut != null)
- {
- request.batchOut.__finished(ex);
- }
+ _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ flushRequestsWithException(ex);
+ };
+ });
}
- _requests.clear();
+
+ notifyAll();
}
//
@@ -301,6 +315,10 @@ public class ConnectRequestHandler
public void
addedProxy()
{
+ //
+ // The proxy was added to the router info, we're now ready to send the
+ // queued requests.
+ //
flushRequests();
}
@@ -319,25 +337,6 @@ public class ConnectRequestHandler
_updateRequestHandler = false;
}
- public RequestHandler
- connect()
- {
- _reference.getConnection(this);
-
- synchronized(this)
- {
- if(_connection != null)
- {
- return new ConnectionRequestHandler(_reference, _connection, _compress);
- }
- else
- {
- _updateRequestHandler = true;
- return this;
- }
- }
- }
-
private boolean
initialized()
{
@@ -348,7 +347,7 @@ public class ConnectRequestHandler
}
else
{
- while(_flushing)
+ while(_flushing && _exception == null)
{
try
{
@@ -395,66 +394,92 @@ public class ConnectRequestHandler
//
_flushing = true;
}
-
- java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true
- while(p.hasNext())
+
+ try
{
- Request request = (Request)p.next();
- if(request.out != null)
+ java.util.Iterator p = _requests.iterator(); // _requests is immutable when _flushing = true
+ while(p.hasNext())
{
- try
+ Request request = (Request)p.next();
+ if(request.out != null)
{
_connection.sendAsyncRequest(request.out, _compress, _response);
}
- catch(LocalExceptionWrapper ex)
+ else if(request.batchOut != null)
{
- request.out.__finished(ex);
+ _connection.flushAsyncBatchRequests(request.batchOut);
}
- catch(Ice.LocalException ex)
+ else
{
- request.out.__finished(ex);
+ BasicStream os = new BasicStream(request.os.instance());
+ _connection.prepareBatchRequest(os);
+ try
+ {
+ request.os.pos(0);
+ os.writeBlob(request.os.readBlob(request.os.size()));
+ _connection.finishBatchRequest(os, _compress);
+ }
+ catch(Ice.LocalException ex)
+ {
+ _connection.abortBatchRequest();
+ throw ex;
+ }
}
+ p.remove();
}
- else if(request.batchOut != null)
+ }
+ catch(final LocalExceptionWrapper ex)
+ {
+ synchronized(this)
{
- try
- {
- _connection.flushAsyncBatchRequests(request.batchOut);
- }
- catch(Ice.LocalException ex)
- {
- request.batchOut.__finished(ex);
- }
+ assert(_exception != null && !_requests.isEmpty());
+ _exception = ex.get();
+ _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ flushRequestsWithException(ex);
+ };
+ });
+ return;
}
- else
+ }
+ catch(final Ice.LocalException ex)
+ {
+ synchronized(this)
{
- //
- // TODO: Add sendBatchRequest() method to ConnectionI?
- //
- try
- {
- BasicStream os = new BasicStream(request.os.instance());
- _connection.prepareBatchRequest(os);
- request.os.pos(0);
- os.writeBlob(request.os.readBlob(request.os.size()));
- _connection.finishBatchRequest(os, _compress);
- }
- catch(Ice.LocalException ex)
- {
- _connection.abortBatchRequest();
- _exception = ex;
- }
+ assert(_exception != null && !_requests.isEmpty());
+ _exception = ex;
+ _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ flushRequestsWithException(ex);
+ };
+ });
+ return;
}
}
- _requests.clear();
synchronized(this)
{
+ assert(!_initialized);
_initialized = true;
_flushing = false;
notifyAll();
}
+ //
+ // We've finished sending the queued requests and the request handler now send
+ // the requests over the connection directly. It's time to substitute the
+ // request handler of the proxy with the more efficient connection request
+ // handler which does not have any synchronization. This also breaks the cyclic
+ // reference count with the proxy.
+ //
if(_updateRequestHandler && _exception == null)
{
_proxy.__setRequestHandler(_delegate, new ConnectionRequestHandler(_reference, _connection, _compress));
@@ -463,6 +488,44 @@ public class ConnectRequestHandler
_delegate = null; // Break cyclic reference count.
}
+ void
+ flushRequestsWithException(Ice.LocalException ex)
+ {
+ java.util.Iterator p = _requests.iterator();
+ while(p.hasNext())
+ {
+ Request request = (Request)p.next();
+ if(request.out != null)
+ {
+ request.out.__finished(ex);
+ }
+ else if(request.batchOut != null)
+ {
+ request.batchOut.__finished(ex);
+ }
+ }
+ _requests.clear();
+ }
+
+ void
+ flushRequestsWithException(LocalExceptionWrapper ex)
+ {
+ java.util.Iterator p = _requests.iterator();
+ while(p.hasNext())
+ {
+ Request request = (Request)p.next();
+ if(request.out != null)
+ {
+ request.out.__finished(ex);
+ }
+ else if(request.batchOut != null)
+ {
+ request.batchOut.__finished(ex.get());
+ }
+ }
+ _requests.clear();
+ }
+
private final Reference _reference;
private final boolean _batchAutoFlush;
private Ice.ObjectPrxHelperBase _proxy;
@@ -474,7 +537,7 @@ public class ConnectRequestHandler
private boolean _response;
private Ice.LocalException _exception = null;
- private java.util.ArrayList _requests = new java.util.ArrayList();
+ private java.util.List _requests = new java.util.LinkedList();
private boolean _batchRequestInProgress;
private int _batchRequestsSize;
private BasicStream _batchStream;
diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java
index 2f1b1c031ed..727ad6244d9 100644
--- a/java/src/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/IceInternal/ConnectionRequestHandler.java
@@ -33,24 +33,21 @@ public class ConnectionRequestHandler implements RequestHandler
sendRequest(Outgoing out)
throws LocalExceptionWrapper
{
- return (!_connection.sendRequest(out, _compress, _response) || _response) ? _connection : null;
+ if(!_connection.sendRequest(out, _compress, _response) || _response)
+ {
+ return _connection; // The request has been sent or we're expecting a response.
+ }
+ else
+ {
+ return null; // The request hasn't been sent yet.
+ }
}
public void
sendAsyncRequest(OutgoingAsync out)
+ throws LocalExceptionWrapper
{
- try
- {
- _connection.sendAsyncRequest(out, _compress, _response);
- }
- catch(LocalExceptionWrapper ex)
- {
- out.__finished(ex);
- }
- catch(Ice.LocalException ex)
- {
- out.__finished(ex);
- }
+ _connection.sendAsyncRequest(out, _compress, _response);
}
public boolean
@@ -62,14 +59,7 @@ public class ConnectionRequestHandler implements RequestHandler
public void
flushAsyncBatchRequests(BatchOutgoingAsync out)
{
- try
- {
- _connection.flushAsyncBatchRequests(out);
- }
- catch(Ice.LocalException ex)
- {
- out.__finished(ex);
- }
+ _connection.flushAsyncBatchRequests(out);
}
public Outgoing
diff --git a/java/src/IceInternal/EndpointI.java b/java/src/IceInternal/EndpointI.java
index 8086bf138fd..3be504cada4 100644
--- a/java/src/IceInternal/EndpointI.java
+++ b/java/src/IceInternal/EndpointI.java
@@ -115,13 +115,6 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable
public abstract boolean equals(java.lang.Object obj);
public abstract int compareTo(java.lang.Object obj); // From java.lang.Comparable.
- //
- // Returns true if the endpoint's transport requires thread-per-connection.
- //
- // TODO: Remove this when we no longer support SSL for JDK 1.4.
- //
- public abstract boolean requiresThreadPerConnection();
-
public java.util.List
connectors(java.util.List addresses)
{
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index 2830f59e4b5..4f578b37adb 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -64,6 +64,7 @@ public final class Outgoing implements OutgoingMessageCallback
_state = StateInProgress;
Ice.ConnectionI connection = _handler.sendRequest(this);
+ assert(connection != null);
boolean timedOut = false;
@@ -186,52 +187,33 @@ public final class Outgoing implements OutgoingMessageCallback
case Reference.ModeOneway:
case Reference.ModeDatagram:
{
- try
+ _state = StateInProgress;
+ if(_handler.sendRequest(this) != null)
{
- _state = StateInProgress;
- if(_handler.sendRequest(this) != null)
+ //
+ // If the handler returns the connection, we must wait for the sent callback.
+ //
+ synchronized(this)
{
- //
- // If the handler returns the connection, we must wait for the sent callback.
- //
- synchronized(this)
+ while(_state != StateFailed && !_sent)
{
- while(_state != StateFailed && !_sent)
+ try
{
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
+ wait();
}
-
- if(_exception != null)
+ catch(java.lang.InterruptedException ex)
{
- assert(!_sent);
- throw _exception;
}
}
- }
- return true;
- }
- catch(Ice.LocalException ex) // Java specfic work-around (see ConnectionI.sendRequest())
- {
- if(!_sent) // The send might have failed but the request might still be sent...
- {
- throw ex;
- }
- else
- {
- //
- // We wrap the exception into a LocalExceptionWrapper to indicate that
- // the request cannot be resent without potentially violating the
- // "at-most-once" principle.
- //
- throw new IceInternal.LocalExceptionWrapper(ex, false);
+
+ if(_exception != null)
+ {
+ assert(!_sent);
+ throw _exception;
+ }
}
}
+ return true;
}
case Reference.ModeBatchOneway:
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 184355e8e06..fcf104ed942 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,35 +9,22 @@
package IceInternal;
-public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
+public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
{
- public
- OutgoingAsync()
- {
- }
-
- public abstract void ice_exception(Ice.LocalException ex);
-
- public final BasicStream
- __os()
- {
- return __os;
- }
-
public final void
__sent(final Ice.ConnectionI connection)
{
- synchronized(_monitor)
+ synchronized(__monitor)
{
_sent = true;
if(!_proxy.ice_isTwoway())
{
- cleanup(); // No response expected, we're done with the OutgoingAsync.
+ __release();
}
else if(_response)
{
- _monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
+ __monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
}
else if(connection.timeout() >= 0)
{
@@ -63,7 +50,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
byte replyStatus;
try
{
- synchronized(_monitor)
+ synchronized(__monitor)
{
assert(__os != null);
_response = true;
@@ -77,7 +64,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
{
try
{
- _monitor.wait();
+ __monitor.wait();
}
catch(java.lang.InterruptedException ex)
{
@@ -216,14 +203,8 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
}
catch(java.lang.Exception ex)
{
- warning(ex);
- }
- finally
- {
- synchronized(_monitor)
- {
- cleanup();
- }
+ __warning(ex);
+ __release();
}
}
@@ -231,8 +212,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
public final void
__finished(Ice.LocalException exc)
{
- boolean retry = false;
- synchronized(_monitor)
+ synchronized(__monitor)
{
if(__os != null) // Might be called from __prepare or before __prepare
{
@@ -245,236 +225,213 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
{
try
{
- _monitor.wait();
+ __monitor.wait();
}
catch(java.lang.InterruptedException ex)
{
}
}
-
- //
- // A CloseConnectionException indicates graceful
- // server shutdown, and is therefore always repeatable
- // without violating "at-most-once". That's because by
- // sending a close connection message, the server
- // guarantees that all outstanding requests can safely
- // be repeated. Otherwise, we can also retry if the
- // operation mode is Nonmutating or Idempotent.
- //
- // An ObjectNotExistException can always be retried as
- // well without violating "at-most-once".
- //
- if(!_sent ||
- _mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent ||
- exc instanceof Ice.CloseConnectionException || exc instanceof Ice.ObjectNotExistException)
- {
- retry = true;
- }
- }
- }
-
- if(retry)
- {
- try
- {
- _cnt = _proxy.__handleException(_delegate, exc, _cnt);
- __send();
- return;
- }
- catch(Ice.LocalException ex)
- {
}
}
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
try
{
- ice_exception(exc);
- }
- catch(java.lang.Exception ex)
- {
- warning(ex);
+ handleException(exc); // This will throw if the invocation can't be retried.
+ __send();
}
- finally
+ catch(Ice.LocalException ex)
{
- synchronized(_monitor)
- {
- cleanup();
- }
+ __exception(ex);
}
}
public final void
__finished(LocalExceptionWrapper ex)
{
+ assert(__os != null && !_sent);
+
//
- // NOTE: This is called if sendRequest/sendAsyncRequest fails with
- // a LocalExceptionWrapper exception. It's not possible for the
- // timer to be set at this point because the request couldn't be
- // sent.
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback. The LocalExceptionWrapper exception is only called
+ // before the invocation is sent.
//
- assert(!_sent && _timerTask == null);
try
{
- if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
- {
- _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
- }
- else
- {
- _proxy.__handleExceptionWrapper(_delegate, ex);
- }
+ handleException(ex); // This will throw if the invocation can't be retried.
__send();
}
catch(Ice.LocalException exc)
{
- try
- {
- ice_exception(exc);
- }
- catch(java.lang.Exception exl)
+ __exception(exc);
+ }
+ }
+
+ protected final void
+ __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context)
+ {
+ assert(__os != null);
+
+ _proxy = (Ice.ObjectPrxHelperBase)prx;
+ _delegate = null;
+ _cnt = 0;
+ _mode = mode;
+
+ //
+ // Can't call async via a batch proxy.
+ //
+ if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
+ {
+ throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
+ }
+
+ __os.writeBlob(IceInternal.Protocol.requestHdr);
+
+ Reference ref = _proxy.__reference();
+
+ ref.getIdentity().__write(__os);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ String facet = ref.getFacet();
+ if(facet == null || facet.length() == 0)
+ {
+ __os.writeStringSeq(null);
+ }
+ else
+ {
+ String[] facetPath = { facet };
+ __os.writeStringSeq(facetPath);
+ }
+
+ __os.writeString(operation);
+
+ __os.writeByte((byte)mode.value());
+
+ if(context != null)
+ {
+ //
+ // Explicit context
+ //
+ Ice.ContextHelper.write(__os, context);
+ }
+ else
+ {
+ //
+ // Implicit context
+ //
+ Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
+ java.util.Map prxContext = ref.getContext();
+
+ if(implicitContext == null)
{
- warning(exl);
+ Ice.ContextHelper.write(__os, prxContext);
}
- finally
+ else
{
- synchronized(_monitor)
- {
- cleanup();
- }
+ implicitContext.write(prxContext, __os);
}
}
+
+ __os.startWriteEncaps();
}
protected final void
- __prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map context)
+ __send()
{
- synchronized(_monitor)
+ while(true)
{
try
{
- //
- // We must first wait for other requests to finish.
- //
- while(__os != null)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- //
- // Can't call async via a batch proxy.
- //
- _proxy = (Ice.ObjectPrxHelperBase)prx;
- if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
- {
- throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
- }
-
- _delegate = null;
- _cnt = 0;
- _mode = mode;
_sent = false;
- _response = false;
-
- Reference ref = _proxy.__reference();
- assert(__is == null);
- __is = new BasicStream(ref.getInstance());
- assert(__os == null);
- __os = new BasicStream(ref.getInstance());
-
- __os.writeBlob(IceInternal.Protocol.requestHdr);
-
- ref.getIdentity().__write(__os);
-
- //
- // For compatibility with the old FacetPath.
- //
- String facet = ref.getFacet();
- if(facet == null || facet.length() == 0)
- {
- __os.writeStringSeq(null);
- }
- else
- {
- String[] facetPath = { facet };
- __os.writeStringSeq(facetPath);
- }
-
- __os.writeString(operation);
-
- __os.writeByte((byte)mode.value());
-
- if(context != null)
- {
- //
- // Explicit context
- //
- Ice.ContextHelper.write(__os, context);
- }
- else
- {
- //
- // Implicit context
- //
- Ice.ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
- java.util.Map prxContext = ref.getContext();
-
- if(implicitContext == null)
- {
- Ice.ContextHelper.write(__os, prxContext);
- }
- else
- {
- implicitContext.write(prxContext, __os);
- }
- }
-
- __os.startWriteEncaps();
+ _response = false;
+ _delegate = _proxy.__getDelegate(true);
+ _delegate.__getRequestHandler().sendAsyncRequest(this);
+ return;
+ }
+ catch(LocalExceptionWrapper ex)
+ {
+ handleException(ex);
}
catch(Ice.LocalException ex)
{
- cleanup();
- throw ex;
+ handleException(ex);
}
}
}
- protected final void
- __send()
+ protected abstract void __response(boolean ok);
+
+ private void
+ handleException(LocalExceptionWrapper ex)
{
- //
- // NOTE: no synchronization needed. At this point, no other threads can be calling on this object.
- //
+ if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
+ {
+ _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy.__handleExceptionWrapper(_delegate, ex);
+ }
+ }
- RequestHandler handler;
+ private void
+ handleException(Ice.LocalException exc)
+ {
try
{
- _delegate = _proxy.__getDelegate(true);
- handler = _delegate.__getRequestHandler();
+ //
+ // A CloseConnectionException indicates graceful
+ // server shutdown, and is therefore always repeatable
+ // without violating "at-most-once". That's because by
+ // sending a close connection message, the server
+ // guarantees that all outstanding requests can safely
+ // be repeated.
+ //
+ // An ObjectNotExistException can always be retried as
+ // well without violating "at-most-once".
+ //
+ if(!_sent ||
+ exc instanceof Ice.CloseConnectionException ||
+ exc instanceof Ice.ObjectNotExistException)
+ {
+ throw exc;
+ }
+
+ //
+ // Throw the exception wrapped in a LocalExceptionWrapper, to
+ // indicate that the request cannot be resent without
+ // potentially violating the "at-most-once" principle.
+ //
+ throw new LocalExceptionWrapper(exc, false);
+ }
+ catch(LocalExceptionWrapper ex)
+ {
+ if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
+ {
+ _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy.__handleExceptionWrapper(_delegate, ex);
+ }
}
catch(Ice.LocalException ex)
{
- __finished(ex);
- return;
+ _cnt = _proxy.__handleException(_delegate, ex, _cnt);
}
-
- _sent = false;
- _response = false;
- handler.sendAsyncRequest(this);
}
- protected abstract void __response(boolean ok);
-
private final void
__runTimerTask(Ice.ConnectionI connection)
{
- synchronized(_monitor)
+ synchronized(__monitor)
{
assert(_timerTask != null && _sent); // Can only be set once the request is sent.
@@ -483,7 +440,7 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
connection = null;
}
_timerTask = null;
- _monitor.notifyAll();
+ __monitor.notifyAll();
}
if(connection != null)
@@ -492,49 +449,11 @@ public abstract class OutgoingAsync implements OutgoingAsyncMessageCallback
}
}
- private final void
- warning(java.lang.Exception ex)
- {
- if(__os != null) // Don't print anything if cleanup() was already called.
- {
- Reference ref = _proxy.__reference();
- if(ref.getInstance().initializationData().properties.getPropertyAsIntWithDefault(
- "Ice.Warn.AMICallback", 1) > 0)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- IceUtil.OutputBase out = new IceUtil.OutputBase(pw);
- out.setUseTab(false);
- out.print("exception raised by AMI callback:\n");
- ex.printStackTrace(pw);
- pw.flush();
- ref.getInstance().initializationData().logger.warning(sw.toString());
- }
- }
- }
-
- private final void
- cleanup()
- {
- assert(_timerTask == null);
-
- __is = null;
- __os = null;
-
- _monitor.notify();
- }
-
- protected BasicStream __is;
- protected BasicStream __os;
-
private boolean _sent;
private boolean _response;
private Ice.ObjectPrxHelperBase _proxy;
private Ice._ObjectDel _delegate;
private int _cnt;
private Ice.OperationMode _mode;
-
private TimerTask _timerTask;
-
- private final java.lang.Object _monitor = new java.lang.Object();
}
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index 2575dd33b38..677075f59e9 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -9,8 +9,140 @@
package IceInternal;
-public interface OutgoingAsyncMessageCallback
+abstract public class OutgoingAsyncMessageCallback
{
- void __sent(Ice.ConnectionI connection);
- void __finished(Ice.LocalException ex);
+ public abstract void __sent(Ice.ConnectionI connection);
+ public abstract void __finished(Ice.LocalException ex);
+ public abstract void ice_exception(Ice.LocalException ex);
+
+ public final BasicStream
+ __os()
+ {
+ return __os;
+ }
+
+ public void
+ __exception(Ice.LocalException exc)
+ {
+ try
+ {
+ ice_exception(exc);
+ }
+ catch(java.lang.Exception ex)
+ {
+ __warning(ex);
+ }
+ finally
+ {
+ __release();
+ }
+ }
+
+ protected synchronized void
+ finalize()
+ throws Throwable
+ {
+ assert(__os == null);
+ assert(__is == null);
+ }
+
+ protected void
+ __acquire(Ice.ObjectPrx proxy)
+ {
+ synchronized(__monitor)
+ {
+ //
+ // We must first wait for other requests to finish.
+ //
+ while(__os != null)
+ {
+ try
+ {
+ __monitor.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ Reference ref = ((Ice.ObjectPrxHelperBase)proxy).__reference();
+ assert(__is == null);
+ __is = new BasicStream(ref.getInstance());
+ assert(__os == null);
+ __os = new BasicStream(ref.getInstance());
+ }
+ }
+
+ protected void
+ __release(final Ice.LocalException ex)
+ {
+ synchronized(__monitor)
+ {
+ assert(__os != null);
+
+ //
+ // This is called by the invoking thread to release the callback following a direct
+ // failure to marhsall/send the request. We call the ice_exception() callback with
+ // the thread pool to avoid potential deadlocks in case the invoking thread locked
+ // some mutexes/resources (which couldn't be re-acquired by the callback).
+ //
+
+ try
+ {
+ __os.instance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ {
+ public void
+ execute(ThreadPool threadPool)
+ {
+ threadPool.promoteFollower();
+ __exception(ex);
+ }
+ });
+ }
+ catch(Ice.CommunicatorDestroyedException exc)
+ {
+ __release();
+ throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly.
+ }
+ }
+ }
+
+ protected void
+ __release()
+ {
+ synchronized(__monitor)
+ {
+ assert(__is != null);
+ __is = null;
+
+ assert(__os != null);
+ __os = null;
+
+ __monitor.notify();
+ }
+ }
+
+ protected void
+ __warning(java.lang.Exception ex)
+ {
+ if(__os != null) // Don't print anything if release() was already called.
+ {
+ Instance instance = __os.instance();
+ if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ IceUtil.OutputBase out = new IceUtil.OutputBase(pw);
+ out.setUseTab(false);
+ out.print("exception raised by AMI callback:\n");
+ ex.printStackTrace(pw);
+ pw.flush();
+ instance.initializationData().logger.warning(sw.toString());
+ }
+ }
+ }
+
+ protected final java.lang.Object __monitor = new java.lang.Object();
+ protected BasicStream __is;
+ protected BasicStream __os;
}; \ No newline at end of file
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 601b522bf10..107ec4986a7 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -55,7 +55,7 @@ public final class OutgoingConnectionFactory
// anymore. Only then we can be sure the _connections
// contains all connections.
//
- while(!_destroyed || !_pending.isEmpty() || !_pendingEndpoints.isEmpty())
+ while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0)
{
try
{
@@ -100,6 +100,7 @@ public final class OutgoingConnectionFactory
// methods on member objects.
//
_connections = null;
+ _connectionsByEndpoint = null;
}
}
@@ -110,20 +111,6 @@ public final class OutgoingConnectionFactory
assert(endpts.length > 0);
//
- // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove
- // the threadPerConnection argument.
- //
- for(int i = 0; i < endpts.length; i++)
- {
- if(!tpc && endpts[i].requiresThreadPerConnection())
- {
- Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString();
- throw ex;
- }
- }
-
- //
// Apply the overrides.
//
java.util.List endpoints = applyOverrides(endpts);
@@ -271,20 +258,6 @@ public final class OutgoingConnectionFactory
assert(endpts.length > 0);
//
- // TODO: Remove when we no longer support SSL for JDK 1.4. We can also remove
- // the threadPerConnection argument.
- //
- for(int i = 0; i < endpts.length; i++)
- {
- if(!tpc && endpts[i].requiresThreadPerConnection())
- {
- Ice.FeatureNotSupportedException ex = new Ice.FeatureNotSupportedException();
- ex.unsupportedFeature = "endpoint requires thread-per-connection:\n" + endpts[i].toString();
- throw ex;
- }
- }
-
- //
// Apply the overrides.
//
java.util.List endpoints = applyOverrides(endpts);
@@ -292,16 +265,24 @@ public final class OutgoingConnectionFactory
//
// Try to find a connection to one of the given endpoints.
//
- Ice.BooleanHolder compress = new Ice.BooleanHolder();
- Ice.ConnectionI connection = findConnection(endpoints, tpc, compress);
- if(connection != null)
+ try
+ {
+ Ice.BooleanHolder compress = new Ice.BooleanHolder();
+ Ice.ConnectionI connection = findConnection(endpoints, tpc, compress);
+ if(connection != null)
+ {
+ callback.setConnection(connection, compress.value);
+ return;
+ }
+ }
+ catch(Ice.LocalException ex)
{
- callback.setConnection(connection, compress.value);
+ callback.setException(ex);
return;
}
ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType, tpc);
- cb.getConnection();
+ cb.getConnectors();
}
public synchronized void
@@ -456,6 +437,9 @@ public final class OutgoingConnectionFactory
{
IceUtil.Assert.FinalizerAssert(_destroyed);
IceUtil.Assert.FinalizerAssert(_connections == null);
+ IceUtil.Assert.FinalizerAssert(_connectionsByEndpoint == null);
+ IceUtil.Assert.FinalizerAssert(_pendingConnectCount == 0);
+ IceUtil.Assert.FinalizerAssert(_pending.isEmpty());
super.finalize();
}
@@ -486,6 +470,11 @@ public final class OutgoingConnectionFactory
synchronized private Ice.ConnectionI
findConnection(java.util.List endpoints, boolean tpc, Ice.BooleanHolder compress)
{
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
+
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
assert(!endpoints.isEmpty());
@@ -573,25 +562,29 @@ public final class OutgoingConnectionFactory
}
synchronized private void
- addPendingEndpoints(java.util.List endpoints)
+ incPendingConnectCount()
{
+ //
+ // Keep track of the number of pending connects. The outgoing connection factory
+ // waitUntilFinished() method waits for all the pending connects to terminate before
+ // to return. This ensures that the communicator client thread pool isn't destroyed
+ // too soon and will still be available to execute the ice_exception() callbacks for
+ // the asynchronous requests waiting on a connection to be established.
+ //
+
if(_destroyed)
{
throw new Ice.CommunicatorDestroyedException();
}
- _pendingEndpoints.addAll(endpoints);
+ ++_pendingConnectCount;
}
synchronized private void
- removePendingEndpoints(java.util.List endpoints)
+ decPendingConnectCount()
{
- java.util.Iterator p = endpoints.iterator();
- while(p.hasNext())
- {
- _pendingEndpoints.remove(p.next());
- }
-
- if(_destroyed)
+ --_pendingConnectCount;
+ assert(_pendingConnectCount >= 0);
+ if(_destroyed && _pendingConnectCount == 0)
{
notifyAll();
}
@@ -963,8 +956,7 @@ public final class OutgoingConnectionFactory
public boolean threadPerConnection;
}
- private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors,
- ThreadPoolWorkItem
+ private static class ConnectCallback implements Ice.ConnectionI.StartCallback, EndpointI_connectors
{
ConnectCallback(OutgoingConnectionFactory f, java.util.List endpoints, boolean more,
CreateConnectionCallback cb, Ice.EndpointSelectionType selType, boolean threadPerConnection)
@@ -984,8 +976,6 @@ public final class OutgoingConnectionFactory
public synchronized void
connectionStartCompleted(Ice.ConnectionI connection)
{
- assert(_exception == null && connection == _connection);
-
boolean compress;
DefaultsAndOverrides defaultsAndOverrides = _factory._instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideCompress)
@@ -996,19 +986,34 @@ public final class OutgoingConnectionFactory
{
compress = _current.endpoint.compress();
}
-
+
_factory.finishGetConnection(_connectors, this, connection);
- _factory.removePendingEndpoints(_endpoints);
_callback.setConnection(connection, compress);
+ _factory.decPendingConnectCount(); // Must be called last.
}
public synchronized void
connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)
{
- assert(_exception == null && connection == _connection);
+ assert(_current != null);
- _exception = ex;
- handleException();
+ _factory.handleException(ex, _current, connection, _hasMore || _iter.hasNext());
+ if(ex instanceof Ice.CommunicatorDestroyedException) // No need to continue.
+ {
+ _factory.finishGetConnection(_connectors, this, null);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
+ else if(_iter.hasNext()) // Try the next connector.
+ {
+ nextConnector();
+ }
+ else
+ {
+ _factory.finishGetConnection(_connectors, this, null);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
+ }
}
//
@@ -1033,8 +1038,7 @@ public final class OutgoingConnectionFactory
if(_endpointsIter.hasNext())
{
- _currentEndpoint = (EndpointI)_endpointsIter.next();
- _currentEndpoint.connectors_async(this);
+ nextEndpoint();
}
else
{
@@ -1055,8 +1059,7 @@ public final class OutgoingConnectionFactory
_factory.handleException(ex, _hasMore || _endpointsIter.hasNext());
if(_endpointsIter.hasNext())
{
- _currentEndpoint = (EndpointI)_endpointsIter.next();
- _currentEndpoint.connectors_async(this);
+ nextEndpoint();
}
else if(!_connectors.isEmpty())
{
@@ -1069,46 +1072,56 @@ public final class OutgoingConnectionFactory
}
else
{
- _exception = ex;
- _factory._instance.clientThreadPool().execute(this);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
}
}
- //
- // Methods from ThreadPoolWorkItem
- //
- public void
- execute(ThreadPool threadPool)
+ void
+ getConnectors()
{
- threadPool.promoteFollower();
- assert(_exception != null);
- _factory.removePendingEndpoints(_endpoints);
- _callback.setException(_exception);
+ try
+ {
+ //
+ // Notify the factory that there's an async connect pending. This is necessary
+ // to prevent the outgoing connection factory to be destroyed before all the
+ // pending asynchronous connects are finished.
+ //
+ _factory.incPendingConnectCount();
+ }
+ catch(Ice.LocalException ex)
+ {
+ _callback.setException(ex);
+ return;
+ }
+
+ nextEndpoint();
}
void
- getConnection()
+ nextEndpoint()
{
- //
- // First, get the connectors for all the endpoints.
- //
- if(_endpointsIter.hasNext())
+ try
{
- try
- {
- _factory.addPendingEndpoints(_endpoints);
- _currentEndpoint = (EndpointI)_endpointsIter.next();
- _currentEndpoint.connectors_async(this);
- }
- catch(Ice.LocalException ex)
- {
- _callback.setException(ex);
- }
- return;
+ assert(_endpointsIter.hasNext());
+ _currentEndpoint = (EndpointI)_endpointsIter.next();
+ _currentEndpoint.connectors_async(this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
}
+ }
+ void
+ getConnection()
+ {
try
{
+ //
+ // If all the connectors have been created, we ask the factory to get a
+ // connection.
+ //
Ice.BooleanHolder compress = new Ice.BooleanHolder();
Ice.ConnectionI connection = _factory.getConnection(_connectors, this, compress);
if(connection == null)
@@ -1122,54 +1135,30 @@ public final class OutgoingConnectionFactory
return;
}
- _factory.removePendingEndpoints(_endpoints);
_callback.setConnection(connection, compress.value);
+ _factory.decPendingConnectCount(); // Must be called last.
}
catch(Ice.LocalException ex)
{
- _exception = ex;
- _factory._instance.clientThreadPool().execute(this);
+ _callback.setException(ex);
+ _factory.decPendingConnectCount(); // Must be called last.
}
}
void
nextConnector()
{
- _current = (ConnectorInfo)_iter.next();
+ Ice.ConnectionI connection = null;
try
{
- _exception = null;
- _connection = _factory.createConnection(_current.connector.connect(0), _current);
- _connection.start(this);
+ assert(_iter.hasNext());
+ _current = (ConnectorInfo)_iter.next();
+ connection = _factory.createConnection(_current.connector.connect(0), _current);
+ connection.start(this);
}
catch(Ice.LocalException ex)
{
- _exception = ex;
- handleException();
- }
- }
-
- private void
- handleException()
- {
- assert(_current != null && _exception != null);
-
- _factory.handleException(_exception, _current, _connection, _hasMore || _iter.hasNext());
- if(_exception instanceof Ice.CommunicatorDestroyedException) // No need to continue.
- {
- _factory.finishGetConnection(_connectors, this, null);
- _factory.removePendingEndpoints(_endpoints);
- _callback.setException(_exception);
- }
- else if(_iter.hasNext()) // Try the next connector.
- {
- nextConnector();
- }
- else
- {
- _factory.finishGetConnection(_connectors, this, null);
- _factory.removePendingEndpoints(_endpoints);
- _callback.setException(_exception);
+ connectionStartFailed(connection, ex);
}
}
@@ -1184,16 +1173,13 @@ public final class OutgoingConnectionFactory
private java.util.List _connectors = new java.util.ArrayList();
private java.util.Iterator _iter;
private ConnectorInfo _current;
- private Ice.LocalException _exception;
- private Ice.ConnectionI _connection;
}
private final Instance _instance;
private boolean _destroyed;
private java.util.HashMap _connections = new java.util.HashMap();
- private java.util.HashMap _pending = new java.util.HashMap();
-
private java.util.HashMap _connectionsByEndpoint = new java.util.HashMap();
- private java.util.LinkedList _pendingEndpoints = new java.util.LinkedList();
+ private java.util.HashMap _pending = new java.util.HashMap();
+ private int _pendingConnectCount = 0;
}
diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java
index 49df9e6bc8d..e3a9efd7307 100644
--- a/java/src/IceInternal/RequestHandler.java
+++ b/java/src/IceInternal/RequestHandler.java
@@ -18,7 +18,8 @@ public interface RequestHandler
Ice.ConnectionI sendRequest(Outgoing out)
throws LocalExceptionWrapper;
- void sendAsyncRequest(OutgoingAsync out);
+ void sendAsyncRequest(OutgoingAsync out)
+ throws LocalExceptionWrapper;
boolean flushBatchRequests(BatchOutgoing out);
void flushAsyncBatchRequests(BatchOutgoingAsync out);
diff --git a/java/src/IceInternal/TcpEndpointI.java b/java/src/IceInternal/TcpEndpointI.java
index 90b3073b3ce..e4a62706b19 100644
--- a/java/src/IceInternal/TcpEndpointI.java
+++ b/java/src/IceInternal/TcpEndpointI.java
@@ -482,12 +482,6 @@ final class TcpEndpointI extends EndpointI
return _host.compareTo(p._host);
}
- public boolean
- requiresThreadPerConnection()
- {
- return false;
- }
-
public java.util.List
connectors(java.util.List addresses)
{
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index 533b91adfc7..bfa7c3ced27 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -179,7 +179,6 @@ final class TcpTransceiver implements Transceiver
public boolean
write(Buffer buf, int timeout)
- throws LocalExceptionWrapper
{
while(writeBuffer(buf.b))
{
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 6306ca10475..2ba09235fec 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -145,7 +145,6 @@ public final class ThreadPool
assert(!_destroyed);
assert(_handlerMap.isEmpty());
- assert(_changes.isEmpty());
assert(_workItems.isEmpty());
_destroyed = true;
setInterrupt();
@@ -197,7 +196,10 @@ public final class ThreadPool
public synchronized void
execute(ThreadPoolWorkItem workItem)
{
- assert(!_destroyed);
+ if(_destroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
_workItems.add(workItem);
setInterrupt();
}
@@ -529,9 +531,21 @@ public final class ThreadPool
// 3. A work item has been scheduled.
//
- // Thread pool destroyed?
- //
- if(_destroyed)
+ if(!_workItems.isEmpty())
+ {
+ //
+ // Work items must be executed first even if the thread pool is destroyed.
+ //
+
+ //
+ // Remove the interrupt channel from the selected key set.
+ //
+ _keys.remove(_fdIntrReadKey);
+ clearInterrupt();
+ assert(!_workItems.isEmpty());
+ workItem = (ThreadPoolWorkItem)_workItems.removeFirst();
+ }
+ else if(_destroyed)
{
if(TRACE_SHUTDOWN)
{
@@ -539,27 +553,22 @@ public final class ThreadPool
}
//
- // Don't clear the interrupt fd if
- // destroyed, so that the other threads
- // exit as well.
+ // Don't clear the interrupt fd if destroyed, so that the other threads exit as well.
//
return true;
}
-
- //
- // Remove the interrupt channel from the
- // selected key set.
- //
- _keys.remove(_fdIntrReadKey);
-
- clearInterrupt();
-
- //
- // An event handler must have been registered
- // or unregistered.
- //
- if(!_changes.isEmpty())
+ else
{
+ //
+ // Remove the interrupt channel from the selected key set.
+ //
+ _keys.remove(_fdIntrReadKey);
+ clearInterrupt();
+
+ //
+ // An event handler must have been registered or unregistered.
+ //
+ assert(!_changes.isEmpty());
FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
if(change.handler != null) // Addition if handler is set.
@@ -627,11 +636,6 @@ public final class ThreadPool
// outside the thread synchronization.
}
}
- else
- {
- assert(!_workItems.isEmpty());
- workItem = (ThreadPoolWorkItem)_workItems.removeFirst();
- }
}
else
{
diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java
index 7dce61206b8..fbc61da6ff7 100644
--- a/java/src/IceInternal/Transceiver.java
+++ b/java/src/IceInternal/Transceiver.java
@@ -40,13 +40,7 @@ public interface Transceiver
// block until all the data is written or the specified timeout
// expires.
//
- // NOTE: In Java, write() can raise LocalExceptionWrapper to indicate that
- // retrying may not be safe, which is necessary to address an issue
- // in the IceSSL implementation for JDK 1.4. We can remove this if
- // we ever drop support for JDK 1.4 (also see Ice.ConnectionI).
- //
- boolean write(Buffer buf, int timeout)
- throws LocalExceptionWrapper;
+ boolean write(Buffer buf, int timeout);
//
// Read data.
diff --git a/java/src/IceInternal/UdpEndpointI.java b/java/src/IceInternal/UdpEndpointI.java
index 1d635a7dff2..190fdf061c2 100644
--- a/java/src/IceInternal/UdpEndpointI.java
+++ b/java/src/IceInternal/UdpEndpointI.java
@@ -703,12 +703,6 @@ final class UdpEndpointI extends EndpointI
return _host.compareTo(p._host);
}
- public boolean
- requiresThreadPerConnection()
- {
- return false;
- }
-
public java.util.List
connectors(java.util.List addresses)
{
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index e04d4f0b84b..cebd0876374 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -94,7 +94,6 @@ final class UdpTransceiver implements Transceiver
public boolean
write(Buffer buf, int timeout)
- throws LocalExceptionWrapper
{
assert(buf.b.position() == 0);
final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead);
diff --git a/java/src/IceSSL/EndpointI.java b/java/src/IceSSL/EndpointI.java
index f2a66c95f13..84a94885599 100644
--- a/java/src/IceSSL/EndpointI.java
+++ b/java/src/IceSSL/EndpointI.java
@@ -483,12 +483,6 @@ final class EndpointI extends IceInternal.EndpointI
return _host.compareTo(p._host);
}
- public boolean
- requiresThreadPerConnection()
- {
- return false;
- }
-
public java.util.List
connectors(java.util.List addresses)
{
diff --git a/java/src/IceSSL/TransceiverI.java b/java/src/IceSSL/TransceiverI.java
index defae854a84..97ce62c558c 100644
--- a/java/src/IceSSL/TransceiverI.java
+++ b/java/src/IceSSL/TransceiverI.java
@@ -226,7 +226,6 @@ final class TransceiverI implements IceInternal.Transceiver
//
public synchronized boolean
write(IceInternal.Buffer buf, int timeout)
- throws IceInternal.LocalExceptionWrapper
{
//
// If the handshake isn't completed yet, we shouldn't be writing.