summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2008-02-14 18:08:59 +0100
committerBenoit Foucher <benoit@zeroc.com>2008-02-14 18:08:59 +0100
commita95ccc73ef5572109532f3e94498d31df106195d (patch)
tree0062b9c56c133da43a9c7ff5536e9d5b5dbaae89 /java/src
parentMerge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff)
downloadice-a95ccc73ef5572109532f3e94498d31df106195d.tar.bz2
ice-a95ccc73ef5572109532f3e94498d31df106195d.tar.xz
ice-a95ccc73ef5572109532f3e94498d31df106195d.zip
- Fixed bug 2688
- Fixed bug 2674 - Changed connection validation to always use non-blocking IO (bug 1981) - Added distribution/src/common/RELEASE_NOTES.txt - Moved distribution/src/windows/README.DEMOS to distribution/src/common
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AMI_Object_ice_flushBatchRequests.java2
-rw-r--r--java/src/Ice/ConnectionI.java282
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java50
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java4
-rw-r--r--java/src/IceInternal/OutgoingAsync.java68
-rw-r--r--java/src/IceInternal/ProxyFactory.java43
-rw-r--r--java/src/IceInternal/ThreadPool.java33
7 files changed, 262 insertions, 220 deletions
diff --git a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
index 5bd0bad9b44..67eeeeef24d 100644
--- a/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
+++ b/java/src/Ice/AMI_Object_ice_flushBatchRequests.java
@@ -32,7 +32,7 @@ public abstract class AMI_Object_ice_flushBatchRequests extends IceInternal.Batc
}
catch(Ice.LocalException ex)
{
- cnt = proxy.__handleException(delegate, ex, cnt);
+ cnt = proxy.__handleException(delegate, ex, null, cnt);
}
}
catch(Ice.LocalException ex)
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index de67944b223..3005bdff402 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -18,6 +18,23 @@ public final class ConnectionI extends IceInternal.EventHandler
void connectionStartFailed(ConnectionI connection, Ice.LocalException ex);
}
+ public class CallFinished implements IceInternal.ThreadPoolWorkItem
+ {
+ public
+ CallFinished(ConnectionI connection)
+ {
+ _connection = connection;
+ }
+
+ public void
+ execute(IceInternal.ThreadPool threadPool)
+ {
+ _connection.finished(threadPool);
+ }
+
+ final private ConnectionI _connection;
+ }
+
public void
start(StartCallback callback)
{
@@ -35,92 +52,59 @@ public final class ConnectionI extends IceInternal.EventHandler
assert(_exception != null);
throw _exception;
}
+ }
+ if(_threadPerConnection)
+ {
//
// In thread per connection mode, we create the thread for the connection. The
// intialization and validation of the connection is taken care of by the thread
- // per connection. If a callback is given, no need to wait, the thread will notify
- // the callback, otherwise wait until the connection is validated.
+ // per connection.
//
- if(_threadPerConnection)
+ try
{
- try
- {
- _thread = new ThreadPerConnection();
- _thread.start();
- }
- catch(java.lang.Exception ex)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- _logger.error("cannot create thread for connection:\n" + sw.toString());
-
- //
- // Clean up.
- //
- _thread = null;
- _state = StateClosed;
-
- Ice.SyscallException e = new Ice.SyscallException();
- e.initCause(ex);
- throw e;
- }
-
- if(callback == null) // Wait for the connection to be validated.
- {
- while(_state <= StateNotValidated)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- if(_state >= StateClosing)
- {
- assert(_exception != null);
- throw _exception;
- }
- }
- return; // We're done.
+ _thread = new ThreadPerConnection();
+ _thread.start();
}
- }
-
- assert(!_threadPerConnection);
-
- //
- // Initialize the connection transceiver and then validate the connection.
- //
- IceInternal.SocketStatus status = initialize();
- if(status == IceInternal.SocketStatus.Finished)
- {
- status = validate();
- }
+ catch(java.lang.Exception ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ _logger.error("cannot create thread for connection:\n" + sw.toString());
+
+ //
+ // Clean up.
+ //
+ _thread = null;
- if(status == IceInternal.SocketStatus.Finished)
- {
- finishStart(null);
- return; // We're done!
+ Ice.SyscallException e = new Ice.SyscallException();
+ e.initCause(ex);
+ throw e;
+ }
}
-
- //
- // If the initialization or validation couldn't be completed without potentially
- // blocking, we register the connection with the selector thread and return.
- //
-
- synchronized(this)
+ else
{
- if(_state == StateClosed)
+ //
+ // Initialize the connection transceiver and then validate the connection.
+ //
+ IceInternal.SocketStatus status = initialize(0);
+ if(status == IceInternal.SocketStatus.Finished)
{
- assert(_exception != null);
- throw _exception;
+ status = validate(0);
}
-
+
+ if(status == IceInternal.SocketStatus.Finished)
+ {
+ finishStart(null);
+ return; // We're done!
+ }
+
+ //
+ // If the initialization or validation couldn't be completed without potentially
+ // blocking, we register the connection with the selector thread and return.
+ //
int timeout;
IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
if(defaultsAndOverrides.overrideConnectTimeout)
@@ -131,49 +115,52 @@ public final class ConnectionI extends IceInternal.EventHandler
{
timeout = _endpoint.timeout();
}
-
- _sendInProgress = true;
- _selectorThread._register(_transceiver.fd(), this, status, timeout);
- }
- }
- catch(Ice.LocalException ex)
- {
- synchronized(this)
- {
- setState(StateClosed, ex);
- //
- // If start is called with a callback, the callback is notified either by the
- // thread per conncetion or the thread pool.
- //
- if(callback != null)
+ synchronized(this)
{
- if(!_threadPerConnection)
+ if(_state == StateClosed)
{
- registerWithPool();
- unregisterWithPool(); // Let finished() do the close.
+ assert(_exception != null);
+ throw _exception;
}
- return;
+ _sendInProgress = true;
+ _selectorThread._register(_transceiver.fd(), this, status, timeout);
}
-
- //
- // Close the transceiver if there's no thread per connection. Otherwise, wait
- // for the thread per connection to take care of it.
- //
- if(_thread == null && _transceiver != null)
+ }
+
+ if(callback == null) // Wait for the connection to be validated.
+ {
+ synchronized(this)
{
- try
+ while(_state <= StateNotValidated)
{
- _transceiver.close();
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
- catch(LocalException e)
+
+ if(_state >= StateClosing)
{
- // Here we ignore any exceptions in close().
+ assert(_exception != null);
+ throw _exception;
}
- _transceiver = null;
}
}
-
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(this)
+ {
+ setState(StateClosed, ex);
+ if(callback != null)
+ {
+ return;
+ }
+ }
waitUntilFinished();
throw ex;
}
@@ -1258,7 +1245,7 @@ public final class ConnectionI extends IceInternal.EventHandler
if(state == StateNotInitialized)
{
- IceInternal.SocketStatus status = initialize();
+ IceInternal.SocketStatus status = initialize(0);
if(status != IceInternal.SocketStatus.Finished)
{
return status;
@@ -1267,7 +1254,7 @@ public final class ConnectionI extends IceInternal.EventHandler
if(state <= StateNotValidated)
{
- IceInternal.SocketStatus status = validate();
+ IceInternal.SocketStatus status = validate(0);
if(status != IceInternal.SocketStatus.Finished)
{
return status;
@@ -1308,8 +1295,15 @@ public final class ConnectionI extends IceInternal.EventHandler
}
else
{
- registerWithPool();
- unregisterWithPool(); // Let finished() do the close.
+ if(!_registeredWithPool)
+ {
+ _threadPool.execute(new CallFinished(this));
+ ++_finishedCount; // For each unregistration, finished() is called once.
+ }
+ else
+ {
+ unregisterWithPool();
+ }
}
notifyAll();
@@ -1654,7 +1648,7 @@ public final class ConnectionI extends IceInternal.EventHandler
_transceiver.shutdownWrite();
}
- else if(_state <= StateNotValidated || _threadPerConnection)
+ else if(_threadPerConnection)
{
//
// If we are in thread per connection mode and the thread is started, we
@@ -1665,8 +1659,15 @@ public final class ConnectionI extends IceInternal.EventHandler
}
else
{
- registerWithPool();
- unregisterWithPool(); // Let finished() do the close.
+ if(!_registeredWithPool)
+ {
+ _threadPool.execute(new CallFinished(this));
+ ++_finishedCount; // For each unregistration, finished() is called once.
+ }
+ else
+ {
+ unregisterWithPool();
+ }
//
// Prevent further writes.
@@ -1751,28 +1752,14 @@ public final class ConnectionI extends IceInternal.EventHandler
}
private IceInternal.SocketStatus
- initialize()
+ initialize(int timeout)
{
- int timeout = 0;
- if(_startCallback == null || _threadPerConnection)
- {
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
- }
-
try
{
IceInternal.SocketStatus status = _transceiver.initialize(timeout);
if(status != IceInternal.SocketStatus.Finished)
{
- if(_startCallback == null || _threadPerConnection)
+ if(timeout != 0)
{
throw new Ice.TimeoutException();
}
@@ -1804,24 +1791,10 @@ public final class ConnectionI extends IceInternal.EventHandler
}
private IceInternal.SocketStatus
- validate()
+ validate(int timeout)
{
if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
{
- int timeout = 0;
- if(_startCallback == null || _threadPerConnection)
- {
- IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- if(defaultsAndOverrides.overrideConnectTimeout)
- {
- timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
- }
- else
- {
- timeout = _endpoint.timeout();
- }
- }
-
if(_adapter != null) // The server side has the active role for connection validation.
{
IceInternal.BasicStream os = _stream;
@@ -1841,14 +1814,14 @@ public final class ConnectionI extends IceInternal.EventHandler
else
{
// The stream can only be non-empty if we're doing a non-blocking connection validation.
- assert(_startCallback != null && !_threadPerConnection);
+ assert(!_threadPerConnection);
}
try
{
if(!_transceiver.write(os.getBuffer(), timeout))
{
- if(_startCallback == null || _threadPerConnection)
+ if(timeout != 0)
{
throw new Ice.TimeoutException();
}
@@ -1871,14 +1844,14 @@ public final class ConnectionI extends IceInternal.EventHandler
else
{
// The stream can only be non-empty if we're doing a non-blocking connection validation.
- assert(_startCallback != null && !_threadPerConnection);
+ assert(!_threadPerConnection);
}
try
{
if(!_transceiver.read(is.getBuffer(), timeout, _hasMoreData))
{
- if(_startCallback == null || _threadPerConnection)
+ if(timeout != 0)
{
throw new Ice.TimeoutException();
}
@@ -2430,10 +2403,21 @@ public final class ConnectionI extends IceInternal.EventHandler
IceInternal.SocketStatus status;
- status = initialize();
+ int timeout;
+ IceInternal.DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideConnectTimeout)
+ {
+ timeout = defaultsAndOverrides.overrideConnectTimeoutValue;
+ }
+ else
+ {
+ timeout = _endpoint.timeout();
+ }
+
+ status = initialize(timeout);
assert(status == IceInternal.SocketStatus.Finished);
- status = validate();
+ status = validate(timeout);
assert(status == IceInternal.SocketStatus.Finished);
}
catch(LocalException ex)
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 2a84c5690c0..5621fbbbbbb 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -70,11 +70,11 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
catch(IceInternal.LocalExceptionWrapper __ex)
{
- __cnt = __handleExceptionWrapperRelaxed(__del, __ex, __cnt);
+ __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt);
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
}
@@ -111,11 +111,11 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
catch(IceInternal.LocalExceptionWrapper __ex)
{
- __cnt = __handleExceptionWrapperRelaxed(__del, __ex, __cnt);
+ __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt);
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
}
@@ -152,11 +152,11 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
catch(IceInternal.LocalExceptionWrapper __ex)
{
- __cnt = __handleExceptionWrapperRelaxed(__del, __ex, __cnt);
+ __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt);
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
}
@@ -193,11 +193,11 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
catch(IceInternal.LocalExceptionWrapper __ex)
{
- __cnt = __handleExceptionWrapperRelaxed(__del, __ex, __cnt);
+ __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt);
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
}
@@ -237,16 +237,16 @@ public class ObjectPrxHelperBase implements ObjectPrx
{
if(mode == OperationMode.Nonmutating || mode == OperationMode.Idempotent)
{
- __cnt = __handleExceptionWrapperRelaxed(__del, __ex, __cnt);
+ __cnt = __handleExceptionWrapperRelaxed(__del, __ex, null, __cnt);
}
else
{
- __handleExceptionWrapper(__del, __ex);
+ __handleExceptionWrapper(__del, __ex, null);
}
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
}
@@ -715,7 +715,7 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
}
@@ -760,7 +760,7 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
catch(LocalException __ex)
{
- __cnt = __handleException(__del, __ex, __cnt);
+ __cnt = __handleException(__del, __ex, null, __cnt);
}
}
@@ -853,7 +853,7 @@ public class ObjectPrxHelperBase implements ObjectPrx
}
public final int
- __handleException(_ObjectDel delegate, LocalException ex, int cnt)
+ __handleException(_ObjectDel delegate, LocalException ex, IceInternal.OutgoingAsync out, int cnt)
{
//
// Only _delegate needs to be mutex protected here.
@@ -871,10 +871,9 @@ public class ObjectPrxHelperBase implements ObjectPrx
throw ex;
}
- IceInternal.ProxyFactory proxyFactory;
try
{
- proxyFactory = _reference.getInstance().proxyFactory();
+ return _reference.getInstance().proxyFactory().checkRetryAfterException(ex, _reference, out, cnt);
}
catch(CommunicatorDestroyedException e)
{
@@ -885,11 +884,10 @@ public class ObjectPrxHelperBase implements ObjectPrx
throw ex;
}
- return proxyFactory.checkRetryAfterException(ex, _reference, cnt);
}
public final void
- __handleExceptionWrapper(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex)
+ __handleExceptionWrapper(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex, IceInternal.OutgoingAsync out)
{
synchronized(this)
{
@@ -903,14 +901,20 @@ public class ObjectPrxHelperBase implements ObjectPrx
{
throw ex.get();
}
+
+ if(out != null)
+ {
+ out.__send();
+ }
}
public final int
- __handleExceptionWrapperRelaxed(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex, int cnt)
+ __handleExceptionWrapperRelaxed(_ObjectDel delegate, IceInternal.LocalExceptionWrapper ex,
+ IceInternal.OutgoingAsync out, int cnt)
{
if(!ex.retry())
{
- return __handleException(delegate, ex.get(), cnt);
+ return __handleException(delegate, ex.get(), out, cnt);
}
else
{
@@ -921,6 +925,12 @@ public class ObjectPrxHelperBase implements ObjectPrx
_delegate = null;
}
}
+
+ if(out != null)
+ {
+ out.__send(cnt);
+ }
+
return cnt;
}
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 8144356b60a..94c3c3721d6 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -429,7 +429,7 @@ public class ConnectRequestHandler
{
synchronized(this)
{
- assert(_exception != null && !_requests.isEmpty());
+ assert(_exception == null && !_requests.isEmpty());
_exception = ex.get();
_reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
{
@@ -448,7 +448,7 @@ public class ConnectRequestHandler
{
synchronized(this)
{
- assert(_exception != null && !_requests.isEmpty());
+ assert(_exception == null && !_requests.isEmpty());
_exception = ex;
_reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
{
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 31bf5557462..6cb5f0cdb70 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -241,7 +241,6 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
try
{
handleException(exc); // This will throw if the invocation can't be retried.
- __send();
}
catch(Ice.LocalException ex)
{
@@ -263,7 +262,6 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
try
{
handleException(ex); // This will throw if the invocation can't be retried.
- __send();
}
catch(Ice.LocalException exc)
{
@@ -271,6 +269,38 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
}
}
+ public final void
+ __send(int cnt)
+ {
+ //
+ // This method is called by the proxy to retry an invocation. It's safe to update
+ // the count here without synchronization, no other threads can access this object.
+ //
+ _cnt = cnt;
+ __send();
+ }
+
+ public final void
+ __send()
+ {
+ try
+ {
+ _sent = false;
+ _response = false;
+ _delegate = _proxy.__getDelegate(true);
+ _delegate.__getRequestHandler().sendAsyncRequest(this);
+ return;
+ }
+ catch(LocalExceptionWrapper ex)
+ {
+ handleException(ex);
+ }
+ catch(Ice.LocalException ex)
+ {
+ handleException(ex);
+ }
+ }
+
protected final void
__prepare(Ice.ObjectPrx prx, String operation, Ice.OperationMode mode, java.util.Map<String, String> context)
{
@@ -341,30 +371,6 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
__os.startWriteEncaps();
}
- protected final void
- __send()
- {
- while(true)
- {
- try
- {
- _sent = false;
- _response = false;
- _delegate = _proxy.__getDelegate(true);
- _delegate.__getRequestHandler().sendAsyncRequest(this);
- return;
- }
- catch(LocalExceptionWrapper ex)
- {
- handleException(ex);
- }
- catch(Ice.LocalException ex)
- {
- handleException(ex);
- }
- }
- }
-
protected abstract void __response(boolean ok);
protected void
@@ -388,11 +394,11 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
{
if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
{
- _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
}
else
{
- _proxy.__handleExceptionWrapper(_delegate, ex);
+ _proxy.__handleExceptionWrapper(_delegate, ex, this);
}
}
@@ -429,16 +435,16 @@ public abstract class OutgoingAsync extends OutgoingAsyncMessageCallback
{
if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent)
{
- _cnt = _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ _proxy.__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
}
else
{
- _proxy.__handleExceptionWrapper(_delegate, ex);
+ _proxy.__handleExceptionWrapper(_delegate, ex, this);
}
}
catch(Ice.LocalException ex)
{
- _cnt = _proxy.__handleException(_delegate, ex, _cnt);
+ _proxy.__handleException(_delegate, ex, this, _cnt);
}
}
diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java
index d2a9382b30a..ac05fe1d57b 100644
--- a/java/src/IceInternal/ProxyFactory.java
+++ b/java/src/IceInternal/ProxyFactory.java
@@ -85,7 +85,7 @@ public final class ProxyFactory
}
public int
- checkRetryAfterException(Ice.LocalException ex, Reference ref, int cnt)
+ checkRetryAfterException(Ice.LocalException ex, Reference ref, final OutgoingAsync out, int cnt)
{
TraceLevels traceLevels = _instance.traceLevels();
Ice.Logger logger = _instance.initializationData().logger;
@@ -129,6 +129,11 @@ public final class ProxyFactory
String s = "retrying operation call to add proxy to router\n" + ex.toString();
logger.trace(traceLevels.retryCat, s);
}
+
+ if(out != null)
+ {
+ out.__send(cnt);
+ }
return cnt; // We must always retry, so we don't look at the retry count.
}
else
@@ -200,17 +205,39 @@ public final class ProxyFactory
logger.trace(traceLevels.retryCat, s);
}
- if(cnt > 0)
+ if(interval > 0)
{
- //
- // Sleep before retrying.
- //
- try
+ if(out != null)
{
- Thread.currentThread().sleep(interval);
+ final int count = cnt;
+ _instance.timer().schedule(new TimerTask()
+ {
+ public void
+ runTimerTask()
+ {
+ out.__send(count);
+ }
+ }, interval);
+ }
+ else
+ {
+ //
+ // Sleep before retrying.
+ //
+ try
+ {
+ Thread.currentThread().sleep(interval);
+ }
+ catch(InterruptedException ex1)
+ {
+ }
}
- catch(InterruptedException ex1)
+ }
+ else
+ {
+ if(out != null)
{
+ out.__send(cnt);
}
}
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 8c7a74cdc1d..f38ea735b6f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -589,10 +589,7 @@ public final class ThreadPool
}
catch(java.nio.channels.ClosedChannelException ex)
{
- //
- // This is expected if the transceiver finishConnect() call failed
- // and the connection is a background connection.
- //
+ assert(false);
}
_handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
@@ -619,10 +616,7 @@ public final class ThreadPool
assert(pair != null);
handler = pair.handler;
finished = true;
- if(pair.key != null)
- {
- pair.key.cancel();
- }
+ pair.key.cancel();
if(TRACE_REGISTRATION)
{
@@ -1155,7 +1149,7 @@ public final class ThreadPool
select()
{
int ret = 0;
-
+ int spuriousWakeUp = 0;
while(true)
{
try
@@ -1204,6 +1198,27 @@ public final class ThreadPool
trace("select() returned " + ret + ", _keys.size() = " + _keys.size());
}
+ if(ret == 0 && _timeout <= 0)
+ {
+ //
+ // This is necessary to prevent a busy loop in case of a spurious wake-up which
+ // sometime occurs in the client thread pool when the communicator is destroyed.
+ // If there are too many successive spurious wake-ups, we log an error.
+ //
+ try
+ {
+ Thread.currentThread().sleep(1);
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ if(++spuriousWakeUp > 100)
+ {
+ _instance.initializationData().logger.error("spurious selector wake up in `" + _prefix + "'");
+ }
+ }
+
break;
}
}