diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-09-03 11:01:11 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-09-03 11:01:11 -0230 |
commit | 3b0588532354adf7bf3b86e611a8ae4996bfe6ad (patch) | |
tree | 253961cb83af7bc3b1dfc7633a8f934789476cd1 /java | |
parent | More work on ICE-2400: the send log thread now uses a separate communicator t... (diff) | |
download | ice-3b0588532354adf7bf3b86e611a8ae4996bfe6ad.tar.bz2 ice-3b0588532354adf7bf3b86e611a8ae4996bfe6ad.tar.xz ice-3b0588532354adf7bf3b86e611a8ae4996bfe6ad.zip |
- C#, Java: Removed Outgoing, fixed generated code to make synchronous
requests using AMI.
- Java: AsyncResult is now an interface.
- Added --arg to allTests.py.
- Fixed operations, adapterDeactivation and metrics test to work with
background IO.
- Added Collocated interrupt test.
- Added support for batch oneway requests using AMI.
- Added test in operations for batch oneway requests using AMI.
Diffstat (limited to 'java')
56 files changed, 2449 insertions, 3514 deletions
diff --git a/java/.classpath b/java/.classpath index 4755711bc41..4a4d89b8e9e 100644 --- a/java/.classpath +++ b/java/.classpath @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry excluding="ant/|test/**/lambda/*|test/android/|test/ejb/|test/Ice/translator/|test/Ice/translator/Test/" kind="src" path="src"/> + <classpathentry excluding="ant/|test/**/lambda/*|test/android/|test/ejb/|test/Ice/translator/|test/Ice/translator/Test/|test/Slice/generation/classes/test/Slice/generation/Test/" kind="src" path="src"/> <classpathentry kind="src" path="generated"/> <classpathentry kind="src" path="generated.test"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> diff --git a/java/.settings/org.eclipse.jdt.core.prefs b/java/.settings/org.eclipse.jdt.core.prefs index 9060ba01e38..f19526f9df4 100644 --- a/java/.settings/org.eclipse.jdt.core.prefs +++ b/java/.settings/org.eclipse.jdt.core.prefs @@ -95,7 +95,7 @@ org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_c org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16 org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16 org.eclipse.jdt.core.formatter.alignment_for_assignment=0 -org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16 +org.eclipse.jdt.core.formatter.alignment_for_binary_expression=18 org.eclipse.jdt.core.formatter.alignment_for_compact_if=16 org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80 org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0 @@ -167,7 +167,7 @@ org.eclipse.jdt.core.formatter.indent_empty_lines=false org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true -org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false +org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=true org.eclipse.jdt.core.formatter.indentation.size=8 org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert @@ -334,8 +334,8 @@ org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=do no org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=do not insert org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=do not insert org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=do not insert -org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert -org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert +org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=do not insert +org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=do not insert org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert @@ -368,6 +368,6 @@ org.eclipse.jdt.core.formatter.tabulation.char=space org.eclipse.jdt.core.formatter.tabulation.size=4 org.eclipse.jdt.core.formatter.use_on_off_tags=false org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=true -org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true +org.eclipse.jdt.core.formatter.wrap_before_binary_operator=false org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true diff --git a/java/config/build.properties b/java/config/build.properties index 5249125ce55..3da87e59437 100644 --- a/java/config/build.properties +++ b/java/config/build.properties @@ -41,7 +41,7 @@ jgoodies-looks.version = 2.5.2 # # Define debug as on if you want to build with debug information. # -debug = on +#debug = on # # Define lint with your preferred -Xlint options. diff --git a/java/demo/Ice/latency/config.client b/java/demo/Ice/latency/config.client index 9beb0c1f2f7..3bb7b2067e7 100644 --- a/java/demo/Ice/latency/config.client +++ b/java/demo/Ice/latency/config.client @@ -25,4 +25,4 @@ Ice.ACM.Client.Timeout=0 #Ice.Admin.Endpoints=tcp -h localhost -p 10003 Ice.Admin.InstanceName=client IceMX.Metrics.Debug.GroupBy=id -IceMX.Metrics.ByParent.GroupBy=parent +IceMX.Metrics.ByParent.GroupBy=parent
\ No newline at end of file diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index eed405ad00c..7c015d67c8c 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -14,50 +14,29 @@ package Ice; * With this object, an application can obtain several attributes of the * invocation and discover its outcome. **/ -public class AsyncResult +public interface AsyncResult { - protected AsyncResult(Communicator communicator, IceInternal.Instance instance, String op, - IceInternal.CallbackBase del) - { - _communicator = communicator; - _instance = instance; - _operation = op; - _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, false, false); - _state = 0; - _sentSynchronously = false; - _exception = null; - _callback = del; - } /** * Returns the communicator that sent the invocation. * * @return The communicator. **/ - public Communicator getCommunicator() - { - return _communicator; - } + public Communicator getCommunicator(); /** * Returns the connection that was used for the invocation. * * @return The connection. **/ - public Connection getConnection() - { - return null; - } + public Connection getConnection(); /** * Returns the proxy that was used to call the <code>begin_</code> method. * * @return The proxy. **/ - public ObjectPrx getProxy() - { - return null; - } + public ObjectPrx getProxy(); /** * Indicates whether the result of an invocation is available. @@ -65,34 +44,12 @@ public class AsyncResult * @return True if the result is available, which means a call to the <code>end_</code> * method will not block. The method returns false if the result is not yet available. **/ - public final boolean isCompleted() - { - synchronized(_monitor) - { - return (_state & Done) > 0; - } - } + public boolean isCompleted(); /** * Blocks the caller until the result of the invocation is available. **/ - public final void waitForCompleted() - { - synchronized(_monitor) - { - while((_state & Done) == 0) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } - } - } - } + public void waitForCompleted(); /** * When you call the <code>begin_</code> method, the Ice run time attempts to @@ -105,48 +62,17 @@ public class AsyncResult * * @return True if the request has been sent, or false if the request is queued. **/ - public final boolean isSent() - { - synchronized(_monitor) - { - return (_state & Sent) > 0; - } - } + public boolean isSent(); /** * Blocks the caller until the request has been written to the client-side transport. **/ - public final void waitForSent() - { - synchronized(_monitor) - { - while((_state & Sent) == 0 && _exception == null) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } - } - } - } + public void waitForSent(); /** * If the invocation failed with a local exception, throws the local exception. **/ - public final void throwLocalException() - { - synchronized(_monitor) - { - if(_exception != null) - { - throw _exception; - } - } - } + public void throwLocalException(); /** * This method returns true if a request was written to the client-side @@ -157,385 +83,12 @@ public class AsyncResult * @return True if the request was sent without being queued, or false * otherwise. **/ - public final boolean sentSynchronously() - { - return _sentSynchronously; // No lock needed, immutable once __send() is called - } + public boolean sentSynchronously(); /** * Returns the name of the operation. * * @return The operation name. **/ - public final String getOperation() - { - return _operation; - } - - public final IceInternal.BasicStream __getOs() - { - return _os; - } - - public IceInternal.BasicStream - __startReadParams() - { - _is.startReadEncaps(); - return _is; - } - - public void - __endReadParams() - { - _is.endReadEncaps(); - } - - public void - __readEmptyParams() - { - _is.skipEmptyEncaps(null); - } - - public byte[] - __readParamEncaps() - { - return _is.readEncaps(null); - } - - public final boolean __wait() - { - synchronized(_monitor) - { - if((_state & EndCalled) > 0) - { - throw new java.lang.IllegalArgumentException("end_ method called more than once"); - } - _state |= EndCalled; - while((_state & Done) == 0) - { - try - { - _monitor.wait(); - } - catch(InterruptedException ex) - { - // - // Remove the EndCalled flag since it should be possible to - // call end_* again on the AsyncResult. - // - _state &= ~EndCalled; - throw new Ice.OperationInterruptedException(); - } - } - if(_exception != null) - { - //throw (LocalException)_exception.fillInStackTrace(); - throw _exception; - } - return (_state & OK) > 0; - } - } - - public final void __throwUserException() - throws UserException - { - try - { - _is.startReadEncaps(); - _is.throwException(null); - } - catch(UserException ex) - { - _is.endReadEncaps(); - throw ex; - } - } - - public final void __invokeExceptionAsync(final Ice.Exception ex) - { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) - { - @Override - public void - run() - { - __invokeException(ex); - } - }); - } - catch(CommunicatorDestroyedException exc) - { - throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly. - } - } - - public final void __invokeException(Ice.Exception ex) - { - synchronized(_monitor) - { - _state |= Done; - _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception = ex; - _monitor.notifyAll(); - } - - __invokeCompleted(); - } - - protected final void __invokeSentInternal() - { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback != null) - { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); - } - - try - { - _callback.__sent(this); - } - catch(RuntimeException ex) - { - __warning(ex); - } - catch(AssertionError exc) - { - __error(exc); - } - catch(OutOfMemoryError exc) - { - __error(exc); - } - finally - { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(null); - } - } - } - - if(_observer != null) - { - Ice.ObjectPrx proxy = getProxy(); - if(proxy == null || !proxy.ice_isTwoway()) - { - _observer.detach(); - } - } - } - - public void - __attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) - { - if(_observer != null) - { - _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - public void __attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) - { - if(_observer != null) - { - _childObserver = _observer.getCollocatedObserver(adapter, - requestId, - _os.size() - IceInternal.Protocol.headerSize - 4); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - public final void __invokeSentAsync() - { - // - // This is called when it's not safe to call the sent callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) - { - @Override - public void - run() - { - __invokeSentInternal(); - } - }); - } - catch(CommunicatorDestroyedException exc) - { - } - } - - public static void __check(AsyncResult r, ObjectPrx prx, String operation) - { - __check(r, operation); - if(r.getProxy() != prx) - { - throw new IllegalArgumentException("Proxy for call to end_" + operation + - " does not match proxy that was used to call corresponding begin_" + - operation + " method"); - } - } - - public static void __check(AsyncResult r, Connection con, String operation) - { - __check(r, operation); - if(r.getConnection() != con) - { - throw new IllegalArgumentException("Connection for call to end_" + operation + - " does not match connection that was used to call corresponding begin_" + - operation + " method"); - } - } - - public static void __check(AsyncResult r, Communicator com, String operation) - { - __check(r, operation); - if(r.getCommunicator() != com) - { - throw new IllegalArgumentException("Communicator for call to end_" + operation + - " does not match communicator that was used to call corresponding " + - "begin_" + operation + " method"); - } - } - - protected static void __check(AsyncResult r, String operation) - { - if(r == null) - { - throw new IllegalArgumentException("AsyncResult == null"); - } - else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality - { - throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " + - r.getOperation()); - } - } - - protected final void __invokeCompleted() - { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback != null) - { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); - } - - try - { - _callback.__completed(this); - } - catch(RuntimeException ex) - { - __warning(ex); - } - catch(AssertionError exc) - { - __error(exc); - } - catch(OutOfMemoryError exc) - { - __error(exc); - } - finally - { - if(_instance.useApplicationClassLoader()) - { - Thread.currentThread().setContextClassLoader(null); - } - } - } - - if(_observer != null) - { - _observer.detach(); - _observer = null; - } - } - - protected void - __runTimerTask() - { - IceInternal.RequestHandler handler; - synchronized(_monitor) - { - handler = _timeoutRequestHandler; - _timeoutRequestHandler = null; - } - - if(handler != null) - { - handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this, - new Ice.InvocationTimeoutException()); - } - } - - protected final void __warning(RuntimeException ex) - { - if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) - { - String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex); - _instance.initializationData().logger.warning(s); - } - } - - protected final void __error(Error error) - { - String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error); - _instance.initializationData().logger.error(s); - } - - protected Communicator _communicator; - protected IceInternal.Instance _instance; - protected String _operation; - protected Ice.Connection _cachedConnection; - - protected java.lang.Object _monitor = new java.lang.Object(); - protected IceInternal.BasicStream _is; - protected IceInternal.BasicStream _os; - - protected IceInternal.RequestHandler _timeoutRequestHandler; - protected java.util.concurrent.Future<?> _future; - - protected static final byte OK = 0x1; - protected static final byte Done = 0x2; - protected static final byte Sent = 0x4; - protected static final byte EndCalled = 0x8; - - protected byte _state; - protected boolean _sentSynchronously; - protected Ice.Exception _exception; - - protected Ice.Instrumentation.InvocationObserver _observer; - protected Ice.Instrumentation.ChildInvocationObserver _childObserver; - - private IceInternal.CallbackBase _callback; -} + public String getOperation(); +}
\ No newline at end of file diff --git a/java/src/Ice/Callback.java b/java/src/Ice/Callback.java index f0e8eb86c7d..b4cabf9a30e 100644 --- a/java/src/Ice/Callback.java +++ b/java/src/Ice/Callback.java @@ -48,4 +48,10 @@ public abstract class Callback extends IceInternal.CallbackBase { sent(r); } + + @Override + public final boolean __hasSentCallback() + { + return true; + } } diff --git a/java/src/Ice/Callback_Communicator_flushBatchRequests.java b/java/src/Ice/Callback_Communicator_flushBatchRequests.java index 7e9e8e2f1ff..3ebab69b4e4 100644 --- a/java/src/Ice/Callback_Communicator_flushBatchRequests.java +++ b/java/src/Ice/Callback_Communicator_flushBatchRequests.java @@ -46,4 +46,10 @@ public abstract class Callback_Communicator_flushBatchRequests extends IceIntern { sent(__result.sentSynchronously()); } + + @Override + public final boolean __hasSentCallback() + { + return true; + } } diff --git a/java/src/Ice/Callback_Connection_flushBatchRequests.java b/java/src/Ice/Callback_Connection_flushBatchRequests.java index e9e0976024e..ceea4e06140 100644 --- a/java/src/Ice/Callback_Connection_flushBatchRequests.java +++ b/java/src/Ice/Callback_Connection_flushBatchRequests.java @@ -46,4 +46,10 @@ public abstract class Callback_Connection_flushBatchRequests extends IceInternal { sent(__result.sentSynchronously()); } + + @Override + public final boolean __hasSentCallback() + { + return true; + } } diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 757fc85daf5..bb95a3583f7 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -283,8 +283,9 @@ public final class CommunicatorI implements Communicator public void end_flushBatchRequests(AsyncResult r) { - AsyncResult.__check(r, this, __flushBatchRequests_name); - r.__wait(); + IceInternal.AsyncResultI ri = (IceInternal.AsyncResultI)r; + IceInternal.AsyncResultI.check(ri, this, __flushBatchRequests_name); + ri.__wait(); } @Override diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index b304efa82b5..00645ce8dc9 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -14,30 +14,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public interface StartCallback { void connectionStartCompleted(ConnectionI connection); + void connectionStartFailed(ConnectionI connection, Ice.LocalException ex); } private class TimeoutCallback implements Runnable { @Override - public void - run() + public void run() { timedOut(); } } - public void - start(StartCallback callback) + public void start(StartCallback callback) { try { synchronized(this) { - if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. + // The connection might already be closed if the communicator + // was destroyed. + if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) @@ -62,18 +63,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne callback.connectionStartCompleted(this); } - public void - startAndWait() - throws InterruptedException + public void startAndWait() throws InterruptedException { try { synchronized(this) { - if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. + // The connection might already be closed if the communicator + // was destroyed. + if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None)) @@ -85,8 +86,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state >= StateClosing) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } } @@ -104,8 +105,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void - activate() + public synchronized void activate() { if(_state <= StateNotValidated) { @@ -120,8 +120,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateActive); } - public synchronized void - hold() + public synchronized void hold() { if(_state <= StateNotValidated) { @@ -135,8 +134,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public final static int ObjectAdapterDeactivated = 0; public final static int CommunicatorDestroyed = 1; - synchronized public void - destroy(int reason) + synchronized public void destroy(int reason) { switch(reason) { @@ -155,8 +153,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - close(boolean force) + synchronized public void close(boolean force) { if(force) { @@ -171,7 +168,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // requests to be retried, regardless of whether the // server has processed them or not. // - while(!_requests.isEmpty() || !_asyncRequests.isEmpty()) + while(!_asyncRequests.isEmpty()) { try { @@ -187,37 +184,32 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized boolean - isActiveOrHolding() + public synchronized boolean isActiveOrHolding() { return _state > StateNotValidated && _state < StateClosing; } - public synchronized boolean - isFinished() + public synchronized boolean isFinished() { if(_state != StateFinished || _dispatchCount != 0) { return false; } - assert(_state == StateFinished); + assert (_state == StateFinished); return true; } - public synchronized void - throwException() + public synchronized void throwException() { if(_exception != null) { - assert(_state >= StateClosing); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_state >= StateClosing); + throw (Ice.LocalException) _exception.fillInStackTrace(); } } - public synchronized void - waitUntilHolding() - throws InterruptedException + public synchronized void waitUntilHolding() throws InterruptedException { while(_state < StateHolding || _dispatchCount > 0) { @@ -225,9 +217,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void - waitUntilFinished() - throws InterruptedException + public synchronized void waitUntilFinished() throws InterruptedException { // // We wait indefinitely until the connection is finished and all @@ -240,7 +230,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne wait(); } - assert(_state == StateFinished); + assert (_state == StateFinished); // // Clear the OA. See bug 1673 for the details of why this is necessary. @@ -248,19 +238,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _adapter = null; } - synchronized public void - updateObserver() + synchronized public void updateObserver() { if(_state < StateNotValidated || _state > StateClosed) { return; } - assert(_instance.getObserver() != null); - _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), - _endpoint, - toConnectionState(_state), - _observer); + assert (_instance.getObserver() != null); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), _endpoint, + toConnectionState(_state), _observer); if(_observer != null) { _observer.attach(); @@ -272,8 +259,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - synchronized public void - monitor(long now, IceInternal.ACMConfig acm) + synchronized public void monitor(long now, IceInternal.ACMConfig acm) { if(_state != StateActive) { @@ -285,7 +271,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // If writing or reading, nothing to do, the connection // timeout will kick-in if writes or reads don't progress. - // This check is necessary because the actitivy timer is + // This check is necessary because the activity timer is // only set when a message is fully read/written. // return; @@ -317,7 +303,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout)) { if(acm.close == ACMClose.CloseOnIdleForceful || - (acm.close != ACMClose.CloseOnIdle && (!_requests.isEmpty() || !_asyncRequests.isEmpty()))) + (acm.close != ACMClose.CloseOnIdle && (!_asyncRequests.isEmpty()))) { // // Close the connection if we didn't receive a heartbeat in @@ -325,8 +311,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // setState(StateClosed, new ConnectionTimeoutException()); } - else if(acm.close != ACMClose.CloseOnInvocation && - _dispatchCount == 0 && _batchStream.isEmpty() && _requests.isEmpty() && _asyncRequests.isEmpty()) + else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchStream.isEmpty() && + _asyncRequests.isEmpty()) { // // The connection is idle, close it. @@ -336,88 +322,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - synchronized public boolean - sendRequest(IceInternal.Outgoing out, boolean compress, boolean response) - throws IceInternal.RetryException - { - final IceInternal.BasicStream os = out.os(); - - if(_exception != null) - { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace()); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Ensure the message isn't bigger than what we can send with the - // transport. - // - _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); - - int requestId = 0; - if(response) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - os.pos(IceInternal.Protocol.headerSize); - os.writeInt(requestId); - } - - out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - os.size() - IceInternal.Protocol.headerSize - 4); - - // - // Send the message. If it can't be sent without blocking the message is added - // to _sendStreams and it will be sent by the selector thread or by this thread - // if flush is true. - // - boolean sent = false; - try - { - OutgoingMessage message = new OutgoingMessage(out, out.os(), compress, requestId); - sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0; - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - - if(response) - { - // - // Add to the requests map. - // - _requests.put(requestId, out); - } - - return sent; // The request was sent. - } - - synchronized public int - sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) - throws IceInternal.RetryException + synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response) + throws IceInternal.RetryException { - final IceInternal.BasicStream os = out.__getOs(); + final IceInternal.BasicStream os = out.getOs(); if(_exception != null) { @@ -426,11 +334,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // to send our request, we always try to send the request // again. // - throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace()); + throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace()); } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert (_state > StateNotValidated); + assert (_state < StateClosing); // // Ensure the message isn't bigger than what we can send with the @@ -458,8 +366,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne os.writeInt(requestId); } - out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - os.size() - IceInternal.Protocol.headerSize - 4); + out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() - + IceInternal.Protocol.headerSize - 4); int status; try @@ -469,8 +377,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(response) @@ -483,31 +391,30 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return status; } - public synchronized void - prepareBatchRequest(IceInternal.BasicStream os) - throws IceInternal.RetryException + public synchronized void prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException { waitBatchStreamInUse(); if(_exception != null) { // - // If there were no batch requests queued when the connection failed, we can safely - // retry with a new connection. Otherwise, we must throw to notify the caller that - // some previous batch requests were not sent. + // If there were no batch requests queued when the connection + // failed, we can safely retry with a new connection. Otherwise, we + // must throw to notify the caller that some previous batch requests + // were not sent. // if(_batchStream.isEmpty()) { - throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace()); + throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace()); } else { - throw (Ice.LocalException)_exception.fillInStackTrace(); + throw (Ice.LocalException) _exception.fillInStackTrace(); } } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert (_state > StateNotValidated); + assert (_state < StateClosing); if(_batchStream.isEmpty()) { @@ -532,8 +439,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // } - public void - finishBatchRequest(IceInternal.BasicStream os, boolean compress) + public void finishBatchRequest(IceInternal.BasicStream os, boolean compress) { try { @@ -553,9 +459,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_batchAutoFlush) { // - // Throw memory limit exception if the first message added causes us to go over - // limit. Otherwise put aside the marshalled message that caused limit to be - // exceeded and rollback stream to the marker. + // Throw memory limit exception if the first message added + // causes us to go over limit. Otherwise put aside the + // marshalled message that caused limit to be exceeded and + // rollback stream to the marker. + // try { _transceiver.checkSendSize(_batchStream.getBuffer(), _instance.messageSizeMax()); @@ -601,31 +509,32 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } // // Reset the batch stream. // _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; // - // Check again if the last request doesn't exceed the maximum message size. + // Check again if the last request doesn't exceed the + // maximum message size. // - if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax()) + if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax()) { - IceInternal.Ex.throwMemoryLimitException( - IceInternal.Protocol.requestBatchHdr.length + lastRequest.length, - _instance.messageSizeMax()); + IceInternal.Ex.throwMemoryLimitException(IceInternal.Protocol.requestBatchHdr.length + + lastRequest.length, _instance.messageSizeMax()); } // - // Start a new batch with the last message that caused us to go over the limit. + // Start a new batch with the last message that caused us to + // go over the limit. // _batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr); _batchStream.writeBlob(lastRequest); @@ -637,7 +546,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ++_batchRequestNum; // - // We compress the whole batch if there is at least one compressed + // We compress the whole batch if there is at least one + // compressed // message. // if(compress) @@ -648,7 +558,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Notify about the batch stream not being in use anymore. // - assert(_batchStreamInUse); + assert (_batchStreamInUse); _batchStreamInUse = false; notifyAll(); } @@ -660,173 +570,103 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - public synchronized void - abortBatchRequest() + public synchronized void abortBatchRequest() { _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; - assert(_batchStreamInUse); + assert (_batchStreamInUse); _batchStreamInUse = false; notifyAll(); } @Override - public void - flushBatchRequests() + public void flushBatchRequests() { - IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name); - try - { - out.invoke(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } + end_flushBatchRequests(begin_flushBatchRequests()); } private static final String __flushBatchRequests_name = "flushBatchRequests"; @Override - public Ice.AsyncResult - begin_flushBatchRequests() + public Ice.AsyncResult begin_flushBatchRequests() { return begin_flushBatchRequestsInternal(null); } @Override - public Ice.AsyncResult - begin_flushBatchRequests(Callback cb) + public Ice.AsyncResult begin_flushBatchRequests(Callback cb) { return begin_flushBatchRequestsInternal(cb); } @Override - public Ice.AsyncResult - begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) + public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb) { return begin_flushBatchRequestsInternal(cb); } @Override - public AsyncResult - begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb, - IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, - IceInternal.Functional_BoolCallback __sentCb) + public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb, + IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, + IceInternal.Functional_BoolCallback __sentCb) { - return begin_flushBatchRequestsInternal( - new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb) + return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb) + { + @Override + public final void __completed(AsyncResult __result) + { + try { - @Override - public final void __completed(AsyncResult __result) - { - try - { - __result.getConnection().end_flushBatchRequests(__result); - } - catch(Exception __ex) - { - __exceptionCb.apply(__ex); - } - } - }); + __result.getConnection().end_flushBatchRequests(__result); + } + catch(Exception __ex) + { + __exceptionCb.apply(__ex); + } + } + }); } - private Ice.AsyncResult - begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) + private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb) { - IceInternal.ConnectionBatchOutgoingAsync result = - new IceInternal.ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb); + IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this, + _communicator, _instance, __flushBatchRequests_name, cb); try { result.__invoke(); } catch(LocalException __ex) { - result.__invokeExceptionAsync(__ex); + result.invokeExceptionAsync(__ex); } return result; } @Override - public void - end_flushBatchRequests(AsyncResult r) + public void end_flushBatchRequests(AsyncResult ir) { - AsyncResult.__check(r, this, __flushBatchRequests_name); + IceInternal.AsyncResultI r = (IceInternal.AsyncResultI) ir; + IceInternal.AsyncResultI.check(r, this, __flushBatchRequests_name); r.__wait(); } - synchronized public boolean - flushBatchRequests(IceInternal.BatchOutgoing out) - { - waitBatchStreamInUse(); - if(_exception != null) - { - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - - if(_batchRequestNum == 0) - { - out.sent(); - return true; - } - - // - // Fill in the number of requests in the batch. - // - _batchStream.pos(IceInternal.Protocol.headerSize); - _batchStream.writeInt(_batchRequestNum); - - out.attachRemoteObserver(initConnectionInfo(), _endpoint, - _batchStream.size() - IceInternal.Protocol.headerSize - 4); - - _batchStream.swap(out.os()); - - // - // Send the batch stream. - // - boolean sent = false; - try - { - OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, 0); - sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0; - } - catch(Ice.LocalException ex) - { - setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); - } - - // - // Reset the batch stream. - // - _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - return sent; - } - - synchronized public int - flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) + synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync) { waitBatchStreamInUse(); if(_exception != null) { - throw (Ice.LocalException)_exception.fillInStackTrace(); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(_batchRequestNum == 0) { int status = IceInternal.AsyncStatus.Sent; - if(outAsync.__sent()) + if(outAsync.sent()) { status |= IceInternal.AsyncStatus.InvokeSentCallback; } @@ -839,10 +679,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchStream.pos(IceInternal.Protocol.headerSize); _batchStream.writeInt(_batchRequestNum); - outAsync.__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, - _batchStream.size() - IceInternal.Protocol.headerSize - 4); + outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() - + IceInternal.Protocol.headerSize - 4); - _batchStream.swap(outAsync.__getOs()); + _batchStream.swap(outAsync.getOs()); // // Send the batch stream. @@ -850,21 +690,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne int status; try { - OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__getOs(), _batchRequestCompress, 0); + OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.getOs(), _batchRequestCompress, 0); status = sendMessage(message); } catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } // // Reset the batch stream. // _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchRequestNum = 0; _batchRequestCompress = false; _batchMarker = 0; @@ -872,8 +712,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - setCallback(ConnectionCallback callback) + synchronized public void setCallback(ConnectionCallback callback) { if(_state > StateClosing) { @@ -883,8 +722,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, Ice.Optional<ACMHeartbeat> heartbeat) + synchronized public void setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, + Ice.Optional<ACMHeartbeat> heartbeat) { if(_monitor != null) { @@ -893,10 +732,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _monitor.remove(this); } _monitor = _monitor.acm(timeout, close, heartbeat); - + if(_monitor.getACM().timeout <= 0) { - _acmLastActivity = -1; // Disable the recording of last activity. + _acmLastActivity = -1; // Disable the recording of last + // activity. } else if(_state == StateActive && _acmLastActivity == -1) { @@ -912,59 +752,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public Ice.ACM - getACM() + synchronized public Ice.ACM getACM() { return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff); } - synchronized public boolean - requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex) - { - java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); - while(it.hasNext()) - { - OutgoingMessage o = it.next(); - if(o.out == out) - { - if(o.requestId > 0) - { - _requests.remove(o.requestId); - } - - // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. - // - o.timedOut(); - if(o != _sendStreams.getFirst()) - { - it.remove(); - } - out.finished(ex); - return true; // We're done. - } - } - - if(out instanceof IceInternal.Outgoing) - { - IceInternal.Outgoing o = (IceInternal.Outgoing)out; - java.util.Iterator<IceInternal.Outgoing> it2 = _requests.values().iterator(); - while(it2.hasNext()) - { - if(it2.next() == o) - { - o.finished(ex); - it2.remove(); - return true; // We're done. - } - } - } - return false; - } - - public boolean - asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex) + synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, + Ice.LocalException ex) { java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator(); while(it.hasNext()) @@ -978,42 +772,44 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // If the request is being sent, don't remove it from the send streams, - // it will be removed once the sending is finished. + // If the request is being sent, don't remove it from the send + // streams, it will be removed once the sending is finished. + // + // Note that since we swapped the message stream to _writeStream + // it's fine if the OutgoingAsync output stream is released (and + // as long as canceled requests cannot be retried). // o.timedOut(); if(o != _sendStreams.getFirst()) { it.remove(); } - outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + outAsync.dispatchInvocationCancel(ex, _threadPool, this); return true; // We're done } } if(outAsync instanceof IceInternal.OutgoingAsync) { - IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync; + IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync; java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator(); while(it2.hasNext()) { if(it2.next() == o) { it2.remove(); - outAsync.__dispatchInvocationCancel(ex, _threadPool, this); + outAsync.dispatchInvocationCancel(ex, _threadPool, this); return true; // We're done. } } } - return false; } @Override - synchronized public void - sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag) + synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag) { - assert(_state > StateNotValidated); + assert (_state > StateNotValidated); try { @@ -1028,8 +824,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } sendMessage(new OutgoingMessage(os, compressFlag != 0, true)); @@ -1046,10 +842,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - synchronized public void - sendNoResponse() + synchronized public void sendNoResponse() { - assert(_state > StateNotValidated); + assert (_state > StateNotValidated); try { if(--_dispatchCount == 0) @@ -1063,8 +858,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_state >= StateClosed) { - assert(_exception != null); - throw (Ice.LocalException)_exception.fillInStackTrace(); + assert (_exception != null); + throw (Ice.LocalException) _exception.fillInStackTrace(); } if(_state == StateClosing && _dispatchCount == 0) @@ -1079,27 +874,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public boolean - systemException(int requestId, Ice.SystemException ex) + public boolean systemException(int requestId, Ice.SystemException ex) { return false; // System exceptions aren't marshalled. } - public IceInternal.EndpointI - endpoint() + public IceInternal.EndpointI endpoint() { - return _endpoint; // No mutex protection necessary, _endpoint is immutable. + return _endpoint; // No mutex protection necessary, _endpoint is + // immutable. } - public IceInternal.Connector - connector() + public IceInternal.Connector connector() { - return _connector; // No mutex protection necessary, _connector is immutable. + return _connector; // No mutex protection necessary, _connector is + // immutable. } @Override - public synchronized void - setAdapter(ObjectAdapter adapter) + public synchronized void setAdapter(ObjectAdapter adapter) { if(_state <= StateNotValidated || _state >= StateClosing) { @@ -1110,7 +903,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_adapter != null) { - _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); + _servantManager = ((ObjectAdapterI) _adapter).getServantManager(); if(_servantManager == null) { _adapter = null; @@ -1129,22 +922,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public synchronized ObjectAdapter - getAdapter() + public synchronized ObjectAdapter getAdapter() { return _adapter; } @Override - public Endpoint - getEndpoint() + public Endpoint getEndpoint() { - return _endpoint; // No mutex protection necessary, _endpoint is immutable. + return _endpoint; // No mutex protection necessary, _endpoint is + // immutable. } @Override - public ObjectPrx - createProxy(Identity ident) + public ObjectPrx createProxy(Identity ident) { // // Create a reference and return a reverse proxy for this @@ -1157,8 +948,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Operations from EventHandler // @Override - public void - message(IceInternal.ThreadPoolCurrent current) + public void message(IceInternal.ThreadPoolCurrent current) { StartCallback startCB = null; java.util.List<OutgoingMessage> sentCBs = null; @@ -1213,7 +1003,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } if(_observer != null && !_readHeader) { - assert(!buf.b.hasRemaining()); + assert (!buf.b.hasRemaining()); observerFinishRead(buf); } @@ -1277,7 +1067,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_endpoint.datagram()) { - throw new Ice.DatagramLimitException(); // The message was truncated. + // The message was truncated. + throw new Ice.DatagramLimitException(); } continue; } @@ -1286,7 +1077,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne int newOp = readOp | writeOp; readyOp = readyOp & ~newOp; - assert(readyOp != 0 || newOp != 0); + assert (readyOp != 0 || newOp != 0); if(_state <= StateNotValidated) { @@ -1329,7 +1120,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else { - assert(_state <= StateClosingPending); + assert (_state <= StateClosingPending); // // We parse messages first, if we receive a close @@ -1337,7 +1128,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if((readyOp & IceInternal.SocketOperation.Read) != 0) { - info = new MessageInfo(current.stream); // Optimization: use the thread's stream. + // Optimization: use the thread's stream. + info = new MessageInfo(current.stream); newOp |= parseMessage(info); } @@ -1411,19 +1203,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne current.ioCompleted(); } - if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. + if(!_dispatcher) // Optimization, call dispatch() directly if there's no + // dispatcher. { dispatch(startCB, sentCBs, info); } else { - if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback + // No need for the stream if heartbeat callback + if(info != null && info.heartbeatCallback == null) { // - // Create a new stream for the dispatch instead of using the thread - // pool's thread stream. + // Create a new stream for the dispatch instead of using the + // thread pool's thread stream. // - assert(info.stream == current.stream); + assert (info.stream == current.stream); IceInternal.BasicStream stream = info.stream; info.stream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); info.stream.swap(stream); @@ -1432,21 +1226,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne final StartCallback finalStartCB = startCB; final java.util.List<OutgoingMessage> finalSentCBs = sentCBs; final MessageInfo finalInfo = info; - _threadPool.dispatchFromThisThread( - new IceInternal.DispatchWorkItem(this) + _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this) + { + @Override + public void run() { - @Override - public void - run() - { - dispatch(finalStartCB, finalSentCBs, finalInfo); - } - }); + dispatch(finalStartCB, finalSentCBs, finalInfo); + } + }); } } - protected void - dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) + protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) { int count = 0; @@ -1467,7 +1258,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { for(OutgoingMessage msg : sentCBs) { - msg.outAsync.__invokeSent(); + msg.outAsync.invokeSent(); } ++count; } @@ -1480,7 +1271,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(info.outAsync != null) { - info.outAsync.__finished(info.stream); + info.outAsync.finished(info.stream); ++count; } @@ -1504,8 +1295,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(info.invokeNum > 0) { - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, - info.adapter); + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); // // Don't increase count, the dispatch count is @@ -1552,19 +1342,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public void - finished(IceInternal.ThreadPoolCurrent current) + public void finished(IceInternal.ThreadPoolCurrent current) { synchronized(this) { - assert(_state == StateClosed); + assert (_state == StateClosed); unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write); } // - // If there are no callbacks to call, we don't call ioCompleted() since we're not going - // to call code that will potentially block (this avoids promoting a new leader and - // unecessary thread creation, especially if this is called on shutdown). + // If there are no callbacks to call, we don't call ioCompleted() since + // we're not going to call code that will potentially block (this avoids + // promoting a new leader and unecessary thread creation, especially if + // this is called on shutdown). // if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null) { @@ -1573,27 +1363,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } current.ioCompleted(); - if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher. + if(!_dispatcher) // Optimization, call finish() directly if there's no + // dispatcher. { finish(); } else { - _threadPool.dispatchFromThisThread( - new IceInternal.DispatchWorkItem(this) + _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this) + { + @Override + public void run() { - @Override - public void - run() - { - finish(); - } - }); + finish(); + } + }); } } - public void - finish() + public void finish() { if(_startCallback != null) { @@ -1618,28 +1406,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.finished(_exception); if(p.requestId > 0) // Make sure finished isn't called twice. { - if(p.out != null) - { - _requests.remove(p.requestId); - } - else - { - _asyncRequests.remove(p.requestId); - } + _asyncRequests.remove(p.requestId); } } _sendStreams.clear(); } - for(IceInternal.Outgoing p : _requests.values()) - { - p.finished(_exception); - } - _requests.clear(); - for(IceInternal.OutgoingAsync p : _asyncRequests.values()) { - p.__finished(_exception); + p.finished(_exception); } _asyncRequests.clear(); @@ -1657,8 +1432,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // This must be done last as this will cause waitUntilFinished() to return (and communicator - // objects such as the timer might be destroyed too). + // This must be done last as this will cause waitUntilFinished() to + // return (and communicator objects such as the timer might be destroyed + // too). // synchronized(this) { @@ -1671,21 +1447,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public String - toString() + public String toString() { return _toString(); } @Override - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { return _transceiver.fd(); } - public synchronized void - timedOut() + public synchronized void timedOut() { if(_state <= StateNotValidated) { @@ -1702,49 +1475,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - public String - type() + public String type() { return _type; // No mutex lock, _type is immutable. } @Override - public int - timeout() + public int timeout() { - return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable. + return _endpoint.timeout(); // No mutex protection necessary, _endpoint + // is immutable. } @Override - public synchronized ConnectionInfo - getInfo() + public synchronized ConnectionInfo getInfo() { if(_state >= StateClosed) { - throw (Ice.LocalException)_exception.fillInStackTrace(); + throw (Ice.LocalException) _exception.fillInStackTrace(); } return initConnectionInfo(); } @Override - public String - _toString() + public String _toString() { return _desc; // No mutex lock, _desc is immutable. } - public synchronized void - exception(LocalException ex) + public synchronized void exception(LocalException ex) { setState(StateClosed, ex); } @Override - public synchronized void - invokeException(int requestId, LocalException ex, int invokeNum) + public synchronized void invokeException(int requestId, LocalException ex, int invokeNum) { // - // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't + // Fatal exception while invoking a request. Since + // sendResponse/sendNoResponse isn't // called in case of a fatal exception we decrement _dispatchCount here. // @@ -1752,9 +1521,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(invokeNum > 0) { - assert(_dispatchCount > 0); + assert (_dispatchCount > 0); _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); + assert (_dispatchCount >= 0); if(_dispatchCount == 0) { if(_state == StateFinished) @@ -1767,8 +1536,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor, - IceInternal.Transceiver transceiver, IceInternal.Connector connector, - IceInternal.EndpointI endpoint, ObjectAdapter adapter) + IceInternal.Transceiver transceiver, IceInternal.Connector connector, IceInternal.EndpointI endpoint, + ObjectAdapter adapter) { _communicator = communicator; _instance = instance; @@ -1780,7 +1549,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _endpoint = endpoint; _adapter = adapter; final Ice.InitializationData initData = instance.initializationData(); - _dispatcher = initData.dispatcher != null; // Cached for better performance. + // Cached for better performance. + _dispatcher = initData.dispatcher != null; _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); @@ -1802,7 +1572,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _nextRequestId = 1; _batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false; _batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, - _batchAutoFlush); + _batchAutoFlush); _batchStreamInUse = false; _batchRequestNum = 0; _batchRequestCompress = false; @@ -1828,7 +1598,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_adapter != null) { - _servantManager = ((ObjectAdapterI)_adapter).getServantManager(); + _servantManager = ((ObjectAdapterI) _adapter).getServantManager(); } else { @@ -1839,7 +1609,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_adapter != null) { - _threadPool = ((ObjectAdapterI)_adapter).getThreadPool(); + _threadPool = ((ObjectAdapterI) _adapter).getThreadPool(); } else { @@ -1858,9 +1628,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } @Override - protected synchronized void - finalize() - throws Throwable + protected synchronized void finalize() throws Throwable { try { @@ -1868,7 +1636,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished); IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0); IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty()); - IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty()); IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty()); } catch(java.lang.Exception ex) @@ -1889,14 +1656,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private static final int StateClosed = 6; private static final int StateFinished = 7; - private void - setState(int state, LocalException ex) + private void setState(int state, LocalException ex) { // // If setState() is called with an exception, then only closed // and closing states are permissible. // - assert(state >= StateClosing); + assert (state >= StateClosing); if(_state == state) // Don't switch twice. { @@ -1908,7 +1674,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // If we are in closed state, an exception must be set. // - assert(_state != StateClosed); + assert (_state != StateClosed); _exception = ex; @@ -1924,8 +1690,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) { warning("connection exception", _exception); } @@ -1940,8 +1705,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(state); } - private void - setState(int state) + private void setState(int state) { // // We don't want to send close connection messages if the endpoint @@ -1969,83 +1733,83 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { switch(state) { - case StateNotInitialized: - { - assert(false); - break; - } - - case StateNotValidated: - { - if(_state != StateNotInitialized) + case StateNotInitialized: { - assert(_state == StateClosed); - return; + assert (false); + break; } - break; - } - case StateActive: - { - // - // Can only switch from holding or not validated to - // active. - // - if(_state != StateHolding && _state != StateNotValidated) + case StateNotValidated: { - return; + if(_state != StateNotInitialized) + { + assert (_state == StateClosed); + return; + } + break; } - _threadPool.register(this, IceInternal.SocketOperation.Read); - break; - } - case StateHolding: - { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) + case StateActive: { - return; + // + // Can only switch from holding or not validated to + // active. + // + if(_state != StateHolding && _state != StateNotValidated) + { + return; + } + _threadPool.register(this, IceInternal.SocketOperation.Read); + break; } - if(_state == StateActive) + + case StateHolding: { - _threadPool.unregister(this, IceInternal.SocketOperation.Read); + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return; + } + if(_state == StateActive) + { + _threadPool.unregister(this, IceInternal.SocketOperation.Read); + } + break; } - break; - } - case StateClosing: - case StateClosingPending: - { - // - // Can't change back from closing pending. - // - if(_state >= StateClosingPending) + case StateClosing: + case StateClosingPending: { - return; + // + // Can't change back from closing pending. + // + if(_state >= StateClosingPending) + { + return; + } + break; } - break; - } - case StateClosed: - { - if(_state == StateFinished) + case StateClosed: { - return; + if(_state == StateFinished) + { + return; + } + _threadPool.finish(this); + break; } - _threadPool.finish(this); - break; - } - case StateFinished: - { - assert(_state == StateClosed); - _transceiver.close(); - _communicator = null; - break; - } + case StateFinished: + { + assert (_state == StateClosed); + _transceiver.close(); + _communicator = null; + break; + } } } catch(Ice.LocalException ex) @@ -2086,10 +1850,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne Ice.Instrumentation.ConnectionState newState = toConnectionState(state); if(oldState != newState) { - _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), - _endpoint, - newState, - _observer); + _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), _endpoint, newState, + _observer); if(_observer != null) { _observer.attach(); @@ -2106,8 +1868,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _exception instanceof ForcedCloseConnectionException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || - _exception instanceof ObjectAdapterDeactivatedException || - (_exception instanceof ConnectionLostException && _state >= StateClosing))) + _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing))) { _observer.failed(_exception.ice_name()); } @@ -2130,11 +1891,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - initiateShutdown() + private void initiateShutdown() { - assert(_state == StateClosing); - assert(_dispatchCount == 0); + assert (_state == StateClosing); + assert (_dispatchCount == 0); if(_shutdownInitiated) { @@ -2148,12 +1908,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Before we shut down, we send a close connection message. // IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, - IceInternal.Protocol.currentProtocolEncoding); + IceInternal.Protocol.currentProtocolEncoding); os.writeBlob(IceInternal.Protocol.magic); IceInternal.Protocol.currentProtocol.__write(os); IceInternal.Protocol.currentProtocolEncoding.__write(os); os.writeByte(IceInternal.Protocol.closeConnectionMsg); - os.writeByte((byte)0); // compression status: always report 0 for CloseConnection in Java. + os.writeByte((byte) 0); // compression status: always report 0 for + // CloseConnection in Java. os.writeInt(IceInternal.Protocol.headerSize); // Message size. if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0) @@ -2161,7 +1922,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection closure. + // Notify the the transceiver of the graceful connection + // closure. // int op = _transceiver.closing(true, _exception); if(op != 0) @@ -2173,20 +1935,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - heartbeat() + private void heartbeat() { - assert(_state == StateActive); + assert (_state == StateActive); if(!_endpoint.datagram()) { IceInternal.BasicStream os = new IceInternal.BasicStream(_instance, - IceInternal.Protocol.currentProtocolEncoding); + IceInternal.Protocol.currentProtocolEncoding); os.writeBlob(IceInternal.Protocol.magic); IceInternal.Protocol.currentProtocol.__write(os); IceInternal.Protocol.currentProtocolEncoding.__write(os); os.writeByte(IceInternal.Protocol.validateConnectionMsg); - os.writeByte((byte)0); + os.writeByte((byte) 0); os.writeInt(IceInternal.Protocol.headerSize); // Message size. try @@ -2197,13 +1958,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne catch(Ice.LocalException ex) { setState(StateClosed, ex); - assert(_exception != null); + assert (_exception != null); } } } - private boolean - initialize(int operation) + private boolean initialize(int operation) { int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), _hasMoreData); if(s != IceInternal.SocketOperation.None) @@ -2214,7 +1974,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // Update the connection description once the transceiver is initialized. + // Update the connection description once the transceiver is + // initialized. // _desc = _transceiver.toString(); setState(StateNotValidated); @@ -2222,12 +1983,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return true; } - private boolean - validate(int operation) + private boolean validate(int operation) { - if(!_endpoint.datagram()) // Datagram connections are always implicitly validated. + if(!_endpoint.datagram()) // Datagram connections are always implicitly + // validated. { - if(_adapter != null) // The server side has the active role for connection validation. + if(_adapter != null) // The server side has the active role for + // connection validation. { if(_writeStream.isEmpty()) { @@ -2235,8 +1997,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.Protocol.currentProtocol.__write(_writeStream); IceInternal.Protocol.currentProtocolEncoding.__write(_writeStream); _writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg); - _writeStream.writeByte((byte)0); // Compression status (always zero for validate connection). - _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message size. + _writeStream.writeByte((byte) 0); // Compression status + // (always zero for + // validate connection). + _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message + // size. IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels); _writeStream.prepareWrite(); } @@ -2262,7 +2027,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishWrite(_writeStream.getBuffer()); } } - else // The client side has the passive role for connection validation. + else + // The client side has the passive role for connection validation. { if(_readStream.isEmpty()) { @@ -2291,7 +2057,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishRead(_readStream.getBuffer()); } - assert(_readStream.pos() == IceInternal.Protocol.headerSize); + assert (_readStream.pos() == IceInternal.Protocol.headerSize); _readStream.pos(0); byte[] m = _readStream.readBlob(4); if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || @@ -2313,7 +2079,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { throw new ConnectionNotValidatedException(); } - _readStream.readByte(); // Ignore compression status for validate connection. + _readStream.readByte(); // Ignore compression status for + // validate connection. int size = _readStream.readInt(); if(size != IceInternal.Protocol.headerSize) { @@ -2335,8 +2102,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return true; } - private int - sendNextMessage(java.util.List<OutgoingMessage> callbacks) + private int sendNextMessage(java.util.List<OutgoingMessage> callbacks) { if(_sendStreams.isEmpty()) { @@ -2344,13 +2110,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } else if(_state == StateClosingPending && _writeStream.pos() == 0) { - // Message wasn't sent, empty the _writeStream, we're not going to send more data. + // Message wasn't sent, empty the _writeStream, we're not going to + // send more data. OutgoingMessage message = _sendStreams.getFirst(); _writeStream.swap(message.stream); return IceInternal.SocketOperation.None; } - assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); + assert (!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size()); try { while(true) @@ -2390,7 +2157,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Otherwise, prepare the next message stream for writing. // message = _sendStreams.getFirst(); - assert(!message.prepared); + assert (!message.prepared); IceInternal.BasicStream stream = message.stream; message.stream = doCompress(stream, message.compress); @@ -2435,8 +2202,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // If all the messages were sent and we are in the closing state, we schedule - // the close timeout to wait for the peer to close the connection. + // If all the messages were sent and we are in the closing state, we + // schedule the close timeout to wait for the peer to close the + // connection. // if(_state == StateClosing && _dispatchCount == 0) { @@ -2451,10 +2219,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return IceInternal.SocketOperation.None; } - private int - sendMessage(OutgoingMessage message) + private int sendMessage(OutgoingMessage message) { - assert(_state < StateClosed); + assert (_state < StateClosed); if(!_sendStreams.isEmpty()) { @@ -2464,11 +2231,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } // - // Attempt to send the message without blocking. If the send blocks, we register - // the connection with the selector thread. + // Attempt to send the message without blocking. If the send blocks, we + // register the connection with the selector thread. // - assert(!message.prepared); + assert (!message.prepared); IceInternal.BasicStream stream = message.stream; @@ -2523,15 +2290,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return IceInternal.AsyncStatus.Queued; } - private IceInternal.BasicStream - doCompress(IceInternal.BasicStream uncompressed, boolean compress) + private IceInternal.BasicStream doCompress(IceInternal.BasicStream uncompressed, boolean compress) { boolean compressionSupported = false; if(compress) { // - // Don't check whether compression support is available unless the proxy - // is configured for compression. + // Don't check whether compression support is available unless the + // proxy is configured for compression. // compressionSupported = IceInternal.BasicStream.compressible(); } @@ -2548,7 +2314,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Set compression status. // cstream.pos(9); - cstream.writeByte((byte)2); + cstream.writeByte((byte) 2); // // Write the size of the compressed stream into the header. @@ -2557,11 +2323,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne cstream.writeInt(cstream.size()); // - // Write the compression status and size of the compressed stream into the header of the - // uncompressed stream -- we need this to trace requests correctly. + // Write the compression status and size of the compressed + // stream into the header of the uncompressed stream -- we need + // this to trace requests correctly. // uncompressed.pos(9); - uncompressed.writeByte((byte)2); + uncompressed.writeByte((byte) 2); uncompressed.writeInt(cstream.size()); return cstream; @@ -2569,7 +2336,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } uncompressed.pos(9); - uncompressed.writeByte((byte)(compressionSupported ? 1 : 0)); + uncompressed.writeByte((byte) (compressionSupported ? 1 : 0)); // // Not compressed, fill in the message size. @@ -2597,17 +2364,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ConnectionCallback heartbeatCallback; } - private int - parseMessage(MessageInfo info) + private int parseMessage(MessageInfo info) { - assert(_state > StateNotValidated && _state < StateClosed); + assert (_state > StateNotValidated && _state < StateClosed); _readStream.swap(info.stream); _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); _readHeader = true; - assert(info.stream.pos() == info.stream.size()); + assert (info.stream.pos() == info.stream.size()); // // Connection is validated on first message. This is only used by @@ -2626,7 +2392,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne info.stream.pos(8); byte messageType = info.stream.readByte(); info.compress = info.stream.readByte(); - if(info.compress == (byte)2) + if(info.compress == (byte) 2) { if(IceInternal.BasicStream.compressible()) { @@ -2636,7 +2402,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { FeatureNotSupportedException ex = new FeatureNotSupportedException(); ex.unsupportedFeature = "Cannot uncompress compressed message: " - + "org.apache.tools.bzip2.CBZip2OutputStream was not found"; + + "org.apache.tools.bzip2.CBZip2OutputStream was not found"; throw ex; } } @@ -2659,7 +2425,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne setState(StateClosingPending, new CloseConnectionException()); // - // Notify the the transceiver of the graceful connection closure. + // Notify the the transceiver of the graceful connection + // closure. // int op = _transceiver.closing(false, _exception); if(op != 0) @@ -2675,9 +2442,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state >= StateClosing) { - IceInternal.TraceUtil.trace("received request during closing\n" + - "(ignored by server, client will retry)", - info.stream, _logger, _traceLevels); + IceInternal.TraceUtil.trace("received request during closing\n" + + "(ignored by server, client will retry)", info.stream, _logger, + _traceLevels); } else { @@ -2695,9 +2462,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_state >= StateClosing) { - IceInternal.TraceUtil.trace("received batch request during closing\n" + - "(ignored by server, client will retry)", - info.stream, _logger, _traceLevels); + IceInternal.TraceUtil.trace("received batch request during closing\n" + + "(ignored by server, client will retry)", info.stream, _logger, + _traceLevels); } else { @@ -2719,18 +2486,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - IceInternal.Outgoing out = _requests.remove(info.requestId); - if(out != null) - { - out.finished(info.stream); - } - else + info.outAsync = _asyncRequests.remove(info.requestId); + if(info.outAsync != null) { - info.outAsync = _asyncRequests.remove(info.requestId); - if(info.outAsync != null) - { - ++_dispatchCount; - } + ++_dispatchCount; } notifyAll(); // Notify threads blocked in close(false) break; @@ -2750,7 +2509,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne default: { IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream, - _logger, _traceLevels); + _logger, _traceLevels); throw new UnknownMessageException(); } } @@ -2773,9 +2532,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read; } - private void - invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, - IceInternal.ServantManager servantManager, ObjectAdapter adapter) + private void invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress, + IceInternal.ServantManager servantManager, ObjectAdapter adapter) { // // Note: In contrast to other private or protected methods, this @@ -2811,7 +2569,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { invokeException(requestId, ex, invokeNum); } - catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace. + catch(java.lang.AssertionError ex) // Upon assertion, we print the stack + // trace. { UnknownException uex = new UnknownException(ex); java.io.StringWriter sw = new java.io.StringWriter(); @@ -2842,8 +2601,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - scheduleTimeout(int status) + private void scheduleTimeout(int status) { int timeout; if(_state < StateActive) @@ -2892,8 +2650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _readTimeoutFuture.cancel(false); } - _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, java.util.concurrent.TimeUnit.MILLISECONDS); } if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0) { @@ -2902,17 +2659,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeTimeoutFuture.cancel(false); } _writeTimeoutFuture = _timer.schedule(_writeTimeout, timeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + java.util.concurrent.TimeUnit.MILLISECONDS); } } catch(Throwable ex) { - assert(false); + assert (false); } } - private void - unscheduleTimeout(int status) + private void unscheduleTimeout(int status) { if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutFuture != null) { @@ -2927,8 +2683,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private ConnectionInfo - initConnectionInfo() + private ConnectionInfo initConnectionInfo() { if(_info != null) { @@ -2941,19 +2696,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne info.incoming = _connector == null; if(_state > StateNotInitialized) { - _info = info; // Cache the connection information only if initialized. + _info = info; // Cache the connection information only if + // initialized. } return info; } - private Ice.Instrumentation.ConnectionState - toConnectionState(int state) + private Ice.Instrumentation.ConnectionState toConnectionState(int state) { return connectionStateMap[state]; } - private void - warning(String msg, java.lang.Exception ex) + private void warning(String msg, java.lang.Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); @@ -2963,42 +2717,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _logger.warning(s); } - private void - observerStartRead(IceInternal.Buffer buf) + private void observerStartRead(IceInternal.Buffer buf) { if(_readStreamPos >= 0) { - assert(!buf.empty()); + assert (!buf.empty()); _observer.receivedBytes(buf.b.position() - _readStreamPos); } _readStreamPos = buf.empty() ? -1 : buf.b.position(); } - private void - observerFinishRead(IceInternal.Buffer buf) + private void observerFinishRead(IceInternal.Buffer buf) { if(_readStreamPos == -1) { return; } - assert(buf.b.position() >= _readStreamPos); + assert (buf.b.position() >= _readStreamPos); _observer.receivedBytes(buf.b.position() - _readStreamPos); _readStreamPos = -1; } - private void - observerStartWrite(IceInternal.Buffer buf) + private void observerStartWrite(IceInternal.Buffer buf) { if(_writeStreamPos >= 0) { - assert(!buf.empty()); + assert (!buf.empty()); _observer.sentBytes(buf.b.position() - _writeStreamPos); } _writeStreamPos = buf.empty() ? -1 : buf.b.position(); } - private void - observerFinishWrite(IceInternal.Buffer buf) + private void observerFinishWrite(IceInternal.Buffer buf) { if(_writeStreamPos == -1) { @@ -3011,8 +2761,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeStreamPos = -1; } - private IceInternal.Incoming - getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId) + private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId) { IceInternal.Incoming in = null; @@ -3041,8 +2790,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne return in; } - private void - reclaimIncoming(IceInternal.Incoming in) + private void reclaimIncoming(IceInternal.Incoming in) { if(_cacheBuffers > 0) { @@ -3058,8 +2806,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } } - private void - reap() + private void reap() { if(_monitor != null) { @@ -3086,7 +2833,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { wait(); } - catch (InterruptedException e) + catch(InterruptedException e) { interrupted = true; } @@ -3110,17 +2857,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = 0; } - OutgoingMessage(IceInternal.OutgoingMessageCallback out, IceInternal.BasicStream stream, boolean compress, - int requestId) - { - this.stream = stream; - this.compress = compress; - this.out = out; - this.requestId = requestId; - } - OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress, - int requestId) + int requestId) { this.stream = stream; this.compress = compress; @@ -3128,56 +2866,42 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne this.requestId = requestId; } - public void - timedOut() + public void timedOut() { - assert((out != null || outAsync != null)); - out = null; + assert (outAsync != null); outAsync = null; } - public void - adopt() + public void adopt() { if(adopt) { - IceInternal.BasicStream stream = - new IceInternal.BasicStream(this.stream.instance(), IceInternal.Protocol.currentProtocolEncoding); + IceInternal.BasicStream stream = new IceInternal.BasicStream(this.stream.instance(), + IceInternal.Protocol.currentProtocolEncoding); stream.swap(this.stream); this.stream = stream; adopt = false; } } - public boolean - sent() + public boolean sent() { - if(out != null) + if(outAsync != null) { - out.sent(); - } - else if(outAsync != null) - { - return outAsync.__sent(); + return outAsync.sent(); } return false; } - public void - finished(Ice.LocalException ex) + public void finished(Ice.LocalException ex) { - if(out != null) - { - out.finished(ex); - } - else if(outAsync != null) + if(outAsync != null) { - outAsync.__finished(ex); + outAsync.finished(ex); } } public IceInternal.BasicStream stream; - public IceInternal.OutgoingMessageCallback out; public IceInternal.OutgoingAsyncMessageCallback outAsync; public boolean compress; public int requestId; @@ -3219,10 +2943,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private int _nextRequestId; - private java.util.Map<Integer, IceInternal.Outgoing> _requests = - new java.util.HashMap<Integer, IceInternal.Outgoing>(); - private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = - new java.util.HashMap<Integer, IceInternal.OutgoingAsync>(); + private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, IceInternal.OutgoingAsync>(); private LocalException _exception; @@ -3262,14 +2983,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private ConnectionCallback _callback; private static Ice.Instrumentation.ConnectionState connectionStateMap[] = { - Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized - Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated - Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive - Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding - Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing - Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending - Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed - Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized + Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated + Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive + Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing + Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed + Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished }; } diff --git a/java/src/Ice/ObjectPrxHelper.java b/java/src/Ice/ObjectPrxHelper.java index 45c290df6e2..c390823e61f 100644 --- a/java/src/Ice/ObjectPrxHelper.java +++ b/java/src/Ice/ObjectPrxHelper.java @@ -152,7 +152,6 @@ public class ObjectPrxHelper extends ObjectPrxHelperBase public static String ice_staticId() { - return Ice.ObjectImpl.ice_staticId(); + return Ice.ObjectImpl.ice_staticId(); } - } diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index cb7623ecbdd..edda748ea7a 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -9,8 +9,12 @@ package Ice; +import java.util.LinkedList; +import java.util.List; + import Ice.Instrumentation.InvocationObserver; import IceInternal.QueueRequestHandler; +import IceInternal.RetryException; /** * Base class for all proxies. @@ -86,39 +90,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable ice_isA(String __id, java.util.Map<String, String> __context, boolean __explicitCtx) { __checkTwowayOnly(__ice_isA_name); - IceInternal.Outgoing __og = getOutgoing(__ice_isA_name, OperationMode.Nonmutating, __context, __explicitCtx); - try - { - try - { - IceInternal.BasicStream __os = __og.startWriteParams(Ice.FormatType.DefaultFormat); - __os.writeString(__id); - __og.endWriteParams(); - } - catch(LocalException __ex) - { - __og.abort(__ex); - } - if(!__og.invoke()) - { - try - { - __og.throwUserException(); - } - catch(UserException __ex) - { - throw new UnknownUserException(__ex.ice_name(), __ex); - } - } - IceInternal.BasicStream __is = __og.startReadParams(); - boolean __ret = __is.readBool(); - __og.endReadParams(); - return __ret; - } - finally - { - reclaimOutgoing(__og); - } + return end_ice_isA(begin_ice_isA(__id, __context, __explicitCtx, true, null)); } /** @@ -131,7 +103,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_isA(String __id) { - return begin_ice_isA(__id, null, false, null); + return begin_ice_isA(__id, null, false, false, null); } /** @@ -145,7 +117,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context) { - return begin_ice_isA(__id, __context, true, null); + return begin_ice_isA(__id, __context, true, false, null); } /** @@ -159,7 +131,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_isA(String __id, Callback __cb) { - return begin_ice_isA(__id, null, false, __cb); + return begin_ice_isA(__id, null, false, false, __cb); } /** @@ -174,7 +146,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context, Callback __cb) { - return begin_ice_isA(__id, __context, true, __cb); + return begin_ice_isA(__id, __context, true, false, __cb); } /** @@ -188,7 +160,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_isA(String __id, Callback_Object_ice_isA __cb) { - return begin_ice_isA(__id, null, false, __cb); + return begin_ice_isA(__id, null, false, false, __cb); } /** @@ -203,7 +175,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context, Callback_Object_ice_isA __cb) { - return begin_ice_isA(__id, __context, true, __cb); + return begin_ice_isA(__id, __context, true, false, __cb); } /** @@ -220,7 +192,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_BoolCallback __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_isA(__id, null, false, __responseCb, __exceptionCb, null); + return begin_ice_isA(__id, null, false, false, __responseCb, __exceptionCb, null); } /** @@ -239,7 +211,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_isA(__id, null, false, __responseCb, __exceptionCb, __sentCb); + return begin_ice_isA(__id, null, false, false, __responseCb, __exceptionCb, __sentCb); } /** @@ -258,7 +230,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_BoolCallback __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_isA(__id, __context, true, __responseCb, __exceptionCb, null); + return begin_ice_isA(__id, __context, true, false, __responseCb, __exceptionCb, null); } /** @@ -279,18 +251,19 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_isA(__id, __context, true, __responseCb, __exceptionCb, __sentCb); + return begin_ice_isA(__id, __context, true, false, __responseCb, __exceptionCb, __sentCb); } private final AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context, boolean __explicitCtx, + boolean __synchronous, IceInternal.Functional_BoolCallback __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_isA(__id, __context, __explicitCtx, + return begin_ice_isA(__id, __context, __explicitCtx, __synchronous, new IceInternal.Functional_TwowayCallbackBool(__responseCb, __exceptionCb, __sentCb) { @Override @@ -303,21 +276,21 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private AsyncResult begin_ice_isA(String __id, java.util.Map<String, String> __context, boolean __explicitCtx, - IceInternal.CallbackBase __cb) + boolean __synchronous, IceInternal.CallbackBase __cb) { __checkAsyncTwowayOnly(__ice_isA_name); - IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_isA_name, __cb); + IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_isA_name, __cb); try { - __result.__prepare(__ice_isA_name, OperationMode.Nonmutating, __context, __explicitCtx); - IceInternal.BasicStream __os = __result.__startWriteParams(Ice.FormatType.DefaultFormat); + __result.prepare(__ice_isA_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); + IceInternal.BasicStream __os = __result.startWriteParams(Ice.FormatType.DefaultFormat); __os.writeString(__id); - __result.__endWriteParams(); - __result.__invoke(true); + __result.endWriteParams(); + __result.invoke(true); } catch(Exception __ex) { - __result.__invokeExceptionAsync(__ex); + __result.invokeExceptionAsync(__ex); } return __result; } @@ -330,25 +303,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable **/ @Override public final boolean - end_ice_isA(AsyncResult __result) + end_ice_isA(AsyncResult __iresult) { - AsyncResult.__check(__result, this, __ice_isA_name); - if(!__result.__wait()) + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult; + IceInternal.AsyncResultI.check(__result, this, __ice_isA_name); + try { - try + if(!__result.__wait()) { - __result.__throwUserException(); + try + { + __result.throwUserException(); + } + catch(UserException __ex) + { + throw new UnknownUserException(__ex.ice_name(), __ex); + } } - catch(UserException __ex) + boolean __ret; + IceInternal.BasicStream __is = __result.startReadParams(); + __ret = __is.readBool(); + __result.endReadParams(); + return __ret; + } + finally + { + if(__result != null) { - throw new UnknownUserException(__ex.ice_name(), __ex); + __result.cacheMessageBuffers(); } } - boolean __ret; - IceInternal.BasicStream __is = __result.__startReadParams(); - __ret = __is.readBool(); - __result.__endReadParams(); - return __ret; } static public final void __ice_isA_completed(TwowayCallbackBool __cb, Ice.AsyncResult __result) @@ -398,16 +382,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable private void ice_ping(java.util.Map<String, String> __context, boolean __explicitCtx) { - IceInternal.Outgoing __og = getOutgoing(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx); - try - { - __og.writeEmptyParams(); - __invoke(__og); - } - finally - { - reclaimOutgoing(__og); - } + end_ice_ping(begin_ice_ping(__context, __explicitCtx, true, null)); } /** @@ -419,7 +394,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ping() { - return begin_ice_ping(null, false, null); + return begin_ice_ping(null, false, false, null); } /** @@ -432,7 +407,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ping(java.util.Map<String, String> __context) { - return begin_ice_ping(__context, true, null); + return begin_ice_ping(__context, true, false, null); } /** @@ -445,7 +420,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ping(Callback __cb) { - return begin_ice_ping(null, false, __cb); + return begin_ice_ping(null, false, false, __cb); } /** @@ -459,7 +434,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ping(java.util.Map<String, String> __context, Callback __cb) { - return begin_ice_ping(__context, true, __cb); + return begin_ice_ping(__context, true, false, __cb); } /** @@ -472,7 +447,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ping(Callback_Object_ice_ping __cb) { - return begin_ice_ping(null, false, __cb); + return begin_ice_ping(null, false, false, __cb); } /** @@ -486,7 +461,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ping(java.util.Map<String, String> __context, Callback_Object_ice_ping __cb) { - return begin_ice_ping(__context, true, __cb); + return begin_ice_ping(__context, true, false, __cb); } /** @@ -501,7 +476,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable begin_ice_ping(IceInternal.Functional_VoidCallback __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_ping(null, false, + return begin_ice_ping(null, false, false, new IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, null)); } @@ -519,8 +494,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_ping(null, false, new - IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, __sentCb)); + return begin_ice_ping(null, false, false, new IceInternal.Functional_OnewayCallback(__responseCb, + __exceptionCb, __sentCb)); } /** @@ -537,8 +512,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_VoidCallback __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_ping(__context, true, - new IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, null)); + return begin_ice_ping(__context, true, false, new IceInternal.Functional_OnewayCallback(__responseCb, + __exceptionCb, null)); } /** @@ -557,23 +532,23 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_ping(__context, true, + return begin_ice_ping(__context, true, false, new IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, __sentCb)); } - private AsyncResult - begin_ice_ping(java.util.Map<String, String> __context, boolean __explicitCtx, IceInternal.CallbackBase __cb) + private AsyncResult begin_ice_ping(java.util.Map<String, String> __context, boolean __explicitCtx, + boolean __synchronous, IceInternal.CallbackBase __cb) { - IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_ping_name, __cb); + IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_ping_name, __cb); try { - __result.__prepare(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx); - __result.__writeEmptyParams(); - __result.__invoke(true); + __result.prepare(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); + __result.writeEmptyParams(); + __result.invoke(true); } catch(Exception __ex) { - __result.__invokeExceptionAsync(__ex); + __result.invokeExceptionAsync(__ex); } return __result; } @@ -623,30 +598,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable ice_ids(java.util.Map<String, String> __context, boolean __explicitCtx) { __checkTwowayOnly(__ice_id_name); - IceInternal.Outgoing __og = getOutgoing(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx); - try - { - __og.writeEmptyParams(); - if(!__og.invoke()) - { - try - { - __og.throwUserException(); - } - catch(UserException __ex) - { - throw new UnknownUserException(__ex.ice_name(), __ex); - } - } - IceInternal.BasicStream __is = __og.startReadParams(); - String[] __ret = __is.readStringSeq(); - __og.endReadParams(); - return __ret; - } - finally - { - reclaimOutgoing(__og); - } + return end_ice_ids(begin_ice_ids(__context, __explicitCtx, true, null)); } /** @@ -658,7 +610,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ids() { - return begin_ice_ids(null, false, null); + return begin_ice_ids(null, false, false, null); } /** @@ -671,7 +623,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ids(java.util.Map<String, String> __context) { - return begin_ice_ids(__context, true, null); + return begin_ice_ids(__context, true, false, null); } /** @@ -684,7 +636,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ids(Callback __cb) { - return begin_ice_ids(null, false, __cb); + return begin_ice_ids(null, false, false,__cb); } /** @@ -698,7 +650,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ids(java.util.Map<String, String> __context, Callback __cb) { - return begin_ice_ids(__context, true, __cb); + return begin_ice_ids(__context, true, false,__cb); } /** @@ -711,7 +663,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ids(Callback_Object_ice_ids __cb) { - return begin_ice_ids(null, false, __cb); + return begin_ice_ids(null, false, false,__cb); } /** @@ -725,10 +677,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_ids(java.util.Map<String, String> __context, Callback_Object_ice_ids __cb) { - return begin_ice_ids(__context, true, __cb); + return begin_ice_ids(__context, true, false,__cb); } - class FunctionalCallback_Object_ice_ids extends IceInternal.Functional_TwowayCallbackArg1<String[]> + private class FunctionalCallback_Object_ice_ids extends IceInternal.Functional_TwowayCallbackArg1<String[]> { FunctionalCallback_Object_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, @@ -756,7 +708,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable begin_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_ids(null, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, null)); + return begin_ice_ids(null, false, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, + null)); } /** @@ -773,8 +726,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_ids(null, false, - new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, __sentCb)); + return begin_ice_ids(null, false, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, + __sentCb)); } /** @@ -791,8 +744,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<String[]> __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_ids(__context, true, - new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, null)); + return begin_ice_ids(__context, true, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, + null)); } /** @@ -811,24 +764,24 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_ids(__context, true, + return begin_ice_ids(__context, true, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, __sentCb)); } - private AsyncResult - begin_ice_ids(java.util.Map<String, String> __context, boolean __explicitCtx, IceInternal.CallbackBase __cb) + private AsyncResult begin_ice_ids(java.util.Map<String, String> __context, boolean __explicitCtx, + boolean __synchronous, IceInternal.CallbackBase __cb) { __checkAsyncTwowayOnly(__ice_ids_name); - IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_ids_name, __cb); + IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_ids_name, __cb); try { - __result.__prepare(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx); - __result.__writeEmptyParams(); - __result.__invoke(true); + __result.prepare(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); + __result.writeEmptyParams(); + __result.invoke(true); } catch(Exception __ex) { - __result.__invokeExceptionAsync(__ex); + __result.invokeExceptionAsync(__ex); } return __result; } @@ -842,25 +795,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable **/ @Override public final String[] - end_ice_ids(AsyncResult __result) + end_ice_ids(AsyncResult __iresult) { - AsyncResult.__check(__result, this, __ice_ids_name); - if(!__result.__wait()) + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult; + IceInternal.AsyncResultI.check(__result, this, __ice_ids_name); + try { - try + if(!__result.__wait()) { - __result.__throwUserException(); + try + { + __result.throwUserException(); + } + catch(UserException __ex) + { + throw new UnknownUserException(__ex.ice_name(), __ex); + } } - catch(UserException __ex) + String[] __ret = null; + IceInternal.BasicStream __is = __result.startReadParams(); + __ret = StringSeqHelper.read(__is); + __result.endReadParams(); + return __ret; + } + finally + { + if(__result != null) { - throw new UnknownUserException(__ex.ice_name(), __ex); + __result.cacheMessageBuffers(); } } - String[] __ret = null; - IceInternal.BasicStream __is = __result.__startReadParams(); - __ret = StringSeqHelper.read(__is); - __result.__endReadParams(); - return __ret; } static public final void __ice_ids_completed(TwowayCallbackArg1<String[]> __cb, AsyncResult __result) @@ -914,30 +878,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable ice_id(java.util.Map<String, String> __context, boolean __explicitCtx) { __checkTwowayOnly(__ice_id_name); - IceInternal.Outgoing __og = getOutgoing(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx); - try - { - __og.writeEmptyParams(); - if(!__og.invoke()) - { - try - { - __og.throwUserException(); - } - catch(UserException __ex) - { - throw new UnknownUserException(__ex.ice_name(), __ex); - } - } - IceInternal.BasicStream __is = __og.startReadParams(); - String __ret = __is.readString(); - __og.endReadParams(); - return __ret; - } - finally - { - reclaimOutgoing(__og); - } + return end_ice_id(begin_ice_id(__context, __explicitCtx, true, null)); } /** @@ -949,7 +890,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_id() { - return begin_ice_id(null, false, null); + return begin_ice_id(null, false, false, null); } /** @@ -962,7 +903,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_id(java.util.Map<String, String> __context) { - return begin_ice_id(__context, true, null); + return begin_ice_id(__context, true, false, null); } /** @@ -975,7 +916,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_id(Callback __cb) { - return begin_ice_id(null, false, __cb); + return begin_ice_id(null, false, false, __cb); } /** @@ -989,7 +930,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_id(java.util.Map<String, String> __context, Callback __cb) { - return begin_ice_id(__context, true, __cb); + return begin_ice_id(__context, true, false, __cb); } /** @@ -1002,7 +943,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_id(Callback_Object_ice_id __cb) { - return begin_ice_id(null, false, __cb); + return begin_ice_id(null, false, false, __cb); } /** @@ -1016,10 +957,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_id(java.util.Map<String, String> __context, Callback_Object_ice_id __cb) { - return begin_ice_id(__context, true, __cb); + return begin_ice_id(__context, true, false, __cb); } - class FunctionalCallback_Object_ice_id extends IceInternal.Functional_TwowayCallbackArg1<String> + private class FunctionalCallback_Object_ice_id extends IceInternal.Functional_TwowayCallbackArg1<String> { FunctionalCallback_Object_ice_id(IceInternal.Functional_GenericCallback1<String> responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, @@ -1047,7 +988,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable begin_ice_id(IceInternal.Functional_GenericCallback1<String> __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_id(null, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null)); + return begin_ice_id(null, false, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null)); } /** @@ -1064,7 +1005,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_id(null, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, __sentCb)); + return begin_ice_id(null, false, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, __sentCb)); } /** @@ -1081,7 +1022,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<String> __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb) { - return begin_ice_id(__context, true, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null)); + return begin_ice_id(__context, true, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null)); } /** @@ -1100,24 +1041,24 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) { - return begin_ice_id(__context, true, + return begin_ice_id(__context, true, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, __sentCb)); } - private AsyncResult - begin_ice_id(java.util.Map<String, String> __context, boolean __explicitCtx, IceInternal.CallbackBase __cb) + private AsyncResult begin_ice_id(java.util.Map<String, String> __context, boolean __explicitCtx, + boolean __synchronous, IceInternal.CallbackBase __cb) { __checkAsyncTwowayOnly(__ice_id_name); - IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_id_name, __cb); + IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_id_name, __cb); try { - __result.__prepare(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx); - __result.__writeEmptyParams(); - __result.__invoke(true); + __result.prepare(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous); + __result.writeEmptyParams(); + __result.invoke(true); } catch(Exception __ex) { - __result.__invokeExceptionAsync(__ex); + __result.invokeExceptionAsync(__ex); } return __result; } @@ -1130,25 +1071,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable **/ @Override public final String - end_ice_id(AsyncResult __result) + end_ice_id(AsyncResult __iresult) { - AsyncResult.__check(__result, this, __ice_id_name); - if(!__result.__wait()) + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult; + IceInternal.AsyncResultI.check(__result, this, __ice_id_name); + try { - try + if(!__result.__wait()) { - __result.__throwUserException(); + try + { + __result.throwUserException(); + } + catch(UserException __ex) + { + throw new UnknownUserException(__ex.ice_name(), __ex); + } } - catch(UserException __ex) + String __ret = null; + IceInternal.BasicStream __is = __result.startReadParams(); + __ret = __is.readString(); + __result.endReadParams(); + return __ret; + } + finally + { + if(__result != null) { - throw new UnknownUserException(__ex.ice_name(), __ex); + __result.cacheMessageBuffers(); } } - String __ret = null; - IceInternal.BasicStream __is = __result.__startReadParams(); - __ret = __is.readString(); - __result.__endReadParams(); - return __ret; } static public final void __ice_id_completed(TwowayCallbackArg1<String> __cb, AsyncResult __result) @@ -1223,24 +1175,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable ice_invoke(String operation, OperationMode mode, byte[] inParams, ByteSeqHolder outParams, java.util.Map<String, String> context, boolean explicitCtx) { - IceInternal.Outgoing __og = getOutgoing(operation, mode, context, explicitCtx); - try - { - __og.writeParamEncaps(inParams); - boolean ok = __og.invoke(); - if(_reference.getMode() == IceInternal.Reference.ModeTwoway) - { - if(outParams != null) - { - outParams.value = __og.readParamEncaps(); - } - } - return ok; - } - finally - { - reclaimOutgoing(__og); - } + return end_ice_invoke(outParams, begin_ice_invoke(operation, mode, inParams, context, explicitCtx, true, null)); } private static final String __ice_invoke_name = "ice_invoke"; @@ -1260,7 +1195,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams) { - return begin_ice_invoke(operation, mode, inParams, null, false, null); + return begin_ice_invoke(operation, mode, inParams, null, false, false, null); } /** @@ -1281,7 +1216,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context) { - return begin_ice_invoke(operation, mode, inParams, __context, true, null); + return begin_ice_invoke(operation, mode, inParams, __context, true, false, null); } /** @@ -1301,7 +1236,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, Callback __cb) { - return begin_ice_invoke(operation, mode, inParams, null, false, __cb); + return begin_ice_invoke(operation, mode, inParams, null, false, false, __cb); } /** @@ -1323,7 +1258,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context, Callback __cb) { - return begin_ice_invoke(operation, mode, inParams, __context, true, __cb); + return begin_ice_invoke(operation, mode, inParams, __context, true, false, __cb); } /** @@ -1343,7 +1278,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public final AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, Callback_Object_ice_invoke __cb) { - return begin_ice_invoke(operation, mode, inParams, null, false, __cb); + return begin_ice_invoke(operation, mode, inParams, null, false, false, __cb); } /** @@ -1365,7 +1300,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context, Callback_Object_ice_invoke __cb) { - return begin_ice_invoke(operation, mode, inParams, __context, true, __cb); + return begin_ice_invoke(operation, mode, inParams, __context, true, false, __cb); } /** @@ -1389,7 +1324,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, IceInternal.Functional_BoolCallback sentCb) { - return begin_ice_invoke(operation, mode, inParams, null, false, responseCb, exceptionCb, sentCb); + return begin_ice_invoke(operation, mode, inParams, null, false, false, responseCb, exceptionCb, sentCb); } /** @@ -1411,7 +1346,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable FunctionalCallback_Object_ice_invoke_Response responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb) { - return begin_ice_invoke(operation, mode, inParams, null, false, responseCb, exceptionCb, null); + return begin_ice_invoke(operation, mode, inParams, null, false, false, responseCb, exceptionCb, null); } /** @@ -1437,7 +1372,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb, IceInternal.Functional_BoolCallback sentCb) { - return begin_ice_invoke(operation, mode, inParams, context, true, responseCb, exceptionCb, sentCb); + return begin_ice_invoke(operation, mode, inParams, context, true, false, responseCb, exceptionCb, sentCb); } /** @@ -1461,12 +1396,13 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable FunctionalCallback_Object_ice_invoke_Response responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb) { - return begin_ice_invoke(operation, mode, inParams, context, true, responseCb, exceptionCb, null); + return begin_ice_invoke(operation, mode, inParams, context, true, false, responseCb, exceptionCb, null); } private final AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context, boolean __explicitCtx, + boolean __synchronous, FunctionalCallback_Object_ice_invoke_Response __responseCb, IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, IceInternal.Functional_BoolCallback __sentCb) @@ -1495,24 +1431,24 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable FunctionalCallback_Object_ice_invoke_Response __responseCb; } - return begin_ice_invoke(operation, mode, inParams, __context, __explicitCtx, + return begin_ice_invoke(operation, mode, inParams, __context, __explicitCtx, __synchronous, new CB(__responseCb, __exceptionCb, __sentCb)); } private AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context, - boolean __explicitCtx, IceInternal.CallbackBase __cb) + boolean __explicitCtx, boolean __synchronous, IceInternal.CallbackBase __cb) { - IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_invoke_name, __cb); + IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_invoke_name, __cb); try { - __result.__prepare(operation, mode, __context, __explicitCtx); - __result.__writeParamEncaps(inParams); - __result.__invoke(true); + __result.prepare(operation, mode, __context, __explicitCtx, __synchronous); + __result.writeParamEncaps(inParams); + __result.invoke(true); } catch(Exception __ex) { - __result.__invokeExceptionAsync(__ex); + __result.invokeExceptionAsync(__ex); } return __result; } @@ -1530,18 +1466,29 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable **/ @Override public final boolean - end_ice_invoke(ByteSeqHolder outParams, AsyncResult __result) + end_ice_invoke(ByteSeqHolder outParams, AsyncResult __iresult) { - AsyncResult.__check(__result, this, __ice_invoke_name); - boolean ok = __result.__wait(); - if(_reference.getMode() == IceInternal.Reference.ModeTwoway) + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult; + IceInternal.AsyncResultI.check(__result, this, __ice_invoke_name); + try + { + boolean ok = __result.__wait(); + if(_reference.getMode() == IceInternal.Reference.ModeTwoway) + { + if(outParams != null) + { + outParams.value = __result.readParamEncaps(); + } + } + return ok; + } + finally { - if(outParams != null) + if(__result != null) { - outParams.value = __result.__readParamEncaps(); + __result.cacheMessageBuffers(); } } - return ok; } public static void __ice_invoke_completed(_Callback_Object_ice_invoke __cb, AsyncResult __result) @@ -2389,13 +2336,19 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable handler = __getRequestHandler(); try { + // Wait for the connection to be established. return handler.waitForConnection(); } - catch (InterruptedException e) + catch(InterruptedException e) { throw new Ice.OperationInterruptedException(); } } + catch(RetryException e) + { + // Clear request handler and retry. + __setRequestHandler(handler, null); + } catch(Ice.Exception ex) { try @@ -2479,15 +2432,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable public void ice_flushBatchRequests() { - IceInternal.BatchOutgoing __og = new IceInternal.BatchOutgoing(this, __ice_flushBatchRequests_name); - try - { - __og.invoke(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } + end_ice_flushBatchRequests(begin_ice_flushBatchRequests()); } /** @@ -2577,16 +2522,17 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } catch(Exception __ex) { - __result.__invokeExceptionAsync(__ex); + __result.invokeExceptionAsync(__ex); } return __result; } @Override public void - end_ice_flushBatchRequests(AsyncResult __result) + end_ice_flushBatchRequests(AsyncResult __iresult) { - AsyncResult.__check(__result, this, __ice_flushBatchRequests_name); + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult; + IceInternal.AsyncResultI.check(__result, this, __ice_flushBatchRequests_name); __result.__wait(); } @@ -2708,50 +2654,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } - public void - __invoke(IceInternal.Outgoing __og) + public final void + __end(AsyncResult __iresult, String operation) { - // - // Helper for operations without out/return parameters and user - // exceptions. - // - boolean __ok = __og.invoke(); - if(__og.hasResponse()) + IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult; + IceInternal.AsyncResultI.check(__result, this, operation); + try { - if(!__ok) + boolean ok = __result.__wait(); + if(_reference.getMode() == IceInternal.Reference.ModeTwoway) { - try - { - __og.throwUserException(); - } - catch(UserException __ex) + if(!ok) { - throw new UnknownUserException(__ex.ice_name(), __ex); + try + { + __result.throwUserException(); + } + catch(UserException __ex) + { + throw new UnknownUserException(__ex.ice_name(), __ex); + } } + __result.readEmptyParams(); } - __og.readEmptyParams(); } - } - - public final void - __end(AsyncResult __result, String operation) - { - AsyncResult.__check(__result, this, operation); - boolean ok = __result.__wait(); - if(_reference.getMode() == IceInternal.Reference.ModeTwoway) + finally { - if(!ok) + if(__result != null) { - try - { - __result.__throwUserException(); - } - catch(UserException __ex) - { - throw new UnknownUserException(__ex.ice_name(), __ex); - } + __result.cacheMessageBuffers(); } - __result.__readEmptyParams(); } } @@ -2835,6 +2767,19 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } } + + public void + cacheMessageBuffers(IceInternal.BasicStream is, IceInternal.BasicStream os) + { + synchronized(this) + { + if(_streamCache == null) + { + _streamCache = new LinkedList<StreamCacheEntry>(); + } + _streamCache.add(new StreamCacheEntry(is, os)); + } + } private IceInternal.RequestHandler createRequestHandler() @@ -2873,6 +2818,150 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable _reference = ref; } + protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, String id, Class<T> proxyCls, Class<?> helperCls) + { + return checkedCastImpl(obj, null, false, null, false, id, proxyCls, helperCls); + } + + protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, java.util.Map<String, String> ctx, String id, + Class<T> proxyCls, Class<?> helperCls) + { + return checkedCastImpl(obj, ctx, true, null, false, id, proxyCls, helperCls); + } + + protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, String facet, String id, Class<T> proxyCls, + Class<?> helperCls) + { + return checkedCastImpl(obj, null, false, facet, true, id, proxyCls, helperCls); + } + + protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, String facet, java.util.Map<String, String> ctx, + String id, Class<T> proxyCls, Class<?> helperCls) + { + return checkedCastImpl(obj, ctx, true, facet, true, id, proxyCls, helperCls); + } + + protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, java.util.Map<String, String> ctx, boolean explicitCtx, + String facet, boolean explicitFacet, String id, Class<T> proxyCls, + Class<?> helperCls) + { + T d = null; + if(obj != null) + { + if(explicitFacet) + { + obj = obj.ice_facet(facet); + } + if(proxyCls.isInstance(obj)) + { + d = proxyCls.cast(obj); + } + else + { + try + { + final boolean b = explicitCtx ? obj.ice_isA(id, ctx) : obj.ice_isA(id); + if(b) + { + ObjectPrxHelperBase h = null; + try + { + h = ObjectPrxHelperBase.class.cast(helperCls.newInstance()); + } + catch(InstantiationException ex) + { + throw new SyscallException(ex); + } + catch(IllegalAccessException ex) + { + throw new SyscallException(ex); + } + h.__copyFrom(obj); + d = proxyCls.cast(h); + } + } + catch(FacetNotExistException ex) + { + } + } + } + return d; + } + + protected static <T> T uncheckedCastImpl(Ice.ObjectPrx obj, Class<T> proxyCls, Class<?> helperCls) + { + return uncheckedCastImpl(obj, null, false, proxyCls, helperCls); + } + + protected static <T> T uncheckedCastImpl(Ice.ObjectPrx obj, String facet, Class<T> proxyCls, Class<?> helperCls) + { + return uncheckedCastImpl(obj, facet, true, proxyCls, helperCls); + } + + protected static <T> T uncheckedCastImpl(Ice.ObjectPrx obj, String facet, boolean explicitFacet, Class<T> proxyCls, + Class<?> helperCls) + { + T d = null; + if(obj != null) + { + try + { + if(explicitFacet) + { + ObjectPrxHelperBase h = ObjectPrxHelperBase.class.cast(helperCls.newInstance()); + h.__copyFrom(obj.ice_facet(facet)); + d = proxyCls.cast(h); + } + else + { + if(proxyCls.isInstance(obj)) + { + d = proxyCls.cast(obj); + } + else + { + ObjectPrxHelperBase h = ObjectPrxHelperBase.class.cast(helperCls.newInstance()); + h.__copyFrom(obj); + d = proxyCls.cast(h); + } + } + } + catch(InstantiationException ex) + { + throw new SyscallException(ex); + } + catch(IllegalAccessException ex) + { + throw new SyscallException(ex); + } + } + return d; + } + + protected IceInternal.OutgoingAsync + getOutgoingAsync(String operation, IceInternal.CallbackBase cb) + { + StreamCacheEntry cacheEntry = null; + if(_reference.getInstance().cacheMessageBuffers() > 0) + { + synchronized(this) + { + if(_streamCache != null && !_streamCache.isEmpty()) + { + cacheEntry = _streamCache.remove(0); + } + } + } + if(cacheEntry == null) + { + return new IceInternal.OutgoingAsync(this, operation, cb); + } + else + { + return new IceInternal.OutgoingAsync(this, operation, cb, cacheEntry.is, cacheEntry.os); + } + } + private final ObjectPrxHelperBase newInstance(IceInternal.Reference ref) { @@ -2938,55 +3027,20 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable } } - protected IceInternal.Outgoing - getOutgoing(String operation, OperationMode mode, java.util.Map<String, String> context, boolean explicitCtx) + private static class StreamCacheEntry { - IceInternal.Outgoing out = null; - if(_reference.getInstance().cacheMessageBuffers() > 0) - { - synchronized(this) - { - if(_outgoingCache != null) - { - out = _outgoingCache; - _outgoingCache = _outgoingCache.next; - out.next = null; - } - } - } - if(out == null) - { - out = new IceInternal.Outgoing(this, operation, mode, context, explicitCtx); - } - else + StreamCacheEntry(IceInternal.BasicStream is, IceInternal.BasicStream os) { - out.reset(this, operation, mode, context, explicitCtx); + this.is = is; + this.os = os; } - return out; - } - - protected void - reclaimOutgoing(IceInternal.Outgoing out) - { - out.detach(); - if(_reference.getInstance().cacheMessageBuffers() > 0) - { - // - // Clear references to Ice objects as soon as possible. - // - out.reclaim(); - - synchronized(this) - { - out.next = _outgoingCache; - _outgoingCache = out; - } - } + IceInternal.BasicStream is; + IceInternal.BasicStream os; } private transient IceInternal.Reference _reference; private transient IceInternal.RequestHandler _requestHandler; - private transient IceInternal.Outgoing _outgoingCache; + private transient List<StreamCacheEntry> _streamCache; public static final long serialVersionUID = 0L; } diff --git a/java/src/Ice/OnewayCallback.java b/java/src/Ice/OnewayCallback.java index f7403e60526..3401c98fc9d 100644 --- a/java/src/Ice/OnewayCallback.java +++ b/java/src/Ice/OnewayCallback.java @@ -50,6 +50,12 @@ public abstract class OnewayCallback extends IceInternal.CallbackBase } @Override + public final boolean __hasSentCallback() + { + return true; + } + + @Override public final void __completed(AsyncResult __result) { try diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java new file mode 100644 index 00000000000..9b211f0bb72 --- /dev/null +++ b/java/src/IceInternal/AsyncResultI.java @@ -0,0 +1,578 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +import Ice.AsyncResult; +import Ice.Communicator; +import Ice.CommunicatorDestroyedException; +import Ice.Connection; +import Ice.ObjectPrx; +import Ice.UserException; + +/** + * An AsyncResult object is the return value of an asynchronous invocation. + * With this object, an application can obtain several attributes of the + * invocation and discover its outcome. + **/ +public class AsyncResultI implements Ice.AsyncResult +{ + protected AsyncResultI(Communicator communicator, IceInternal.Instance instance, String op, + IceInternal.CallbackBase del) + { + _communicator = communicator; + _instance = instance; + _operation = op; + _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding); + _state = 0; + _sentSynchronously = false; + _exception = null; + _callback = del; + } + + protected AsyncResultI(Communicator communicator, Instance instance, String op, CallbackBase del, BasicStream is, + BasicStream os) + { + _communicator = communicator; + _instance = instance; + _operation = op; + _os = os; + _is = is; + _state = 0; + _sentSynchronously = false; + _exception = null; + _callback = del; + } + + /** + * Returns the communicator that sent the invocation. + * + * @return The communicator. + **/ + @Override + public Communicator getCommunicator() + { + return _communicator; + } + + /** + * Returns the connection that was used for the invocation. + * + * @return The connection. + **/ + @Override + public Connection getConnection() + { + return null; + } + + /** + * Returns the proxy that was used to call the <code>begin_</code> method. + * + * @return The proxy. + **/ + @Override + public ObjectPrx getProxy() + { + return null; + } + + /** + * Indicates whether the result of an invocation is available. + * + * @return True if the result is available, which means a call to the <code>end_</code> + * method will not block. The method returns false if the result is not yet available. + **/ + @Override + public final boolean isCompleted() + { + synchronized(_monitor) + { + return (_state & StateDone) > 0; + } + } + + /** + * Blocks the caller until the result of the invocation is available. + **/ + @Override + public final void waitForCompleted() + { + synchronized(_monitor) + { + while((_state & StateDone) == 0) + { + try + { + _monitor.wait(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } + } + } + } + + /** + * When you call the <code>begin_</code> method, the Ice run time attempts to + * write the corresponding request to the client-side transport. If the + * transport cannot accept the request, the Ice run time queues the request + * for later transmission. This method returns true if, at the time it is called, + * the request has been written to the local transport (whether it was initially + * queued or not). Otherwise, if the request is still queued, this method returns + * false. + * + * @return True if the request has been sent, or false if the request is queued. + **/ + @Override + public final boolean isSent() + { + synchronized(_monitor) + { + return (_state & StateSent) > 0; + } + } + + /** + * Blocks the caller until the request has been written to the client-side transport. + **/ + @Override + public final void waitForSent() + { + synchronized(_monitor) + { + while((_state & StateSent) == 0 && _exception == null) + { + try + { + _monitor.wait(); + } + catch(InterruptedException ex) + { + throw new Ice.OperationInterruptedException(); + } + } + } + } + + /** + * If the invocation failed with a local exception, throws the local exception. + **/ + @Override + public final void throwLocalException() + { + synchronized(_monitor) + { + if(_exception != null) + { + throw _exception; + } + } + } + + /** + * This method returns true if a request was written to the client-side + * transport without first being queued. If the request was initially + * queued, this method returns false (independent of whether the request + * is still in the queue or has since been written to the client-side transport). + * + * @return True if the request was sent without being queued, or false + * otherwise. + **/ + @Override + public final boolean sentSynchronously() + { + return _sentSynchronously; // No lock needed, immutable once __send() is called + } + + /** + * Returns the name of the operation. + * + * @return The operation name. + **/ + @Override + public final String getOperation() + { + return _operation; + } + + public final IceInternal.BasicStream getOs() + { + return _os; + } + + public IceInternal.BasicStream + startReadParams() + { + _is.startReadEncaps(); + return _is; + } + + public void + endReadParams() + { + _is.endReadEncaps(); + } + + public void + readEmptyParams() + { + _is.skipEmptyEncaps(null); + } + + public byte[] + readParamEncaps() + { + return _is.readEncaps(null); + } + + public final boolean __wait() + { + synchronized(_monitor) + { + if((_state & StateEndCalled) > 0) + { + throw new java.lang.IllegalArgumentException("end_ method called more than once"); + } + _state |= StateEndCalled; + while((_state & StateDone) == 0) + { + try + { + _monitor.wait(); + } + catch(InterruptedException ex) + { + // + // Remove the EndCalled flag since it should be possible to + // call end_* again on the AsyncResult. + // + _state &= ~StateEndCalled; + throw new Ice.OperationInterruptedException(); + } + } + if(_exception != null) + { + //throw (LocalException)_exception.fillInStackTrace(); + throw _exception; + } + return (_state & StateOK) > 0; + } + } + + public final void throwUserException() + throws UserException + { + try + { + _is.startReadEncaps(); + _is.throwException(null); + } + catch(UserException ex) + { + _is.endReadEncaps(); + throw ex; + } + } + + public void invokeExceptionAsync(final Ice.Exception ex) + { + // + // This is called when it's not safe to call the exception callback synchronously + // from this thread. Instead the exception callback is called asynchronously from + // the client thread pool. + // + try + { + _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) + { + @Override + public void + run() + { + invokeException(ex); + } + }); + } + catch(CommunicatorDestroyedException exc) + { + throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly. + } + } + + public final void invokeException(Ice.Exception ex) + { + synchronized(_monitor) + { + _state |= StateDone; + //_os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation + _exception = ex; + _monitor.notifyAll(); + } + + invokeCompleted(); + } + + protected final void invokeSentInternal() + { + // + // Note: no need to change the _state here, specializations are responsible for + // changing the state. + // + + if(_callback != null) + { + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); + } + + try + { + _callback.__sent(this); + } + catch(RuntimeException ex) + { + warning(ex); + } + catch(AssertionError exc) + { + error(exc); + } + catch(OutOfMemoryError exc) + { + error(exc); + } + finally + { + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(null); + } + } + } + + if(_observer != null) + { + Ice.ObjectPrx proxy = getProxy(); + if(proxy == null || !proxy.ice_isTwoway()) + { + _observer.detach(); + } + } + } + + public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) + { + if(_observer != null) + { + _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size); + if(_childObserver != null) + { + _childObserver.attach(); + } + } + } + + void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) + { + if(_observer != null) + { + _childObserver = _observer.getCollocatedObserver(adapter, + requestId, + _os.size() - IceInternal.Protocol.headerSize - 4); + if(_childObserver != null) + { + _childObserver.attach(); + } + } + } + + final void invokeSentAsync() + { + // + // This is called when it's not safe to call the sent callback synchronously + // from this thread. Instead the exception callback is called asynchronously from + // the client thread pool. + // + try + { + _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection) + { + @Override + public void + run() + { + invokeSentInternal(); + } + }); + } + catch(CommunicatorDestroyedException exc) + { + } + } + + public static void check(AsyncResult r, ObjectPrx prx, String operation) + { + check(r, operation); + if(r.getProxy() != prx) + { + throw new IllegalArgumentException("Proxy for call to end_" + operation + + " does not match proxy that was used to call corresponding begin_" + + operation + " method"); + } + } + + public static void check(AsyncResult r, Connection con, String operation) + { + check(r, operation); + if(r.getConnection() != con) + { + throw new IllegalArgumentException("Connection for call to end_" + operation + + " does not match connection that was used to call corresponding begin_" + + operation + " method"); + } + } + + public static void check(AsyncResult r, Communicator com, String operation) + { + check(r, operation); + if(r.getCommunicator() != com) + { + throw new IllegalArgumentException("Communicator for call to end_" + operation + + " does not match communicator that was used to call corresponding " + + "begin_" + operation + " method"); + } + } + + public void cacheMessageBuffers() + { + } + + protected final void invokeCompleted() + { + // + // Note: no need to change the _state here, specializations are responsible for + // changing the state. + // + + if(_callback != null) + { + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader()); + } + + try + { + _callback.__completed(this); + } + catch(RuntimeException ex) + { + warning(ex); + } + catch(AssertionError exc) + { + error(exc); + } + catch(OutOfMemoryError exc) + { + error(exc); + } + finally + { + if(_instance.useApplicationClassLoader()) + { + Thread.currentThread().setContextClassLoader(null); + } + } + + cacheMessageBuffers(); + } + + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + } + + protected void + timeout() + { + IceInternal.RequestHandler handler; + synchronized(_monitor) + { + handler = _timeoutRequestHandler; + _timeoutRequestHandler = null; + } + + if(handler != null) + { + handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this, + new Ice.InvocationTimeoutException()); + } + } + + private static void check(AsyncResult r, String operation) + { + if(r == null) + { + throw new IllegalArgumentException("AsyncResult == null"); + } + else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality + { + throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " + + r.getOperation()); + } + } + + private final void warning(RuntimeException ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex); + _instance.initializationData().logger.warning(s); + } + } + + private final void error(Error error) + { + String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error); + _instance.initializationData().logger.error(s); + } + + protected Communicator _communicator; + protected IceInternal.Instance _instance; + protected String _operation; + protected Ice.Connection _cachedConnection; + + protected java.lang.Object _monitor = new java.lang.Object(); + protected IceInternal.BasicStream _is; + protected IceInternal.BasicStream _os; + + protected IceInternal.RequestHandler _timeoutRequestHandler; + protected java.util.concurrent.Future<?> _future; + + protected static final byte StateOK = 0x1; + protected static final byte StateDone = 0x2; + protected static final byte StateSent = 0x4; + protected static final byte StateEndCalled = 0x8; + protected static final byte StateCachedBuffers = 0x10; + + protected byte _state; + protected boolean _sentSynchronously; + protected Ice.Exception _exception; + + protected Ice.Instrumentation.InvocationObserver _observer; + protected Ice.Instrumentation.ChildInvocationObserver _childObserver; + + protected IceInternal.CallbackBase _callback; +} diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java deleted file mode 100644 index a5f958eb10b..00000000000 --- a/java/src/IceInternal/BatchOutgoing.java +++ /dev/null @@ -1,220 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -import Ice.Instrumentation.Observer; -import Ice.Instrumentation.InvocationObserver; - -public final class BatchOutgoing implements OutgoingMessageCallback -{ - public - BatchOutgoing(Ice.ConnectionI connection, Instance instance, String op) - { - _connection = connection; - _sent = false; - _os = new BasicStream(instance, Protocol.currentProtocolEncoding); - _observer = IceInternal.ObserverHelper.get(instance, op); - } - - public - BatchOutgoing(Ice.ObjectPrxHelperBase proxy, String op) - { - _proxy = proxy; - _sent = false; - _os = new BasicStream(proxy.__reference().getInstance(), Protocol.currentProtocolEncoding); - _observer = IceInternal.ObserverHelper.get(proxy, op); - Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol()); - } - - public void - invoke() - throws InterruptedException - { - assert(_proxy != null || _connection != null); - - if(_connection != null) - { - if(_connection.flushBatchRequests(this)) - { - return; - } - - synchronized(this) - { - while(_exception == null && !_sent) - { - wait(); - } - if(_exception != null) - { - throw _exception; - } - } - return; - } - - RequestHandler handler = null; - try - { - handler = _proxy.__getRequestHandler(); - if(handler.sendRequest(this)) - { - return; - } - - boolean timedOut = false; - synchronized(this) - { - int timeout = _proxy.__reference().getInvocationTimeout(); - if(timeout > 0) - { - long now = Time.currentMonotonicTimeMillis(); - long deadline = now + timeout; - while(_exception == null && !_sent && !timedOut) - { - wait(deadline - now); - if(_exception == null && !_sent) - { - now = Time.currentMonotonicTimeMillis(); - timedOut = now >= deadline; - } - } - } - else - { - while(_exception == null && !_sent) - { - wait(); - } - } - } - - if(timedOut) - { - if(handler.requestCanceled(this, new Ice.InvocationTimeoutException())) - { - synchronized(this) - { - while(_exception == null) - { - wait(); - } - } - } - } - - if(_exception != null) - { - throw (Ice.Exception)_exception.fillInStackTrace(); - } - } - catch(RetryException ex) - { - // - // Clear request handler but don't retry or throw. Retrying - // isn't useful, there were no batch requests associated with - // the proxy's request handler. - // - _proxy.__setRequestHandler(handler, null); - } - catch(Ice.Exception ex) - { - _proxy.__setRequestHandler(handler, null); // Clear request handler - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - throw ex; // Throw to notify the user that batch requests were potentially lost. - } - } - - @Override - public boolean - send(Ice.ConnectionI connection, boolean compress, boolean response) - { - return connection.flushBatchRequests(this); - } - - @Override - public void - invokeCollocated(CollocatedRequestHandler handler) - { - handler.invokeBatchRequests(this); - } - - @Override - synchronized public void - sent() - { - if(_childObserver != null) - { - _childObserver.detach(); - _childObserver = null; - } - _sent = true; - notify(); - } - - @Override - public synchronized void - finished(Ice.Exception ex) - { - if(_childObserver != null) - { - _childObserver.failed(ex.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - _exception = ex; - notify(); - } - - public BasicStream - os() - { - return _os; - } - - public void - attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int size) - { - if(_observer != null) - { - _childObserver = _observer.getRemoteObserver(info, endpt, 0, size); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - public void - attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) - { - if(_observer != null) - { - _childObserver = _observer.getCollocatedObserver(adapter, requestId, _os.size() - Protocol.headerSize - 4); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - private Ice.ObjectPrxHelperBase _proxy; - private Ice.ConnectionI _connection; - private BasicStream _os; - private boolean _sent; - private Ice.Exception _exception; - - private InvocationObserver _observer; - private Observer _childObserver; - -} diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index 74777db0d4b..d5953310639 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -9,16 +9,16 @@ package IceInternal; -public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable +public class BatchOutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback { - public BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) + BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) { super(communicator, instance, operation, callback); } @Override public int - __send(Ice.ConnectionI connection, boolean compress, boolean response) + send(Ice.ConnectionI connection, boolean compress, boolean response) { _cachedConnection = connection; return connection.flushAsyncBatchRequests(this); @@ -26,18 +26,18 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync @Override public int - __invokeCollocated(CollocatedRequestHandler handler) + invokeCollocated(CollocatedRequestHandler handler) { return handler.invokeAsyncBatchRequests(this); } @Override public boolean - __sent() + sent() { synchronized(_monitor) { - _state |= Done | OK | Sent; + _state |= StateDone | StateOK | StateSent; //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization if(_childObserver != null) { @@ -51,20 +51,30 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync _timeoutRequestHandler = null; } _monitor.notifyAll(); + + if(_callback == null || !_callback.__hasSentCallback()) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } return true; } } @Override public void - __invokeSent() + invokeSent() { - __invokeSentInternal(); + invokeSentInternal(); } @Override public void - __finished(Ice.Exception exc) + finished(Ice.Exception exc) { synchronized(_monitor) { @@ -81,29 +91,20 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync _timeoutRequestHandler = null; } } - __invokeException(exc); + invokeException(exc); } @Override public void - __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) + dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { - threadPool.dispatch( - new DispatchWorkItem(connection) + threadPool.dispatch(new DispatchWorkItem(connection) + { + @Override + public void run() { - @Override - public void - run() - { - BatchOutgoingAsync.this.__finished(ex); - } - }); - } - - @Override - public void - run() - { - __runTimerTask(); + BatchOutgoingAsync.this.finished(ex); + } + }); } } diff --git a/java/src/IceInternal/Buffer.java b/java/src/IceInternal/Buffer.java index b14d732c643..fc7f88966c1 100644 --- a/java/src/IceInternal/Buffer.java +++ b/java/src/IceInternal/Buffer.java @@ -21,7 +21,6 @@ public class Buffer this(maxCapacity, direct, java.nio.ByteOrder.LITTLE_ENDIAN); } - public Buffer(int maxCapacity, boolean direct, java.nio.ByteOrder order) { b = _emptyBuffer; @@ -32,13 +31,11 @@ public class Buffer _order = order; } - public Buffer(byte[] data) { this(data, java.nio.ByteOrder.LITTLE_ENDIAN); } - public Buffer(byte[] data, java.nio.ByteOrder order) { b = java.nio.ByteBuffer.wrap(data); @@ -50,13 +47,11 @@ public class Buffer _order = order; } - public Buffer(java.nio.ByteBuffer data) { this(data, java.nio.ByteOrder.LITTLE_ENDIAN); } - public Buffer(java.nio.ByteBuffer data, java.nio.ByteOrder order) { b = data; diff --git a/java/src/IceInternal/CallbackBase.java b/java/src/IceInternal/CallbackBase.java index 61c711f0308..13c9e40f708 100644 --- a/java/src/IceInternal/CallbackBase.java +++ b/java/src/IceInternal/CallbackBase.java @@ -13,6 +13,7 @@ public abstract class CallbackBase { public abstract void __completed(Ice.AsyncResult r); public abstract void __sent(Ice.AsyncResult r); + public abstract boolean __hasSentCallback(); public static void check(boolean cb) { diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 4c79b2e3c58..1d97dc9a8d6 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -11,39 +11,10 @@ package IceInternal; public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { - class InvokeAll extends DispatchWorkItem + private class InvokeAllAsync extends DispatchWorkItem { - public - InvokeAll(OutgoingMessageCallback out, BasicStream os, int requestId, int invokeNum, boolean batch) - { - _out = out; - _os = os; - _requestId = requestId; - _invokeNum = invokeNum; - _batch = batch; - } - - @Override - public void - run() - { - if(sent(_out)) - { - invokeAll(_os, _requestId, _invokeNum, _batch); - } - } - - private final OutgoingMessageCallback _out; - private final BasicStream _os; - private final int _requestId; - private final int _invokeNum; - private final boolean _batch; - }; - - class InvokeAllAsync extends DispatchWorkItem - { - public InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum, - boolean batch) + private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum, + boolean batch) { _outAsync = outAsync; _os = os; @@ -53,8 +24,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - public void - run() + public void run() { if(sentAsync(_outAsync)) { @@ -63,18 +33,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } private final OutgoingAsyncMessageCallback _outAsync; - private final BasicStream _os; + private BasicStream _os; private final int _requestId; private final int _invokeNum; private final boolean _batch; }; - private void - fillInValue(BasicStream os, int pos, int value) - { - os.rewriteInt(pos, value); - } - public CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter) { @@ -207,50 +171,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } @Override - public boolean - sendRequest(OutgoingMessageCallback out) - { - out.invokeCollocated(this); - return !_response && _reference.getInvocationTimeout() == 0; - } - - @Override public int sendAsyncRequest(OutgoingAsyncMessageCallback outAsync) { - return outAsync.__invokeCollocated(this); - } - - @Override - synchronized public boolean - requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) - { - Integer requestId = _sendRequests.get(out); - if(requestId != null) - { - if(requestId > 0) - { - _requests.remove(requestId); - } - out.finished(ex); - _sendRequests.remove(out); - return true; - } - else if(out instanceof Outgoing) - { - Outgoing o = (Outgoing)out; - assert(o != null); - for(java.util.Map.Entry<Integer, Outgoing> e : _requests.entrySet()) - { - if(e.getValue() == o) - { - out.finished(ex); - _requests.remove(e.getKey()); - return true; // We're done. - } - } - } - return false; + return outAsync.invokeCollocated(this); } @Override @@ -265,7 +189,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler _asyncRequests.remove(requestId); } _sendAsyncRequests.remove(outAsync); - outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); return true; // We're done } @@ -278,7 +202,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler if(e.getValue() == o) { _asyncRequests.remove(e.getKey()); - outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); return true; // We're done } } @@ -286,175 +210,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return false; } - public void - invokeRequest(Outgoing out) - { - int requestId = 0; - if(_reference.getInvocationTimeout() > 0 || _response) - { - synchronized(this) - { - if(_response) - { - requestId = ++_requestId; - _requests.put(requestId, out); - } - if(_reference.getInvocationTimeout() > 0) - { - _sendRequests.put(out, requestId); - } - } - } - - out.attachCollocatedObserver(_adapter, requestId); - - if(_reference.getInvocationTimeout() > 0) - { - // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. - _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), requestId, 1, false)); - } - else if(_dispatcher) - { - _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), requestId, 1, false)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out.sent(); - invokeAll(out.os(), requestId, 1, false); - } - } - - public int - invokeAsyncRequest(OutgoingAsync outAsync) - { - int requestId = 0; - if(_reference.getInvocationTimeout() > 0 || _response) - { - synchronized(this) - { - if(_response) - { - requestId = ++_requestId; - _asyncRequests.put(requestId, outAsync); - } - if(_reference.getInvocationTimeout() > 0) - { - _sendAsyncRequests.put(outAsync, requestId); - } - } - } - - outAsync.__attachCollocatedObserver(_adapter, requestId); - - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false)); - - return AsyncStatus.Queued; - } - - public void - invokeBatchRequests(BatchOutgoing out) - { - int invokeNum; - synchronized(this) - { - waitStreamInUse(); - invokeNum = _batchRequestNum; - - if(_batchRequestNum > 0) - { - if(_reference.getInvocationTimeout() > 0) - { - _sendRequests.put(out, 0); - } - - assert(!_batchStream.isEmpty()); - _batchStream.swap(out.os()); - - // - // Reset the batch stream. - // - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - out.attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - if(_reference.getInvocationTimeout() > 0) - { - _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), 0, invokeNum, true)); - } - else if(_dispatcher) - { - _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), 0, invokeNum, true)); - } - else // Optimization: directly call invokeAll if there's no dispatcher. - { - out.sent(); - invokeAll(out.os(), 0, invokeNum, true); - } - } - else - { - out.sent(); - } - } - - public int - invokeAsyncBatchRequests(BatchOutgoingAsync outAsync) - { - int invokeNum; - synchronized(this) - { - waitStreamInUse(); - - invokeNum = _batchRequestNum; - if(_batchRequestNum > 0) - { - if(_reference.getInvocationTimeout() > 0) - { - _sendAsyncRequests.put(outAsync, 0); - } - - assert(!_batchStream.isEmpty()); - _batchStream.swap(outAsync.__getOs()); - - // - // Reset the batch stream. - // - BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, - _batchAutoFlush); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchMarker = 0; - } - } - - outAsync.__attachCollocatedObserver(_adapter, 0); - - if(invokeNum > 0) - { - _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true)); - return AsyncStatus.Queued; - } - else if(outAsync.__sent()) - { - return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback; - } - else - { - return AsyncStatus.Sent; - } - } - @Override public void - sendResponse(int requestId, BasicStream os, byte status) + sendResponse(int requestId, final BasicStream os, byte status) { OutgoingAsync outAsync = null; synchronized(this) @@ -469,25 +227,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler TraceUtil.traceRecv(os, _logger, _traceLevels); } - Outgoing out = _requests.get(requestId); - if(out != null) - { - out.finished(os); - _requests.remove(requestId); - } - else + outAsync = _asyncRequests.get(requestId); + if(outAsync != null) { - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + _asyncRequests.remove(requestId); } } if(outAsync != null) { - outAsync.__finished(os); + outAsync.finished(os); } _adapter.decDirectCount(); } @@ -517,19 +266,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler OutgoingAsync outAsync = null; synchronized(this) { - Outgoing out = _requests.remove(requestId); - if(out != null) - { - out.finished(ex); - } - else - { - outAsync = _asyncRequests.remove(requestId); - } + outAsync = _asyncRequests.remove(requestId); } if(outAsync != null) { - outAsync.__finished(ex); + outAsync.finished(ex); } } _adapter.decDirectCount(); @@ -556,25 +297,104 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler return null; } - boolean - sent(OutgoingMessageCallback out) + void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous) { - if(_reference.getInvocationTimeout() > 0) + int requestId = 0; + if(_reference.getInvocationTimeout() > 0 || _response) { synchronized(this) { - if(_sendRequests.remove(out) == null) + if(_response) { - return false; // The request timed-out. + requestId = ++_requestId; + _asyncRequests.put(requestId, outAsync); + } + if(_reference.getInvocationTimeout() > 0) + { + _sendAsyncRequests.put(outAsync, requestId); } } } - out.sent(); - return true; + + outAsync.attachCollocatedObserver(_adapter, requestId); + + if(synchronous) + { + // + // Treat this collocated call as if it is a synchronous invocation. + // + if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0 || !_response) + { + // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise. + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + } + else if(_dispatcher) + { + _adapter.getThreadPool().dispatchFromThisThread( + new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + } + else // Optimization: directly call invokeAll if there's no dispatcher. + { + if(sentAsync(outAsync)) + { + invokeAll(outAsync.getOs(), requestId, 1, false); + } + } + } + else + { + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false)); + } + } + + int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync) + { + int invokeNum; + synchronized(this) + { + waitStreamInUse(); + + invokeNum = _batchRequestNum; + if(_batchRequestNum > 0) + { + if(_reference.getInvocationTimeout() > 0) + { + _sendAsyncRequests.put(outAsync, 0); + } + + assert(!_batchStream.isEmpty()); + _batchStream.swap(outAsync.getOs()); + + // + // Reset the batch stream. + // + BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding, + _batchAutoFlush); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchMarker = 0; + } + } + + outAsync.attachCollocatedObserver(_adapter, 0); + + if(invokeNum > 0) + { + _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), 0, invokeNum, true)); + return AsyncStatus.Queued; + } + else if(outAsync.sent()) + { + return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback; + } + else + { + return AsyncStatus.Sent; + } } - boolean - sentAsync(OutgoingAsyncMessageCallback outAsync) + private boolean + sentAsync(final OutgoingAsyncMessageCallback outAsync) { if(_reference.getInvocationTimeout() > 0) { @@ -586,14 +406,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } } - if(outAsync.__sent()) + + if(outAsync.sent()) { - outAsync.__invokeSent(); + outAsync.invokeSent(); } return true; } - void + private void invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch) { if(batch) @@ -668,7 +489,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } - void + private void handleException(int requestId, Ice.Exception ex) { if(requestId == 0) @@ -679,25 +500,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler OutgoingAsync outAsync = null; synchronized(this) { - Outgoing out = _requests.get(requestId); - if(out != null) - { - out.finished(ex); - _requests.remove(requestId); - } - else + outAsync = _asyncRequests.get(requestId); + if(outAsync != null) { - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + _asyncRequests.remove(requestId); } } if(outAsync != null) { - outAsync.__finished(ex); + outAsync.finished(ex); } } @@ -731,6 +543,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler } } + private void + fillInValue(BasicStream os, int pos, int value) + { + os.rewriteInt(pos, value); + } + private final Reference _reference; private final boolean _dispatcher; private final boolean _response; @@ -741,12 +559,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler private int _requestId; - private java.util.Map<OutgoingMessageCallback, Integer> _sendRequests = - new java.util.HashMap<OutgoingMessageCallback, Integer>(); private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests = new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>(); - private java.util.Map<Integer, Outgoing> _requests = new java.util.HashMap<Integer, Outgoing>(); private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>(); private BasicStream _batchStream; diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java index 3c3d07bcc4e..6ebd6fb3a3e 100644 --- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java +++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java @@ -9,7 +9,7 @@ package IceInternal; -public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult +public class CommunicatorBatchOutgoingAsync extends IceInternal.AsyncResultI { public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback) @@ -49,21 +49,21 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult @Override public boolean - __sent() + sent() { if(_childObserver != null) { _childObserver.detach(); _childObserver = null; } - check(false); + doCheck(false); return false; } // TODO: MJN: This is missing a test. @Override public void - __finished(Ice.Exception ex) + finished(Ice.Exception ex) { if(_childObserver != null) { @@ -71,12 +71,12 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult _childObserver.detach(); _childObserver = null; } - check(false); + doCheck(false); } @Override public void - __attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) + attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) { if(CommunicatorBatchOutgoingAsync.this._observer != null) { @@ -105,17 +105,17 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult } catch(Ice.LocalException ex) { - check(false); + doCheck(false); throw ex; } } public void ready() { - check(true); + doCheck(true); } - private void check(boolean userThread) + private void doCheck(boolean userThread) { synchronized(_monitor) { @@ -124,21 +124,32 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult { return; } - _state |= Done | OK | Sent; + _state |= StateDone | StateOK | StateSent; _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation _monitor.notifyAll(); } - // - // sentSynchronously_ is immutable here. - // - if(!_sentSynchronously || !userThread) + if(_callback == null || !_callback.__hasSentCallback()) { - __invokeSentAsync(); + if(_observer != null) + { + _observer.detach(); + _observer = null; + } } else { - __invokeSentInternal(); + // + // sentSynchronously_ is immutable here. + // + if(!_sentSynchronously || !userThread) + { + invokeSentAsync(); + } + else + { + invokeSentInternal(); + } } } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 9cc70b6a76d..25d9859b997 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -14,7 +14,7 @@ import Ice.ConnectionI; public class ConnectRequestHandler implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback { - static class Request + static private class Request { Request(BasicStream os) { @@ -22,17 +22,11 @@ public class ConnectRequestHandler this.os.swap(os); } - Request(OutgoingMessageCallback out) - { - this.out = out; - } - Request(OutgoingAsyncMessageCallback out) { this.outAsync = out; } - OutgoingMessageCallback out = null; OutgoingAsyncMessageCallback outAsync = null; BasicStream os = null; } @@ -135,29 +129,6 @@ public class ConnectRequestHandler } @Override - public boolean - sendRequest(OutgoingMessageCallback out) - throws RetryException - { - synchronized(this) - { - try - { - if(!initialized()) - { - _requests.add(new Request(out)); - return false; // Not sent - } - } - catch(Ice.LocalException ex) - { - throw new RetryException(ex); - } - } - return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response. - } - - @Override public int sendAsyncRequest(OutgoingAsyncMessageCallback out) throws RetryException @@ -177,37 +148,7 @@ public class ConnectRequestHandler throw new RetryException(ex); } } - return out.__send(_connection, _compress, _response); - } - - @Override - public boolean - requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) - { - synchronized(this) - { - if(_exception != null) - { - return false; // The request has been notified of a failure already. - } - - if(!initialized()) - { - java.util.Iterator<Request> it = _requests.iterator(); - while(it.hasNext()) - { - Request request = it.next(); - if(request.out == out) - { - out.finished(ex); - it.remove(); - return true; - } - } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. - } - } - return _connection.requestCanceled(out, ex); + return out.send(_connection, _compress, _response); } @Override @@ -230,7 +171,7 @@ public class ConnectRequestHandler if(request.outAsync == outAsync) { it.remove(); - outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); + outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null); return true; // We're done } } @@ -249,7 +190,8 @@ public class ConnectRequestHandler @Override synchronized public ConnectionI - getConnection() { + getConnection() + { if(_exception != null) { throw (Ice.LocalException)_exception.fillInStackTrace(); @@ -259,12 +201,17 @@ public class ConnectRequestHandler return _connection; } } - + @Override synchronized public ConnectionI waitForConnection() - throws InterruptedException + throws InterruptedException, RetryException { + if(_exception != null) + { + throw new RetryException(_exception); + } + // // Wait for the connection establishment to complete or fail. // @@ -440,13 +387,9 @@ public class ConnectRequestHandler while(p.hasNext()) { Request request = p.next(); - if(request.out != null) - { - request.out.send(_connection, _compress, _response); - } - else if(request.outAsync != null) + if(request.outAsync != null) { - if((request.outAsync.__send(_connection, _compress, _response) & + if((request.outAsync.send(_connection, _compress, _response) & AsyncStatus.InvokeSentCallback) > 0) { sentCallbacks.add(request.outAsync); @@ -485,14 +428,13 @@ public class ConnectRequestHandler assert(_exception == null && !_requests.isEmpty()); _exception = ex.get(); _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) + { + @Override + public void run() { - @Override - public void - run() - { - flushRequestsWithException(); - }; - }); + flushRequestsWithException(); + }; + }); } } catch(final Ice.LocalException ex) @@ -502,32 +444,29 @@ public class ConnectRequestHandler assert(_exception == null && !_requests.isEmpty()); _exception = ex; _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) + { + @Override + public void run() { - @Override - public void - run() - { - flushRequestsWithException(); - }; - }); + flushRequestsWithException(); + }; + }); } } if(!sentCallbacks.isEmpty()) { - _reference.getInstance().clientThreadPool().dispatch( - new DispatchWorkItem(_connection) + _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection) + { + @Override + public void run() { - @Override - public void - run() + for(OutgoingAsyncMessageCallback callback : sentCallbacks) { - for(OutgoingAsyncMessageCallback callback : sentCallbacks) - { - callback.__invokeSent(); - } - }; - }); + callback.invokeSent(); + } + }; + }); } // @@ -590,13 +529,9 @@ public class ConnectRequestHandler { for(Request request : _requests) { - if(request.out != null) - { - request.out.finished(_exception); - } - else if(request.outAsync != null) + if(request.outAsync != null) { - request.outAsync.__finished(_exception); + request.outAsync.finished(_exception); } } _requests.clear(); diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java index 47870ff2494..1b9fd2e0e3c 100644 --- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java +++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java @@ -26,7 +26,7 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync _sentSynchronously = true; if((status & AsyncStatus.InvokeSentCallback) > 0) { - __invokeSent(); + invokeSent(); } } } diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java index 4dcf9db63e8..5494b6f70e3 100644 --- a/java/src/IceInternal/ConnectionRequestHandler.java +++ b/java/src/IceInternal/ConnectionRequestHandler.java @@ -13,44 +13,31 @@ public class ConnectionRequestHandler implements RequestHandler { @Override public void - prepareBatchRequest(BasicStream out) throws RetryException { + prepareBatchRequest(BasicStream out) + throws RetryException + { _connection.prepareBatchRequest(out); } @Override public void - finishBatchRequest(BasicStream out) { + finishBatchRequest(BasicStream out) + { _connection.finishBatchRequest(out, _compress); } @Override public void - abortBatchRequest() { + abortBatchRequest() + { _connection.abortBatchRequest(); } @Override - public boolean - sendRequest(OutgoingMessageCallback out) - throws RetryException { - // - // Finished if sent and no response. - // - return out.send(_connection, _compress, _response) && !_response; - } - - @Override - public int - sendAsyncRequest(OutgoingAsyncMessageCallback out) - throws RetryException { - return out.__send(_connection, _compress, _response); - } - - @Override - public boolean - requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex) + public int sendAsyncRequest(OutgoingAsyncMessageCallback out) + throws RetryException { - return _connection.requestCanceled(out, ex); + return out.send(_connection, _compress, _response); } @Override diff --git a/java/src/IceInternal/DispatchObserverI.java b/java/src/IceInternal/DispatchObserverI.java index 4aab7c449bb..6dbc20c2066 100644 --- a/java/src/IceInternal/DispatchObserverI.java +++ b/java/src/IceInternal/DispatchObserverI.java @@ -43,7 +43,7 @@ public class DispatchObserverI } } - final MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>() + final private MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>() { @Override public void diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index ee49e23ddf2..532880174bd 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -9,7 +9,7 @@ package IceInternal; -public class EndpointHostResolver +class EndpointHostResolver { EndpointHostResolver(Instance instance) { @@ -31,8 +31,7 @@ public class EndpointHostResolver } } - public java.util.List<Connector> resolve(String host, int port, Ice.EndpointSelectionType selType, - IPEndpointI endpoint) + java.util.List<Connector> resolve(String host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint) { // // Try to get the addresses without DNS lookup. If this doesn't @@ -89,7 +88,7 @@ public class EndpointHostResolver return connectors; } - synchronized public void resolve(final String host, final int port, final Ice.EndpointSelectionType selType, final IPEndpointI endpoint, + synchronized void resolve(final String host, final int port, final Ice.EndpointSelectionType selType, final IPEndpointI endpoint, final EndpointI_connectors callback) { // @@ -172,7 +171,7 @@ public class EndpointHostResolver }); } - synchronized public void destroy() + synchronized void destroy() { assert(!_destroyed); _destroyed = true; @@ -184,7 +183,7 @@ public class EndpointHostResolver _executor.shutdown(); } - public void joinWithThread() + void joinWithThread() throws InterruptedException { // Wait for the executor to terminate. @@ -201,7 +200,7 @@ public class EndpointHostResolver } } - synchronized public void updateObserver() + synchronized void updateObserver() { Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) diff --git a/java/src/IceInternal/Functional_CallbackBase.java b/java/src/IceInternal/Functional_CallbackBase.java index 4665f8b49a6..5e344f32141 100644 --- a/java/src/IceInternal/Functional_CallbackBase.java +++ b/java/src/IceInternal/Functional_CallbackBase.java @@ -37,6 +37,12 @@ public abstract class Functional_CallbackBase extends IceInternal.CallbackBase } @Override + public final boolean __hasSentCallback() + { + return __sentCb != null; + } + + @Override public abstract void __completed(Ice.AsyncResult __result); protected final Functional_GenericCallback1<Ice.Exception> __exceptionCb; diff --git a/java/src/IceInternal/HttpParser.java b/java/src/IceInternal/HttpParser.java index b5567b080b6..c3532393cbd 100644 --- a/java/src/IceInternal/HttpParser.java +++ b/java/src/IceInternal/HttpParser.java @@ -20,7 +20,7 @@ final class HttpParser _state = State.Init; } - enum Type + private enum Type { Unknown, Request, @@ -627,17 +627,6 @@ final class HttpParser return _state == State.Complete; } - Type type() - { - return _type; - } - - String method() - { - assert(_type == Type.Request); - return _method.toString(); - } - String uri() { assert(_type == Type.Request); @@ -675,11 +664,6 @@ final class HttpParser return null; } - java.util.Map<String, String> headers() - { - return _headers; - } - private Type _type; private StringBuffer _method = new StringBuffer(); @@ -694,7 +678,7 @@ final class HttpParser private int _status; private String _reason; - enum State + private enum State { Init, Type, diff --git a/java/src/IceInternal/IncomingBase.java b/java/src/IceInternal/IncomingBase.java index e02c15565c6..6879e8cdf2f 100644 --- a/java/src/IceInternal/IncomingBase.java +++ b/java/src/IceInternal/IncomingBase.java @@ -9,7 +9,7 @@ package IceInternal; -public class IncomingBase +class IncomingBase { protected IncomingBase(Instance instance, ResponseHandler handler, Ice.ConnectionI connection, Ice.ObjectAdapter adapter, diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index d091c0fd45c..e52546b26fa 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -13,6 +13,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import Ice.CommunicatorDestroyedException; + public final class Instance { private class ObserverUpdaterI implements Ice.Instrumentation.ObserverUpdater @@ -605,12 +607,16 @@ public final class Instance public boolean queueRequests() { - return _hasQueueExecutor; + return _queueExecutor != null; } - public ExecutorService + synchronized public ExecutorService getQueueExecutor() { + if(_state == StateDestroyed) + { + throw new Ice.CommunicatorDestroyedException(); + } return _queueExecutor; } @@ -853,19 +859,11 @@ public final class Instance if(_initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0) { - _hasQueueExecutor = true; _queueExecutor = Executors.newFixedThreadPool(1, Util.createThreadFactory(_initData.properties, Util.createThreadName(_initData.properties, "Ice.BackgroundIO"))); - // - // If background IO is enabled message buffers cannot be cached. - // - _cacheMessageBuffers = 0; - } - else - { - _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2); } + _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2); } catch(Ice.LocalException ex) { @@ -1072,7 +1070,7 @@ public final class Instance ThreadPool serverThreadPool = null; ThreadPool clientThreadPool = null; EndpointHostResolver endpointHostResolver = null; - + ExecutorService queueExecutor = null; synchronized(this) { _objectAdapterFactory = null; @@ -1152,6 +1150,9 @@ public final class Instance _adminAdapter = null; _adminFacets.clear(); + queueExecutor = _queueExecutor; + _queueExecutor = null; + _typeToClassMap.clear(); _state = StateDestroyed; @@ -1180,18 +1181,18 @@ public final class Instance throw new Ice.OperationInterruptedException(); } - if(_queueExecutor != null) + if(queueExecutor != null) { - _queueExecutor.shutdown(); + queueExecutor.shutdown(); try { - _queueExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + queueExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { throw new Ice.OperationInterruptedException(); } - _queueExecutor = null; + queueExecutor = null; } if(_initData.properties.getPropertyAsInt("Ice.Warn.UnusedProperties") > 0) @@ -1334,6 +1335,5 @@ public final class Instance final private boolean _useApplicationClassLoader; private static boolean _oneOffDone = false; - private boolean _hasQueueExecutor = false; private ExecutorService _queueExecutor; } diff --git a/java/src/IceInternal/InvocationObserverI.java b/java/src/IceInternal/InvocationObserverI.java index 8c9ffc57d32..bf96a9c0d0d 100644 --- a/java/src/IceInternal/InvocationObserverI.java +++ b/java/src/IceInternal/InvocationObserverI.java @@ -205,10 +205,10 @@ public class InvocationObserverI delegate = _delegate.getRemoteObserver(con, edpt, requestId, sz); } return getObserver("Remote", - new RemoteInvocationHelper(con, edpt, requestId, sz), - RemoteMetrics.class, - RemoteObserverI.class, - delegate); + new RemoteInvocationHelper(con, edpt, requestId, sz), + RemoteMetrics.class, + RemoteObserverI.class, + delegate); } @Override @@ -220,12 +220,11 @@ public class InvocationObserverI { delegate = _delegate.getCollocatedObserver(adapter, requestId, sz); } - return getObserver( - "Collocated", - new CollocatedInvocationHelper(adapter, requestId, sz), - CollocatedMetrics.class, - CollocatedObserverI.class, - delegate); + return getObserver("Collocated", + new CollocatedInvocationHelper(adapter, requestId, sz), + CollocatedMetrics.class, + CollocatedObserverI.class, + delegate); } final MetricsUpdate<InvocationMetrics> _incrementRetry = new MetricsUpdate<InvocationMetrics>() diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java index c06e7bf9550..324ccbe124d 100644 --- a/java/src/IceInternal/Network.java +++ b/java/src/IceInternal/Network.java @@ -487,94 +487,6 @@ public final class Network } } - public static java.nio.channels.SocketChannel - doAccept(java.nio.channels.ServerSocketChannel fd, int timeout) - { - java.nio.channels.SocketChannel result = null; - while(result == null) - { - try - { - result = fd.accept(); - if(result == null) - { - java.nio.channels.Selector selector = java.nio.channels.Selector.open(); - - try - { - while(true) - { - try - { - fd.register(selector, java.nio.channels.SelectionKey.OP_ACCEPT); - int n; - if(timeout > 0) - { - n = selector.select(timeout); - } - else if(timeout == 0) - { - n = selector.selectNow(); - } - else - { - n = selector.select(); - } - - if(n == 0) - { - throw new Ice.TimeoutException(); - } - - break; - } - catch(java.io.IOException ex) - { - if(interrupted(ex)) - { - continue; - } - throw new Ice.SocketException(ex); - } - } - } - finally - { - try - { - selector.close(); - } - catch(java.io.IOException ex) - { - // Ignore - } - } - } - } - catch(java.io.IOException ex) - { - if(interrupted(ex)) - { - continue; - } - throw new Ice.SocketException(ex); - } - } - - try - { - java.net.Socket socket = result.socket(); - socket.setTcpNoDelay(true); - socket.setKeepAlive(true); - } - catch(java.io.IOException ex) - { - throw new Ice.SocketException(ex); - } - - return result; - } - public static void setSendBufferSize(java.nio.channels.SocketChannel fd, int size) { @@ -1189,22 +1101,6 @@ public final class Network } public static String - fdToString(java.net.Socket fd) - { - if(fd == null) - { - return "<closed>"; - } - - java.net.InetAddress localAddr = fd.getLocalAddress(); - int localPort = fd.getLocalPort(); - java.net.InetAddress remoteAddr = fd.getInetAddress(); - int remotePort = fd.getPort(); - - return addressesToString(localAddr, localPort, remoteAddr, remotePort); - } - - public static String addressesToString(java.net.InetAddress localAddr, int localPort, java.net.InetAddress remoteAddr, int remotePort, NetworkProxy proxy, java.net.InetSocketAddress target) { diff --git a/java/src/IceInternal/OpaqueEndpointI.java b/java/src/IceInternal/OpaqueEndpointI.java index b4f291f7751..b14beda4036 100644 --- a/java/src/IceInternal/OpaqueEndpointI.java +++ b/java/src/IceInternal/OpaqueEndpointI.java @@ -174,14 +174,6 @@ final class OpaqueEndpointI extends EndpointI } // - // Get the encoded endpoint. - // - public byte[] rawBytes() - { - return _rawBytes; - } - - // // Return a server side transceiver for this endpoint, or null if a // transceiver can only be created by an acceptor. In case a // transceiver is created, this operation also returns a new diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java deleted file mode 100644 index eec1e4eb8f0..00000000000 --- a/java/src/IceInternal/Outgoing.java +++ /dev/null @@ -1,736 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -import Ice.Instrumentation.ChildInvocationObserver; -import Ice.Instrumentation.InvocationObserver; - -public final class Outgoing implements OutgoingMessageCallback -{ - public - Outgoing(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context, - boolean explicitCtx) - { - Reference ref = proxy.__reference(); - _state = StateUnsent; - _sent = false; - _proxy = proxy; - _mode = mode; - _handler = null; - _observer = IceInternal.ObserverHelper.get(proxy, op, context); - _encoding = Protocol.getCompatibleEncoding(ref.getEncoding()); - _os = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding); - - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol())); - - writeHeader(op, mode, context, explicitCtx); - } - - // - // These functions allow this object to be reused, rather than reallocated. - // - public void - reset(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context, - boolean explicitCtx) - { - Reference ref = proxy.__reference(); - _state = StateUnsent; - _exception = null; - _sent = false; - _proxy = proxy; - _mode = mode; - _handler = null; - _observer = IceInternal.ObserverHelper.get(proxy, op, context); - _encoding = Protocol.getCompatibleEncoding(ref.getEncoding()); - - Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol())); - - writeHeader(op, mode, context, explicitCtx); - } - - public void - detach() - { - if(_observer != null) - { - _observer.detach(); - } - } - - public void - reclaim() - { - if(_is != null) - { - _is.reset(); - } - _os.reset(); - } - - // Returns true if ok, false if user exception. - public boolean - invoke() - { - assert(_state == StateUnsent); - - int mode = _proxy.__reference().getMode(); - if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) - { - _state = StateInProgress; - _handler.finishBatchRequest(_os); - return true; - } - - int cnt = 0; - while(true) - { - try - { - _state = StateInProgress; - _exception = null; - _sent = false; - - _handler = _proxy.__getRequestHandler(); - try - { - if(_handler.sendRequest(this)) // Request sent and no response expected, we're done. - { - return true; - } - } - catch(Ice.OperationInterruptedException ex) - { - if(_handler.requestCanceled(this, ex)) - { - // - // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. - // In this case, the exception should be set on the Outgoing. - // - synchronized(this) - { - boolean interrupted = false; - while(_exception == null) - { - try - { - wait(); - } - catch(InterruptedException ex2) - { - interrupted = true; - } - } - if(interrupted) - { - Thread.currentThread().interrupt(); - } - } - } - else - { - throw ex; - } - } - - boolean timedOut = false; - synchronized(this) - { - // - // If the handler says it's not finished, we wait until we're done. - // - - int invocationTimeout = _proxy.__reference().getInvocationTimeout(); - if(invocationTimeout > 0) - { - long now = Time.currentMonotonicTimeMillis(); - long deadline = now + invocationTimeout; - while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) - { - try - { - wait(deadline - now); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } - if((_state == StateInProgress || !_sent) && _state != StateFailed) - { - now = Time.currentMonotonicTimeMillis(); - timedOut = now >= deadline; - } - } - } - else - { - while((_state == StateInProgress || !_sent) && _state != StateFailed) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - throw new Ice.OperationInterruptedException(); - } - } - } - } - - if(timedOut) - { - Ice.InvocationTimeoutException ex = new Ice.InvocationTimeoutException(); - if(_handler.requestCanceled(this, ex)) - { - // - // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. - // In this case, the exception should be set on the Outgoing. - // - synchronized(this) - { - boolean interrupted = false; - while(_exception == null) - { - try - { - wait(); - } - catch(InterruptedException e) - { - interrupted = true; - } - } - if(interrupted) - { - Thread.currentThread().interrupt(); - } - } - } - else - { - throw ex; - } - } - - if(_exception != null) - { - throw (Ice.Exception)_exception.fillInStackTrace(); - } - else - { - assert(_state != StateInProgress); - return _state == StateOK; - } - } - catch(RetryException ex) - { - _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. - } - catch(Ice.Exception ex) - { - try - { - Ice.Holder<Integer> interval = new Ice.Holder<Integer>(); - cnt = _proxy.__handleException(ex, _handler, _mode, _sent, interval, cnt); - if(_observer != null) - { - _observer.retried(); // Invocation is being retried. - } - if(interval.value > 0) - { - try - { - Thread.sleep(interval.value); - } - catch(InterruptedException exi) - { - throw new Ice.OperationInterruptedException(); - } - } - } - catch(Ice.Exception exc) - { - if(_observer != null) - { - _observer.failed(exc.ice_name()); - } - throw exc; - } - } - } - } - - public void - abort(Ice.LocalException ex) - { - assert(_state == StateUnsent); - - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - int mode = _proxy.__reference().getMode(); - if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) - { - _handler.abortBatchRequest(); - } - - throw ex; - } - - @Override - public boolean - send(Ice.ConnectionI connection, boolean compress, boolean response) - throws RetryException - { - return connection.sendRequest(this, compress, response); - } - - @Override - public void - invokeCollocated(CollocatedRequestHandler handler) - { - handler.invokeRequest(this); - } - - @Override - synchronized public void - sent() - { - if(_proxy.__reference().getMode() != Reference.ModeTwoway) - { - if(_childObserver != null) - { - _childObserver.detach(); - _childObserver = null; - } - _state = StateOK; - } - _sent = true; - notify(); - } - - public synchronized void - finished(BasicStream is) - { - assert(_proxy.__reference().getMode() == Reference.ModeTwoway); // Only for twoways. - - assert(_state <= StateInProgress); - - if(_childObserver != null) - { - _childObserver.reply(is.size() - Protocol.headerSize - 4); - _childObserver.detach(); - _childObserver = null; - } - - if(_is == null) - { - _is = new IceInternal.BasicStream(_proxy.__reference().getInstance(), Protocol.currentProtocolEncoding); - } - _is.swap(is); - byte replyStatus = _is.readByte(); - - switch(replyStatus) - { - case ReplyStatus.replyOK: - { - _state = StateOK; // The state must be set last, in case there is an exception. - break; - } - - case ReplyStatus.replyUserException: - { - if(_observer != null) - { - _observer.userException(); - } - _state = StateUserException; // The state must be set last, in case there is an exception. - break; - } - - case ReplyStatus.replyObjectNotExist: - case ReplyStatus.replyFacetNotExist: - case ReplyStatus.replyOperationNotExist: - { - Ice.RequestFailedException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } - - case ReplyStatus.replyFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } - - case ReplyStatus.replyOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } - - default: - { - assert(false); - break; - } - } - - ex.id = new Ice.Identity(); - ex.id.__read(_is); - - // - // For compatibility with the old FacetPath. - // - String[] facetPath = _is.readStringSeq(); - if(facetPath.length > 0) - { - if(facetPath.length > 1) - { - throw new Ice.MarshalException(); - } - ex.facet = facetPath[0]; - } - else - { - ex.facet = ""; - } - - ex.operation = _is.readString(); - _exception = ex; - - _state = StateLocalException; // The state must be set last, in case there is an exception. - break; - } - - case ReplyStatus.replyUnknownException: - case ReplyStatus.replyUnknownLocalException: - case ReplyStatus.replyUnknownUserException: - { - Ice.UnknownException ex = null; - switch(replyStatus) - { - case ReplyStatus.replyUnknownException: - { - ex = new Ice.UnknownException(); - break; - } - - case ReplyStatus.replyUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } - - case ReplyStatus.replyUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } - - default: - { - assert(false); - break; - } - } - - ex.unknown = _is.readString(); - _exception = ex; - - _state = StateLocalException; // The state must be set last, in case there is an exception. - break; - } - - default: - { - _exception = new Ice.UnknownReplyStatusException(); - _state = StateLocalException; - break; - } - } - - notify(); - } - - @Override - public synchronized void - finished(Ice.Exception ex) - { - //assert(_state <= StateInProgress); - if(_state > StateInProgress) - { - // - // Response was already received but message - // didn't get removed first from the connection - // send message queue so it's possible we can be - // notified of failures. In this case, ignore the - // failure and assume the outgoing has been sent. - // - assert(_state != StateFailed); - _sent = true; - notify(); - return; - } - - if(_childObserver != null) - { - _childObserver.failed(ex.ice_name()); - _childObserver.detach(); - _childObserver = null; - } - _state = StateFailed; - _exception = ex; - notify(); - } - - public BasicStream - os() - { - return _os; - } - - public BasicStream - startReadParams() - { - _is.startReadEncaps(); - return _is; - } - - public void - endReadParams() - { - _is.endReadEncaps(); - } - - public void - readEmptyParams() - { - _is.skipEmptyEncaps(null); - } - - public byte[] - readParamEncaps() - { - return _is.readEncaps(null); - } - - public BasicStream - startWriteParams(Ice.FormatType format) - { - _os.startWriteEncaps(_encoding, format); - return _os; - } - - public void - endWriteParams() - { - _os.endWriteEncaps(); - } - - public void - writeEmptyParams() - { - _os.writeEmptyEncaps(_encoding); - } - - public void - writeParamEncaps(byte[] encaps) - { - if(encaps == null || encaps.length == 0) - { - _os.writeEmptyEncaps(_encoding); - } - else - { - _os.writeEncaps(encaps); - } - } - - public boolean - hasResponse() - { - return _is != null && !_is.isEmpty(); - } - - public void - throwUserException() - throws Ice.UserException - { - try - { - _is.startReadEncaps(); - _is.throwException(null); - } - catch(Ice.UserException ex) - { - _is.endReadEncaps(); - throw ex; - } - } - - public void - attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size) - { - if(_observer != null) - { - _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - public void - attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId) - { - if(_observer != null) - { - _childObserver = _observer.getCollocatedObserver(adapter, requestId, _os.size() - Protocol.headerSize - 4); - if(_childObserver != null) - { - _childObserver.attach(); - } - } - } - - private void - writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, boolean explicitCtx) - { - if(explicitCtx && context == null) - { - context = _emptyContext; - } - - switch(_proxy.__reference().getMode()) - { - case Reference.ModeTwoway: - case Reference.ModeOneway: - case Reference.ModeDatagram: - { - _os.writeBlob(IceInternal.Protocol.requestHdr); - break; - } - - case Reference.ModeBatchOneway: - case Reference.ModeBatchDatagram: - { - while(true) - { - try - { - _handler = _proxy.__getRequestHandler(); - _handler.prepareBatchRequest(_os); - break; - } - catch(RetryException ex) - { - _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. - } - catch(Ice.LocalException ex) - { - if(_observer != null) - { - _observer.failed(ex.ice_name()); - } - _proxy.__setRequestHandler(_handler, null); // Clear request handler - throw ex; - } - } - break; - } - } - - try - { - _proxy.__reference().getIdentity().__write(_os); - - // - // For compatibility with the old FacetPath. - // - String facet = _proxy.__reference().getFacet(); - if(facet == null || facet.length() == 0) - { - _os.writeStringSeq(null); - } - else - { - String[] facetPath = { facet }; - _os.writeStringSeq(facetPath); - } - - _os.writeString(operation); - - _os.writeByte((byte)mode.value()); - - if(context != null) - { - // - // Explicit context - // - Ice.ContextHelper.write(_os, context); - } - else - { - // - // Implicit context - // - Ice.ImplicitContextI implicitContext = _proxy.__reference().getInstance().getImplicitContext(); - java.util.Map<String, String> prxContext = _proxy.__reference().getContext(); - - if(implicitContext == null) - { - Ice.ContextHelper.write(_os, prxContext); - } - else - { - implicitContext.write(prxContext, _os); - } - } - } - catch(Ice.LocalException ex) - { - abort(ex); - } - } - - private Ice.ObjectPrxHelperBase _proxy; - private Ice.OperationMode _mode; - private RequestHandler _handler; - private Ice.EncodingVersion _encoding; - private BasicStream _is; - private BasicStream _os; - private boolean _sent; - private Ice.Exception _exception; - - private static final int StateUnsent = 0; - private static final int StateInProgress = 1; - private static final int StateOK = 2; - private static final int StateUserException = 3; - private static final int StateLocalException = 4; - private static final int StateFailed = 5; - private int _state; - - private InvocationObserver _observer; - private ChildInvocationObserver _childObserver; - - public Outgoing next; // For use by Ice.ObjectPrxHelperBase - - private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); -} diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index dbb6ac37ab9..0618ffb93e9 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -9,23 +9,33 @@ package IceInternal; -public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable +public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback { public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb) { - super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, cb); - _proxy = (Ice.ObjectPrxHelperBase)prx; + super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb); + _proxy = (Ice.ObjectPrxHelperBase) prx; _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); } - - public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, - boolean explicitCtx) + + public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is, + IceInternal.BasicStream os) + { + super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb, + is, os); + _proxy = (Ice.ObjectPrxHelperBase) prx; + _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding()); + } + + public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx, + boolean explicitCtx, boolean synchronous) { _handler = null; _cnt = 0; _sent = false; _mode = mode; _sentSynchronously = false; + _synchronous = synchronous; Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol())); @@ -36,15 +46,47 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _observer = ObserverHelper.get(_proxy, operation, ctx); - // - // Can't call async via a batch proxy. - // - if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram()) + switch(_proxy.__reference().getMode()) { - throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI"); - } + case Reference.ModeTwoway: + case Reference.ModeOneway: + case Reference.ModeDatagram: + { + _os.writeBlob(IceInternal.Protocol.requestHdr); + break; + } - _os.writeBlob(IceInternal.Protocol.requestHdr); + case Reference.ModeBatchOneway: + case Reference.ModeBatchDatagram: + { + while(true) + { + try + { + _handler = _proxy.__getRequestHandler(); + _handler.prepareBatchRequest(_os); + break; + } + catch(RetryException ex) + { + // Clear request handler and retry. + _proxy.__setRequestHandler(_handler, null); + } + catch(Ice.LocalException ex) + { + if(_observer != null) + { + _observer.failed(ex.ice_name()); + } + // Clear request handler + _proxy.__setRequestHandler(_handler, null); + _handler = null; + throw ex; + } + } + break; + } + } Reference ref = _proxy.__reference(); @@ -66,7 +108,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _os.writeString(operation); - _os.writeByte((byte)mode.value()); + _os.writeByte((byte) mode.value()); if(ctx != null) { @@ -94,39 +136,44 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } } - @Override public Ice.ObjectPrx - getProxy() + @Override + public Ice.ObjectPrx getProxy() { return _proxy; } @Override - public int - __send(Ice.ConnectionI connection, boolean compress, boolean response) - throws RetryException + public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { _cachedConnection = connection; return connection.sendAsyncRequest(this, compress, response); } @Override - public int - __invokeCollocated(CollocatedRequestHandler handler) + public int invokeCollocated(CollocatedRequestHandler handler) { - return handler.invokeAsyncRequest(this); + // The BasicStream cannot be cached if background io is enabled, + // the proxy is not a twoway or there is an invocation timeout set. + if(_proxy.__reference().getInstance().queueRequests() || !_proxy.ice_isTwoway() || + _proxy.__reference().getInvocationTimeout() > 0) + { + // Disable caching by marking the streams as cached! + _state |= StateCachedBuffers; + } + handler.invokeAsyncRequest(this, _synchronous); + return AsyncStatus.Queued; } @Override - public boolean - __sent() + public boolean sent() { synchronized(_monitor) { - boolean alreadySent = (_state & Sent) != 0; - _state |= Sent; + boolean alreadySent = (_state & StateSent) != 0; + _state |= StateSent; _sent = true; - assert((_state & Done) == 0); + assert ((_state & StateDone) == 0); if(!_proxy.ice_isTwoway()) { @@ -135,34 +182,40 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _childObserver.detach(); _childObserver = null; } + if(_observer != null && (_callback == null || !_callback.__hasSentCallback())) + { + _observer.detach(); + _observer = null; + } if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - _state |= Done | OK; - //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization + _state |= StateDone | StateOK; + // _os.resize(0, false); // Don't clear the buffer now, it's + // needed for the collocation optimization } _monitor.notifyAll(); - return !alreadySent; // Don't call the sent call is already sent. + + // Don't call the sent call is already sent. + return !alreadySent && _callback != null && _callback.__hasSentCallback(); } } @Override - public void - __invokeSent() + public void invokeSent() { - __invokeSentInternal(); + invokeSentInternal(); } @Override - public void - __finished(Ice.Exception exc) + public void finished(Ice.Exception exc) { synchronized(_monitor) { - assert((_state & Done) == 0); + assert ((_state & StateDone) == 0); if(_childObserver != null) { _childObserver.failed(exc.ice_name()); @@ -178,8 +231,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. + // NOTE: at this point, synchronization isn't needed, no other threads + // should be calling on the callback. // try { @@ -188,41 +241,37 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa return; // Can't be retried immediately. } - __invoke(false); // Retry the invocation + invoke(false); // Retry the invocation } catch(Ice.Exception ex) { - __invokeException(ex); + invokeException(ex); } } @Override - public void - __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) + public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection) { - threadPool.dispatch( - new DispatchWorkItem(connection) + threadPool.dispatch(new DispatchWorkItem(connection) + { + @Override + public void run() { - @Override - public void - run() - { - OutgoingAsync.this.__finished(ex); - } - }); + OutgoingAsync.this.finished(ex); + } + }); } - public final void - __finished(BasicStream is) + public final void finished(BasicStream is) { - assert(_proxy.ice_isTwoway()); // Can only be called for twoways. + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; try { synchronized(_monitor) { - assert(_exception == null && (_state & Done) == 0); + assert (_exception == null && (_state & StateDone) == 0); if(_childObserver != null) { _childObserver.reply(is.size() - Protocol.headerSize - 4); @@ -237,7 +286,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _timeoutRequestHandler = null; } - if(_is == null) // _is can already be initialized if the invocation is retried + // _is can already be initialized if the invocation is retried + if(_is == null) { _is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding); } @@ -290,29 +340,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa Ice.RequestFailedException ex = null; switch(replyStatus) { - case ReplyStatus.replyObjectNotExist: - { - ex = new Ice.ObjectNotExistException(); - break; - } + case ReplyStatus.replyObjectNotExist: + { + ex = new Ice.ObjectNotExistException(); + break; + } - case ReplyStatus.replyFacetNotExist: - { - ex = new Ice.FacetNotExistException(); - break; - } + case ReplyStatus.replyFacetNotExist: + { + ex = new Ice.FacetNotExistException(); + break; + } - case ReplyStatus.replyOperationNotExist: - { - ex = new Ice.OperationNotExistException(); - break; - } + case ReplyStatus.replyOperationNotExist: + { + ex = new Ice.OperationNotExistException(); + break; + } - default: - { - assert(false); - break; - } + default: + { + assert (false); + break; + } } ex.id = id; @@ -330,29 +380,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa Ice.UnknownException ex = null; switch(replyStatus) { - case ReplyStatus.replyUnknownException: - { - ex = new Ice.UnknownException(); - break; - } + case ReplyStatus.replyUnknownException: + { + ex = new Ice.UnknownException(); + break; + } - case ReplyStatus.replyUnknownLocalException: - { - ex = new Ice.UnknownLocalException(); - break; - } + case ReplyStatus.replyUnknownLocalException: + { + ex = new Ice.UnknownLocalException(); + break; + } - case ReplyStatus.replyUnknownUserException: - { - ex = new Ice.UnknownUserException(); - break; - } + case ReplyStatus.replyUnknownUserException: + { + ex = new Ice.UnknownUserException(); + break; + } - default: - { - assert(false); - break; - } + default: + { + assert (false); + break; + } } ex.unknown = unknown; @@ -365,34 +415,48 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } } - _state |= Done; - _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation + _state |= StateDone; + // Clear buffer now, instead of waiting for AsyncResult + // deallocation + // _os.resize(0, false); if(replyStatus == ReplyStatus.replyOK) { - _state |= OK; + _state |= StateOK; } _monitor.notifyAll(); } } catch(Ice.LocalException ex) { - __finished(ex); + finished(ex); return; } - assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - __invokeCompleted(); + assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); + invokeCompleted(); } - public final boolean - __invoke(boolean synchronous) + public final boolean invoke(boolean synchronous) { + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) + { + _state |= StateDone | StateOK; + _handler.finishBatchRequest(_os); + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return true; + } + while(true) { - _handler = _proxy.__getRequestHandler(); try { _sent = false; + _handler = _proxy.__getRequestHandler(); int status = _handler.sendAsyncRequest(this); if((status & AsyncStatus.Sent) > 0) { @@ -401,29 +465,36 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _sentSynchronously = true; if((status & AsyncStatus.InvokeSentCallback) > 0) { - __invokeSent(); // Call from the user thread. + invokeSent(); // Call from the user thread. } } else { if((status & AsyncStatus.InvokeSentCallback) > 0) { - __invokeSentAsync(); // Call from a client thread pool thread. + // Call from a client thread pool thread. + invokeSentAsync(); } } } - if(_proxy.ice_isTwoway() || (status & AsyncStatus.Sent) == 0) + if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0) { synchronized(_monitor) { - if((_state & Done) == 0) + if((_state & StateDone) == 0) { int invocationTimeout = _handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { - _future = _instance.timer().schedule(this, invocationTimeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + _future = _instance.timer().schedule(new Runnable() + { + @Override + public void run() + { + timeout(); + } + }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); _timeoutRequestHandler = _handler; } } @@ -431,22 +502,15 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa } break; } - catch(Ice.OperationInterruptedException ex) - { - // - // Clear the request handler, and cancel the outgoing request. - // - _proxy.__setRequestHandler(_handler, null); - _handler.asyncRequestCanceled(this, ex); - break; - } catch(RetryException ex) { - _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry. + // Clear request handler and retry. + _proxy.__setRequestHandler(_handler, null); } catch(Ice.Exception ex) { - if(!handleException(ex)) // This will throw if the invocation can't be retried. + // This will throw if the invocation can't be retried. + if(!handleException(ex)) { break; // Can't be retried immediately. } @@ -455,27 +519,23 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa return _sentSynchronously; } - public BasicStream - __startWriteParams(Ice.FormatType format) + public BasicStream startWriteParams(Ice.FormatType format) { _os.startWriteEncaps(_encoding, format); return _os; } - public void - __endWriteParams() + public void endWriteParams() { _os.endWriteEncaps(); } - public void - __writeEmptyParams() + public void writeEmptyParams() { _os.writeEmptyEncaps(_encoding); } - public void - __writeParamEncaps(byte[] encaps) + public void writeParamEncaps(byte[] encaps) { if(encaps == null || encaps.length == 0) { @@ -486,22 +546,50 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa _os.writeEncaps(encaps); } } - - BasicStream - __getIs() + + public void cacheMessageBuffers() { - return _is; - } + if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0) + { + synchronized(_monitor) + { + if((_state & StateCachedBuffers) > 0) { + return; + } + _state |= StateCachedBuffers; + } + if(_is != null) + { + _is.reset(); + } + _os.reset(); + + _proxy.cacheMessageBuffers(_is, _os); + } + } + @Override - public void - run() + public void invokeExceptionAsync(final Ice.Exception ex) { - __runTimerTask(); - } + if((_state & StateDone) == 0 && _handler != null) + { + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + int mode = _proxy.__reference().getMode(); + if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram) + { + _handler.abortBatchRequest(); + } + } - private boolean - handleException(Ice.Exception exc) + super.invokeExceptionAsync(ex); + } + + private boolean handleException(Ice.Exception exc) { try { @@ -514,7 +602,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa if(interval.value > 0) { _instance.retryQueue().add(this, interval.value); - return false; // Don't retry immediately, the retry queue will take care of the retry. + return false; // Don't retry immediately, the retry queue will + // take care of the retry. } else { @@ -538,6 +627,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa private int _cnt; private Ice.OperationMode _mode; private boolean _sent; + // + // If true this AMI request is being used for a generated synchronous invocation. + // + private boolean _synchronous; + private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>(); } diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java index 6c2259b35be..95c5fd2bb45 100644 --- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java +++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java @@ -18,32 +18,33 @@ public interface OutgoingAsyncMessageCallback // // Called by the request handler to send the request over the connection. // - int __send(Ice.ConnectionI conection, boolean compress, boolean response) + int send(Ice.ConnectionI conection, boolean compress, boolean response) throws RetryException; - int __invokeCollocated(CollocatedRequestHandler handler); + int invokeCollocated(CollocatedRequestHandler handler); // - // Called by the connection when the message is confirmed sent. The connection is locked - // when this is called so this method can call the sent callback. Instead, this method - // returns true if there's a sent callback and false otherwise. If true is returned, the - // connection will call the __invokeSent() method bellow (which in turn should call the - // sent callback). + // Called by the connection when the message is confirmed sent. The + // connection is locked when this is called so this method can't call the + // sent callback. Instead, this method returns true if there's a sent + // callback and false otherwise. If true is returned, the connection will + // call the __invokeSent() method bellow (which in turn should call the sent + // callback). // - boolean __sent(); + boolean sent(); // // Called by the connection to call the user sent callback. // - void __invokeSent(); + void invokeSent(); // // Called by the connection when the request failed. // - void __finished(Ice.Exception ex); + void finished(Ice.Exception ex); // // Helper to dispatch the cancellation exception. // - void __dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection); + void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection); } diff --git a/java/src/IceInternal/OutgoingMessageCallback.java b/java/src/IceInternal/OutgoingMessageCallback.java deleted file mode 100644 index 1a8426b2e19..00000000000 --- a/java/src/IceInternal/OutgoingMessageCallback.java +++ /dev/null @@ -1,22 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -package IceInternal; - -public interface OutgoingMessageCallback -{ - boolean send(Ice.ConnectionI conection, boolean compress, boolean response) - throws RetryException; - - void invokeCollocated(CollocatedRequestHandler handler); - - void sent(); - - void finished(Ice.Exception ex); -} diff --git a/java/src/IceInternal/Protocol.java b/java/src/IceInternal/Protocol.java index a129e406a22..6e68fd1cf7f 100644 --- a/java/src/IceInternal/Protocol.java +++ b/java/src/IceInternal/Protocol.java @@ -181,12 +181,6 @@ final public class Protocol } static public boolean - isSupported(Ice.ProtocolVersion version, Ice.ProtocolVersion supported) - { - return version.major == supported.major && version.minor <= supported.minor; - } - - static public boolean isSupported(Ice.EncodingVersion version, Ice.EncodingVersion supported) { return version.major == supported.major && version.minor <= supported.minor; diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java index 4f24a1919e6..009319110fe 100644 --- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java +++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java @@ -32,20 +32,26 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync _sentSynchronously = true; if((status & AsyncStatus.InvokeSentCallback) > 0) { - __invokeSent(); + invokeSent(); } } else { synchronized(_monitor) { - if((_state & Done) == 0) + if((_state & StateDone) == 0) { int invocationTimeout = handler.getReference().getInvocationTimeout(); if(invocationTimeout > 0) { - _future = _instance.timer().schedule(this, invocationTimeout, - java.util.concurrent.TimeUnit.MILLISECONDS); + _future = _instance.timer().schedule(new Runnable() + { + @Override + public void run() + { + timeout(); + } + }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS); _timeoutRequestHandler = handler; } } diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java index 6c2a5b55f2c..f9d90e3e72e 100644 --- a/java/src/IceInternal/QueueRequestHandler.java +++ b/java/src/IceInternal/QueueRequestHandler.java @@ -21,12 +21,9 @@ import Ice.ConnectionI; public class QueueRequestHandler implements RequestHandler { public - QueueRequestHandler(Instance instance, RequestHandler delegate) { + QueueRequestHandler(Instance instance, RequestHandler delegate) + { _executor = instance.getQueueExecutor(); - if(_executor == null) - { - throw new CommunicatorDestroyedException(); - } assert(delegate != null); _delegate = delegate; } @@ -37,7 +34,8 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() { + Future<Void> future = _executor.submit(new Callable<Void>() + { @Override public Void call() throws RetryException { @@ -52,11 +50,11 @@ public class QueueRequestHandler implements RequestHandler { throw new CommunicatorDestroyedException(); } - catch (InterruptedException e) + catch(InterruptedException e) { throw new Ice.OperationInterruptedException(); } - catch (ExecutionException e) + catch(ExecutionException e) { try { @@ -83,7 +81,8 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() { + Future<Void> future = _executor.submit(new Callable<Void>() + { @Override public Void call() { @@ -97,11 +96,11 @@ public class QueueRequestHandler implements RequestHandler { throw new CommunicatorDestroyedException(); } - catch (InterruptedException e) + catch(InterruptedException e) { throw new Ice.OperationInterruptedException(); } - catch (ExecutionException e) + catch(ExecutionException e) { try { @@ -124,7 +123,8 @@ public class QueueRequestHandler implements RequestHandler { try { - Future<Void> future = _executor.submit(new Callable<Void>() { + Future<Void> future = _executor.submit(new Callable<Void>() + { @Override public Void call() { @@ -138,11 +138,11 @@ public class QueueRequestHandler implements RequestHandler { throw new CommunicatorDestroyedException(); } - catch (InterruptedException e) + catch(InterruptedException e) { throw new Ice.OperationInterruptedException(); } - catch (ExecutionException e) + catch(ExecutionException e) { try { @@ -160,74 +160,42 @@ public class QueueRequestHandler implements RequestHandler } @Override - public boolean - sendRequest(final OutgoingMessageCallback out) throws RetryException + public int + sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException { try { - Future<Boolean> future = _executor.submit(new Callable<Boolean>() { + Future<Integer> future = _executor.submit(new Callable<Integer>() + { @Override - public Boolean call() throws RetryException + public Integer call() throws RetryException { - return _delegate.sendRequest(out); + return _delegate.sendAsyncRequest(out); } }); return future.get(); } - catch (InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } catch(RejectedExecutionException e) { throw new CommunicatorDestroyedException(); } - catch (ExecutionException e) + catch(InterruptedException e) { + // If the request cannot be canceled (or is itself interrupted) then + // restore the interrupt state. try { - throw e.getCause(); - } - catch(RetryException ex) - { - throw ex; - } - catch(RuntimeException ex) - { - throw ex; + if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException())) + { + Thread.currentThread().interrupt(); + } } - catch(Throwable ex) + catch(Ice.OperationInterruptedException ex) { - assert(false); + Thread.currentThread().interrupt(); } } - return false; - } - - @Override - public int - sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException - { - try - { - Future<Integer> future = _executor.submit(new Callable<Integer>() { - @Override - public Integer call() throws RetryException - { - return _delegate.sendAsyncRequest(out); - } - }); - return future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch (InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch (ExecutionException e) + catch(ExecutionException e) { try { @@ -251,52 +219,12 @@ public class QueueRequestHandler implements RequestHandler @Override public boolean - requestCanceled(final OutgoingMessageCallback out, final Ice.LocalException ex) - { - try - { - Future<Boolean> future = _executor.submit(new Callable<Boolean>() { - @Override - public Boolean call() - { - return _delegate.requestCanceled(out, ex); - } - }); - return future.get(); - } - catch(RejectedExecutionException e) - { - throw new CommunicatorDestroyedException(); - } - catch (InterruptedException e) - { - throw new Ice.OperationInterruptedException(); - } - catch (ExecutionException e) - { - try - { - throw e.getCause(); - } - catch(RuntimeException exc) - { - throw exc; - } - catch(Throwable exc) - { - assert(false); - } - } - return false; - } - - @Override - public boolean asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex) { try { - Future<Boolean> future = _executor.submit(new Callable<Boolean>() { + Future<Boolean> future = _executor.submit(new Callable<Boolean>() + { @Override public Boolean call() { @@ -309,11 +237,11 @@ public class QueueRequestHandler implements RequestHandler { throw new CommunicatorDestroyedException(); } - catch (InterruptedException e) + catch(InterruptedException e) { throw new Ice.OperationInterruptedException(); } - catch (ExecutionException e) + catch(ExecutionException e) { try { @@ -347,7 +275,7 @@ public class QueueRequestHandler implements RequestHandler @Override public ConnectionI - waitForConnection() throws InterruptedException + waitForConnection() throws InterruptedException, RetryException { return _delegate.waitForConnection(); } diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java index 894c986a84d..cff4cd6a8a4 100644 --- a/java/src/IceInternal/RequestHandler.java +++ b/java/src/IceInternal/RequestHandler.java @@ -16,19 +16,14 @@ public interface RequestHandler void finishBatchRequest(BasicStream out); void abortBatchRequest(); - boolean sendRequest(OutgoingMessageCallback out) - throws RetryException; - int sendAsyncRequest(OutgoingAsyncMessageCallback out) throws RetryException; - boolean requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex); boolean asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex); Reference getReference(); Ice.ConnectionI getConnection(); Ice.ConnectionI waitForConnection() - throws InterruptedException; - + throws InterruptedException, RetryException; } diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java index 850aea259be..e4f3a6c26fe 100644 --- a/java/src/IceInternal/RetryTask.java +++ b/java/src/IceInternal/RetryTask.java @@ -25,11 +25,11 @@ class RetryTask implements Runnable { try { - _outAsync.__invoke(false); + _outAsync.invoke(false); } catch(Ice.LocalException ex) { - _outAsync.__invokeExceptionAsync(ex); + _outAsync.invokeExceptionAsync(ex); } } } @@ -38,7 +38,7 @@ class RetryTask implements Runnable destroy() { _future.cancel(false); - _outAsync.__invokeExceptionAsync(new Ice.CommunicatorDestroyedException()); + _outAsync.invokeExceptionAsync(new Ice.CommunicatorDestroyedException()); } public void setFuture(java.util.concurrent.Future<?> future) diff --git a/java/src/IceInternal/RouterInfo.java b/java/src/IceInternal/RouterInfo.java index f6c13dbfd00..5eecff2298c 100644 --- a/java/src/IceInternal/RouterInfo.java +++ b/java/src/IceInternal/RouterInfo.java @@ -133,24 +133,6 @@ public final class RouterInfo return setServerEndpoints(_router.getServerProxy()); } - public void - addProxy(Ice.ObjectPrx proxy) - { - assert(proxy != null); - synchronized(this) - { - if(_identities.contains(proxy.ice_getIdentity())) - { - // - // Only add the proxy to the router if it's not already in our local map. - // - return; - } - } - - addAndEvictProxies(proxy, _router.addProxies(new Ice.ObjectPrx[] { proxy })); - } - public boolean addProxy(final Ice.ObjectPrx proxy, final AddProxyCallback callback) { diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 56836072f54..c5c26eea65a 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -389,12 +389,6 @@ public final class ThreadPool _selector.destroy(); } - public String - prefix() - { - return _prefix; - } - private void run(EventHandlerThread thread) { diff --git a/java/src/IceInternal/TwowayCallback.java b/java/src/IceInternal/TwowayCallback.java index 86c6afb6202..55c9044e6d6 100644 --- a/java/src/IceInternal/TwowayCallback.java +++ b/java/src/IceInternal/TwowayCallback.java @@ -26,4 +26,10 @@ public abstract class TwowayCallback extends CallbackBase implements Ice.TwowayC { sent(__result.sentSynchronously()); } + + @Override + public final boolean __hasSentCallback() + { + return true; + } }; diff --git a/java/src/IceInternal/Util.java b/java/src/IceInternal/Util.java index 647262fe90b..1e32d032771 100644 --- a/java/src/IceInternal/Util.java +++ b/java/src/IceInternal/Util.java @@ -14,7 +14,8 @@ import java.util.concurrent.ThreadFactory; public final class Util { static String - createThreadName(final Ice.Properties properties, final String name) { + createThreadName(final Ice.Properties properties, final String name) + { String threadName = properties.getProperty("Ice.ProgramName"); if(threadName.length() > 0) { @@ -26,7 +27,8 @@ public final class Util } static ThreadFactory - createThreadFactory(final Ice.Properties properties, final String name) { + createThreadFactory(final Ice.Properties properties, final String name) + { return new java.util.concurrent.ThreadFactory() { @Override diff --git a/java/test/Ice/adapterDeactivation/Collocated.java b/java/test/Ice/adapterDeactivation/Collocated.java index 9f8f90bdda6..77c320bc3d3 100644 --- a/java/test/Ice/adapterDeactivation/Collocated.java +++ b/java/test/Ice/adapterDeactivation/Collocated.java @@ -28,6 +28,10 @@ public class Collocated extends test.Util.Application { Ice.InitializationData initData = new Ice.InitializationData(); initData.properties = Ice.Util.createProperties(argsH); + if(initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0) + { + initData.properties.setProperty("Ice.ThreadPool.Server.Size", "2"); + } initData.properties.setProperty("Ice.Package.Test", "test.Ice.adapterDeactivation"); initData.properties.setProperty("TestAdapter.Endpoints", "default -p 12010"); return initData; diff --git a/java/test/Ice/exceptions/AllTests.java b/java/test/Ice/exceptions/AllTests.java index 282add94c37..f11f576c2fb 100644 --- a/java/test/Ice/exceptions/AllTests.java +++ b/java/test/Ice/exceptions/AllTests.java @@ -1,6 +1,5 @@ // ********************************************************************** // -// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. // // This copy of Ice is licensed to you under the terms described in the // ICE_LICENSE file included in this distribution. @@ -2281,18 +2280,18 @@ public class AllTests { final Callback_Thrower_throwLocalExceptionI cb = new Callback_Thrower_throwLocalExceptionI(); thrower.begin_throwLocalExceptionIdempotent(new Callback_Thrower_throwLocalExceptionIdempotent() - { - @Override - public void response() - { - cb.response(); - } - - @Override - public void exception(Ice.LocalException exc) - { - cb.exception(exc); - } + { + @Override + public void response() + { + cb.response(); + } + + @Override + public void exception(Ice.LocalException exc) + { + cb.exception(exc); + } }); cb.check(); } diff --git a/java/test/Ice/interrupt/AllTests.java b/java/test/Ice/interrupt/AllTests.java index 1c205b80ac8..0467444cfb9 100644 --- a/java/test/Ice/interrupt/AllTests.java +++ b/java/test/Ice/interrupt/AllTests.java @@ -56,31 +56,7 @@ public class AllTests notify(); } - public synchronized void checkResponse() - { - while(!_response) - { - try - { - wait(); - } - catch(InterruptedException ex) - { - } - } - - _response = false; - } - - public synchronized void response() - { - assert(!_response); - _response = true; - notify(); - } - private boolean _called = false; - private boolean _response = false; } private static void @@ -91,6 +67,18 @@ public class AllTests throw new RuntimeException(); } } + + private static void failIfNotInterrupted() + { + if(Thread.currentThread().isInterrupted()) + { + Thread.interrupted(); + } + else + { + test(false); + } + } public static void allTests(test.Util.Application app, Ice.Communicator communicator, PrintWriter out) @@ -232,48 +220,52 @@ public class AllTests test(false); } - // - // Test interrupt of waitForSent. Here hold the adapter and send a large payload. The - // thread is interrupted in 500ms which should result in a operation interrupted exception. - // - executor.submit(new Runnable() { - @Override - public void run() - { - try - { - Thread.sleep(500); - } - catch(InterruptedException e) + // This section of the test doesn't run when collocated. + if(p.ice_getConnection() != null) + { + // + // Test interrupt of waitForSent. Here hold the adapter and send a large payload. The + // thread is interrupted in 500ms which should result in a operation interrupted exception. + // + executor.submit(new Runnable() { + @Override + public void run() { - test(false); + try + { + Thread.sleep(500); + } + catch(InterruptedException e) + { + test(false); + } + mainThread.interrupt(); } - mainThread.interrupt(); - } - }); + }); - testController.holdAdapter(); - Ice.AsyncResult r = null; - try - { - // The sequence needs to be large enough to fill the write/recv buffers - byte[] seq = new byte[20000000]; - r = p.begin_opWithPayload(seq); + testController.holdAdapter(); + Ice.AsyncResult r = null; + try + { + // The sequence needs to be large enough to fill the write/recv buffers + byte[] seq = new byte[20000000]; + r = p.begin_opWithPayload(seq); + r.waitForSent(); + test(false); + } + catch(Ice.OperationInterruptedException ex) + { + // Expected + } + // + // Resume the adapter. + // + testController.resumeAdapter(); + r.waitForSent(); - test(false); - } - catch(Ice.OperationInterruptedException ex) - { - // Expected + r.waitForCompleted(); + p.end_opWithPayload(r); } - // - // Resume the adapter. - // - testController.resumeAdapter(); - - r.waitForSent(); - r.waitForCompleted(); - p.end_opWithPayload(r); // // The executor is all done. @@ -297,7 +289,7 @@ public class AllTests try { ic.destroy(); - test(false); + failIfNotInterrupted(); } catch(Ice.OperationInterruptedException ex) { diff --git a/java/test/Ice/interrupt/Collocated.java b/java/test/Ice/interrupt/Collocated.java new file mode 100644 index 00000000000..a377024f44b --- /dev/null +++ b/java/test/Ice/interrupt/Collocated.java @@ -0,0 +1,72 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.interrupt; + +public class Collocated extends test.Util.Application +{ + @Override + public int run(String[] args) + { + Ice.Communicator communicator = communicator(); + Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TestAdapter"); + Ice.ObjectAdapter adapter2 = communicator().createObjectAdapter("ControllerAdapter"); + TestControllerI controller = new TestControllerI(adapter); + adapter.add(new TestI(controller), communicator().stringToIdentity("test")); + adapter.activate(); + adapter2.add(controller, communicator().stringToIdentity("testController")); + adapter2.activate(); + + try + { + AllTests.allTests(this, communicator(), getWriter()); + } + catch (InterruptedException e) + { + e.printStackTrace(); + throw new RuntimeException(); + } + + return 0; + } + + @Override + protected Ice.InitializationData getInitData(Ice.StringSeqHolder argsH) + { + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = Ice.Util.createProperties(argsH); + initData.properties.setProperty("Ice.Package.Test", "test.Ice.interrupt"); + // + // We need to enable the background IO so that Ice is interrupt + // safe for this test. + // + initData.properties.setProperty("Ice.BackgroundIO", "1"); + // + // We need to send messages large enough to cause the transport + // buffers to fill up. + // + initData.properties.setProperty("Ice.MessageSizeMax", "20000"); + + initData.properties.setProperty("TestAdapter.Endpoints", "default -p 12010"); + initData.properties.setProperty("ControllerAdapter.Endpoints", "tcp -p 12011"); + initData.properties.setProperty("ControllerAdapter.ThreadPool.Size", "1"); + + return initData; + } + + public static void main(String[] args) + { + Collocated c = new Collocated(); + int status = c.main("Collocated", args); + + System.gc(); + System.exit(status); + } + +} diff --git a/java/test/Ice/interrupt/run.py b/java/test/Ice/interrupt/run.py index 3cbc4137c5e..012b74ea130 100644 --- a/java/test/Ice/interrupt/run.py +++ b/java/test/Ice/interrupt/run.py @@ -22,3 +22,6 @@ import TestUtil print("tests with regular server.") TestUtil.clientServerTest() + +print("tests with collocated server.") +TestUtil.collocatedTest()
\ No newline at end of file diff --git a/java/test/Ice/metrics/AllTests.java b/java/test/Ice/metrics/AllTests.java index 1a8724a91ab..c24a65b3f9e 100644 --- a/java/test/Ice/metrics/AllTests.java +++ b/java/test/Ice/metrics/AllTests.java @@ -143,17 +143,20 @@ public class AllTests _serverProps = serverProps; } - public synchronized void + public void waitForUpdate() { - while(!_updated) + synchronized(this) { - try - { - wait(); - } - catch(InterruptedException ex) + while(!_updated) { + try + { + wait(); + } + catch(InterruptedException ex) + { + } } } // Ensure that the previous updates were committed, the setProperties call returns before @@ -161,7 +164,10 @@ public class AllTests // a second time, this will block until all the notifications from the first update have // completed. _serverProps.setProperties(new java.util.HashMap<String, String>()); - _updated = false; + synchronized(this) + { + _updated = false; + } } @Override @@ -418,6 +424,12 @@ public class AllTests MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010")); boolean collocated = metrics.ice_getConnection() == null; + int threadCount = 3; + if(collocated && communicator.getProperties().getPropertyAsInt("Ice.BackgroundIO") > 0) + { + threadCount = 5; + } + out.print("testing metrics admin facet checkedCast... "); out.flush(); Ice.ObjectPrx admin = communicator.getAdmin(); @@ -449,7 +461,8 @@ public class AllTests test(view.get("Connection").length == 1 && view.get("Connection")[0].current == 1 && view.get("Connection")[0].total == 1); } - test(view.get("Thread").length == 1 && view.get("Thread")[0].current == 3 && view.get("Thread")[0].total == 3); + test(view.get("Thread").length == 1 && view.get("Thread")[0].current == threadCount && + view.get("Thread")[0].total == threadCount); out.println("ok"); out.print("testing group by id..."); @@ -465,7 +478,7 @@ public class AllTests metrics.ice_connectionId("Con1").ice_ping(); view = clientMetrics.getMetricsView("View", timestamp); - test(view.get("Thread").length == 3); + test(view.get("Thread").length == threadCount); if(!collocated) { test(view.get("Connection").length == 2); @@ -711,17 +724,20 @@ public class AllTests checkFailure(clientMetrics, "ConnectionEstablishment", m1.id, "Ice::ConnectTimeoutException", 2, out); Connect c = new Connect(metrics); - testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "parent", "Communicator", c, out); - testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "id", "127.0.0.1:12010", c, out); + testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "parent", "Communicator", c, + out); + testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "id", "127.0.0.1:12010", c, + out); testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpoint", "tcp -h 127.0.0.1 -p 12010 -t 60000", c, out); testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointType", "1", c, out); - testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointIsDatagram", "false", c, - out); + testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointIsDatagram", "false", + c, out); testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointIsSecure", "false", c, out); - testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointTimeout", "60000", c, out); + testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointTimeout", "60000", c, + out); testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointCompress", "false", c, out); testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointHost", "127.0.0.1", c, @@ -921,6 +937,9 @@ public class AllTests Callback cb = new Callback(); + // + // Twoway tests + // metrics.op(); metrics.end_op(metrics.begin_op()); metrics.begin_op(cb); @@ -1096,6 +1115,54 @@ public class AllTests testAttribute(clientMetrics, clientProps, update, "Invocation", "context.entry2", "", op, out); testAttribute(clientMetrics, clientProps, update, "Invocation", "context.entry3", "", op, out); + // + // Oneway tests + // + clearView(clientProps, serverProps, update); + props.put("IceMX.Metrics.View.Map.Invocation.GroupBy", "operation"); + props.put("IceMX.Metrics.View.Map.Invocation.Map.Remote.GroupBy", "localPort"); + updateProps(clientProps, serverProps, update, props, "Invocation"); + + MetricsPrx metricsOneway = (MetricsPrx)metrics.ice_oneway(); + metricsOneway.op(); + metricsOneway.end_op(metricsOneway.begin_op()); + metricsOneway.begin_op(cb).waitForSent(); + + map = toMap(clientMetrics.getMetricsView("View", timestamp).get("Invocation")); + test(map.size() == 1); + + im1 = (IceMX.InvocationMetrics)map.get("op"); + test(im1.current <= 1 && im1.total == 3 && im1.failures == 0 && im1.retry == 0); + test(!collocated ? (im1.remotes.length == 1) : (im1.collocated.length == 1)); + rim1 = (IceMX.ChildInvocationMetrics)(!collocated ? im1.remotes[0] : im1.collocated[0]); + test(rim1.current <= 1 && rim1.total == 3 && rim1.failures == 0); + test(rim1.size == 63 && rim1.replySize == 0); + + testAttribute(clientMetrics, clientProps, update, "Invocation", "mode", "oneway", new InvokeOp(metricsOneway), + out); + + // + // Batch oneway tests + // + props.put("IceMX.Metrics.View.Map.Invocation.GroupBy", "operation"); + props.put("IceMX.Metrics.View.Map.Invocation.Map.Remote.GroupBy", "localPort"); + updateProps(clientProps, serverProps, update, props, "Invocation"); + + MetricsPrx metricsBatchOneway = (MetricsPrx)metrics.ice_batchOneway(); + metricsBatchOneway.op(); + metricsBatchOneway.end_op(metricsBatchOneway.begin_op()); + //metricsBatchOneway.begin_op(cb).waitForSent(); + + map = toMap(clientMetrics.getMetricsView("View", timestamp).get("Invocation")); + test(map.size() == 1); + + im1 = (IceMX.InvocationMetrics)map.get("op"); + test(im1.current == 0 && im1.total == 2 && im1.failures == 0 && im1.retry == 0); + test(im1.remotes.length == 0); + + testAttribute(clientMetrics, clientProps, update, "Invocation", "mode", "batch-oneway", + new InvokeOp(metricsBatchOneway), out); + out.println("ok"); out.print("testing metrics view enable/disable..."); diff --git a/java/test/Ice/metrics/Collocated.java b/java/test/Ice/metrics/Collocated.java index 69a8896e1b5..6fbfa9f4154 100644 --- a/java/test/Ice/metrics/Collocated.java +++ b/java/test/Ice/metrics/Collocated.java @@ -46,6 +46,13 @@ public class Collocated extends test.Util.Application { Ice.InitializationData initData = new Ice.InitializationData(); initData.properties = Ice.Util.createProperties(argsH); + if(initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0) + { + // With background IO, collocated invocations are + // dispatched on the server thread pool. This test needs + // at least 3 threads in the server thread pool to work. + initData.properties.setProperty("Ice.ThreadPool.Server.Size", "3"); + } initData.properties.setProperty("Ice.Package.Test", "test.Ice.metrics"); initData.properties.setProperty("Ice.Admin.Endpoints", "tcp"); initData.properties.setProperty("Ice.Admin.InstanceName", "client"); diff --git a/java/test/Ice/operations/AllTests.java b/java/test/Ice/operations/AllTests.java index a22474bbfd6..63866c9e4c0 100644 --- a/java/test/Ice/operations/AllTests.java +++ b/java/test/Ice/operations/AllTests.java @@ -128,6 +128,11 @@ public class AllTests BatchOneways.batchOneways(derived, out); out.println("ok"); + out.print("testing batch AMI oneway operations... "); + out.flush(); + BatchOnewaysAMI.batchOneways(cl, out); + BatchOnewaysAMI.batchOneways(derived, out); + out.println("ok"); return cl; } } diff --git a/java/test/Ice/operations/BatchOneways.java b/java/test/Ice/operations/BatchOneways.java index 8415f3d6ece..fc20955e9d2 100644 --- a/java/test/Ice/operations/BatchOneways.java +++ b/java/test/Ice/operations/BatchOneways.java @@ -61,7 +61,6 @@ class BatchOneways MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); batch.ice_flushBatchRequests(); - batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); for(int i = 0 ; i < 30 ; ++i) { diff --git a/java/test/Ice/operations/BatchOnewaysAMI.java b/java/test/Ice/operations/BatchOnewaysAMI.java new file mode 100644 index 00000000000..f11df72b019 --- /dev/null +++ b/java/test/Ice/operations/BatchOnewaysAMI.java @@ -0,0 +1,208 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package test.Ice.operations; + +import java.io.PrintWriter; + +import Ice.LocalException; +import test.Ice.operations.Test.Callback_MyClass_opByteSOneway; +import test.Ice.operations.Test.MyClassPrx; +import test.Ice.operations.Test.MyClassPrxHelper; + +class BatchOnewaysAMI +{ + private static class Callback + { + Callback() + { + _called = false; + } + + public synchronized void check() + { + while(!_called) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + _called = false; + } + + public synchronized void called() + { + assert (!_called); + _called = true; + notify(); + } + + private boolean _called; + } + + private static void test(boolean b) + { + if(!b) + { + throw new RuntimeException(); + } + } + + static void batchOneways(MyClassPrx p, PrintWriter out) + { + final byte[] bs1 = new byte[10 * 1024]; + final byte[] bs2 = new byte[99 * 1024]; + final byte[] bs3 = new byte[100 * 1024]; + + final Callback cb = new Callback(); + p.begin_opByteSOneway(bs1, new Callback_MyClass_opByteSOneway() + { + @Override + public void exception(LocalException ex) + { + test(false); + } + + @Override + public void response() + { + cb.called(); + } + }); + cb.check(); + p.begin_opByteSOneway(bs2, new Callback_MyClass_opByteSOneway() + { + @Override + public void exception(LocalException ex) + { + test(false); + } + + @Override + public void response() + { + cb.called(); + } + }); + cb.check(); + + p.begin_opByteSOneway(bs3, new Callback_MyClass_opByteSOneway() + { + @Override + public void exception(LocalException ex) + { + test(ex instanceof Ice.MemoryLimitException); + cb.called(); + } + + @Override + public void response() + { + test(false); + } + }); + cb.check(); + + MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); + batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); + + for(int i = 0; i < 30; ++i) + { + batch.begin_opByteSOneway(bs1, new Callback_MyClass_opByteSOneway() + { + @Override + public void exception(LocalException ex) + { + test(false); + } + + @Override + public void response() + { + } + }); + } + + if(batch.ice_getConnection() != null) + { + batch.ice_getConnection().end_flushBatchRequests(batch.ice_getConnection().begin_flushBatchRequests()); + + MyClassPrx batch2 = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway()); + + batch.begin_ice_ping(); + batch2.begin_ice_ping(); + batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); + batch.ice_getConnection().close(false); + batch.begin_ice_ping(); + batch2.begin_ice_ping(); + + batch.ice_getConnection(); + batch2.ice_getConnection(); + + batch.begin_ice_ping(); + batch.ice_getConnection().close(false); + batch.begin_ice_ping(new Ice.Callback_Object_ice_ping() + { + + @Override + public void response() + { + test(false); + } + + @Override + public void exception(LocalException ex) + { + test(ex instanceof Ice.CloseConnectionException); + cb.called(); + } + + }); + cb.check(); + batch2.begin_ice_ping(new Ice.Callback_Object_ice_ping() + { + + @Override + public void response() + { + test(false); + } + + @Override + public void exception(LocalException ex) + { + test(ex instanceof Ice.CloseConnectionException); + cb.called(); + } + + }); + cb.check(); + batch.begin_ice_ping(); + batch2.begin_ice_ping(); + } + + Ice.Identity identity = new Ice.Identity(); + identity.name = "invalid"; + Ice.ObjectPrx batch3 = batch.ice_identity(identity); + batch3.begin_ice_ping(); + batch3.end_ice_flushBatchRequests(batch3.begin_ice_flushBatchRequests()); + + // Make sure that a bogus batch request doesn't cause troubles to other + // ones. + batch3.begin_ice_ping(); + batch.begin_ice_ping(); + batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests()); + batch.begin_ice_ping(); + } +} diff --git a/java/test/Ice/operations/Collocated.java b/java/test/Ice/operations/Collocated.java index c34d07d5f11..278e81a9b65 100644 --- a/java/test/Ice/operations/Collocated.java +++ b/java/test/Ice/operations/Collocated.java @@ -35,6 +35,10 @@ public class Collocated extends test.Util.Application { Ice.InitializationData initData = new Ice.InitializationData(); initData.properties = Ice.Util.createProperties(argsH); + if(initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0) + { + initData.properties.setProperty("Ice.ThreadPool.Server.Size", "2"); + } initData.properties.setProperty("Ice.Package.Test", "test.Ice.operations"); // |