summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-09-03 11:01:11 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-09-03 11:01:11 -0230
commit3b0588532354adf7bf3b86e611a8ae4996bfe6ad (patch)
tree253961cb83af7bc3b1dfc7633a8f934789476cd1 /java
parentMore work on ICE-2400: the send log thread now uses a separate communicator t... (diff)
downloadice-3b0588532354adf7bf3b86e611a8ae4996bfe6ad.tar.bz2
ice-3b0588532354adf7bf3b86e611a8ae4996bfe6ad.tar.xz
ice-3b0588532354adf7bf3b86e611a8ae4996bfe6ad.zip
- C#, Java: Removed Outgoing, fixed generated code to make synchronous
requests using AMI. - Java: AsyncResult is now an interface. - Added --arg to allTests.py. - Fixed operations, adapterDeactivation and metrics test to work with background IO. - Added Collocated interrupt test. - Added support for batch oneway requests using AMI. - Added test in operations for batch oneway requests using AMI.
Diffstat (limited to 'java')
-rw-r--r--java/.classpath2
-rw-r--r--java/.settings/org.eclipse.jdt.core.prefs10
-rw-r--r--java/config/build.properties2
-rw-r--r--java/demo/Ice/latency/config.client2
-rw-r--r--java/src/Ice/AsyncResult.java471
-rw-r--r--java/src/Ice/Callback.java6
-rw-r--r--java/src/Ice/Callback_Communicator_flushBatchRequests.java6
-rw-r--r--java/src/Ice/Callback_Connection_flushBatchRequests.java6
-rw-r--r--java/src/Ice/CommunicatorI.java5
-rw-r--r--java/src/Ice/ConnectionI.java1107
-rw-r--r--java/src/Ice/ObjectPrxHelper.java3
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java720
-rw-r--r--java/src/Ice/OnewayCallback.java6
-rw-r--r--java/src/IceInternal/AsyncResultI.java578
-rw-r--r--java/src/IceInternal/BatchOutgoing.java220
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java55
-rw-r--r--java/src/IceInternal/Buffer.java5
-rw-r--r--java/src/IceInternal/CallbackBase.java1
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java421
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java43
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java139
-rw-r--r--java/src/IceInternal/ConnectionBatchOutgoingAsync.java2
-rw-r--r--java/src/IceInternal/ConnectionRequestHandler.java33
-rw-r--r--java/src/IceInternal/DispatchObserverI.java2
-rw-r--r--java/src/IceInternal/EndpointHostResolver.java13
-rw-r--r--java/src/IceInternal/Functional_CallbackBase.java6
-rw-r--r--java/src/IceInternal/HttpParser.java20
-rw-r--r--java/src/IceInternal/IncomingBase.java2
-rw-r--r--java/src/IceInternal/Instance.java34
-rw-r--r--java/src/IceInternal/InvocationObserverI.java19
-rw-r--r--java/src/IceInternal/Network.java104
-rw-r--r--java/src/IceInternal/OpaqueEndpointI.java8
-rw-r--r--java/src/IceInternal/Outgoing.java736
-rw-r--r--java/src/IceInternal/OutgoingAsync.java378
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java23
-rw-r--r--java/src/IceInternal/OutgoingMessageCallback.java22
-rw-r--r--java/src/IceInternal/Protocol.java6
-rw-r--r--java/src/IceInternal/ProxyBatchOutgoingAsync.java14
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java142
-rw-r--r--java/src/IceInternal/RequestHandler.java7
-rw-r--r--java/src/IceInternal/RetryTask.java6
-rw-r--r--java/src/IceInternal/RouterInfo.java18
-rw-r--r--java/src/IceInternal/ThreadPool.java6
-rw-r--r--java/src/IceInternal/TwowayCallback.java6
-rw-r--r--java/src/IceInternal/Util.java6
-rw-r--r--java/test/Ice/adapterDeactivation/Collocated.java4
-rw-r--r--java/test/Ice/exceptions/AllTests.java25
-rw-r--r--java/test/Ice/interrupt/AllTests.java116
-rw-r--r--java/test/Ice/interrupt/Collocated.java72
-rw-r--r--java/test/Ice/interrupt/run.py3
-rw-r--r--java/test/Ice/metrics/AllTests.java97
-rw-r--r--java/test/Ice/metrics/Collocated.java7
-rw-r--r--java/test/Ice/operations/AllTests.java5
-rw-r--r--java/test/Ice/operations/BatchOneways.java1
-rw-r--r--java/test/Ice/operations/BatchOnewaysAMI.java208
-rw-r--r--java/test/Ice/operations/Collocated.java4
56 files changed, 2449 insertions, 3514 deletions
diff --git a/java/.classpath b/java/.classpath
index 4755711bc41..4a4d89b8e9e 100644
--- a/java/.classpath
+++ b/java/.classpath
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry excluding="ant/|test/**/lambda/*|test/android/|test/ejb/|test/Ice/translator/|test/Ice/translator/Test/" kind="src" path="src"/>
+ <classpathentry excluding="ant/|test/**/lambda/*|test/android/|test/ejb/|test/Ice/translator/|test/Ice/translator/Test/|test/Slice/generation/classes/test/Slice/generation/Test/" kind="src" path="src"/>
<classpathentry kind="src" path="generated"/>
<classpathentry kind="src" path="generated.test"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
diff --git a/java/.settings/org.eclipse.jdt.core.prefs b/java/.settings/org.eclipse.jdt.core.prefs
index 9060ba01e38..f19526f9df4 100644
--- a/java/.settings/org.eclipse.jdt.core.prefs
+++ b/java/.settings/org.eclipse.jdt.core.prefs
@@ -95,7 +95,7 @@ org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_c
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
+org.eclipse.jdt.core.formatter.alignment_for_binary_expression=18
org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0
@@ -167,7 +167,7 @@ org.eclipse.jdt.core.formatter.indent_empty_lines=false
org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false
+org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=true
org.eclipse.jdt.core.formatter.indentation.size=8
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
@@ -334,8 +334,8 @@ org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=do no
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
+org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=do not insert
+org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
@@ -368,6 +368,6 @@ org.eclipse.jdt.core.formatter.tabulation.char=space
org.eclipse.jdt.core.formatter.tabulation.size=4
org.eclipse.jdt.core.formatter.use_on_off_tags=false
org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=true
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
+org.eclipse.jdt.core.formatter.wrap_before_binary_operator=false
org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true
diff --git a/java/config/build.properties b/java/config/build.properties
index 5249125ce55..3da87e59437 100644
--- a/java/config/build.properties
+++ b/java/config/build.properties
@@ -41,7 +41,7 @@ jgoodies-looks.version = 2.5.2
#
# Define debug as on if you want to build with debug information.
#
-debug = on
+#debug = on
#
# Define lint with your preferred -Xlint options.
diff --git a/java/demo/Ice/latency/config.client b/java/demo/Ice/latency/config.client
index 9beb0c1f2f7..3bb7b2067e7 100644
--- a/java/demo/Ice/latency/config.client
+++ b/java/demo/Ice/latency/config.client
@@ -25,4 +25,4 @@ Ice.ACM.Client.Timeout=0
#Ice.Admin.Endpoints=tcp -h localhost -p 10003
Ice.Admin.InstanceName=client
IceMX.Metrics.Debug.GroupBy=id
-IceMX.Metrics.ByParent.GroupBy=parent
+IceMX.Metrics.ByParent.GroupBy=parent \ No newline at end of file
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index eed405ad00c..7c015d67c8c 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -14,50 +14,29 @@ package Ice;
* With this object, an application can obtain several attributes of the
* invocation and discover its outcome.
**/
-public class AsyncResult
+public interface AsyncResult
{
- protected AsyncResult(Communicator communicator, IceInternal.Instance instance, String op,
- IceInternal.CallbackBase del)
- {
- _communicator = communicator;
- _instance = instance;
- _operation = op;
- _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding, false, false);
- _state = 0;
- _sentSynchronously = false;
- _exception = null;
- _callback = del;
- }
/**
* Returns the communicator that sent the invocation.
*
* @return The communicator.
**/
- public Communicator getCommunicator()
- {
- return _communicator;
- }
+ public Communicator getCommunicator();
/**
* Returns the connection that was used for the invocation.
*
* @return The connection.
**/
- public Connection getConnection()
- {
- return null;
- }
+ public Connection getConnection();
/**
* Returns the proxy that was used to call the <code>begin_</code> method.
*
* @return The proxy.
**/
- public ObjectPrx getProxy()
- {
- return null;
- }
+ public ObjectPrx getProxy();
/**
* Indicates whether the result of an invocation is available.
@@ -65,34 +44,12 @@ public class AsyncResult
* @return True if the result is available, which means a call to the <code>end_</code>
* method will not block. The method returns false if the result is not yet available.
**/
- public final boolean isCompleted()
- {
- synchronized(_monitor)
- {
- return (_state & Done) > 0;
- }
- }
+ public boolean isCompleted();
/**
* Blocks the caller until the result of the invocation is available.
**/
- public final void waitForCompleted()
- {
- synchronized(_monitor)
- {
- while((_state & Done) == 0)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
- }
+ public void waitForCompleted();
/**
* When you call the <code>begin_</code> method, the Ice run time attempts to
@@ -105,48 +62,17 @@ public class AsyncResult
*
* @return True if the request has been sent, or false if the request is queued.
**/
- public final boolean isSent()
- {
- synchronized(_monitor)
- {
- return (_state & Sent) > 0;
- }
- }
+ public boolean isSent();
/**
* Blocks the caller until the request has been written to the client-side transport.
**/
- public final void waitForSent()
- {
- synchronized(_monitor)
- {
- while((_state & Sent) == 0 && _exception == null)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
- }
+ public void waitForSent();
/**
* If the invocation failed with a local exception, throws the local exception.
**/
- public final void throwLocalException()
- {
- synchronized(_monitor)
- {
- if(_exception != null)
- {
- throw _exception;
- }
- }
- }
+ public void throwLocalException();
/**
* This method returns true if a request was written to the client-side
@@ -157,385 +83,12 @@ public class AsyncResult
* @return True if the request was sent without being queued, or false
* otherwise.
**/
- public final boolean sentSynchronously()
- {
- return _sentSynchronously; // No lock needed, immutable once __send() is called
- }
+ public boolean sentSynchronously();
/**
* Returns the name of the operation.
*
* @return The operation name.
**/
- public final String getOperation()
- {
- return _operation;
- }
-
- public final IceInternal.BasicStream __getOs()
- {
- return _os;
- }
-
- public IceInternal.BasicStream
- __startReadParams()
- {
- _is.startReadEncaps();
- return _is;
- }
-
- public void
- __endReadParams()
- {
- _is.endReadEncaps();
- }
-
- public void
- __readEmptyParams()
- {
- _is.skipEmptyEncaps(null);
- }
-
- public byte[]
- __readParamEncaps()
- {
- return _is.readEncaps(null);
- }
-
- public final boolean __wait()
- {
- synchronized(_monitor)
- {
- if((_state & EndCalled) > 0)
- {
- throw new java.lang.IllegalArgumentException("end_ method called more than once");
- }
- _state |= EndCalled;
- while((_state & Done) == 0)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- //
- // Remove the EndCalled flag since it should be possible to
- // call end_* again on the AsyncResult.
- //
- _state &= ~EndCalled;
- throw new Ice.OperationInterruptedException();
- }
- }
- if(_exception != null)
- {
- //throw (LocalException)_exception.fillInStackTrace();
- throw _exception;
- }
- return (_state & OK) > 0;
- }
- }
-
- public final void __throwUserException()
- throws UserException
- {
- try
- {
- _is.startReadEncaps();
- _is.throwException(null);
- }
- catch(UserException ex)
- {
- _is.endReadEncaps();
- throw ex;
- }
- }
-
- public final void __invokeExceptionAsync(final Ice.Exception ex)
- {
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
- {
- @Override
- public void
- run()
- {
- __invokeException(ex);
- }
- });
- }
- catch(CommunicatorDestroyedException exc)
- {
- throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly.
- }
- }
-
- public final void __invokeException(Ice.Exception ex)
- {
- synchronized(_monitor)
- {
- _state |= Done;
- _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception = ex;
- _monitor.notifyAll();
- }
-
- __invokeCompleted();
- }
-
- protected final void __invokeSentInternal()
- {
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
- }
-
- try
- {
- _callback.__sent(this);
- }
- catch(RuntimeException ex)
- {
- __warning(ex);
- }
- catch(AssertionError exc)
- {
- __error(exc);
- }
- catch(OutOfMemoryError exc)
- {
- __error(exc);
- }
- finally
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
- }
-
- if(_observer != null)
- {
- Ice.ObjectPrx proxy = getProxy();
- if(proxy == null || !proxy.ice_isTwoway())
- {
- _observer.detach();
- }
- }
- }
-
- public void
- __attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- public void __attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getCollocatedObserver(adapter,
- requestId,
- _os.size() - IceInternal.Protocol.headerSize - 4);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- public final void __invokeSentAsync()
- {
- //
- // This is called when it's not safe to call the sent callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
- {
- @Override
- public void
- run()
- {
- __invokeSentInternal();
- }
- });
- }
- catch(CommunicatorDestroyedException exc)
- {
- }
- }
-
- public static void __check(AsyncResult r, ObjectPrx prx, String operation)
- {
- __check(r, operation);
- if(r.getProxy() != prx)
- {
- throw new IllegalArgumentException("Proxy for call to end_" + operation +
- " does not match proxy that was used to call corresponding begin_" +
- operation + " method");
- }
- }
-
- public static void __check(AsyncResult r, Connection con, String operation)
- {
- __check(r, operation);
- if(r.getConnection() != con)
- {
- throw new IllegalArgumentException("Connection for call to end_" + operation +
- " does not match connection that was used to call corresponding begin_" +
- operation + " method");
- }
- }
-
- public static void __check(AsyncResult r, Communicator com, String operation)
- {
- __check(r, operation);
- if(r.getCommunicator() != com)
- {
- throw new IllegalArgumentException("Communicator for call to end_" + operation +
- " does not match communicator that was used to call corresponding " +
- "begin_" + operation + " method");
- }
- }
-
- protected static void __check(AsyncResult r, String operation)
- {
- if(r == null)
- {
- throw new IllegalArgumentException("AsyncResult == null");
- }
- else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality
- {
- throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " +
- r.getOperation());
- }
- }
-
- protected final void __invokeCompleted()
- {
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
- }
-
- try
- {
- _callback.__completed(this);
- }
- catch(RuntimeException ex)
- {
- __warning(ex);
- }
- catch(AssertionError exc)
- {
- __error(exc);
- }
- catch(OutOfMemoryError exc)
- {
- __error(exc);
- }
- finally
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
- }
-
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
- }
-
- protected void
- __runTimerTask()
- {
- IceInternal.RequestHandler handler;
- synchronized(_monitor)
- {
- handler = _timeoutRequestHandler;
- _timeoutRequestHandler = null;
- }
-
- if(handler != null)
- {
- handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this,
- new Ice.InvocationTimeoutException());
- }
- }
-
- protected final void __warning(RuntimeException ex)
- {
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
- {
- String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex);
- _instance.initializationData().logger.warning(s);
- }
- }
-
- protected final void __error(Error error)
- {
- String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error);
- _instance.initializationData().logger.error(s);
- }
-
- protected Communicator _communicator;
- protected IceInternal.Instance _instance;
- protected String _operation;
- protected Ice.Connection _cachedConnection;
-
- protected java.lang.Object _monitor = new java.lang.Object();
- protected IceInternal.BasicStream _is;
- protected IceInternal.BasicStream _os;
-
- protected IceInternal.RequestHandler _timeoutRequestHandler;
- protected java.util.concurrent.Future<?> _future;
-
- protected static final byte OK = 0x1;
- protected static final byte Done = 0x2;
- protected static final byte Sent = 0x4;
- protected static final byte EndCalled = 0x8;
-
- protected byte _state;
- protected boolean _sentSynchronously;
- protected Ice.Exception _exception;
-
- protected Ice.Instrumentation.InvocationObserver _observer;
- protected Ice.Instrumentation.ChildInvocationObserver _childObserver;
-
- private IceInternal.CallbackBase _callback;
-}
+ public String getOperation();
+} \ No newline at end of file
diff --git a/java/src/Ice/Callback.java b/java/src/Ice/Callback.java
index f0e8eb86c7d..b4cabf9a30e 100644
--- a/java/src/Ice/Callback.java
+++ b/java/src/Ice/Callback.java
@@ -48,4 +48,10 @@ public abstract class Callback extends IceInternal.CallbackBase
{
sent(r);
}
+
+ @Override
+ public final boolean __hasSentCallback()
+ {
+ return true;
+ }
}
diff --git a/java/src/Ice/Callback_Communicator_flushBatchRequests.java b/java/src/Ice/Callback_Communicator_flushBatchRequests.java
index 7e9e8e2f1ff..3ebab69b4e4 100644
--- a/java/src/Ice/Callback_Communicator_flushBatchRequests.java
+++ b/java/src/Ice/Callback_Communicator_flushBatchRequests.java
@@ -46,4 +46,10 @@ public abstract class Callback_Communicator_flushBatchRequests extends IceIntern
{
sent(__result.sentSynchronously());
}
+
+ @Override
+ public final boolean __hasSentCallback()
+ {
+ return true;
+ }
}
diff --git a/java/src/Ice/Callback_Connection_flushBatchRequests.java b/java/src/Ice/Callback_Connection_flushBatchRequests.java
index e9e0976024e..ceea4e06140 100644
--- a/java/src/Ice/Callback_Connection_flushBatchRequests.java
+++ b/java/src/Ice/Callback_Connection_flushBatchRequests.java
@@ -46,4 +46,10 @@ public abstract class Callback_Connection_flushBatchRequests extends IceInternal
{
sent(__result.sentSynchronously());
}
+
+ @Override
+ public final boolean __hasSentCallback()
+ {
+ return true;
+ }
}
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 757fc85daf5..bb95a3583f7 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -283,8 +283,9 @@ public final class CommunicatorI implements Communicator
public void
end_flushBatchRequests(AsyncResult r)
{
- AsyncResult.__check(r, this, __flushBatchRequests_name);
- r.__wait();
+ IceInternal.AsyncResultI ri = (IceInternal.AsyncResultI)r;
+ IceInternal.AsyncResultI.check(ri, this, __flushBatchRequests_name);
+ ri.__wait();
}
@Override
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index b304efa82b5..00645ce8dc9 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -14,30 +14,31 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public interface StartCallback
{
void connectionStartCompleted(ConnectionI connection);
+
void connectionStartFailed(ConnectionI connection, Ice.LocalException ex);
}
private class TimeoutCallback implements Runnable
{
@Override
- public void
- run()
+ public void run()
{
timedOut();
}
}
- public void
- start(StartCallback callback)
+ public void start(StartCallback callback)
{
try
{
synchronized(this)
{
- if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
+ // The connection might already be closed if the communicator
+ // was destroyed.
+ if(_state >= StateClosed)
{
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
@@ -62,18 +63,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
callback.connectionStartCompleted(this);
}
- public void
- startAndWait()
- throws InterruptedException
+ public void startAndWait() throws InterruptedException
{
try
{
synchronized(this)
{
- if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
+ // The connection might already be closed if the communicator
+ // was destroyed.
+ if(_state >= StateClosed)
{
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
@@ -85,8 +86,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_state >= StateClosing)
{
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
}
@@ -104,8 +105,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- public synchronized void
- activate()
+ public synchronized void activate()
{
if(_state <= StateNotValidated)
{
@@ -120,8 +120,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateActive);
}
- public synchronized void
- hold()
+ public synchronized void hold()
{
if(_state <= StateNotValidated)
{
@@ -135,8 +134,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public final static int ObjectAdapterDeactivated = 0;
public final static int CommunicatorDestroyed = 1;
- synchronized public void
- destroy(int reason)
+ synchronized public void destroy(int reason)
{
switch(reason)
{
@@ -155,8 +153,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- synchronized public void
- close(boolean force)
+ synchronized public void close(boolean force)
{
if(force)
{
@@ -171,7 +168,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// requests to be retried, regardless of whether the
// server has processed them or not.
//
- while(!_requests.isEmpty() || !_asyncRequests.isEmpty())
+ while(!_asyncRequests.isEmpty())
{
try
{
@@ -187,37 +184,32 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- public synchronized boolean
- isActiveOrHolding()
+ public synchronized boolean isActiveOrHolding()
{
return _state > StateNotValidated && _state < StateClosing;
}
- public synchronized boolean
- isFinished()
+ public synchronized boolean isFinished()
{
if(_state != StateFinished || _dispatchCount != 0)
{
return false;
}
- assert(_state == StateFinished);
+ assert (_state == StateFinished);
return true;
}
- public synchronized void
- throwException()
+ public synchronized void throwException()
{
if(_exception != null)
{
- assert(_state >= StateClosing);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_state >= StateClosing);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
}
- public synchronized void
- waitUntilHolding()
- throws InterruptedException
+ public synchronized void waitUntilHolding() throws InterruptedException
{
while(_state < StateHolding || _dispatchCount > 0)
{
@@ -225,9 +217,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- public synchronized void
- waitUntilFinished()
- throws InterruptedException
+ public synchronized void waitUntilFinished() throws InterruptedException
{
//
// We wait indefinitely until the connection is finished and all
@@ -240,7 +230,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
wait();
}
- assert(_state == StateFinished);
+ assert (_state == StateFinished);
//
// Clear the OA. See bug 1673 for the details of why this is necessary.
@@ -248,19 +238,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_adapter = null;
}
- synchronized public void
- updateObserver()
+ synchronized public void updateObserver()
{
if(_state < StateNotValidated || _state > StateClosed)
{
return;
}
- assert(_instance.getObserver() != null);
- _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
- _endpoint,
- toConnectionState(_state),
- _observer);
+ assert (_instance.getObserver() != null);
+ _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), _endpoint,
+ toConnectionState(_state), _observer);
if(_observer != null)
{
_observer.attach();
@@ -272,8 +259,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- synchronized public void
- monitor(long now, IceInternal.ACMConfig acm)
+ synchronized public void monitor(long now, IceInternal.ACMConfig acm)
{
if(_state != StateActive)
{
@@ -285,7 +271,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// If writing or reading, nothing to do, the connection
// timeout will kick-in if writes or reads don't progress.
- // This check is necessary because the actitivy timer is
+ // This check is necessary because the activity timer is
// only set when a message is fully read/written.
//
return;
@@ -317,7 +303,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(acm.close != ACMClose.CloseOff && now >= (_acmLastActivity + acm.timeout))
{
if(acm.close == ACMClose.CloseOnIdleForceful ||
- (acm.close != ACMClose.CloseOnIdle && (!_requests.isEmpty() || !_asyncRequests.isEmpty())))
+ (acm.close != ACMClose.CloseOnIdle && (!_asyncRequests.isEmpty())))
{
//
// Close the connection if we didn't receive a heartbeat in
@@ -325,8 +311,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
setState(StateClosed, new ConnectionTimeoutException());
}
- else if(acm.close != ACMClose.CloseOnInvocation &&
- _dispatchCount == 0 && _batchStream.isEmpty() && _requests.isEmpty() && _asyncRequests.isEmpty())
+ else if(acm.close != ACMClose.CloseOnInvocation && _dispatchCount == 0 && _batchStream.isEmpty() &&
+ _asyncRequests.isEmpty())
{
//
// The connection is idle, close it.
@@ -336,88 +322,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- synchronized public boolean
- sendRequest(IceInternal.Outgoing out, boolean compress, boolean response)
- throws IceInternal.RetryException
- {
- final IceInternal.BasicStream os = out.os();
-
- if(_exception != null)
- {
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace());
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Ensure the message isn't bigger than what we can send with the
- // transport.
- //
- _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax());
-
- int requestId = 0;
- if(response)
- {
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- os.pos(IceInternal.Protocol.headerSize);
- os.writeInt(requestId);
- }
-
- out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
- os.size() - IceInternal.Protocol.headerSize - 4);
-
- //
- // Send the message. If it can't be sent without blocking the message is added
- // to _sendStreams and it will be sent by the selector thread or by this thread
- // if flush is true.
- //
- boolean sent = false;
- try
- {
- OutgoingMessage message = new OutgoingMessage(out, out.os(), compress, requestId);
- sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0;
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
- }
-
- if(response)
- {
- //
- // Add to the requests map.
- //
- _requests.put(requestId, out);
- }
-
- return sent; // The request was sent.
- }
-
- synchronized public int
- sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
- throws IceInternal.RetryException
+ synchronized public int sendAsyncRequest(IceInternal.OutgoingAsync out, boolean compress, boolean response)
+ throws IceInternal.RetryException
{
- final IceInternal.BasicStream os = out.__getOs();
+ final IceInternal.BasicStream os = out.getOs();
if(_exception != null)
{
@@ -426,11 +334,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// to send our request, we always try to send the request
// again.
//
- throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace());
+ throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace());
}
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
+ assert (_state > StateNotValidated);
+ assert (_state < StateClosing);
//
// Ensure the message isn't bigger than what we can send with the
@@ -458,8 +366,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
}
- out.__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId,
- os.size() - IceInternal.Protocol.headerSize - 4);
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() -
+ IceInternal.Protocol.headerSize - 4);
int status;
try
@@ -469,8 +377,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
if(response)
@@ -483,31 +391,30 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return status;
}
- public synchronized void
- prepareBatchRequest(IceInternal.BasicStream os)
- throws IceInternal.RetryException
+ public synchronized void prepareBatchRequest(IceInternal.BasicStream os) throws IceInternal.RetryException
{
waitBatchStreamInUse();
if(_exception != null)
{
//
- // If there were no batch requests queued when the connection failed, we can safely
- // retry with a new connection. Otherwise, we must throw to notify the caller that
- // some previous batch requests were not sent.
+ // If there were no batch requests queued when the connection
+ // failed, we can safely retry with a new connection. Otherwise, we
+ // must throw to notify the caller that some previous batch requests
+ // were not sent.
//
if(_batchStream.isEmpty())
{
- throw new IceInternal.RetryException((Ice.LocalException)_exception.fillInStackTrace());
+ throw new IceInternal.RetryException((Ice.LocalException) _exception.fillInStackTrace());
}
else
{
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
}
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
+ assert (_state > StateNotValidated);
+ assert (_state < StateClosing);
if(_batchStream.isEmpty())
{
@@ -532,8 +439,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
}
- public void
- finishBatchRequest(IceInternal.BasicStream os, boolean compress)
+ public void finishBatchRequest(IceInternal.BasicStream os, boolean compress)
{
try
{
@@ -553,9 +459,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_batchAutoFlush)
{
//
- // Throw memory limit exception if the first message added causes us to go over
- // limit. Otherwise put aside the marshalled message that caused limit to be
- // exceeded and rollback stream to the marker.
+ // Throw memory limit exception if the first message added
+ // causes us to go over limit. Otherwise put aside the
+ // marshalled message that caused limit to be exceeded and
+ // rollback stream to the marker.
+ //
try
{
_transceiver.checkSendSize(_batchStream.getBuffer(), _instance.messageSizeMax());
@@ -601,31 +509,32 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
//
// Reset the batch stream.
//
_batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
+ _batchAutoFlush);
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
//
- // Check again if the last request doesn't exceed the maximum message size.
+ // Check again if the last request doesn't exceed the
+ // maximum message size.
//
- if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax())
+ if(IceInternal.Protocol.requestBatchHdr.length + lastRequest.length > _instance.messageSizeMax())
{
- IceInternal.Ex.throwMemoryLimitException(
- IceInternal.Protocol.requestBatchHdr.length + lastRequest.length,
- _instance.messageSizeMax());
+ IceInternal.Ex.throwMemoryLimitException(IceInternal.Protocol.requestBatchHdr.length +
+ lastRequest.length, _instance.messageSizeMax());
}
//
- // Start a new batch with the last message that caused us to go over the limit.
+ // Start a new batch with the last message that caused us to
+ // go over the limit.
//
_batchStream.writeBlob(IceInternal.Protocol.requestBatchHdr);
_batchStream.writeBlob(lastRequest);
@@ -637,7 +546,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
++_batchRequestNum;
//
- // We compress the whole batch if there is at least one compressed
+ // We compress the whole batch if there is at least one
+ // compressed
// message.
//
if(compress)
@@ -648,7 +558,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Notify about the batch stream not being in use anymore.
//
- assert(_batchStreamInUse);
+ assert (_batchStreamInUse);
_batchStreamInUse = false;
notifyAll();
}
@@ -660,173 +570,103 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- public synchronized void
- abortBatchRequest()
+ public synchronized void abortBatchRequest()
{
_batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
+ _batchAutoFlush);
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
- assert(_batchStreamInUse);
+ assert (_batchStreamInUse);
_batchStreamInUse = false;
notifyAll();
}
@Override
- public void
- flushBatchRequests()
+ public void flushBatchRequests()
{
- IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name);
- try
- {
- out.invoke();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
+ end_flushBatchRequests(begin_flushBatchRequests());
}
private static final String __flushBatchRequests_name = "flushBatchRequests";
@Override
- public Ice.AsyncResult
- begin_flushBatchRequests()
+ public Ice.AsyncResult begin_flushBatchRequests()
{
return begin_flushBatchRequestsInternal(null);
}
@Override
- public Ice.AsyncResult
- begin_flushBatchRequests(Callback cb)
+ public Ice.AsyncResult begin_flushBatchRequests(Callback cb)
{
return begin_flushBatchRequestsInternal(cb);
}
@Override
- public Ice.AsyncResult
- begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb)
+ public Ice.AsyncResult begin_flushBatchRequests(Callback_Connection_flushBatchRequests cb)
{
return begin_flushBatchRequestsInternal(cb);
}
@Override
- public AsyncResult
- begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb,
- IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
- IceInternal.Functional_BoolCallback __sentCb)
+ public AsyncResult begin_flushBatchRequests(IceInternal.Functional_VoidCallback __responseCb,
+ IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
+ IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_flushBatchRequestsInternal(
- new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb)
+ return begin_flushBatchRequestsInternal(new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb)
+ {
+ @Override
+ public final void __completed(AsyncResult __result)
+ {
+ try
{
- @Override
- public final void __completed(AsyncResult __result)
- {
- try
- {
- __result.getConnection().end_flushBatchRequests(__result);
- }
- catch(Exception __ex)
- {
- __exceptionCb.apply(__ex);
- }
- }
- });
+ __result.getConnection().end_flushBatchRequests(__result);
+ }
+ catch(Exception __ex)
+ {
+ __exceptionCb.apply(__ex);
+ }
+ }
+ });
}
- private Ice.AsyncResult
- begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
+ private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
{
- IceInternal.ConnectionBatchOutgoingAsync result =
- new IceInternal.ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb);
+ IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this,
+ _communicator, _instance, __flushBatchRequests_name, cb);
try
{
result.__invoke();
}
catch(LocalException __ex)
{
- result.__invokeExceptionAsync(__ex);
+ result.invokeExceptionAsync(__ex);
}
return result;
}
@Override
- public void
- end_flushBatchRequests(AsyncResult r)
+ public void end_flushBatchRequests(AsyncResult ir)
{
- AsyncResult.__check(r, this, __flushBatchRequests_name);
+ IceInternal.AsyncResultI r = (IceInternal.AsyncResultI) ir;
+ IceInternal.AsyncResultI.check(r, this, __flushBatchRequests_name);
r.__wait();
}
- synchronized public boolean
- flushBatchRequests(IceInternal.BatchOutgoing out)
- {
- waitBatchStreamInUse();
- if(_exception != null)
- {
- throw (Ice.LocalException)_exception.fillInStackTrace();
- }
-
- if(_batchRequestNum == 0)
- {
- out.sent();
- return true;
- }
-
- //
- // Fill in the number of requests in the batch.
- //
- _batchStream.pos(IceInternal.Protocol.headerSize);
- _batchStream.writeInt(_batchRequestNum);
-
- out.attachRemoteObserver(initConnectionInfo(), _endpoint,
- _batchStream.size() - IceInternal.Protocol.headerSize - 4);
-
- _batchStream.swap(out.os());
-
- //
- // Send the batch stream.
- //
- boolean sent = false;
- try
- {
- OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, 0);
- sent = (sendMessage(message) & IceInternal.AsyncStatus.Sent) > 0;
- }
- catch(Ice.LocalException ex)
- {
- setState(StateClosed, ex);
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
- }
-
- //
- // Reset the batch stream.
- //
- _batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
- return sent;
- }
-
- synchronized public int
- flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
+ synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
{
waitBatchStreamInUse();
if(_exception != null)
{
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
if(_batchRequestNum == 0)
{
int status = IceInternal.AsyncStatus.Sent;
- if(outAsync.__sent())
+ if(outAsync.sent())
{
status |= IceInternal.AsyncStatus.InvokeSentCallback;
}
@@ -839,10 +679,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.pos(IceInternal.Protocol.headerSize);
_batchStream.writeInt(_batchRequestNum);
- outAsync.__attachRemoteObserver(initConnectionInfo(), _endpoint, 0,
- _batchStream.size() - IceInternal.Protocol.headerSize - 4);
+ outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() -
+ IceInternal.Protocol.headerSize - 4);
- _batchStream.swap(outAsync.__getOs());
+ _batchStream.swap(outAsync.getOs());
//
// Send the batch stream.
@@ -850,21 +690,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
int status;
try
{
- OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__getOs(), _batchRequestCompress, 0);
+ OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.getOs(), _batchRequestCompress, 0);
status = sendMessage(message);
}
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
//
// Reset the batch stream.
//
_batchStream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
+ _batchAutoFlush);
_batchRequestNum = 0;
_batchRequestCompress = false;
_batchMarker = 0;
@@ -872,8 +712,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- synchronized public void
- setCallback(ConnectionCallback callback)
+ synchronized public void setCallback(ConnectionCallback callback)
{
if(_state > StateClosing)
{
@@ -883,8 +722,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- synchronized public void
- setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, Ice.Optional<ACMHeartbeat> heartbeat)
+ synchronized public void setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close,
+ Ice.Optional<ACMHeartbeat> heartbeat)
{
if(_monitor != null)
{
@@ -893,10 +732,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_monitor.remove(this);
}
_monitor = _monitor.acm(timeout, close, heartbeat);
-
+
if(_monitor.getACM().timeout <= 0)
{
- _acmLastActivity = -1; // Disable the recording of last activity.
+ _acmLastActivity = -1; // Disable the recording of last
+ // activity.
}
else if(_state == StateActive && _acmLastActivity == -1)
{
@@ -912,59 +752,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- synchronized public Ice.ACM
- getACM()
+ synchronized public Ice.ACM getACM()
{
return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
}
- synchronized public boolean
- requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex)
- {
- java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
- while(it.hasNext())
- {
- OutgoingMessage o = it.next();
- if(o.out == out)
- {
- if(o.requestId > 0)
- {
- _requests.remove(o.requestId);
- }
-
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- o.timedOut();
- if(o != _sendStreams.getFirst())
- {
- it.remove();
- }
- out.finished(ex);
- return true; // We're done.
- }
- }
-
- if(out instanceof IceInternal.Outgoing)
- {
- IceInternal.Outgoing o = (IceInternal.Outgoing)out;
- java.util.Iterator<IceInternal.Outgoing> it2 = _requests.values().iterator();
- while(it2.hasNext())
- {
- if(it2.next() == o)
- {
- o.finished(ex);
- it2.remove();
- return true; // We're done.
- }
- }
- }
- return false;
- }
-
- public boolean
- asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
+ synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync,
+ Ice.LocalException ex)
{
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
@@ -978,42 +772,44 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
+ // If the request is being sent, don't remove it from the send
+ // streams, it will be removed once the sending is finished.
+ //
+ // Note that since we swapped the message stream to _writeStream
+ // it's fine if the OutgoingAsync output stream is released (and
+ // as long as canceled requests cannot be retried).
//
o.timedOut();
if(o != _sendStreams.getFirst())
{
it.remove();
}
- outAsync.__dispatchInvocationCancel(ex, _threadPool, this);
+ outAsync.dispatchInvocationCancel(ex, _threadPool, this);
return true; // We're done
}
}
if(outAsync instanceof IceInternal.OutgoingAsync)
{
- IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync;
+ IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync) outAsync;
java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator();
while(it2.hasNext())
{
if(it2.next() == o)
{
it2.remove();
- outAsync.__dispatchInvocationCancel(ex, _threadPool, this);
+ outAsync.dispatchInvocationCancel(ex, _threadPool, this);
return true; // We're done.
}
}
}
-
return false;
}
@Override
- synchronized public void
- sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
+ synchronized public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
{
- assert(_state > StateNotValidated);
+ assert (_state > StateNotValidated);
try
{
@@ -1028,8 +824,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_state >= StateClosed)
{
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
sendMessage(new OutgoingMessage(os, compressFlag != 0, true));
@@ -1046,10 +842,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- synchronized public void
- sendNoResponse()
+ synchronized public void sendNoResponse()
{
- assert(_state > StateNotValidated);
+ assert (_state > StateNotValidated);
try
{
if(--_dispatchCount == 0)
@@ -1063,8 +858,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_state >= StateClosed)
{
- assert(_exception != null);
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ assert (_exception != null);
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
if(_state == StateClosing && _dispatchCount == 0)
@@ -1079,27 +874,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- public boolean
- systemException(int requestId, Ice.SystemException ex)
+ public boolean systemException(int requestId, Ice.SystemException ex)
{
return false; // System exceptions aren't marshalled.
}
- public IceInternal.EndpointI
- endpoint()
+ public IceInternal.EndpointI endpoint()
{
- return _endpoint; // No mutex protection necessary, _endpoint is immutable.
+ return _endpoint; // No mutex protection necessary, _endpoint is
+ // immutable.
}
- public IceInternal.Connector
- connector()
+ public IceInternal.Connector connector()
{
- return _connector; // No mutex protection necessary, _connector is immutable.
+ return _connector; // No mutex protection necessary, _connector is
+ // immutable.
}
@Override
- public synchronized void
- setAdapter(ObjectAdapter adapter)
+ public synchronized void setAdapter(ObjectAdapter adapter)
{
if(_state <= StateNotValidated || _state >= StateClosing)
{
@@ -1110,7 +903,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_adapter != null)
{
- _servantManager = ((ObjectAdapterI)_adapter).getServantManager();
+ _servantManager = ((ObjectAdapterI) _adapter).getServantManager();
if(_servantManager == null)
{
_adapter = null;
@@ -1129,22 +922,20 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- public synchronized ObjectAdapter
- getAdapter()
+ public synchronized ObjectAdapter getAdapter()
{
return _adapter;
}
@Override
- public Endpoint
- getEndpoint()
+ public Endpoint getEndpoint()
{
- return _endpoint; // No mutex protection necessary, _endpoint is immutable.
+ return _endpoint; // No mutex protection necessary, _endpoint is
+ // immutable.
}
@Override
- public ObjectPrx
- createProxy(Identity ident)
+ public ObjectPrx createProxy(Identity ident)
{
//
// Create a reference and return a reverse proxy for this
@@ -1157,8 +948,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Operations from EventHandler
//
@Override
- public void
- message(IceInternal.ThreadPoolCurrent current)
+ public void message(IceInternal.ThreadPoolCurrent current)
{
StartCallback startCB = null;
java.util.List<OutgoingMessage> sentCBs = null;
@@ -1213,7 +1003,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
if(_observer != null && !_readHeader)
{
- assert(!buf.b.hasRemaining());
+ assert (!buf.b.hasRemaining());
observerFinishRead(buf);
}
@@ -1277,7 +1067,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_endpoint.datagram())
{
- throw new Ice.DatagramLimitException(); // The message was truncated.
+ // The message was truncated.
+ throw new Ice.DatagramLimitException();
}
continue;
}
@@ -1286,7 +1077,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
int newOp = readOp | writeOp;
readyOp = readyOp & ~newOp;
- assert(readyOp != 0 || newOp != 0);
+ assert (readyOp != 0 || newOp != 0);
if(_state <= StateNotValidated)
{
@@ -1329,7 +1120,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else
{
- assert(_state <= StateClosingPending);
+ assert (_state <= StateClosingPending);
//
// We parse messages first, if we receive a close
@@ -1337,7 +1128,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if((readyOp & IceInternal.SocketOperation.Read) != 0)
{
- info = new MessageInfo(current.stream); // Optimization: use the thread's stream.
+ // Optimization: use the thread's stream.
+ info = new MessageInfo(current.stream);
newOp |= parseMessage(info);
}
@@ -1411,19 +1203,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
current.ioCompleted();
}
- if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
+ if(!_dispatcher) // Optimization, call dispatch() directly if there's no
+ // dispatcher.
{
dispatch(startCB, sentCBs, info);
}
else
{
- if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback
+ // No need for the stream if heartbeat callback
+ if(info != null && info.heartbeatCallback == null)
{
//
- // Create a new stream for the dispatch instead of using the thread
- // pool's thread stream.
+ // Create a new stream for the dispatch instead of using the
+ // thread pool's thread stream.
//
- assert(info.stream == current.stream);
+ assert (info.stream == current.stream);
IceInternal.BasicStream stream = info.stream;
info.stream = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
info.stream.swap(stream);
@@ -1432,21 +1226,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
final StartCallback finalStartCB = startCB;
final java.util.List<OutgoingMessage> finalSentCBs = sentCBs;
final MessageInfo finalInfo = info;
- _threadPool.dispatchFromThisThread(
- new IceInternal.DispatchWorkItem(this)
+ _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- dispatch(finalStartCB, finalSentCBs, finalInfo);
- }
- });
+ dispatch(finalStartCB, finalSentCBs, finalInfo);
+ }
+ });
}
}
- protected void
- dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
+ protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
{
int count = 0;
@@ -1467,7 +1258,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
for(OutgoingMessage msg : sentCBs)
{
- msg.outAsync.__invokeSent();
+ msg.outAsync.invokeSent();
}
++count;
}
@@ -1480,7 +1271,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if(info.outAsync != null)
{
- info.outAsync.__finished(info.stream);
+ info.outAsync.finished(info.stream);
++count;
}
@@ -1504,8 +1295,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if(info.invokeNum > 0)
{
- invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager,
- info.adapter);
+ invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
//
// Don't increase count, the dispatch count is
@@ -1552,19 +1342,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- public void
- finished(IceInternal.ThreadPoolCurrent current)
+ public void finished(IceInternal.ThreadPoolCurrent current)
{
synchronized(this)
{
- assert(_state == StateClosed);
+ assert (_state == StateClosed);
unscheduleTimeout(IceInternal.SocketOperation.Read | IceInternal.SocketOperation.Write);
}
//
- // If there are no callbacks to call, we don't call ioCompleted() since we're not going
- // to call code that will potentially block (this avoids promoting a new leader and
- // unecessary thread creation, especially if this is called on shutdown).
+ // If there are no callbacks to call, we don't call ioCompleted() since
+ // we're not going to call code that will potentially block (this avoids
+ // promoting a new leader and unecessary thread creation, especially if
+ // this is called on shutdown).
//
if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty() && _callback == null)
{
@@ -1573,27 +1363,25 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
current.ioCompleted();
- if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher.
+ if(!_dispatcher) // Optimization, call finish() directly if there's no
+ // dispatcher.
{
finish();
}
else
{
- _threadPool.dispatchFromThisThread(
- new IceInternal.DispatchWorkItem(this)
+ _threadPool.dispatchFromThisThread(new IceInternal.DispatchWorkItem(this)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- finish();
- }
- });
+ finish();
+ }
+ });
}
}
- public void
- finish()
+ public void finish()
{
if(_startCallback != null)
{
@@ -1618,28 +1406,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
p.finished(_exception);
if(p.requestId > 0) // Make sure finished isn't called twice.
{
- if(p.out != null)
- {
- _requests.remove(p.requestId);
- }
- else
- {
- _asyncRequests.remove(p.requestId);
- }
+ _asyncRequests.remove(p.requestId);
}
}
_sendStreams.clear();
}
- for(IceInternal.Outgoing p : _requests.values())
- {
- p.finished(_exception);
- }
- _requests.clear();
-
for(IceInternal.OutgoingAsync p : _asyncRequests.values())
{
- p.__finished(_exception);
+ p.finished(_exception);
}
_asyncRequests.clear();
@@ -1657,8 +1432,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // This must be done last as this will cause waitUntilFinished() to return (and communicator
- // objects such as the timer might be destroyed too).
+ // This must be done last as this will cause waitUntilFinished() to
+ // return (and communicator objects such as the timer might be destroyed
+ // too).
//
synchronized(this)
{
@@ -1671,21 +1447,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- public String
- toString()
+ public String toString()
{
return _toString();
}
@Override
- public java.nio.channels.SelectableChannel
- fd()
+ public java.nio.channels.SelectableChannel fd()
{
return _transceiver.fd();
}
- public synchronized void
- timedOut()
+ public synchronized void timedOut()
{
if(_state <= StateNotValidated)
{
@@ -1702,49 +1475,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- public String
- type()
+ public String type()
{
return _type; // No mutex lock, _type is immutable.
}
@Override
- public int
- timeout()
+ public int timeout()
{
- return _endpoint.timeout(); // No mutex protection necessary, _endpoint is immutable.
+ return _endpoint.timeout(); // No mutex protection necessary, _endpoint
+ // is immutable.
}
@Override
- public synchronized ConnectionInfo
- getInfo()
+ public synchronized ConnectionInfo getInfo()
{
if(_state >= StateClosed)
{
- throw (Ice.LocalException)_exception.fillInStackTrace();
+ throw (Ice.LocalException) _exception.fillInStackTrace();
}
return initConnectionInfo();
}
@Override
- public String
- _toString()
+ public String _toString()
{
return _desc; // No mutex lock, _desc is immutable.
}
- public synchronized void
- exception(LocalException ex)
+ public synchronized void exception(LocalException ex)
{
setState(StateClosed, ex);
}
@Override
- public synchronized void
- invokeException(int requestId, LocalException ex, int invokeNum)
+ public synchronized void invokeException(int requestId, LocalException ex, int invokeNum)
{
//
- // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
+ // Fatal exception while invoking a request. Since
+ // sendResponse/sendNoResponse isn't
// called in case of a fatal exception we decrement _dispatchCount here.
//
@@ -1752,9 +1521,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(invokeNum > 0)
{
- assert(_dispatchCount > 0);
+ assert (_dispatchCount > 0);
_dispatchCount -= invokeNum;
- assert(_dispatchCount >= 0);
+ assert (_dispatchCount >= 0);
if(_dispatchCount == 0)
{
if(_state == StateFinished)
@@ -1767,8 +1536,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
public ConnectionI(Communicator communicator, IceInternal.Instance instance, IceInternal.ACMMonitor monitor,
- IceInternal.Transceiver transceiver, IceInternal.Connector connector,
- IceInternal.EndpointI endpoint, ObjectAdapter adapter)
+ IceInternal.Transceiver transceiver, IceInternal.Connector connector, IceInternal.EndpointI endpoint,
+ ObjectAdapter adapter)
{
_communicator = communicator;
_instance = instance;
@@ -1780,7 +1549,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_endpoint = endpoint;
_adapter = adapter;
final Ice.InitializationData initData = instance.initializationData();
- _dispatcher = initData.dispatcher != null; // Cached for better performance.
+ // Cached for better performance.
+ _dispatcher = initData.dispatcher != null;
_logger = initData.logger; // Cached for better performance.
_traceLevels = instance.traceLevels(); // Cached for better performance.
_timer = instance.timer();
@@ -1802,7 +1572,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_nextRequestId = 1;
_batchAutoFlush = initData.properties.getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0 ? true : false;
_batchStream = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding,
- _batchAutoFlush);
+ _batchAutoFlush);
_batchStreamInUse = false;
_batchRequestNum = 0;
_batchRequestCompress = false;
@@ -1828,7 +1598,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_adapter != null)
{
- _servantManager = ((ObjectAdapterI)_adapter).getServantManager();
+ _servantManager = ((ObjectAdapterI) _adapter).getServantManager();
}
else
{
@@ -1839,7 +1609,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_adapter != null)
{
- _threadPool = ((ObjectAdapterI)_adapter).getThreadPool();
+ _threadPool = ((ObjectAdapterI) _adapter).getThreadPool();
}
else
{
@@ -1858,9 +1628,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
@Override
- protected synchronized void
- finalize()
- throws Throwable
+ protected synchronized void finalize() throws Throwable
{
try
{
@@ -1868,7 +1636,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished);
IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0);
IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty());
- IceUtilInternal.Assert.FinalizerAssert(_requests.isEmpty());
IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty());
}
catch(java.lang.Exception ex)
@@ -1889,14 +1656,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private static final int StateClosed = 6;
private static final int StateFinished = 7;
- private void
- setState(int state, LocalException ex)
+ private void setState(int state, LocalException ex)
{
//
// If setState() is called with an exception, then only closed
// and closing states are permissible.
//
- assert(state >= StateClosing);
+ assert (state >= StateClosing);
if(_state == state) // Don't switch twice.
{
@@ -1908,7 +1674,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// If we are in closed state, an exception must be set.
//
- assert(_state != StateClosed);
+ assert (_state != StateClosed);
_exception = ex;
@@ -1924,8 +1690,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_exception instanceof ForcedCloseConnectionException ||
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException ||
- (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
warning("connection exception", _exception);
}
@@ -1940,8 +1705,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(state);
}
- private void
- setState(int state)
+ private void setState(int state)
{
//
// We don't want to send close connection messages if the endpoint
@@ -1969,83 +1733,83 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
switch(state)
{
- case StateNotInitialized:
- {
- assert(false);
- break;
- }
-
- case StateNotValidated:
- {
- if(_state != StateNotInitialized)
+ case StateNotInitialized:
{
- assert(_state == StateClosed);
- return;
+ assert (false);
+ break;
}
- break;
- }
- case StateActive:
- {
- //
- // Can only switch from holding or not validated to
- // active.
- //
- if(_state != StateHolding && _state != StateNotValidated)
+ case StateNotValidated:
{
- return;
+ if(_state != StateNotInitialized)
+ {
+ assert (_state == StateClosed);
+ return;
+ }
+ break;
}
- _threadPool.register(this, IceInternal.SocketOperation.Read);
- break;
- }
- case StateHolding:
- {
- //
- // Can only switch from active or not validated to
- // holding.
- //
- if(_state != StateActive && _state != StateNotValidated)
+ case StateActive:
{
- return;
+ //
+ // Can only switch from holding or not validated to
+ // active.
+ //
+ if(_state != StateHolding && _state != StateNotValidated)
+ {
+ return;
+ }
+ _threadPool.register(this, IceInternal.SocketOperation.Read);
+ break;
}
- if(_state == StateActive)
+
+ case StateHolding:
{
- _threadPool.unregister(this, IceInternal.SocketOperation.Read);
+ //
+ // Can only switch from active or not validated to
+ // holding.
+ //
+ if(_state != StateActive && _state != StateNotValidated)
+ {
+ return;
+ }
+ if(_state == StateActive)
+ {
+ _threadPool.unregister(this, IceInternal.SocketOperation.Read);
+ }
+ break;
}
- break;
- }
- case StateClosing:
- case StateClosingPending:
- {
- //
- // Can't change back from closing pending.
- //
- if(_state >= StateClosingPending)
+ case StateClosing:
+ case StateClosingPending:
{
- return;
+ //
+ // Can't change back from closing pending.
+ //
+ if(_state >= StateClosingPending)
+ {
+ return;
+ }
+ break;
}
- break;
- }
- case StateClosed:
- {
- if(_state == StateFinished)
+ case StateClosed:
{
- return;
+ if(_state == StateFinished)
+ {
+ return;
+ }
+ _threadPool.finish(this);
+ break;
}
- _threadPool.finish(this);
- break;
- }
- case StateFinished:
- {
- assert(_state == StateClosed);
- _transceiver.close();
- _communicator = null;
- break;
- }
+ case StateFinished:
+ {
+ assert (_state == StateClosed);
+ _transceiver.close();
+ _communicator = null;
+ break;
+ }
}
}
catch(Ice.LocalException ex)
@@ -2086,10 +1850,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
Ice.Instrumentation.ConnectionState newState = toConnectionState(state);
if(oldState != newState)
{
- _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(),
- _endpoint,
- newState,
- _observer);
+ _observer = _instance.getObserver().getConnectionObserver(initConnectionInfo(), _endpoint, newState,
+ _observer);
if(_observer != null)
{
_observer.attach();
@@ -2106,8 +1868,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_exception instanceof ForcedCloseConnectionException ||
_exception instanceof ConnectionTimeoutException ||
_exception instanceof CommunicatorDestroyedException ||
- _exception instanceof ObjectAdapterDeactivatedException ||
- (_exception instanceof ConnectionLostException && _state >= StateClosing)))
+ _exception instanceof ObjectAdapterDeactivatedException || (_exception instanceof ConnectionLostException && _state >= StateClosing)))
{
_observer.failed(_exception.ice_name());
}
@@ -2130,11 +1891,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private void
- initiateShutdown()
+ private void initiateShutdown()
{
- assert(_state == StateClosing);
- assert(_dispatchCount == 0);
+ assert (_state == StateClosing);
+ assert (_dispatchCount == 0);
if(_shutdownInitiated)
{
@@ -2148,12 +1908,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Before we shut down, we send a close connection message.
//
IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
- IceInternal.Protocol.currentProtocolEncoding);
+ IceInternal.Protocol.currentProtocolEncoding);
os.writeBlob(IceInternal.Protocol.magic);
IceInternal.Protocol.currentProtocol.__write(os);
IceInternal.Protocol.currentProtocolEncoding.__write(os);
os.writeByte(IceInternal.Protocol.closeConnectionMsg);
- os.writeByte((byte)0); // compression status: always report 0 for CloseConnection in Java.
+ os.writeByte((byte) 0); // compression status: always report 0 for
+ // CloseConnection in Java.
os.writeInt(IceInternal.Protocol.headerSize); // Message size.
if((sendMessage(new OutgoingMessage(os, false, false)) & IceInternal.AsyncStatus.Sent) > 0)
@@ -2161,7 +1922,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateClosingPending);
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the the transceiver of the graceful connection
+ // closure.
//
int op = _transceiver.closing(true, _exception);
if(op != 0)
@@ -2173,20 +1935,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private void
- heartbeat()
+ private void heartbeat()
{
- assert(_state == StateActive);
+ assert (_state == StateActive);
if(!_endpoint.datagram())
{
IceInternal.BasicStream os = new IceInternal.BasicStream(_instance,
- IceInternal.Protocol.currentProtocolEncoding);
+ IceInternal.Protocol.currentProtocolEncoding);
os.writeBlob(IceInternal.Protocol.magic);
IceInternal.Protocol.currentProtocol.__write(os);
IceInternal.Protocol.currentProtocolEncoding.__write(os);
os.writeByte(IceInternal.Protocol.validateConnectionMsg);
- os.writeByte((byte)0);
+ os.writeByte((byte) 0);
os.writeInt(IceInternal.Protocol.headerSize); // Message size.
try
@@ -2197,13 +1958,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- assert(_exception != null);
+ assert (_exception != null);
}
}
}
- private boolean
- initialize(int operation)
+ private boolean initialize(int operation)
{
int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), _hasMoreData);
if(s != IceInternal.SocketOperation.None)
@@ -2214,7 +1974,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // Update the connection description once the transceiver is initialized.
+ // Update the connection description once the transceiver is
+ // initialized.
//
_desc = _transceiver.toString();
setState(StateNotValidated);
@@ -2222,12 +1983,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return true;
}
- private boolean
- validate(int operation)
+ private boolean validate(int operation)
{
- if(!_endpoint.datagram()) // Datagram connections are always implicitly validated.
+ if(!_endpoint.datagram()) // Datagram connections are always implicitly
+ // validated.
{
- if(_adapter != null) // The server side has the active role for connection validation.
+ if(_adapter != null) // The server side has the active role for
+ // connection validation.
{
if(_writeStream.isEmpty())
{
@@ -2235,8 +1997,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.Protocol.currentProtocol.__write(_writeStream);
IceInternal.Protocol.currentProtocolEncoding.__write(_writeStream);
_writeStream.writeByte(IceInternal.Protocol.validateConnectionMsg);
- _writeStream.writeByte((byte)0); // Compression status (always zero for validate connection).
- _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message size.
+ _writeStream.writeByte((byte) 0); // Compression status
+ // (always zero for
+ // validate connection).
+ _writeStream.writeInt(IceInternal.Protocol.headerSize); // Message
+ // size.
IceInternal.TraceUtil.traceSend(_writeStream, _logger, _traceLevels);
_writeStream.prepareWrite();
}
@@ -2262,7 +2027,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
observerFinishWrite(_writeStream.getBuffer());
}
}
- else // The client side has the passive role for connection validation.
+ else
+ // The client side has the passive role for connection validation.
{
if(_readStream.isEmpty())
{
@@ -2291,7 +2057,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
observerFinishRead(_readStream.getBuffer());
}
- assert(_readStream.pos() == IceInternal.Protocol.headerSize);
+ assert (_readStream.pos() == IceInternal.Protocol.headerSize);
_readStream.pos(0);
byte[] m = _readStream.readBlob(4);
if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] ||
@@ -2313,7 +2079,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
throw new ConnectionNotValidatedException();
}
- _readStream.readByte(); // Ignore compression status for validate connection.
+ _readStream.readByte(); // Ignore compression status for
+ // validate connection.
int size = _readStream.readInt();
if(size != IceInternal.Protocol.headerSize)
{
@@ -2335,8 +2102,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return true;
}
- private int
- sendNextMessage(java.util.List<OutgoingMessage> callbacks)
+ private int sendNextMessage(java.util.List<OutgoingMessage> callbacks)
{
if(_sendStreams.isEmpty())
{
@@ -2344,13 +2110,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
else if(_state == StateClosingPending && _writeStream.pos() == 0)
{
- // Message wasn't sent, empty the _writeStream, we're not going to send more data.
+ // Message wasn't sent, empty the _writeStream, we're not going to
+ // send more data.
OutgoingMessage message = _sendStreams.getFirst();
_writeStream.swap(message.stream);
return IceInternal.SocketOperation.None;
}
- assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
+ assert (!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
try
{
while(true)
@@ -2390,7 +2157,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Otherwise, prepare the next message stream for writing.
//
message = _sendStreams.getFirst();
- assert(!message.prepared);
+ assert (!message.prepared);
IceInternal.BasicStream stream = message.stream;
message.stream = doCompress(stream, message.compress);
@@ -2435,8 +2202,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // If all the messages were sent and we are in the closing state, we schedule
- // the close timeout to wait for the peer to close the connection.
+ // If all the messages were sent and we are in the closing state, we
+ // schedule the close timeout to wait for the peer to close the
+ // connection.
//
if(_state == StateClosing && _dispatchCount == 0)
{
@@ -2451,10 +2219,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return IceInternal.SocketOperation.None;
}
- private int
- sendMessage(OutgoingMessage message)
+ private int sendMessage(OutgoingMessage message)
{
- assert(_state < StateClosed);
+ assert (_state < StateClosed);
if(!_sendStreams.isEmpty())
{
@@ -2464,11 +2231,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
//
- // Attempt to send the message without blocking. If the send blocks, we register
- // the connection with the selector thread.
+ // Attempt to send the message without blocking. If the send blocks, we
+ // register the connection with the selector thread.
//
- assert(!message.prepared);
+ assert (!message.prepared);
IceInternal.BasicStream stream = message.stream;
@@ -2523,15 +2290,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return IceInternal.AsyncStatus.Queued;
}
- private IceInternal.BasicStream
- doCompress(IceInternal.BasicStream uncompressed, boolean compress)
+ private IceInternal.BasicStream doCompress(IceInternal.BasicStream uncompressed, boolean compress)
{
boolean compressionSupported = false;
if(compress)
{
//
- // Don't check whether compression support is available unless the proxy
- // is configured for compression.
+ // Don't check whether compression support is available unless the
+ // proxy is configured for compression.
//
compressionSupported = IceInternal.BasicStream.compressible();
}
@@ -2548,7 +2314,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Set compression status.
//
cstream.pos(9);
- cstream.writeByte((byte)2);
+ cstream.writeByte((byte) 2);
//
// Write the size of the compressed stream into the header.
@@ -2557,11 +2323,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
cstream.writeInt(cstream.size());
//
- // Write the compression status and size of the compressed stream into the header of the
- // uncompressed stream -- we need this to trace requests correctly.
+ // Write the compression status and size of the compressed
+ // stream into the header of the uncompressed stream -- we need
+ // this to trace requests correctly.
//
uncompressed.pos(9);
- uncompressed.writeByte((byte)2);
+ uncompressed.writeByte((byte) 2);
uncompressed.writeInt(cstream.size());
return cstream;
@@ -2569,7 +2336,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
uncompressed.pos(9);
- uncompressed.writeByte((byte)(compressionSupported ? 1 : 0));
+ uncompressed.writeByte((byte) (compressionSupported ? 1 : 0));
//
// Not compressed, fill in the message size.
@@ -2597,17 +2364,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
ConnectionCallback heartbeatCallback;
}
- private int
- parseMessage(MessageInfo info)
+ private int parseMessage(MessageInfo info)
{
- assert(_state > StateNotValidated && _state < StateClosed);
+ assert (_state > StateNotValidated && _state < StateClosed);
_readStream.swap(info.stream);
_readStream.resize(IceInternal.Protocol.headerSize, true);
_readStream.pos(0);
_readHeader = true;
- assert(info.stream.pos() == info.stream.size());
+ assert (info.stream.pos() == info.stream.size());
//
// Connection is validated on first message. This is only used by
@@ -2626,7 +2392,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
info.stream.pos(8);
byte messageType = info.stream.readByte();
info.compress = info.stream.readByte();
- if(info.compress == (byte)2)
+ if(info.compress == (byte) 2)
{
if(IceInternal.BasicStream.compressible())
{
@@ -2636,7 +2402,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
FeatureNotSupportedException ex = new FeatureNotSupportedException();
ex.unsupportedFeature = "Cannot uncompress compressed message: "
- + "org.apache.tools.bzip2.CBZip2OutputStream was not found";
+ + "org.apache.tools.bzip2.CBZip2OutputStream was not found";
throw ex;
}
}
@@ -2659,7 +2425,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
setState(StateClosingPending, new CloseConnectionException());
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the the transceiver of the graceful connection
+ // closure.
//
int op = _transceiver.closing(false, _exception);
if(op != 0)
@@ -2675,9 +2442,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state >= StateClosing)
{
- IceInternal.TraceUtil.trace("received request during closing\n" +
- "(ignored by server, client will retry)",
- info.stream, _logger, _traceLevels);
+ IceInternal.TraceUtil.trace("received request during closing\n"
+ + "(ignored by server, client will retry)", info.stream, _logger,
+ _traceLevels);
}
else
{
@@ -2695,9 +2462,9 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(_state >= StateClosing)
{
- IceInternal.TraceUtil.trace("received batch request during closing\n" +
- "(ignored by server, client will retry)",
- info.stream, _logger, _traceLevels);
+ IceInternal.TraceUtil.trace("received batch request during closing\n"
+ + "(ignored by server, client will retry)", info.stream, _logger,
+ _traceLevels);
}
else
{
@@ -2719,18 +2486,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- IceInternal.Outgoing out = _requests.remove(info.requestId);
- if(out != null)
- {
- out.finished(info.stream);
- }
- else
+ info.outAsync = _asyncRequests.remove(info.requestId);
+ if(info.outAsync != null)
{
- info.outAsync = _asyncRequests.remove(info.requestId);
- if(info.outAsync != null)
- {
- ++_dispatchCount;
- }
+ ++_dispatchCount;
}
notifyAll(); // Notify threads blocked in close(false)
break;
@@ -2750,7 +2509,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
default:
{
IceInternal.TraceUtil.trace("received unknown message\n(invalid, closing connection)", info.stream,
- _logger, _traceLevels);
+ _logger, _traceLevels);
throw new UnknownMessageException();
}
}
@@ -2773,9 +2532,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _state == StateHolding ? IceInternal.SocketOperation.None : IceInternal.SocketOperation.Read;
}
- private void
- invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress,
- IceInternal.ServantManager servantManager, ObjectAdapter adapter)
+ private void invokeAll(IceInternal.BasicStream stream, int invokeNum, int requestId, byte compress,
+ IceInternal.ServantManager servantManager, ObjectAdapter adapter)
{
//
// Note: In contrast to other private or protected methods, this
@@ -2811,7 +2569,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
invokeException(requestId, ex, invokeNum);
}
- catch(java.lang.AssertionError ex) // Upon assertion, we print the stack trace.
+ catch(java.lang.AssertionError ex) // Upon assertion, we print the stack
+ // trace.
{
UnknownException uex = new UnknownException(ex);
java.io.StringWriter sw = new java.io.StringWriter();
@@ -2842,8 +2601,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private void
- scheduleTimeout(int status)
+ private void scheduleTimeout(int status)
{
int timeout;
if(_state < StateActive)
@@ -2892,8 +2650,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_readTimeoutFuture.cancel(false);
}
- _readTimeoutFuture = _timer.schedule(_readTimeout, timeout,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ _readTimeoutFuture = _timer.schedule(_readTimeout, timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
}
if((status & (IceInternal.SocketOperation.Write | IceInternal.SocketOperation.Connect)) != 0)
{
@@ -2902,17 +2659,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_writeTimeoutFuture.cancel(false);
}
_writeTimeoutFuture = _timer.schedule(_writeTimeout, timeout,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ java.util.concurrent.TimeUnit.MILLISECONDS);
}
}
catch(Throwable ex)
{
- assert(false);
+ assert (false);
}
}
- private void
- unscheduleTimeout(int status)
+ private void unscheduleTimeout(int status)
{
if((status & IceInternal.SocketOperation.Read) != 0 && _readTimeoutFuture != null)
{
@@ -2927,8 +2683,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private ConnectionInfo
- initConnectionInfo()
+ private ConnectionInfo initConnectionInfo()
{
if(_info != null)
{
@@ -2941,19 +2696,18 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
info.incoming = _connector == null;
if(_state > StateNotInitialized)
{
- _info = info; // Cache the connection information only if initialized.
+ _info = info; // Cache the connection information only if
+ // initialized.
}
return info;
}
- private Ice.Instrumentation.ConnectionState
- toConnectionState(int state)
+ private Ice.Instrumentation.ConnectionState toConnectionState(int state)
{
return connectionStateMap[state];
}
- private void
- warning(String msg, java.lang.Exception ex)
+ private void warning(String msg, java.lang.Exception ex)
{
java.io.StringWriter sw = new java.io.StringWriter();
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
@@ -2963,42 +2717,38 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_logger.warning(s);
}
- private void
- observerStartRead(IceInternal.Buffer buf)
+ private void observerStartRead(IceInternal.Buffer buf)
{
if(_readStreamPos >= 0)
{
- assert(!buf.empty());
+ assert (!buf.empty());
_observer.receivedBytes(buf.b.position() - _readStreamPos);
}
_readStreamPos = buf.empty() ? -1 : buf.b.position();
}
- private void
- observerFinishRead(IceInternal.Buffer buf)
+ private void observerFinishRead(IceInternal.Buffer buf)
{
if(_readStreamPos == -1)
{
return;
}
- assert(buf.b.position() >= _readStreamPos);
+ assert (buf.b.position() >= _readStreamPos);
_observer.receivedBytes(buf.b.position() - _readStreamPos);
_readStreamPos = -1;
}
- private void
- observerStartWrite(IceInternal.Buffer buf)
+ private void observerStartWrite(IceInternal.Buffer buf)
{
if(_writeStreamPos >= 0)
{
- assert(!buf.empty());
+ assert (!buf.empty());
_observer.sentBytes(buf.b.position() - _writeStreamPos);
}
_writeStreamPos = buf.empty() ? -1 : buf.b.position();
}
- private void
- observerFinishWrite(IceInternal.Buffer buf)
+ private void observerFinishWrite(IceInternal.Buffer buf)
{
if(_writeStreamPos == -1)
{
@@ -3011,8 +2761,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_writeStreamPos = -1;
}
- private IceInternal.Incoming
- getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId)
+ private IceInternal.Incoming getIncoming(ObjectAdapter adapter, boolean response, byte compress, int requestId)
{
IceInternal.Incoming in = null;
@@ -3041,8 +2790,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return in;
}
- private void
- reclaimIncoming(IceInternal.Incoming in)
+ private void reclaimIncoming(IceInternal.Incoming in)
{
if(_cacheBuffers > 0)
{
@@ -3058,8 +2806,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- private void
- reap()
+ private void reap()
{
if(_monitor != null)
{
@@ -3086,7 +2833,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
wait();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
interrupted = true;
}
@@ -3110,17 +2857,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.requestId = 0;
}
- OutgoingMessage(IceInternal.OutgoingMessageCallback out, IceInternal.BasicStream stream, boolean compress,
- int requestId)
- {
- this.stream = stream;
- this.compress = compress;
- this.out = out;
- this.requestId = requestId;
- }
-
OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress,
- int requestId)
+ int requestId)
{
this.stream = stream;
this.compress = compress;
@@ -3128,56 +2866,42 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.requestId = requestId;
}
- public void
- timedOut()
+ public void timedOut()
{
- assert((out != null || outAsync != null));
- out = null;
+ assert (outAsync != null);
outAsync = null;
}
- public void
- adopt()
+ public void adopt()
{
if(adopt)
{
- IceInternal.BasicStream stream =
- new IceInternal.BasicStream(this.stream.instance(), IceInternal.Protocol.currentProtocolEncoding);
+ IceInternal.BasicStream stream = new IceInternal.BasicStream(this.stream.instance(),
+ IceInternal.Protocol.currentProtocolEncoding);
stream.swap(this.stream);
this.stream = stream;
adopt = false;
}
}
- public boolean
- sent()
+ public boolean sent()
{
- if(out != null)
+ if(outAsync != null)
{
- out.sent();
- }
- else if(outAsync != null)
- {
- return outAsync.__sent();
+ return outAsync.sent();
}
return false;
}
- public void
- finished(Ice.LocalException ex)
+ public void finished(Ice.LocalException ex)
{
- if(out != null)
- {
- out.finished(ex);
- }
- else if(outAsync != null)
+ if(outAsync != null)
{
- outAsync.__finished(ex);
+ outAsync.finished(ex);
}
}
public IceInternal.BasicStream stream;
- public IceInternal.OutgoingMessageCallback out;
public IceInternal.OutgoingAsyncMessageCallback outAsync;
public boolean compress;
public int requestId;
@@ -3219,10 +2943,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private int _nextRequestId;
- private java.util.Map<Integer, IceInternal.Outgoing> _requests =
- new java.util.HashMap<Integer, IceInternal.Outgoing>();
- private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests =
- new java.util.HashMap<Integer, IceInternal.OutgoingAsync>();
+ private java.util.Map<Integer, IceInternal.OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, IceInternal.OutgoingAsync>();
private LocalException _exception;
@@ -3262,14 +2983,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private ConnectionCallback _callback;
private static Ice.Instrumentation.ConnectionState connectionStateMap[] = {
- Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized
- Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated
- Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive
- Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding
- Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing
- Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending
- Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
- Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
+ Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotInitialized
+ Ice.Instrumentation.ConnectionState.ConnectionStateValidating, // StateNotValidated
+ Ice.Instrumentation.ConnectionState.ConnectionStateActive, // StateActive
+ Ice.Instrumentation.ConnectionState.ConnectionStateHolding, // StateHolding
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosing
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosing, // StateClosingPending
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
+ Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
};
}
diff --git a/java/src/Ice/ObjectPrxHelper.java b/java/src/Ice/ObjectPrxHelper.java
index 45c290df6e2..c390823e61f 100644
--- a/java/src/Ice/ObjectPrxHelper.java
+++ b/java/src/Ice/ObjectPrxHelper.java
@@ -152,7 +152,6 @@ public class ObjectPrxHelper extends ObjectPrxHelperBase
public static String
ice_staticId()
{
- return Ice.ObjectImpl.ice_staticId();
+ return Ice.ObjectImpl.ice_staticId();
}
-
}
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index cb7623ecbdd..edda748ea7a 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -9,8 +9,12 @@
package Ice;
+import java.util.LinkedList;
+import java.util.List;
+
import Ice.Instrumentation.InvocationObserver;
import IceInternal.QueueRequestHandler;
+import IceInternal.RetryException;
/**
* Base class for all proxies.
@@ -86,39 +90,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
ice_isA(String __id, java.util.Map<String, String> __context, boolean __explicitCtx)
{
__checkTwowayOnly(__ice_isA_name);
- IceInternal.Outgoing __og = getOutgoing(__ice_isA_name, OperationMode.Nonmutating, __context, __explicitCtx);
- try
- {
- try
- {
- IceInternal.BasicStream __os = __og.startWriteParams(Ice.FormatType.DefaultFormat);
- __os.writeString(__id);
- __og.endWriteParams();
- }
- catch(LocalException __ex)
- {
- __og.abort(__ex);
- }
- if(!__og.invoke())
- {
- try
- {
- __og.throwUserException();
- }
- catch(UserException __ex)
- {
- throw new UnknownUserException(__ex.ice_name(), __ex);
- }
- }
- IceInternal.BasicStream __is = __og.startReadParams();
- boolean __ret = __is.readBool();
- __og.endReadParams();
- return __ret;
- }
- finally
- {
- reclaimOutgoing(__og);
- }
+ return end_ice_isA(begin_ice_isA(__id, __context, __explicitCtx, true, null));
}
/**
@@ -131,7 +103,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_isA(String __id)
{
- return begin_ice_isA(__id, null, false, null);
+ return begin_ice_isA(__id, null, false, false, null);
}
/**
@@ -145,7 +117,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_isA(String __id, java.util.Map<String, String> __context)
{
- return begin_ice_isA(__id, __context, true, null);
+ return begin_ice_isA(__id, __context, true, false, null);
}
/**
@@ -159,7 +131,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_isA(String __id, Callback __cb)
{
- return begin_ice_isA(__id, null, false, __cb);
+ return begin_ice_isA(__id, null, false, false, __cb);
}
/**
@@ -174,7 +146,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_isA(String __id, java.util.Map<String, String> __context, Callback __cb)
{
- return begin_ice_isA(__id, __context, true, __cb);
+ return begin_ice_isA(__id, __context, true, false, __cb);
}
/**
@@ -188,7 +160,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_isA(String __id, Callback_Object_ice_isA __cb)
{
- return begin_ice_isA(__id, null, false, __cb);
+ return begin_ice_isA(__id, null, false, false, __cb);
}
/**
@@ -203,7 +175,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_isA(String __id, java.util.Map<String, String> __context, Callback_Object_ice_isA __cb)
{
- return begin_ice_isA(__id, __context, true, __cb);
+ return begin_ice_isA(__id, __context, true, false, __cb);
}
/**
@@ -220,7 +192,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_isA(__id, null, false, __responseCb, __exceptionCb, null);
+ return begin_ice_isA(__id, null, false, false, __responseCb, __exceptionCb, null);
}
/**
@@ -239,7 +211,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_isA(__id, null, false, __responseCb, __exceptionCb, __sentCb);
+ return begin_ice_isA(__id, null, false, false, __responseCb, __exceptionCb, __sentCb);
}
/**
@@ -258,7 +230,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_isA(__id, __context, true, __responseCb, __exceptionCb, null);
+ return begin_ice_isA(__id, __context, true, false, __responseCb, __exceptionCb, null);
}
/**
@@ -279,18 +251,19 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_isA(__id, __context, true, __responseCb, __exceptionCb, __sentCb);
+ return begin_ice_isA(__id, __context, true, false, __responseCb, __exceptionCb, __sentCb);
}
private final AsyncResult
begin_ice_isA(String __id,
java.util.Map<String, String> __context,
boolean __explicitCtx,
+ boolean __synchronous,
IceInternal.Functional_BoolCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_isA(__id, __context, __explicitCtx,
+ return begin_ice_isA(__id, __context, __explicitCtx, __synchronous,
new IceInternal.Functional_TwowayCallbackBool(__responseCb, __exceptionCb, __sentCb)
{
@Override
@@ -303,21 +276,21 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
private AsyncResult
begin_ice_isA(String __id, java.util.Map<String, String> __context, boolean __explicitCtx,
- IceInternal.CallbackBase __cb)
+ boolean __synchronous, IceInternal.CallbackBase __cb)
{
__checkAsyncTwowayOnly(__ice_isA_name);
- IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_isA_name, __cb);
+ IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_isA_name, __cb);
try
{
- __result.__prepare(__ice_isA_name, OperationMode.Nonmutating, __context, __explicitCtx);
- IceInternal.BasicStream __os = __result.__startWriteParams(Ice.FormatType.DefaultFormat);
+ __result.prepare(__ice_isA_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
+ IceInternal.BasicStream __os = __result.startWriteParams(Ice.FormatType.DefaultFormat);
__os.writeString(__id);
- __result.__endWriteParams();
- __result.__invoke(true);
+ __result.endWriteParams();
+ __result.invoke(true);
}
catch(Exception __ex)
{
- __result.__invokeExceptionAsync(__ex);
+ __result.invokeExceptionAsync(__ex);
}
return __result;
}
@@ -330,25 +303,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
**/
@Override
public final boolean
- end_ice_isA(AsyncResult __result)
+ end_ice_isA(AsyncResult __iresult)
{
- AsyncResult.__check(__result, this, __ice_isA_name);
- if(!__result.__wait())
+ IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
+ IceInternal.AsyncResultI.check(__result, this, __ice_isA_name);
+ try
{
- try
+ if(!__result.__wait())
{
- __result.__throwUserException();
+ try
+ {
+ __result.throwUserException();
+ }
+ catch(UserException __ex)
+ {
+ throw new UnknownUserException(__ex.ice_name(), __ex);
+ }
}
- catch(UserException __ex)
+ boolean __ret;
+ IceInternal.BasicStream __is = __result.startReadParams();
+ __ret = __is.readBool();
+ __result.endReadParams();
+ return __ret;
+ }
+ finally
+ {
+ if(__result != null)
{
- throw new UnknownUserException(__ex.ice_name(), __ex);
+ __result.cacheMessageBuffers();
}
}
- boolean __ret;
- IceInternal.BasicStream __is = __result.__startReadParams();
- __ret = __is.readBool();
- __result.__endReadParams();
- return __ret;
}
static public final void __ice_isA_completed(TwowayCallbackBool __cb, Ice.AsyncResult __result)
@@ -398,16 +382,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
private void
ice_ping(java.util.Map<String, String> __context, boolean __explicitCtx)
{
- IceInternal.Outgoing __og = getOutgoing(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx);
- try
- {
- __og.writeEmptyParams();
- __invoke(__og);
- }
- finally
- {
- reclaimOutgoing(__og);
- }
+ end_ice_ping(begin_ice_ping(__context, __explicitCtx, true, null));
}
/**
@@ -419,7 +394,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ping()
{
- return begin_ice_ping(null, false, null);
+ return begin_ice_ping(null, false, false, null);
}
/**
@@ -432,7 +407,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ping(java.util.Map<String, String> __context)
{
- return begin_ice_ping(__context, true, null);
+ return begin_ice_ping(__context, true, false, null);
}
/**
@@ -445,7 +420,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ping(Callback __cb)
{
- return begin_ice_ping(null, false, __cb);
+ return begin_ice_ping(null, false, false, __cb);
}
/**
@@ -459,7 +434,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ping(java.util.Map<String, String> __context, Callback __cb)
{
- return begin_ice_ping(__context, true, __cb);
+ return begin_ice_ping(__context, true, false, __cb);
}
/**
@@ -472,7 +447,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ping(Callback_Object_ice_ping __cb)
{
- return begin_ice_ping(null, false, __cb);
+ return begin_ice_ping(null, false, false, __cb);
}
/**
@@ -486,7 +461,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ping(java.util.Map<String, String> __context, Callback_Object_ice_ping __cb)
{
- return begin_ice_ping(__context, true, __cb);
+ return begin_ice_ping(__context, true, false, __cb);
}
/**
@@ -501,7 +476,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
begin_ice_ping(IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_ping(null, false,
+ return begin_ice_ping(null, false, false,
new IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, null));
}
@@ -519,8 +494,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_ping(null, false, new
- IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, __sentCb));
+ return begin_ice_ping(null, false, false, new IceInternal.Functional_OnewayCallback(__responseCb,
+ __exceptionCb, __sentCb));
}
/**
@@ -537,8 +512,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_VoidCallback __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_ping(__context, true,
- new IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, null));
+ return begin_ice_ping(__context, true, false, new IceInternal.Functional_OnewayCallback(__responseCb,
+ __exceptionCb, null));
}
/**
@@ -557,23 +532,23 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_ping(__context, true,
+ return begin_ice_ping(__context, true, false,
new IceInternal.Functional_OnewayCallback(__responseCb, __exceptionCb, __sentCb));
}
- private AsyncResult
- begin_ice_ping(java.util.Map<String, String> __context, boolean __explicitCtx, IceInternal.CallbackBase __cb)
+ private AsyncResult begin_ice_ping(java.util.Map<String, String> __context, boolean __explicitCtx,
+ boolean __synchronous, IceInternal.CallbackBase __cb)
{
- IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_ping_name, __cb);
+ IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_ping_name, __cb);
try
{
- __result.__prepare(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx);
- __result.__writeEmptyParams();
- __result.__invoke(true);
+ __result.prepare(__ice_ping_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
+ __result.writeEmptyParams();
+ __result.invoke(true);
}
catch(Exception __ex)
{
- __result.__invokeExceptionAsync(__ex);
+ __result.invokeExceptionAsync(__ex);
}
return __result;
}
@@ -623,30 +598,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
ice_ids(java.util.Map<String, String> __context, boolean __explicitCtx)
{
__checkTwowayOnly(__ice_id_name);
- IceInternal.Outgoing __og = getOutgoing(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx);
- try
- {
- __og.writeEmptyParams();
- if(!__og.invoke())
- {
- try
- {
- __og.throwUserException();
- }
- catch(UserException __ex)
- {
- throw new UnknownUserException(__ex.ice_name(), __ex);
- }
- }
- IceInternal.BasicStream __is = __og.startReadParams();
- String[] __ret = __is.readStringSeq();
- __og.endReadParams();
- return __ret;
- }
- finally
- {
- reclaimOutgoing(__og);
- }
+ return end_ice_ids(begin_ice_ids(__context, __explicitCtx, true, null));
}
/**
@@ -658,7 +610,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ids()
{
- return begin_ice_ids(null, false, null);
+ return begin_ice_ids(null, false, false, null);
}
/**
@@ -671,7 +623,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ids(java.util.Map<String, String> __context)
{
- return begin_ice_ids(__context, true, null);
+ return begin_ice_ids(__context, true, false, null);
}
/**
@@ -684,7 +636,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ids(Callback __cb)
{
- return begin_ice_ids(null, false, __cb);
+ return begin_ice_ids(null, false, false,__cb);
}
/**
@@ -698,7 +650,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ids(java.util.Map<String, String> __context, Callback __cb)
{
- return begin_ice_ids(__context, true, __cb);
+ return begin_ice_ids(__context, true, false,__cb);
}
/**
@@ -711,7 +663,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ids(Callback_Object_ice_ids __cb)
{
- return begin_ice_ids(null, false, __cb);
+ return begin_ice_ids(null, false, false,__cb);
}
/**
@@ -725,10 +677,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_ids(java.util.Map<String, String> __context, Callback_Object_ice_ids __cb)
{
- return begin_ice_ids(__context, true, __cb);
+ return begin_ice_ids(__context, true, false,__cb);
}
- class FunctionalCallback_Object_ice_ids extends IceInternal.Functional_TwowayCallbackArg1<String[]>
+ private class FunctionalCallback_Object_ice_ids extends IceInternal.Functional_TwowayCallbackArg1<String[]>
{
FunctionalCallback_Object_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
@@ -756,7 +708,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
begin_ice_ids(IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_ids(null, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, null));
+ return begin_ice_ids(null, false, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb,
+ null));
}
/**
@@ -773,8 +726,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_ids(null, false,
- new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, __sentCb));
+ return begin_ice_ids(null, false, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb,
+ __sentCb));
}
/**
@@ -791,8 +744,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<String[]> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_ids(__context, true,
- new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, null));
+ return begin_ice_ids(__context, true, false, new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb,
+ null));
}
/**
@@ -811,24 +764,24 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_ids(__context, true,
+ return begin_ice_ids(__context, true, false,
new FunctionalCallback_Object_ice_ids(__responseCb, __exceptionCb, __sentCb));
}
- private AsyncResult
- begin_ice_ids(java.util.Map<String, String> __context, boolean __explicitCtx, IceInternal.CallbackBase __cb)
+ private AsyncResult begin_ice_ids(java.util.Map<String, String> __context, boolean __explicitCtx,
+ boolean __synchronous, IceInternal.CallbackBase __cb)
{
__checkAsyncTwowayOnly(__ice_ids_name);
- IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_ids_name, __cb);
+ IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_ids_name, __cb);
try
{
- __result.__prepare(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx);
- __result.__writeEmptyParams();
- __result.__invoke(true);
+ __result.prepare(__ice_ids_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
+ __result.writeEmptyParams();
+ __result.invoke(true);
}
catch(Exception __ex)
{
- __result.__invokeExceptionAsync(__ex);
+ __result.invokeExceptionAsync(__ex);
}
return __result;
}
@@ -842,25 +795,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
**/
@Override
public final String[]
- end_ice_ids(AsyncResult __result)
+ end_ice_ids(AsyncResult __iresult)
{
- AsyncResult.__check(__result, this, __ice_ids_name);
- if(!__result.__wait())
+ IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult;
+ IceInternal.AsyncResultI.check(__result, this, __ice_ids_name);
+ try
{
- try
+ if(!__result.__wait())
{
- __result.__throwUserException();
+ try
+ {
+ __result.throwUserException();
+ }
+ catch(UserException __ex)
+ {
+ throw new UnknownUserException(__ex.ice_name(), __ex);
+ }
}
- catch(UserException __ex)
+ String[] __ret = null;
+ IceInternal.BasicStream __is = __result.startReadParams();
+ __ret = StringSeqHelper.read(__is);
+ __result.endReadParams();
+ return __ret;
+ }
+ finally
+ {
+ if(__result != null)
{
- throw new UnknownUserException(__ex.ice_name(), __ex);
+ __result.cacheMessageBuffers();
}
}
- String[] __ret = null;
- IceInternal.BasicStream __is = __result.__startReadParams();
- __ret = StringSeqHelper.read(__is);
- __result.__endReadParams();
- return __ret;
}
static public final void __ice_ids_completed(TwowayCallbackArg1<String[]> __cb, AsyncResult __result)
@@ -914,30 +878,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
ice_id(java.util.Map<String, String> __context, boolean __explicitCtx)
{
__checkTwowayOnly(__ice_id_name);
- IceInternal.Outgoing __og = getOutgoing(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx);
- try
- {
- __og.writeEmptyParams();
- if(!__og.invoke())
- {
- try
- {
- __og.throwUserException();
- }
- catch(UserException __ex)
- {
- throw new UnknownUserException(__ex.ice_name(), __ex);
- }
- }
- IceInternal.BasicStream __is = __og.startReadParams();
- String __ret = __is.readString();
- __og.endReadParams();
- return __ret;
- }
- finally
- {
- reclaimOutgoing(__og);
- }
+ return end_ice_id(begin_ice_id(__context, __explicitCtx, true, null));
}
/**
@@ -949,7 +890,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_id()
{
- return begin_ice_id(null, false, null);
+ return begin_ice_id(null, false, false, null);
}
/**
@@ -962,7 +903,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_id(java.util.Map<String, String> __context)
{
- return begin_ice_id(__context, true, null);
+ return begin_ice_id(__context, true, false, null);
}
/**
@@ -975,7 +916,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_id(Callback __cb)
{
- return begin_ice_id(null, false, __cb);
+ return begin_ice_id(null, false, false, __cb);
}
/**
@@ -989,7 +930,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_id(java.util.Map<String, String> __context, Callback __cb)
{
- return begin_ice_id(__context, true, __cb);
+ return begin_ice_id(__context, true, false, __cb);
}
/**
@@ -1002,7 +943,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_id(Callback_Object_ice_id __cb)
{
- return begin_ice_id(null, false, __cb);
+ return begin_ice_id(null, false, false, __cb);
}
/**
@@ -1016,10 +957,10 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_id(java.util.Map<String, String> __context, Callback_Object_ice_id __cb)
{
- return begin_ice_id(__context, true, __cb);
+ return begin_ice_id(__context, true, false, __cb);
}
- class FunctionalCallback_Object_ice_id extends IceInternal.Functional_TwowayCallbackArg1<String>
+ private class FunctionalCallback_Object_ice_id extends IceInternal.Functional_TwowayCallbackArg1<String>
{
FunctionalCallback_Object_ice_id(IceInternal.Functional_GenericCallback1<String> responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb,
@@ -1047,7 +988,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
begin_ice_id(IceInternal.Functional_GenericCallback1<String> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_id(null, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null));
+ return begin_ice_id(null, false, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null));
}
/**
@@ -1064,7 +1005,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_id(null, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, __sentCb));
+ return begin_ice_id(null, false, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, __sentCb));
}
/**
@@ -1081,7 +1022,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<String> __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb)
{
- return begin_ice_id(__context, true, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null));
+ return begin_ice_id(__context, true, false, new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, null));
}
/**
@@ -1100,24 +1041,24 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
{
- return begin_ice_id(__context, true,
+ return begin_ice_id(__context, true, false,
new FunctionalCallback_Object_ice_id(__responseCb, __exceptionCb, __sentCb));
}
- private AsyncResult
- begin_ice_id(java.util.Map<String, String> __context, boolean __explicitCtx, IceInternal.CallbackBase __cb)
+ private AsyncResult begin_ice_id(java.util.Map<String, String> __context, boolean __explicitCtx,
+ boolean __synchronous, IceInternal.CallbackBase __cb)
{
__checkAsyncTwowayOnly(__ice_id_name);
- IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_id_name, __cb);
+ IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_id_name, __cb);
try
{
- __result.__prepare(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx);
- __result.__writeEmptyParams();
- __result.__invoke(true);
+ __result.prepare(__ice_id_name, OperationMode.Nonmutating, __context, __explicitCtx, __synchronous);
+ __result.writeEmptyParams();
+ __result.invoke(true);
}
catch(Exception __ex)
{
- __result.__invokeExceptionAsync(__ex);
+ __result.invokeExceptionAsync(__ex);
}
return __result;
}
@@ -1130,25 +1071,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
**/
@Override
public final String
- end_ice_id(AsyncResult __result)
+ end_ice_id(AsyncResult __iresult)
{
- AsyncResult.__check(__result, this, __ice_id_name);
- if(!__result.__wait())
+ IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult;
+ IceInternal.AsyncResultI.check(__result, this, __ice_id_name);
+ try
{
- try
+ if(!__result.__wait())
{
- __result.__throwUserException();
+ try
+ {
+ __result.throwUserException();
+ }
+ catch(UserException __ex)
+ {
+ throw new UnknownUserException(__ex.ice_name(), __ex);
+ }
}
- catch(UserException __ex)
+ String __ret = null;
+ IceInternal.BasicStream __is = __result.startReadParams();
+ __ret = __is.readString();
+ __result.endReadParams();
+ return __ret;
+ }
+ finally
+ {
+ if(__result != null)
{
- throw new UnknownUserException(__ex.ice_name(), __ex);
+ __result.cacheMessageBuffers();
}
}
- String __ret = null;
- IceInternal.BasicStream __is = __result.__startReadParams();
- __ret = __is.readString();
- __result.__endReadParams();
- return __ret;
}
static public final void __ice_id_completed(TwowayCallbackArg1<String> __cb, AsyncResult __result)
@@ -1223,24 +1175,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
ice_invoke(String operation, OperationMode mode, byte[] inParams, ByteSeqHolder outParams,
java.util.Map<String, String> context, boolean explicitCtx)
{
- IceInternal.Outgoing __og = getOutgoing(operation, mode, context, explicitCtx);
- try
- {
- __og.writeParamEncaps(inParams);
- boolean ok = __og.invoke();
- if(_reference.getMode() == IceInternal.Reference.ModeTwoway)
- {
- if(outParams != null)
- {
- outParams.value = __og.readParamEncaps();
- }
- }
- return ok;
- }
- finally
- {
- reclaimOutgoing(__og);
- }
+ return end_ice_invoke(outParams, begin_ice_invoke(operation, mode, inParams, context, explicitCtx, true, null));
}
private static final String __ice_invoke_name = "ice_invoke";
@@ -1260,7 +1195,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams)
{
- return begin_ice_invoke(operation, mode, inParams, null, false, null);
+ return begin_ice_invoke(operation, mode, inParams, null, false, false, null);
}
/**
@@ -1281,7 +1216,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams,
java.util.Map<String, String> __context)
{
- return begin_ice_invoke(operation, mode, inParams, __context, true, null);
+ return begin_ice_invoke(operation, mode, inParams, __context, true, false, null);
}
/**
@@ -1301,7 +1236,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, Callback __cb)
{
- return begin_ice_invoke(operation, mode, inParams, null, false, __cb);
+ return begin_ice_invoke(operation, mode, inParams, null, false, false, __cb);
}
/**
@@ -1323,7 +1258,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context,
Callback __cb)
{
- return begin_ice_invoke(operation, mode, inParams, __context, true, __cb);
+ return begin_ice_invoke(operation, mode, inParams, __context, true, false, __cb);
}
/**
@@ -1343,7 +1278,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final AsyncResult
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, Callback_Object_ice_invoke __cb)
{
- return begin_ice_invoke(operation, mode, inParams, null, false, __cb);
+ return begin_ice_invoke(operation, mode, inParams, null, false, false, __cb);
}
/**
@@ -1365,7 +1300,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context,
Callback_Object_ice_invoke __cb)
{
- return begin_ice_invoke(operation, mode, inParams, __context, true, __cb);
+ return begin_ice_invoke(operation, mode, inParams, __context, true, false, __cb);
}
/**
@@ -1389,7 +1324,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb,
IceInternal.Functional_BoolCallback sentCb)
{
- return begin_ice_invoke(operation, mode, inParams, null, false, responseCb, exceptionCb, sentCb);
+ return begin_ice_invoke(operation, mode, inParams, null, false, false, responseCb, exceptionCb, sentCb);
}
/**
@@ -1411,7 +1346,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
FunctionalCallback_Object_ice_invoke_Response responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb)
{
- return begin_ice_invoke(operation, mode, inParams, null, false, responseCb, exceptionCb, null);
+ return begin_ice_invoke(operation, mode, inParams, null, false, false, responseCb, exceptionCb, null);
}
/**
@@ -1437,7 +1372,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb,
IceInternal.Functional_BoolCallback sentCb)
{
- return begin_ice_invoke(operation, mode, inParams, context, true, responseCb, exceptionCb, sentCb);
+ return begin_ice_invoke(operation, mode, inParams, context, true, false, responseCb, exceptionCb, sentCb);
}
/**
@@ -1461,12 +1396,13 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
FunctionalCallback_Object_ice_invoke_Response responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> exceptionCb)
{
- return begin_ice_invoke(operation, mode, inParams, context, true, responseCb, exceptionCb, null);
+ return begin_ice_invoke(operation, mode, inParams, context, true, false, responseCb, exceptionCb, null);
}
private final AsyncResult begin_ice_invoke(String operation, OperationMode mode, byte[] inParams,
java.util.Map<String, String> __context,
boolean __explicitCtx,
+ boolean __synchronous,
FunctionalCallback_Object_ice_invoke_Response __responseCb,
IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb,
IceInternal.Functional_BoolCallback __sentCb)
@@ -1495,24 +1431,24 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
FunctionalCallback_Object_ice_invoke_Response __responseCb;
}
- return begin_ice_invoke(operation, mode, inParams, __context, __explicitCtx,
+ return begin_ice_invoke(operation, mode, inParams, __context, __explicitCtx, __synchronous,
new CB(__responseCb, __exceptionCb, __sentCb));
}
private AsyncResult
begin_ice_invoke(String operation, OperationMode mode, byte[] inParams, java.util.Map<String, String> __context,
- boolean __explicitCtx, IceInternal.CallbackBase __cb)
+ boolean __explicitCtx, boolean __synchronous, IceInternal.CallbackBase __cb)
{
- IceInternal.OutgoingAsync __result = new IceInternal.OutgoingAsync(this, __ice_invoke_name, __cb);
+ IceInternal.OutgoingAsync __result = getOutgoingAsync(__ice_invoke_name, __cb);
try
{
- __result.__prepare(operation, mode, __context, __explicitCtx);
- __result.__writeParamEncaps(inParams);
- __result.__invoke(true);
+ __result.prepare(operation, mode, __context, __explicitCtx, __synchronous);
+ __result.writeParamEncaps(inParams);
+ __result.invoke(true);
}
catch(Exception __ex)
{
- __result.__invokeExceptionAsync(__ex);
+ __result.invokeExceptionAsync(__ex);
}
return __result;
}
@@ -1530,18 +1466,29 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
**/
@Override
public final boolean
- end_ice_invoke(ByteSeqHolder outParams, AsyncResult __result)
+ end_ice_invoke(ByteSeqHolder outParams, AsyncResult __iresult)
{
- AsyncResult.__check(__result, this, __ice_invoke_name);
- boolean ok = __result.__wait();
- if(_reference.getMode() == IceInternal.Reference.ModeTwoway)
+ IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult;
+ IceInternal.AsyncResultI.check(__result, this, __ice_invoke_name);
+ try
+ {
+ boolean ok = __result.__wait();
+ if(_reference.getMode() == IceInternal.Reference.ModeTwoway)
+ {
+ if(outParams != null)
+ {
+ outParams.value = __result.readParamEncaps();
+ }
+ }
+ return ok;
+ }
+ finally
{
- if(outParams != null)
+ if(__result != null)
{
- outParams.value = __result.__readParamEncaps();
+ __result.cacheMessageBuffers();
}
}
- return ok;
}
public static void __ice_invoke_completed(_Callback_Object_ice_invoke __cb, AsyncResult __result)
@@ -2389,13 +2336,19 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
handler = __getRequestHandler();
try
{
+ // Wait for the connection to be established.
return handler.waitForConnection();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
}
+ catch(RetryException e)
+ {
+ // Clear request handler and retry.
+ __setRequestHandler(handler, null);
+ }
catch(Ice.Exception ex)
{
try
@@ -2479,15 +2432,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public void
ice_flushBatchRequests()
{
- IceInternal.BatchOutgoing __og = new IceInternal.BatchOutgoing(this, __ice_flushBatchRequests_name);
- try
- {
- __og.invoke();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
+ end_ice_flushBatchRequests(begin_ice_flushBatchRequests());
}
/**
@@ -2577,16 +2522,17 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
catch(Exception __ex)
{
- __result.__invokeExceptionAsync(__ex);
+ __result.invokeExceptionAsync(__ex);
}
return __result;
}
@Override
public void
- end_ice_flushBatchRequests(AsyncResult __result)
+ end_ice_flushBatchRequests(AsyncResult __iresult)
{
- AsyncResult.__check(__result, this, __ice_flushBatchRequests_name);
+ IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
+ IceInternal.AsyncResultI.check(__result, this, __ice_flushBatchRequests_name);
__result.__wait();
}
@@ -2708,50 +2654,36 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
}
- public void
- __invoke(IceInternal.Outgoing __og)
+ public final void
+ __end(AsyncResult __iresult, String operation)
{
- //
- // Helper for operations without out/return parameters and user
- // exceptions.
- //
- boolean __ok = __og.invoke();
- if(__og.hasResponse())
+ IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
+ IceInternal.AsyncResultI.check(__result, this, operation);
+ try
{
- if(!__ok)
+ boolean ok = __result.__wait();
+ if(_reference.getMode() == IceInternal.Reference.ModeTwoway)
{
- try
- {
- __og.throwUserException();
- }
- catch(UserException __ex)
+ if(!ok)
{
- throw new UnknownUserException(__ex.ice_name(), __ex);
+ try
+ {
+ __result.throwUserException();
+ }
+ catch(UserException __ex)
+ {
+ throw new UnknownUserException(__ex.ice_name(), __ex);
+ }
}
+ __result.readEmptyParams();
}
- __og.readEmptyParams();
}
- }
-
- public final void
- __end(AsyncResult __result, String operation)
- {
- AsyncResult.__check(__result, this, operation);
- boolean ok = __result.__wait();
- if(_reference.getMode() == IceInternal.Reference.ModeTwoway)
+ finally
{
- if(!ok)
+ if(__result != null)
{
- try
- {
- __result.__throwUserException();
- }
- catch(UserException __ex)
- {
- throw new UnknownUserException(__ex.ice_name(), __ex);
- }
+ __result.cacheMessageBuffers();
}
- __result.__readEmptyParams();
}
}
@@ -2835,6 +2767,19 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
}
}
+
+ public void
+ cacheMessageBuffers(IceInternal.BasicStream is, IceInternal.BasicStream os)
+ {
+ synchronized(this)
+ {
+ if(_streamCache == null)
+ {
+ _streamCache = new LinkedList<StreamCacheEntry>();
+ }
+ _streamCache.add(new StreamCacheEntry(is, os));
+ }
+ }
private IceInternal.RequestHandler
createRequestHandler()
@@ -2873,6 +2818,150 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
_reference = ref;
}
+ protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, String id, Class<T> proxyCls, Class<?> helperCls)
+ {
+ return checkedCastImpl(obj, null, false, null, false, id, proxyCls, helperCls);
+ }
+
+ protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, java.util.Map<String, String> ctx, String id,
+ Class<T> proxyCls, Class<?> helperCls)
+ {
+ return checkedCastImpl(obj, ctx, true, null, false, id, proxyCls, helperCls);
+ }
+
+ protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, String facet, String id, Class<T> proxyCls,
+ Class<?> helperCls)
+ {
+ return checkedCastImpl(obj, null, false, facet, true, id, proxyCls, helperCls);
+ }
+
+ protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, String facet, java.util.Map<String, String> ctx,
+ String id, Class<T> proxyCls, Class<?> helperCls)
+ {
+ return checkedCastImpl(obj, ctx, true, facet, true, id, proxyCls, helperCls);
+ }
+
+ protected static <T> T checkedCastImpl(Ice.ObjectPrx obj, java.util.Map<String, String> ctx, boolean explicitCtx,
+ String facet, boolean explicitFacet, String id, Class<T> proxyCls,
+ Class<?> helperCls)
+ {
+ T d = null;
+ if(obj != null)
+ {
+ if(explicitFacet)
+ {
+ obj = obj.ice_facet(facet);
+ }
+ if(proxyCls.isInstance(obj))
+ {
+ d = proxyCls.cast(obj);
+ }
+ else
+ {
+ try
+ {
+ final boolean b = explicitCtx ? obj.ice_isA(id, ctx) : obj.ice_isA(id);
+ if(b)
+ {
+ ObjectPrxHelperBase h = null;
+ try
+ {
+ h = ObjectPrxHelperBase.class.cast(helperCls.newInstance());
+ }
+ catch(InstantiationException ex)
+ {
+ throw new SyscallException(ex);
+ }
+ catch(IllegalAccessException ex)
+ {
+ throw new SyscallException(ex);
+ }
+ h.__copyFrom(obj);
+ d = proxyCls.cast(h);
+ }
+ }
+ catch(FacetNotExistException ex)
+ {
+ }
+ }
+ }
+ return d;
+ }
+
+ protected static <T> T uncheckedCastImpl(Ice.ObjectPrx obj, Class<T> proxyCls, Class<?> helperCls)
+ {
+ return uncheckedCastImpl(obj, null, false, proxyCls, helperCls);
+ }
+
+ protected static <T> T uncheckedCastImpl(Ice.ObjectPrx obj, String facet, Class<T> proxyCls, Class<?> helperCls)
+ {
+ return uncheckedCastImpl(obj, facet, true, proxyCls, helperCls);
+ }
+
+ protected static <T> T uncheckedCastImpl(Ice.ObjectPrx obj, String facet, boolean explicitFacet, Class<T> proxyCls,
+ Class<?> helperCls)
+ {
+ T d = null;
+ if(obj != null)
+ {
+ try
+ {
+ if(explicitFacet)
+ {
+ ObjectPrxHelperBase h = ObjectPrxHelperBase.class.cast(helperCls.newInstance());
+ h.__copyFrom(obj.ice_facet(facet));
+ d = proxyCls.cast(h);
+ }
+ else
+ {
+ if(proxyCls.isInstance(obj))
+ {
+ d = proxyCls.cast(obj);
+ }
+ else
+ {
+ ObjectPrxHelperBase h = ObjectPrxHelperBase.class.cast(helperCls.newInstance());
+ h.__copyFrom(obj);
+ d = proxyCls.cast(h);
+ }
+ }
+ }
+ catch(InstantiationException ex)
+ {
+ throw new SyscallException(ex);
+ }
+ catch(IllegalAccessException ex)
+ {
+ throw new SyscallException(ex);
+ }
+ }
+ return d;
+ }
+
+ protected IceInternal.OutgoingAsync
+ getOutgoingAsync(String operation, IceInternal.CallbackBase cb)
+ {
+ StreamCacheEntry cacheEntry = null;
+ if(_reference.getInstance().cacheMessageBuffers() > 0)
+ {
+ synchronized(this)
+ {
+ if(_streamCache != null && !_streamCache.isEmpty())
+ {
+ cacheEntry = _streamCache.remove(0);
+ }
+ }
+ }
+ if(cacheEntry == null)
+ {
+ return new IceInternal.OutgoingAsync(this, operation, cb);
+ }
+ else
+ {
+ return new IceInternal.OutgoingAsync(this, operation, cb, cacheEntry.is, cacheEntry.os);
+ }
+ }
+
private final ObjectPrxHelperBase
newInstance(IceInternal.Reference ref)
{
@@ -2938,55 +3027,20 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
}
- protected IceInternal.Outgoing
- getOutgoing(String operation, OperationMode mode, java.util.Map<String, String> context, boolean explicitCtx)
+ private static class StreamCacheEntry
{
- IceInternal.Outgoing out = null;
- if(_reference.getInstance().cacheMessageBuffers() > 0)
- {
- synchronized(this)
- {
- if(_outgoingCache != null)
- {
- out = _outgoingCache;
- _outgoingCache = _outgoingCache.next;
- out.next = null;
- }
- }
- }
- if(out == null)
- {
- out = new IceInternal.Outgoing(this, operation, mode, context, explicitCtx);
- }
- else
+ StreamCacheEntry(IceInternal.BasicStream is, IceInternal.BasicStream os)
{
- out.reset(this, operation, mode, context, explicitCtx);
+ this.is = is;
+ this.os = os;
}
- return out;
- }
-
- protected void
- reclaimOutgoing(IceInternal.Outgoing out)
- {
- out.detach();
- if(_reference.getInstance().cacheMessageBuffers() > 0)
- {
- //
- // Clear references to Ice objects as soon as possible.
- //
- out.reclaim();
-
- synchronized(this)
- {
- out.next = _outgoingCache;
- _outgoingCache = out;
- }
- }
+ IceInternal.BasicStream is;
+ IceInternal.BasicStream os;
}
private transient IceInternal.Reference _reference;
private transient IceInternal.RequestHandler _requestHandler;
- private transient IceInternal.Outgoing _outgoingCache;
+ private transient List<StreamCacheEntry> _streamCache;
public static final long serialVersionUID = 0L;
}
diff --git a/java/src/Ice/OnewayCallback.java b/java/src/Ice/OnewayCallback.java
index f7403e60526..3401c98fc9d 100644
--- a/java/src/Ice/OnewayCallback.java
+++ b/java/src/Ice/OnewayCallback.java
@@ -50,6 +50,12 @@ public abstract class OnewayCallback extends IceInternal.CallbackBase
}
@Override
+ public final boolean __hasSentCallback()
+ {
+ return true;
+ }
+
+ @Override
public final void __completed(AsyncResult __result)
{
try
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java
new file mode 100644
index 00000000000..9b211f0bb72
--- /dev/null
+++ b/java/src/IceInternal/AsyncResultI.java
@@ -0,0 +1,578 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+import Ice.AsyncResult;
+import Ice.Communicator;
+import Ice.CommunicatorDestroyedException;
+import Ice.Connection;
+import Ice.ObjectPrx;
+import Ice.UserException;
+
+/**
+ * An AsyncResult object is the return value of an asynchronous invocation.
+ * With this object, an application can obtain several attributes of the
+ * invocation and discover its outcome.
+ **/
+public class AsyncResultI implements Ice.AsyncResult
+{
+ protected AsyncResultI(Communicator communicator, IceInternal.Instance instance, String op,
+ IceInternal.CallbackBase del)
+ {
+ _communicator = communicator;
+ _instance = instance;
+ _operation = op;
+ _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
+ _state = 0;
+ _sentSynchronously = false;
+ _exception = null;
+ _callback = del;
+ }
+
+ protected AsyncResultI(Communicator communicator, Instance instance, String op, CallbackBase del, BasicStream is,
+ BasicStream os)
+ {
+ _communicator = communicator;
+ _instance = instance;
+ _operation = op;
+ _os = os;
+ _is = is;
+ _state = 0;
+ _sentSynchronously = false;
+ _exception = null;
+ _callback = del;
+ }
+
+ /**
+ * Returns the communicator that sent the invocation.
+ *
+ * @return The communicator.
+ **/
+ @Override
+ public Communicator getCommunicator()
+ {
+ return _communicator;
+ }
+
+ /**
+ * Returns the connection that was used for the invocation.
+ *
+ * @return The connection.
+ **/
+ @Override
+ public Connection getConnection()
+ {
+ return null;
+ }
+
+ /**
+ * Returns the proxy that was used to call the <code>begin_</code> method.
+ *
+ * @return The proxy.
+ **/
+ @Override
+ public ObjectPrx getProxy()
+ {
+ return null;
+ }
+
+ /**
+ * Indicates whether the result of an invocation is available.
+ *
+ * @return True if the result is available, which means a call to the <code>end_</code>
+ * method will not block. The method returns false if the result is not yet available.
+ **/
+ @Override
+ public final boolean isCompleted()
+ {
+ synchronized(_monitor)
+ {
+ return (_state & StateDone) > 0;
+ }
+ }
+
+ /**
+ * Blocks the caller until the result of the invocation is available.
+ **/
+ @Override
+ public final void waitForCompleted()
+ {
+ synchronized(_monitor)
+ {
+ while((_state & StateDone) == 0)
+ {
+ try
+ {
+ _monitor.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ }
+ }
+ }
+
+ /**
+ * When you call the <code>begin_</code> method, the Ice run time attempts to
+ * write the corresponding request to the client-side transport. If the
+ * transport cannot accept the request, the Ice run time queues the request
+ * for later transmission. This method returns true if, at the time it is called,
+ * the request has been written to the local transport (whether it was initially
+ * queued or not). Otherwise, if the request is still queued, this method returns
+ * false.
+ *
+ * @return True if the request has been sent, or false if the request is queued.
+ **/
+ @Override
+ public final boolean isSent()
+ {
+ synchronized(_monitor)
+ {
+ return (_state & StateSent) > 0;
+ }
+ }
+
+ /**
+ * Blocks the caller until the request has been written to the client-side transport.
+ **/
+ @Override
+ public final void waitForSent()
+ {
+ synchronized(_monitor)
+ {
+ while((_state & StateSent) == 0 && _exception == null)
+ {
+ try
+ {
+ _monitor.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ }
+ }
+ }
+
+ /**
+ * If the invocation failed with a local exception, throws the local exception.
+ **/
+ @Override
+ public final void throwLocalException()
+ {
+ synchronized(_monitor)
+ {
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ }
+ }
+
+ /**
+ * This method returns true if a request was written to the client-side
+ * transport without first being queued. If the request was initially
+ * queued, this method returns false (independent of whether the request
+ * is still in the queue or has since been written to the client-side transport).
+ *
+ * @return True if the request was sent without being queued, or false
+ * otherwise.
+ **/
+ @Override
+ public final boolean sentSynchronously()
+ {
+ return _sentSynchronously; // No lock needed, immutable once __send() is called
+ }
+
+ /**
+ * Returns the name of the operation.
+ *
+ * @return The operation name.
+ **/
+ @Override
+ public final String getOperation()
+ {
+ return _operation;
+ }
+
+ public final IceInternal.BasicStream getOs()
+ {
+ return _os;
+ }
+
+ public IceInternal.BasicStream
+ startReadParams()
+ {
+ _is.startReadEncaps();
+ return _is;
+ }
+
+ public void
+ endReadParams()
+ {
+ _is.endReadEncaps();
+ }
+
+ public void
+ readEmptyParams()
+ {
+ _is.skipEmptyEncaps(null);
+ }
+
+ public byte[]
+ readParamEncaps()
+ {
+ return _is.readEncaps(null);
+ }
+
+ public final boolean __wait()
+ {
+ synchronized(_monitor)
+ {
+ if((_state & StateEndCalled) > 0)
+ {
+ throw new java.lang.IllegalArgumentException("end_ method called more than once");
+ }
+ _state |= StateEndCalled;
+ while((_state & StateDone) == 0)
+ {
+ try
+ {
+ _monitor.wait();
+ }
+ catch(InterruptedException ex)
+ {
+ //
+ // Remove the EndCalled flag since it should be possible to
+ // call end_* again on the AsyncResult.
+ //
+ _state &= ~StateEndCalled;
+ throw new Ice.OperationInterruptedException();
+ }
+ }
+ if(_exception != null)
+ {
+ //throw (LocalException)_exception.fillInStackTrace();
+ throw _exception;
+ }
+ return (_state & StateOK) > 0;
+ }
+ }
+
+ public final void throwUserException()
+ throws UserException
+ {
+ try
+ {
+ _is.startReadEncaps();
+ _is.throwException(null);
+ }
+ catch(UserException ex)
+ {
+ _is.endReadEncaps();
+ throw ex;
+ }
+ }
+
+ public void invokeExceptionAsync(final Ice.Exception ex)
+ {
+ //
+ // This is called when it's not safe to call the exception callback synchronously
+ // from this thread. Instead the exception callback is called asynchronously from
+ // the client thread pool.
+ //
+ try
+ {
+ _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
+ {
+ @Override
+ public void
+ run()
+ {
+ invokeException(ex);
+ }
+ });
+ }
+ catch(CommunicatorDestroyedException exc)
+ {
+ throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly.
+ }
+ }
+
+ public final void invokeException(Ice.Exception ex)
+ {
+ synchronized(_monitor)
+ {
+ _state |= StateDone;
+ //_os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ _exception = ex;
+ _monitor.notifyAll();
+ }
+
+ invokeCompleted();
+ }
+
+ protected final void invokeSentInternal()
+ {
+ //
+ // Note: no need to change the _state here, specializations are responsible for
+ // changing the state.
+ //
+
+ if(_callback != null)
+ {
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
+ }
+
+ try
+ {
+ _callback.__sent(this);
+ }
+ catch(RuntimeException ex)
+ {
+ warning(ex);
+ }
+ catch(AssertionError exc)
+ {
+ error(exc);
+ }
+ catch(OutOfMemoryError exc)
+ {
+ error(exc);
+ }
+ finally
+ {
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+ }
+
+ if(_observer != null)
+ {
+ Ice.ObjectPrx proxy = getProxy();
+ if(proxy == null || !proxy.ice_isTwoway())
+ {
+ _observer.detach();
+ }
+ }
+ }
+
+ public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
+ {
+ if(_observer != null)
+ {
+ _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size);
+ if(_childObserver != null)
+ {
+ _childObserver.attach();
+ }
+ }
+ }
+
+ void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
+ {
+ if(_observer != null)
+ {
+ _childObserver = _observer.getCollocatedObserver(adapter,
+ requestId,
+ _os.size() - IceInternal.Protocol.headerSize - 4);
+ if(_childObserver != null)
+ {
+ _childObserver.attach();
+ }
+ }
+ }
+
+ final void invokeSentAsync()
+ {
+ //
+ // This is called when it's not safe to call the sent callback synchronously
+ // from this thread. Instead the exception callback is called asynchronously from
+ // the client thread pool.
+ //
+ try
+ {
+ _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
+ {
+ @Override
+ public void
+ run()
+ {
+ invokeSentInternal();
+ }
+ });
+ }
+ catch(CommunicatorDestroyedException exc)
+ {
+ }
+ }
+
+ public static void check(AsyncResult r, ObjectPrx prx, String operation)
+ {
+ check(r, operation);
+ if(r.getProxy() != prx)
+ {
+ throw new IllegalArgumentException("Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding begin_" +
+ operation + " method");
+ }
+ }
+
+ public static void check(AsyncResult r, Connection con, String operation)
+ {
+ check(r, operation);
+ if(r.getConnection() != con)
+ {
+ throw new IllegalArgumentException("Connection for call to end_" + operation +
+ " does not match connection that was used to call corresponding begin_" +
+ operation + " method");
+ }
+ }
+
+ public static void check(AsyncResult r, Communicator com, String operation)
+ {
+ check(r, operation);
+ if(r.getCommunicator() != com)
+ {
+ throw new IllegalArgumentException("Communicator for call to end_" + operation +
+ " does not match communicator that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+ }
+
+ public void cacheMessageBuffers()
+ {
+ }
+
+ protected final void invokeCompleted()
+ {
+ //
+ // Note: no need to change the _state here, specializations are responsible for
+ // changing the state.
+ //
+
+ if(_callback != null)
+ {
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
+ }
+
+ try
+ {
+ _callback.__completed(this);
+ }
+ catch(RuntimeException ex)
+ {
+ warning(ex);
+ }
+ catch(AssertionError exc)
+ {
+ error(exc);
+ }
+ catch(OutOfMemoryError exc)
+ {
+ error(exc);
+ }
+ finally
+ {
+ if(_instance.useApplicationClassLoader())
+ {
+ Thread.currentThread().setContextClassLoader(null);
+ }
+ }
+
+ cacheMessageBuffers();
+ }
+
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ }
+
+ protected void
+ timeout()
+ {
+ IceInternal.RequestHandler handler;
+ synchronized(_monitor)
+ {
+ handler = _timeoutRequestHandler;
+ _timeoutRequestHandler = null;
+ }
+
+ if(handler != null)
+ {
+ handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this,
+ new Ice.InvocationTimeoutException());
+ }
+ }
+
+ private static void check(AsyncResult r, String operation)
+ {
+ if(r == null)
+ {
+ throw new IllegalArgumentException("AsyncResult == null");
+ }
+ else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality
+ {
+ throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " +
+ r.getOperation());
+ }
+ }
+
+ private final void warning(RuntimeException ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex);
+ _instance.initializationData().logger.warning(s);
+ }
+ }
+
+ private final void error(Error error)
+ {
+ String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error);
+ _instance.initializationData().logger.error(s);
+ }
+
+ protected Communicator _communicator;
+ protected IceInternal.Instance _instance;
+ protected String _operation;
+ protected Ice.Connection _cachedConnection;
+
+ protected java.lang.Object _monitor = new java.lang.Object();
+ protected IceInternal.BasicStream _is;
+ protected IceInternal.BasicStream _os;
+
+ protected IceInternal.RequestHandler _timeoutRequestHandler;
+ protected java.util.concurrent.Future<?> _future;
+
+ protected static final byte StateOK = 0x1;
+ protected static final byte StateDone = 0x2;
+ protected static final byte StateSent = 0x4;
+ protected static final byte StateEndCalled = 0x8;
+ protected static final byte StateCachedBuffers = 0x10;
+
+ protected byte _state;
+ protected boolean _sentSynchronously;
+ protected Ice.Exception _exception;
+
+ protected Ice.Instrumentation.InvocationObserver _observer;
+ protected Ice.Instrumentation.ChildInvocationObserver _childObserver;
+
+ protected IceInternal.CallbackBase _callback;
+}
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java
deleted file mode 100644
index a5f958eb10b..00000000000
--- a/java/src/IceInternal/BatchOutgoing.java
+++ /dev/null
@@ -1,220 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-import Ice.Instrumentation.Observer;
-import Ice.Instrumentation.InvocationObserver;
-
-public final class BatchOutgoing implements OutgoingMessageCallback
-{
- public
- BatchOutgoing(Ice.ConnectionI connection, Instance instance, String op)
- {
- _connection = connection;
- _sent = false;
- _os = new BasicStream(instance, Protocol.currentProtocolEncoding);
- _observer = IceInternal.ObserverHelper.get(instance, op);
- }
-
- public
- BatchOutgoing(Ice.ObjectPrxHelperBase proxy, String op)
- {
- _proxy = proxy;
- _sent = false;
- _os = new BasicStream(proxy.__reference().getInstance(), Protocol.currentProtocolEncoding);
- _observer = IceInternal.ObserverHelper.get(proxy, op);
- Protocol.checkSupportedProtocol(_proxy.__reference().getProtocol());
- }
-
- public void
- invoke()
- throws InterruptedException
- {
- assert(_proxy != null || _connection != null);
-
- if(_connection != null)
- {
- if(_connection.flushBatchRequests(this))
- {
- return;
- }
-
- synchronized(this)
- {
- while(_exception == null && !_sent)
- {
- wait();
- }
- if(_exception != null)
- {
- throw _exception;
- }
- }
- return;
- }
-
- RequestHandler handler = null;
- try
- {
- handler = _proxy.__getRequestHandler();
- if(handler.sendRequest(this))
- {
- return;
- }
-
- boolean timedOut = false;
- synchronized(this)
- {
- int timeout = _proxy.__reference().getInvocationTimeout();
- if(timeout > 0)
- {
- long now = Time.currentMonotonicTimeMillis();
- long deadline = now + timeout;
- while(_exception == null && !_sent && !timedOut)
- {
- wait(deadline - now);
- if(_exception == null && !_sent)
- {
- now = Time.currentMonotonicTimeMillis();
- timedOut = now >= deadline;
- }
- }
- }
- else
- {
- while(_exception == null && !_sent)
- {
- wait();
- }
- }
- }
-
- if(timedOut)
- {
- if(handler.requestCanceled(this, new Ice.InvocationTimeoutException()))
- {
- synchronized(this)
- {
- while(_exception == null)
- {
- wait();
- }
- }
- }
- }
-
- if(_exception != null)
- {
- throw (Ice.Exception)_exception.fillInStackTrace();
- }
- }
- catch(RetryException ex)
- {
- //
- // Clear request handler but don't retry or throw. Retrying
- // isn't useful, there were no batch requests associated with
- // the proxy's request handler.
- //
- _proxy.__setRequestHandler(handler, null);
- }
- catch(Ice.Exception ex)
- {
- _proxy.__setRequestHandler(handler, null); // Clear request handler
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- throw ex; // Throw to notify the user that batch requests were potentially lost.
- }
- }
-
- @Override
- public boolean
- send(Ice.ConnectionI connection, boolean compress, boolean response)
- {
- return connection.flushBatchRequests(this);
- }
-
- @Override
- public void
- invokeCollocated(CollocatedRequestHandler handler)
- {
- handler.invokeBatchRequests(this);
- }
-
- @Override
- synchronized public void
- sent()
- {
- if(_childObserver != null)
- {
- _childObserver.detach();
- _childObserver = null;
- }
- _sent = true;
- notify();
- }
-
- @Override
- public synchronized void
- finished(Ice.Exception ex)
- {
- if(_childObserver != null)
- {
- _childObserver.failed(ex.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- _exception = ex;
- notify();
- }
-
- public BasicStream
- os()
- {
- return _os;
- }
-
- public void
- attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int size)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getRemoteObserver(info, endpt, 0, size);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- public void
- attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getCollocatedObserver(adapter, requestId, _os.size() - Protocol.headerSize - 4);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- private Ice.ObjectPrxHelperBase _proxy;
- private Ice.ConnectionI _connection;
- private BasicStream _os;
- private boolean _sent;
- private Ice.Exception _exception;
-
- private InvocationObserver _observer;
- private Observer _childObserver;
-
-}
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index 74777db0d4b..d5953310639 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -9,16 +9,16 @@
package IceInternal;
-public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable
+public class BatchOutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback
{
- public BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback)
+ BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback)
{
super(communicator, instance, operation, callback);
}
@Override
public int
- __send(Ice.ConnectionI connection, boolean compress, boolean response)
+ send(Ice.ConnectionI connection, boolean compress, boolean response)
{
_cachedConnection = connection;
return connection.flushAsyncBatchRequests(this);
@@ -26,18 +26,18 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
@Override
public int
- __invokeCollocated(CollocatedRequestHandler handler)
+ invokeCollocated(CollocatedRequestHandler handler)
{
return handler.invokeAsyncBatchRequests(this);
}
@Override
public boolean
- __sent()
+ sent()
{
synchronized(_monitor)
{
- _state |= Done | OK | Sent;
+ _state |= StateDone | StateOK | StateSent;
//_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization
if(_childObserver != null)
{
@@ -51,20 +51,30 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
_timeoutRequestHandler = null;
}
_monitor.notifyAll();
+
+ if(_callback == null || !_callback.__hasSentCallback())
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
return true;
}
}
@Override
public void
- __invokeSent()
+ invokeSent()
{
- __invokeSentInternal();
+ invokeSentInternal();
}
@Override
public void
- __finished(Ice.Exception exc)
+ finished(Ice.Exception exc)
{
synchronized(_monitor)
{
@@ -81,29 +91,20 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
_timeoutRequestHandler = null;
}
}
- __invokeException(exc);
+ invokeException(exc);
}
@Override
public void
- __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
+ dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
- threadPool.dispatch(
- new DispatchWorkItem(connection)
+ threadPool.dispatch(new DispatchWorkItem(connection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- BatchOutgoingAsync.this.__finished(ex);
- }
- });
- }
-
- @Override
- public void
- run()
- {
- __runTimerTask();
+ BatchOutgoingAsync.this.finished(ex);
+ }
+ });
}
}
diff --git a/java/src/IceInternal/Buffer.java b/java/src/IceInternal/Buffer.java
index b14d732c643..fc7f88966c1 100644
--- a/java/src/IceInternal/Buffer.java
+++ b/java/src/IceInternal/Buffer.java
@@ -21,7 +21,6 @@ public class Buffer
this(maxCapacity, direct, java.nio.ByteOrder.LITTLE_ENDIAN);
}
- public
Buffer(int maxCapacity, boolean direct, java.nio.ByteOrder order)
{
b = _emptyBuffer;
@@ -32,13 +31,11 @@ public class Buffer
_order = order;
}
- public
Buffer(byte[] data)
{
this(data, java.nio.ByteOrder.LITTLE_ENDIAN);
}
- public
Buffer(byte[] data, java.nio.ByteOrder order)
{
b = java.nio.ByteBuffer.wrap(data);
@@ -50,13 +47,11 @@ public class Buffer
_order = order;
}
- public
Buffer(java.nio.ByteBuffer data)
{
this(data, java.nio.ByteOrder.LITTLE_ENDIAN);
}
- public
Buffer(java.nio.ByteBuffer data, java.nio.ByteOrder order)
{
b = data;
diff --git a/java/src/IceInternal/CallbackBase.java b/java/src/IceInternal/CallbackBase.java
index 61c711f0308..13c9e40f708 100644
--- a/java/src/IceInternal/CallbackBase.java
+++ b/java/src/IceInternal/CallbackBase.java
@@ -13,6 +13,7 @@ public abstract class CallbackBase
{
public abstract void __completed(Ice.AsyncResult r);
public abstract void __sent(Ice.AsyncResult r);
+ public abstract boolean __hasSentCallback();
public static void check(boolean cb)
{
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 4c79b2e3c58..1d97dc9a8d6 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -11,39 +11,10 @@ package IceInternal;
public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
- class InvokeAll extends DispatchWorkItem
+ private class InvokeAllAsync extends DispatchWorkItem
{
- public
- InvokeAll(OutgoingMessageCallback out, BasicStream os, int requestId, int invokeNum, boolean batch)
- {
- _out = out;
- _os = os;
- _requestId = requestId;
- _invokeNum = invokeNum;
- _batch = batch;
- }
-
- @Override
- public void
- run()
- {
- if(sent(_out))
- {
- invokeAll(_os, _requestId, _invokeNum, _batch);
- }
- }
-
- private final OutgoingMessageCallback _out;
- private final BasicStream _os;
- private final int _requestId;
- private final int _invokeNum;
- private final boolean _batch;
- };
-
- class InvokeAllAsync extends DispatchWorkItem
- {
- public InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
- boolean batch)
+ private InvokeAllAsync(OutgoingAsyncMessageCallback outAsync, BasicStream os, int requestId, int invokeNum,
+ boolean batch)
{
_outAsync = outAsync;
_os = os;
@@ -53,8 +24,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- public void
- run()
+ public void run()
{
if(sentAsync(_outAsync))
{
@@ -63,18 +33,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
private final OutgoingAsyncMessageCallback _outAsync;
- private final BasicStream _os;
+ private BasicStream _os;
private final int _requestId;
private final int _invokeNum;
private final boolean _batch;
};
- private void
- fillInValue(BasicStream os, int pos, int value)
- {
- os.rewriteInt(pos, value);
- }
-
public
CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter)
{
@@ -207,50 +171,10 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- public boolean
- sendRequest(OutgoingMessageCallback out)
- {
- out.invokeCollocated(this);
- return !_response && _reference.getInvocationTimeout() == 0;
- }
-
- @Override
public int
sendAsyncRequest(OutgoingAsyncMessageCallback outAsync)
{
- return outAsync.__invokeCollocated(this);
- }
-
- @Override
- synchronized public boolean
- requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
- {
- Integer requestId = _sendRequests.get(out);
- if(requestId != null)
- {
- if(requestId > 0)
- {
- _requests.remove(requestId);
- }
- out.finished(ex);
- _sendRequests.remove(out);
- return true;
- }
- else if(out instanceof Outgoing)
- {
- Outgoing o = (Outgoing)out;
- assert(o != null);
- for(java.util.Map.Entry<Integer, Outgoing> e : _requests.entrySet())
- {
- if(e.getValue() == o)
- {
- out.finished(ex);
- _requests.remove(e.getKey());
- return true; // We're done.
- }
- }
- }
- return false;
+ return outAsync.invokeCollocated(this);
}
@Override
@@ -265,7 +189,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_asyncRequests.remove(requestId);
}
_sendAsyncRequests.remove(outAsync);
- outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
return true; // We're done
}
@@ -278,7 +202,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(e.getValue() == o)
{
_asyncRequests.remove(e.getKey());
- outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
return true; // We're done
}
}
@@ -286,175 +210,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return false;
}
- public void
- invokeRequest(Outgoing out)
- {
- int requestId = 0;
- if(_reference.getInvocationTimeout() > 0 || _response)
- {
- synchronized(this)
- {
- if(_response)
- {
- requestId = ++_requestId;
- _requests.put(requestId, out);
- }
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendRequests.put(out, requestId);
- }
- }
- }
-
- out.attachCollocatedObserver(_adapter, requestId);
-
- if(_reference.getInvocationTimeout() > 0)
- {
- // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
- _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), requestId, 1, false));
- }
- else if(_dispatcher)
- {
- _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), requestId, 1, false));
- }
- else // Optimization: directly call invokeAll if there's no dispatcher.
- {
- out.sent();
- invokeAll(out.os(), requestId, 1, false);
- }
- }
-
- public int
- invokeAsyncRequest(OutgoingAsync outAsync)
- {
- int requestId = 0;
- if(_reference.getInvocationTimeout() > 0 || _response)
- {
- synchronized(this)
- {
- if(_response)
- {
- requestId = ++_requestId;
- _asyncRequests.put(requestId, outAsync);
- }
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.put(outAsync, requestId);
- }
- }
- }
-
- outAsync.__attachCollocatedObserver(_adapter, requestId);
-
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false));
-
- return AsyncStatus.Queued;
- }
-
- public void
- invokeBatchRequests(BatchOutgoing out)
- {
- int invokeNum;
- synchronized(this)
- {
- waitStreamInUse();
- invokeNum = _batchRequestNum;
-
- if(_batchRequestNum > 0)
- {
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendRequests.put(out, 0);
- }
-
- assert(!_batchStream.isEmpty());
- _batchStream.swap(out.os());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- out.attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- if(_reference.getInvocationTimeout() > 0)
- {
- _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), 0, invokeNum, true));
- }
- else if(_dispatcher)
- {
- _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), 0, invokeNum, true));
- }
- else // Optimization: directly call invokeAll if there's no dispatcher.
- {
- out.sent();
- invokeAll(out.os(), 0, invokeNum, true);
- }
- }
- else
- {
- out.sent();
- }
- }
-
- public int
- invokeAsyncBatchRequests(BatchOutgoingAsync outAsync)
- {
- int invokeNum;
- synchronized(this)
- {
- waitStreamInUse();
-
- invokeNum = _batchRequestNum;
- if(_batchRequestNum > 0)
- {
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.put(outAsync, 0);
- }
-
- assert(!_batchStream.isEmpty());
- _batchStream.swap(outAsync.__getOs());
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding,
- _batchAutoFlush);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchMarker = 0;
- }
- }
-
- outAsync.__attachCollocatedObserver(_adapter, 0);
-
- if(invokeNum > 0)
- {
- _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true));
- return AsyncStatus.Queued;
- }
- else if(outAsync.__sent())
- {
- return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback;
- }
- else
- {
- return AsyncStatus.Sent;
- }
- }
-
@Override
public void
- sendResponse(int requestId, BasicStream os, byte status)
+ sendResponse(int requestId, final BasicStream os, byte status)
{
OutgoingAsync outAsync = null;
synchronized(this)
@@ -469,25 +227,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
TraceUtil.traceRecv(os, _logger, _traceLevels);
}
- Outgoing out = _requests.get(requestId);
- if(out != null)
- {
- out.finished(os);
- _requests.remove(requestId);
- }
- else
+ outAsync = _asyncRequests.get(requestId);
+ if(outAsync != null)
{
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ _asyncRequests.remove(requestId);
}
}
if(outAsync != null)
{
- outAsync.__finished(os);
+ outAsync.finished(os);
}
_adapter.decDirectCount();
}
@@ -517,19 +266,11 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
OutgoingAsync outAsync = null;
synchronized(this)
{
- Outgoing out = _requests.remove(requestId);
- if(out != null)
- {
- out.finished(ex);
- }
- else
- {
- outAsync = _asyncRequests.remove(requestId);
- }
+ outAsync = _asyncRequests.remove(requestId);
}
if(outAsync != null)
{
- outAsync.__finished(ex);
+ outAsync.finished(ex);
}
}
_adapter.decDirectCount();
@@ -556,25 +297,104 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
return null;
}
- boolean
- sent(OutgoingMessageCallback out)
+ void invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
{
- if(_reference.getInvocationTimeout() > 0)
+ int requestId = 0;
+ if(_reference.getInvocationTimeout() > 0 || _response)
{
synchronized(this)
{
- if(_sendRequests.remove(out) == null)
+ if(_response)
{
- return false; // The request timed-out.
+ requestId = ++_requestId;
+ _asyncRequests.put(requestId, outAsync);
+ }
+ if(_reference.getInvocationTimeout() > 0)
+ {
+ _sendAsyncRequests.put(outAsync, requestId);
}
}
}
- out.sent();
- return true;
+
+ outAsync.attachCollocatedObserver(_adapter, requestId);
+
+ if(synchronous)
+ {
+ //
+ // Treat this collocated call as if it is a synchronous invocation.
+ //
+ if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0 || !_response)
+ {
+ // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ }
+ else if(_dispatcher)
+ {
+ _adapter.getThreadPool().dispatchFromThisThread(
+ new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
+ {
+ if(sentAsync(outAsync))
+ {
+ invokeAll(outAsync.getOs(), requestId, 1, false);
+ }
+ }
+ }
+ else
+ {
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), requestId, 1, false));
+ }
+ }
+
+ int invokeAsyncBatchRequests(BatchOutgoingAsync outAsync)
+ {
+ int invokeNum;
+ synchronized(this)
+ {
+ waitStreamInUse();
+
+ invokeNum = _batchRequestNum;
+ if(_batchRequestNum > 0)
+ {
+ if(_reference.getInvocationTimeout() > 0)
+ {
+ _sendAsyncRequests.put(outAsync, 0);
+ }
+
+ assert(!_batchStream.isEmpty());
+ _batchStream.swap(outAsync.getOs());
+
+ //
+ // Reset the batch stream.
+ //
+ BasicStream dummy = new BasicStream(_reference.getInstance(), Protocol.currentProtocolEncoding,
+ _batchAutoFlush);
+ _batchStream.swap(dummy);
+ _batchRequestNum = 0;
+ _batchMarker = 0;
+ }
+ }
+
+ outAsync.attachCollocatedObserver(_adapter, 0);
+
+ if(invokeNum > 0)
+ {
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.getOs(), 0, invokeNum, true));
+ return AsyncStatus.Queued;
+ }
+ else if(outAsync.sent())
+ {
+ return AsyncStatus.Sent | AsyncStatus.InvokeSentCallback;
+ }
+ else
+ {
+ return AsyncStatus.Sent;
+ }
}
- boolean
- sentAsync(OutgoingAsyncMessageCallback outAsync)
+ private boolean
+ sentAsync(final OutgoingAsyncMessageCallback outAsync)
{
if(_reference.getInvocationTimeout() > 0)
{
@@ -586,14 +406,15 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
}
- if(outAsync.__sent())
+
+ if(outAsync.sent())
{
- outAsync.__invokeSent();
+ outAsync.invokeSent();
}
return true;
}
- void
+ private void
invokeAll(BasicStream os, int requestId, int invokeNum, boolean batch)
{
if(batch)
@@ -668,7 +489,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
- void
+ private void
handleException(int requestId, Ice.Exception ex)
{
if(requestId == 0)
@@ -679,25 +500,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
OutgoingAsync outAsync = null;
synchronized(this)
{
- Outgoing out = _requests.get(requestId);
- if(out != null)
- {
- out.finished(ex);
- _requests.remove(requestId);
- }
- else
+ outAsync = _asyncRequests.get(requestId);
+ if(outAsync != null)
{
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ _asyncRequests.remove(requestId);
}
}
if(outAsync != null)
{
- outAsync.__finished(ex);
+ outAsync.finished(ex);
}
}
@@ -731,6 +543,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
+ private void
+ fillInValue(BasicStream os, int pos, int value)
+ {
+ os.rewriteInt(pos, value);
+ }
+
private final Reference _reference;
private final boolean _dispatcher;
private final boolean _response;
@@ -741,12 +559,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private int _requestId;
- private java.util.Map<OutgoingMessageCallback, Integer> _sendRequests =
- new java.util.HashMap<OutgoingMessageCallback, Integer>();
private java.util.Map<OutgoingAsyncMessageCallback, Integer> _sendAsyncRequests =
new java.util.HashMap<OutgoingAsyncMessageCallback, Integer>();
- private java.util.Map<Integer, Outgoing> _requests = new java.util.HashMap<Integer, Outgoing>();
private java.util.Map<Integer, OutgoingAsync> _asyncRequests = new java.util.HashMap<Integer, OutgoingAsync>();
private BasicStream _batchStream;
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
index 3c3d07bcc4e..6ebd6fb3a3e 100644
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult
+public class CommunicatorBatchOutgoingAsync extends IceInternal.AsyncResultI
{
public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
CallbackBase callback)
@@ -49,21 +49,21 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult
@Override
public boolean
- __sent()
+ sent()
{
if(_childObserver != null)
{
_childObserver.detach();
_childObserver = null;
}
- check(false);
+ doCheck(false);
return false;
}
// TODO: MJN: This is missing a test.
@Override
public void
- __finished(Ice.Exception ex)
+ finished(Ice.Exception ex)
{
if(_childObserver != null)
{
@@ -71,12 +71,12 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult
_childObserver.detach();
_childObserver = null;
}
- check(false);
+ doCheck(false);
}
@Override
public void
- __attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
+ attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
{
if(CommunicatorBatchOutgoingAsync.this._observer != null)
{
@@ -105,17 +105,17 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult
}
catch(Ice.LocalException ex)
{
- check(false);
+ doCheck(false);
throw ex;
}
}
public void ready()
{
- check(true);
+ doCheck(true);
}
- private void check(boolean userThread)
+ private void doCheck(boolean userThread)
{
synchronized(_monitor)
{
@@ -124,21 +124,32 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult
{
return;
}
- _state |= Done | OK | Sent;
+ _state |= StateDone | StateOK | StateSent;
_os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
_monitor.notifyAll();
}
- //
- // sentSynchronously_ is immutable here.
- //
- if(!_sentSynchronously || !userThread)
+ if(_callback == null || !_callback.__hasSentCallback())
{
- __invokeSentAsync();
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
}
else
{
- __invokeSentInternal();
+ //
+ // sentSynchronously_ is immutable here.
+ //
+ if(!_sentSynchronously || !userThread)
+ {
+ invokeSentAsync();
+ }
+ else
+ {
+ invokeSentInternal();
+ }
}
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 9cc70b6a76d..25d9859b997 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -14,7 +14,7 @@ import Ice.ConnectionI;
public class ConnectRequestHandler
implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback
{
- static class Request
+ static private class Request
{
Request(BasicStream os)
{
@@ -22,17 +22,11 @@ public class ConnectRequestHandler
this.os.swap(os);
}
- Request(OutgoingMessageCallback out)
- {
- this.out = out;
- }
-
Request(OutgoingAsyncMessageCallback out)
{
this.outAsync = out;
}
- OutgoingMessageCallback out = null;
OutgoingAsyncMessageCallback outAsync = null;
BasicStream os = null;
}
@@ -135,29 +129,6 @@ public class ConnectRequestHandler
}
@Override
- public boolean
- sendRequest(OutgoingMessageCallback out)
- throws RetryException
- {
- synchronized(this)
- {
- try
- {
- if(!initialized())
- {
- _requests.add(new Request(out));
- return false; // Not sent
- }
- }
- catch(Ice.LocalException ex)
- {
- throw new RetryException(ex);
- }
- }
- return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response.
- }
-
- @Override
public int
sendAsyncRequest(OutgoingAsyncMessageCallback out)
throws RetryException
@@ -177,37 +148,7 @@ public class ConnectRequestHandler
throw new RetryException(ex);
}
}
- return out.__send(_connection, _compress, _response);
- }
-
- @Override
- public boolean
- requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
- {
- synchronized(this)
- {
- if(_exception != null)
- {
- return false; // The request has been notified of a failure already.
- }
-
- if(!initialized())
- {
- java.util.Iterator<Request> it = _requests.iterator();
- while(it.hasNext())
- {
- Request request = it.next();
- if(request.out == out)
- {
- out.finished(ex);
- it.remove();
- return true;
- }
- }
- assert(false); // The request has to be queued if it timed out and we're not initialized yet.
- }
- }
- return _connection.requestCanceled(out, ex);
+ return out.send(_connection, _compress, _response);
}
@Override
@@ -230,7 +171,7 @@ public class ConnectRequestHandler
if(request.outAsync == outAsync)
{
it.remove();
- outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ outAsync.dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
return true; // We're done
}
}
@@ -249,7 +190,8 @@ public class ConnectRequestHandler
@Override
synchronized public ConnectionI
- getConnection() {
+ getConnection()
+ {
if(_exception != null)
{
throw (Ice.LocalException)_exception.fillInStackTrace();
@@ -259,12 +201,17 @@ public class ConnectRequestHandler
return _connection;
}
}
-
+
@Override
synchronized public
ConnectionI waitForConnection()
- throws InterruptedException
+ throws InterruptedException, RetryException
{
+ if(_exception != null)
+ {
+ throw new RetryException(_exception);
+ }
+
//
// Wait for the connection establishment to complete or fail.
//
@@ -440,13 +387,9 @@ public class ConnectRequestHandler
while(p.hasNext())
{
Request request = p.next();
- if(request.out != null)
- {
- request.out.send(_connection, _compress, _response);
- }
- else if(request.outAsync != null)
+ if(request.outAsync != null)
{
- if((request.outAsync.__send(_connection, _compress, _response) &
+ if((request.outAsync.send(_connection, _compress, _response) &
AsyncStatus.InvokeSentCallback) > 0)
{
sentCallbacks.add(request.outAsync);
@@ -485,14 +428,13 @@ public class ConnectRequestHandler
assert(_exception == null && !_requests.isEmpty());
_exception = ex.get();
_reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- flushRequestsWithException();
- };
- });
+ flushRequestsWithException();
+ };
+ });
}
}
catch(final Ice.LocalException ex)
@@ -502,32 +444,29 @@ public class ConnectRequestHandler
assert(_exception == null && !_requests.isEmpty());
_exception = ex;
_reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- flushRequestsWithException();
- };
- });
+ flushRequestsWithException();
+ };
+ });
}
}
if(!sentCallbacks.isEmpty())
{
- _reference.getInstance().clientThreadPool().dispatch(
- new DispatchWorkItem(_connection)
+ _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
+ for(OutgoingAsyncMessageCallback callback : sentCallbacks)
{
- for(OutgoingAsyncMessageCallback callback : sentCallbacks)
- {
- callback.__invokeSent();
- }
- };
- });
+ callback.invokeSent();
+ }
+ };
+ });
}
//
@@ -590,13 +529,9 @@ public class ConnectRequestHandler
{
for(Request request : _requests)
{
- if(request.out != null)
- {
- request.out.finished(_exception);
- }
- else if(request.outAsync != null)
+ if(request.outAsync != null)
{
- request.outAsync.__finished(_exception);
+ request.outAsync.finished(_exception);
}
}
_requests.clear();
diff --git a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
index 47870ff2494..1b9fd2e0e3c 100644
--- a/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ConnectionBatchOutgoingAsync.java
@@ -26,7 +26,7 @@ public class ConnectionBatchOutgoingAsync extends BatchOutgoingAsync
_sentSynchronously = true;
if((status & AsyncStatus.InvokeSentCallback) > 0)
{
- __invokeSent();
+ invokeSent();
}
}
}
diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java
index 4dcf9db63e8..5494b6f70e3 100644
--- a/java/src/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/IceInternal/ConnectionRequestHandler.java
@@ -13,44 +13,31 @@ public class ConnectionRequestHandler implements RequestHandler
{
@Override
public void
- prepareBatchRequest(BasicStream out) throws RetryException {
+ prepareBatchRequest(BasicStream out)
+ throws RetryException
+ {
_connection.prepareBatchRequest(out);
}
@Override
public void
- finishBatchRequest(BasicStream out) {
+ finishBatchRequest(BasicStream out)
+ {
_connection.finishBatchRequest(out, _compress);
}
@Override
public void
- abortBatchRequest() {
+ abortBatchRequest()
+ {
_connection.abortBatchRequest();
}
@Override
- public boolean
- sendRequest(OutgoingMessageCallback out)
- throws RetryException {
- //
- // Finished if sent and no response.
- //
- return out.send(_connection, _compress, _response) && !_response;
- }
-
- @Override
- public int
- sendAsyncRequest(OutgoingAsyncMessageCallback out)
- throws RetryException {
- return out.__send(_connection, _compress, _response);
- }
-
- @Override
- public boolean
- requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
+ public int sendAsyncRequest(OutgoingAsyncMessageCallback out)
+ throws RetryException
{
- return _connection.requestCanceled(out, ex);
+ return out.send(_connection, _compress, _response);
}
@Override
diff --git a/java/src/IceInternal/DispatchObserverI.java b/java/src/IceInternal/DispatchObserverI.java
index 4aab7c449bb..6dbc20c2066 100644
--- a/java/src/IceInternal/DispatchObserverI.java
+++ b/java/src/IceInternal/DispatchObserverI.java
@@ -43,7 +43,7 @@ public class DispatchObserverI
}
}
- final MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>()
+ final private MetricsUpdate<IceMX.DispatchMetrics> _userException = new MetricsUpdate<IceMX.DispatchMetrics>()
{
@Override
public void
diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java
index ee49e23ddf2..532880174bd 100644
--- a/java/src/IceInternal/EndpointHostResolver.java
+++ b/java/src/IceInternal/EndpointHostResolver.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class EndpointHostResolver
+class EndpointHostResolver
{
EndpointHostResolver(Instance instance)
{
@@ -31,8 +31,7 @@ public class EndpointHostResolver
}
}
- public java.util.List<Connector> resolve(String host, int port, Ice.EndpointSelectionType selType,
- IPEndpointI endpoint)
+ java.util.List<Connector> resolve(String host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint)
{
//
// Try to get the addresses without DNS lookup. If this doesn't
@@ -89,7 +88,7 @@ public class EndpointHostResolver
return connectors;
}
- synchronized public void resolve(final String host, final int port, final Ice.EndpointSelectionType selType, final IPEndpointI endpoint,
+ synchronized void resolve(final String host, final int port, final Ice.EndpointSelectionType selType, final IPEndpointI endpoint,
final EndpointI_connectors callback)
{
//
@@ -172,7 +171,7 @@ public class EndpointHostResolver
});
}
- synchronized public void destroy()
+ synchronized void destroy()
{
assert(!_destroyed);
_destroyed = true;
@@ -184,7 +183,7 @@ public class EndpointHostResolver
_executor.shutdown();
}
- public void joinWithThread()
+ void joinWithThread()
throws InterruptedException
{
// Wait for the executor to terminate.
@@ -201,7 +200,7 @@ public class EndpointHostResolver
}
}
- synchronized public void updateObserver()
+ synchronized void updateObserver()
{
Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
diff --git a/java/src/IceInternal/Functional_CallbackBase.java b/java/src/IceInternal/Functional_CallbackBase.java
index 4665f8b49a6..5e344f32141 100644
--- a/java/src/IceInternal/Functional_CallbackBase.java
+++ b/java/src/IceInternal/Functional_CallbackBase.java
@@ -37,6 +37,12 @@ public abstract class Functional_CallbackBase extends IceInternal.CallbackBase
}
@Override
+ public final boolean __hasSentCallback()
+ {
+ return __sentCb != null;
+ }
+
+ @Override
public abstract void __completed(Ice.AsyncResult __result);
protected final Functional_GenericCallback1<Ice.Exception> __exceptionCb;
diff --git a/java/src/IceInternal/HttpParser.java b/java/src/IceInternal/HttpParser.java
index b5567b080b6..c3532393cbd 100644
--- a/java/src/IceInternal/HttpParser.java
+++ b/java/src/IceInternal/HttpParser.java
@@ -20,7 +20,7 @@ final class HttpParser
_state = State.Init;
}
- enum Type
+ private enum Type
{
Unknown,
Request,
@@ -627,17 +627,6 @@ final class HttpParser
return _state == State.Complete;
}
- Type type()
- {
- return _type;
- }
-
- String method()
- {
- assert(_type == Type.Request);
- return _method.toString();
- }
-
String uri()
{
assert(_type == Type.Request);
@@ -675,11 +664,6 @@ final class HttpParser
return null;
}
- java.util.Map<String, String> headers()
- {
- return _headers;
- }
-
private Type _type;
private StringBuffer _method = new StringBuffer();
@@ -694,7 +678,7 @@ final class HttpParser
private int _status;
private String _reason;
- enum State
+ private enum State
{
Init,
Type,
diff --git a/java/src/IceInternal/IncomingBase.java b/java/src/IceInternal/IncomingBase.java
index e02c15565c6..6879e8cdf2f 100644
--- a/java/src/IceInternal/IncomingBase.java
+++ b/java/src/IceInternal/IncomingBase.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class IncomingBase
+class IncomingBase
{
protected
IncomingBase(Instance instance, ResponseHandler handler, Ice.ConnectionI connection, Ice.ObjectAdapter adapter,
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index d091c0fd45c..e52546b26fa 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -13,6 +13,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import Ice.CommunicatorDestroyedException;
+
public final class Instance
{
private class ObserverUpdaterI implements Ice.Instrumentation.ObserverUpdater
@@ -605,12 +607,16 @@ public final class Instance
public boolean
queueRequests()
{
- return _hasQueueExecutor;
+ return _queueExecutor != null;
}
- public ExecutorService
+ synchronized public ExecutorService
getQueueExecutor()
{
+ if(_state == StateDestroyed)
+ {
+ throw new Ice.CommunicatorDestroyedException();
+ }
return _queueExecutor;
}
@@ -853,19 +859,11 @@ public final class Instance
if(_initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0)
{
- _hasQueueExecutor = true;
_queueExecutor = Executors.newFixedThreadPool(1,
Util.createThreadFactory(_initData.properties,
Util.createThreadName(_initData.properties, "Ice.BackgroundIO")));
- //
- // If background IO is enabled message buffers cannot be cached.
- //
- _cacheMessageBuffers = 0;
- }
- else
- {
- _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2);
}
+ _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2);
}
catch(Ice.LocalException ex)
{
@@ -1072,7 +1070,7 @@ public final class Instance
ThreadPool serverThreadPool = null;
ThreadPool clientThreadPool = null;
EndpointHostResolver endpointHostResolver = null;
-
+ ExecutorService queueExecutor = null;
synchronized(this)
{
_objectAdapterFactory = null;
@@ -1152,6 +1150,9 @@ public final class Instance
_adminAdapter = null;
_adminFacets.clear();
+ queueExecutor = _queueExecutor;
+ _queueExecutor = null;
+
_typeToClassMap.clear();
_state = StateDestroyed;
@@ -1180,18 +1181,18 @@ public final class Instance
throw new Ice.OperationInterruptedException();
}
- if(_queueExecutor != null)
+ if(queueExecutor != null)
{
- _queueExecutor.shutdown();
+ queueExecutor.shutdown();
try
{
- _queueExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ queueExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
catch (InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- _queueExecutor = null;
+ queueExecutor = null;
}
if(_initData.properties.getPropertyAsInt("Ice.Warn.UnusedProperties") > 0)
@@ -1334,6 +1335,5 @@ public final class Instance
final private boolean _useApplicationClassLoader;
private static boolean _oneOffDone = false;
- private boolean _hasQueueExecutor = false;
private ExecutorService _queueExecutor;
}
diff --git a/java/src/IceInternal/InvocationObserverI.java b/java/src/IceInternal/InvocationObserverI.java
index 8c9ffc57d32..bf96a9c0d0d 100644
--- a/java/src/IceInternal/InvocationObserverI.java
+++ b/java/src/IceInternal/InvocationObserverI.java
@@ -205,10 +205,10 @@ public class InvocationObserverI
delegate = _delegate.getRemoteObserver(con, edpt, requestId, sz);
}
return getObserver("Remote",
- new RemoteInvocationHelper(con, edpt, requestId, sz),
- RemoteMetrics.class,
- RemoteObserverI.class,
- delegate);
+ new RemoteInvocationHelper(con, edpt, requestId, sz),
+ RemoteMetrics.class,
+ RemoteObserverI.class,
+ delegate);
}
@Override
@@ -220,12 +220,11 @@ public class InvocationObserverI
{
delegate = _delegate.getCollocatedObserver(adapter, requestId, sz);
}
- return getObserver(
- "Collocated",
- new CollocatedInvocationHelper(adapter, requestId, sz),
- CollocatedMetrics.class,
- CollocatedObserverI.class,
- delegate);
+ return getObserver("Collocated",
+ new CollocatedInvocationHelper(adapter, requestId, sz),
+ CollocatedMetrics.class,
+ CollocatedObserverI.class,
+ delegate);
}
final MetricsUpdate<InvocationMetrics> _incrementRetry = new MetricsUpdate<InvocationMetrics>()
diff --git a/java/src/IceInternal/Network.java b/java/src/IceInternal/Network.java
index c06e7bf9550..324ccbe124d 100644
--- a/java/src/IceInternal/Network.java
+++ b/java/src/IceInternal/Network.java
@@ -487,94 +487,6 @@ public final class Network
}
}
- public static java.nio.channels.SocketChannel
- doAccept(java.nio.channels.ServerSocketChannel fd, int timeout)
- {
- java.nio.channels.SocketChannel result = null;
- while(result == null)
- {
- try
- {
- result = fd.accept();
- if(result == null)
- {
- java.nio.channels.Selector selector = java.nio.channels.Selector.open();
-
- try
- {
- while(true)
- {
- try
- {
- fd.register(selector, java.nio.channels.SelectionKey.OP_ACCEPT);
- int n;
- if(timeout > 0)
- {
- n = selector.select(timeout);
- }
- else if(timeout == 0)
- {
- n = selector.selectNow();
- }
- else
- {
- n = selector.select();
- }
-
- if(n == 0)
- {
- throw new Ice.TimeoutException();
- }
-
- break;
- }
- catch(java.io.IOException ex)
- {
- if(interrupted(ex))
- {
- continue;
- }
- throw new Ice.SocketException(ex);
- }
- }
- }
- finally
- {
- try
- {
- selector.close();
- }
- catch(java.io.IOException ex)
- {
- // Ignore
- }
- }
- }
- }
- catch(java.io.IOException ex)
- {
- if(interrupted(ex))
- {
- continue;
- }
- throw new Ice.SocketException(ex);
- }
- }
-
- try
- {
- java.net.Socket socket = result.socket();
- socket.setTcpNoDelay(true);
- socket.setKeepAlive(true);
- }
- catch(java.io.IOException ex)
- {
- throw new Ice.SocketException(ex);
- }
-
- return result;
- }
-
public static void
setSendBufferSize(java.nio.channels.SocketChannel fd, int size)
{
@@ -1189,22 +1101,6 @@ public final class Network
}
public static String
- fdToString(java.net.Socket fd)
- {
- if(fd == null)
- {
- return "<closed>";
- }
-
- java.net.InetAddress localAddr = fd.getLocalAddress();
- int localPort = fd.getLocalPort();
- java.net.InetAddress remoteAddr = fd.getInetAddress();
- int remotePort = fd.getPort();
-
- return addressesToString(localAddr, localPort, remoteAddr, remotePort);
- }
-
- public static String
addressesToString(java.net.InetAddress localAddr, int localPort, java.net.InetAddress remoteAddr, int remotePort,
NetworkProxy proxy, java.net.InetSocketAddress target)
{
diff --git a/java/src/IceInternal/OpaqueEndpointI.java b/java/src/IceInternal/OpaqueEndpointI.java
index b4f291f7751..b14beda4036 100644
--- a/java/src/IceInternal/OpaqueEndpointI.java
+++ b/java/src/IceInternal/OpaqueEndpointI.java
@@ -174,14 +174,6 @@ final class OpaqueEndpointI extends EndpointI
}
//
- // Get the encoded endpoint.
- //
- public byte[] rawBytes()
- {
- return _rawBytes;
- }
-
- //
// Return a server side transceiver for this endpoint, or null if a
// transceiver can only be created by an acceptor. In case a
// transceiver is created, this operation also returns a new
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
deleted file mode 100644
index eec1e4eb8f0..00000000000
--- a/java/src/IceInternal/Outgoing.java
+++ /dev/null
@@ -1,736 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-import Ice.Instrumentation.ChildInvocationObserver;
-import Ice.Instrumentation.InvocationObserver;
-
-public final class Outgoing implements OutgoingMessageCallback
-{
- public
- Outgoing(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context,
- boolean explicitCtx)
- {
- Reference ref = proxy.__reference();
- _state = StateUnsent;
- _sent = false;
- _proxy = proxy;
- _mode = mode;
- _handler = null;
- _observer = IceInternal.ObserverHelper.get(proxy, op, context);
- _encoding = Protocol.getCompatibleEncoding(ref.getEncoding());
- _os = new BasicStream(ref.getInstance(), Protocol.currentProtocolEncoding);
-
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol()));
-
- writeHeader(op, mode, context, explicitCtx);
- }
-
- //
- // These functions allow this object to be reused, rather than reallocated.
- //
- public void
- reset(Ice.ObjectPrxHelperBase proxy, String op, Ice.OperationMode mode, java.util.Map<String, String> context,
- boolean explicitCtx)
- {
- Reference ref = proxy.__reference();
- _state = StateUnsent;
- _exception = null;
- _sent = false;
- _proxy = proxy;
- _mode = mode;
- _handler = null;
- _observer = IceInternal.ObserverHelper.get(proxy, op, context);
- _encoding = Protocol.getCompatibleEncoding(ref.getEncoding());
-
- Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol()));
-
- writeHeader(op, mode, context, explicitCtx);
- }
-
- public void
- detach()
- {
- if(_observer != null)
- {
- _observer.detach();
- }
- }
-
- public void
- reclaim()
- {
- if(_is != null)
- {
- _is.reset();
- }
- _os.reset();
- }
-
- // Returns true if ok, false if user exception.
- public boolean
- invoke()
- {
- assert(_state == StateUnsent);
-
- int mode = _proxy.__reference().getMode();
- if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
- {
- _state = StateInProgress;
- _handler.finishBatchRequest(_os);
- return true;
- }
-
- int cnt = 0;
- while(true)
- {
- try
- {
- _state = StateInProgress;
- _exception = null;
- _sent = false;
-
- _handler = _proxy.__getRequestHandler();
- try
- {
- if(_handler.sendRequest(this)) // Request sent and no response expected, we're done.
- {
- return true;
- }
- }
- catch(Ice.OperationInterruptedException ex)
- {
- if(_handler.requestCanceled(this, ex))
- {
- //
- // Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestTimedOut got called.
- // In this case, the exception should be set on the Outgoing.
- //
- synchronized(this)
- {
- boolean interrupted = false;
- while(_exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex2)
- {
- interrupted = true;
- }
- }
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
- else
- {
- throw ex;
- }
- }
-
- boolean timedOut = false;
- synchronized(this)
- {
- //
- // If the handler says it's not finished, we wait until we're done.
- //
-
- int invocationTimeout = _proxy.__reference().getInvocationTimeout();
- if(invocationTimeout > 0)
- {
- long now = Time.currentMonotonicTimeMillis();
- long deadline = now + invocationTimeout;
- while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut)
- {
- try
- {
- wait(deadline - now);
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- if((_state == StateInProgress || !_sent) && _state != StateFailed)
- {
- now = Time.currentMonotonicTimeMillis();
- timedOut = now >= deadline;
- }
- }
- }
- else
- {
- while((_state == StateInProgress || !_sent) && _state != StateFailed)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
- }
-
- if(timedOut)
- {
- Ice.InvocationTimeoutException ex = new Ice.InvocationTimeoutException();
- if(_handler.requestCanceled(this, ex))
- {
- //
- // Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestTimedOut got called.
- // In this case, the exception should be set on the Outgoing.
- //
- synchronized(this)
- {
- boolean interrupted = false;
- while(_exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException e)
- {
- interrupted = true;
- }
- }
- if(interrupted)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
- else
- {
- throw ex;
- }
- }
-
- if(_exception != null)
- {
- throw (Ice.Exception)_exception.fillInStackTrace();
- }
- else
- {
- assert(_state != StateInProgress);
- return _state == StateOK;
- }
- }
- catch(RetryException ex)
- {
- _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
- }
- catch(Ice.Exception ex)
- {
- try
- {
- Ice.Holder<Integer> interval = new Ice.Holder<Integer>();
- cnt = _proxy.__handleException(ex, _handler, _mode, _sent, interval, cnt);
- if(_observer != null)
- {
- _observer.retried(); // Invocation is being retried.
- }
- if(interval.value > 0)
- {
- try
- {
- Thread.sleep(interval.value);
- }
- catch(InterruptedException exi)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
- catch(Ice.Exception exc)
- {
- if(_observer != null)
- {
- _observer.failed(exc.ice_name());
- }
- throw exc;
- }
- }
- }
- }
-
- public void
- abort(Ice.LocalException ex)
- {
- assert(_state == StateUnsent);
-
- //
- // If we didn't finish a batch oneway or datagram request, we
- // must notify the connection about that we give up ownership
- // of the batch stream.
- //
- int mode = _proxy.__reference().getMode();
- if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
- {
- _handler.abortBatchRequest();
- }
-
- throw ex;
- }
-
- @Override
- public boolean
- send(Ice.ConnectionI connection, boolean compress, boolean response)
- throws RetryException
- {
- return connection.sendRequest(this, compress, response);
- }
-
- @Override
- public void
- invokeCollocated(CollocatedRequestHandler handler)
- {
- handler.invokeRequest(this);
- }
-
- @Override
- synchronized public void
- sent()
- {
- if(_proxy.__reference().getMode() != Reference.ModeTwoway)
- {
- if(_childObserver != null)
- {
- _childObserver.detach();
- _childObserver = null;
- }
- _state = StateOK;
- }
- _sent = true;
- notify();
- }
-
- public synchronized void
- finished(BasicStream is)
- {
- assert(_proxy.__reference().getMode() == Reference.ModeTwoway); // Only for twoways.
-
- assert(_state <= StateInProgress);
-
- if(_childObserver != null)
- {
- _childObserver.reply(is.size() - Protocol.headerSize - 4);
- _childObserver.detach();
- _childObserver = null;
- }
-
- if(_is == null)
- {
- _is = new IceInternal.BasicStream(_proxy.__reference().getInstance(), Protocol.currentProtocolEncoding);
- }
- _is.swap(is);
- byte replyStatus = _is.readByte();
-
- switch(replyStatus)
- {
- case ReplyStatus.replyOK:
- {
- _state = StateOK; // The state must be set last, in case there is an exception.
- break;
- }
-
- case ReplyStatus.replyUserException:
- {
- if(_observer != null)
- {
- _observer.userException();
- }
- _state = StateUserException; // The state must be set last, in case there is an exception.
- break;
- }
-
- case ReplyStatus.replyObjectNotExist:
- case ReplyStatus.replyFacetNotExist:
- case ReplyStatus.replyOperationNotExist:
- {
- Ice.RequestFailedException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
- break;
- }
-
- case ReplyStatus.replyFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
-
- case ReplyStatus.replyOperationNotExist:
- {
- ex = new Ice.OperationNotExistException();
- break;
- }
-
- default:
- {
- assert(false);
- break;
- }
- }
-
- ex.id = new Ice.Identity();
- ex.id.__read(_is);
-
- //
- // For compatibility with the old FacetPath.
- //
- String[] facetPath = _is.readStringSeq();
- if(facetPath.length > 0)
- {
- if(facetPath.length > 1)
- {
- throw new Ice.MarshalException();
- }
- ex.facet = facetPath[0];
- }
- else
- {
- ex.facet = "";
- }
-
- ex.operation = _is.readString();
- _exception = ex;
-
- _state = StateLocalException; // The state must be set last, in case there is an exception.
- break;
- }
-
- case ReplyStatus.replyUnknownException:
- case ReplyStatus.replyUnknownLocalException:
- case ReplyStatus.replyUnknownUserException:
- {
- Ice.UnknownException ex = null;
- switch(replyStatus)
- {
- case ReplyStatus.replyUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
-
- case ReplyStatus.replyUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
- }
-
- case ReplyStatus.replyUnknownUserException:
- {
- ex = new Ice.UnknownUserException();
- break;
- }
-
- default:
- {
- assert(false);
- break;
- }
- }
-
- ex.unknown = _is.readString();
- _exception = ex;
-
- _state = StateLocalException; // The state must be set last, in case there is an exception.
- break;
- }
-
- default:
- {
- _exception = new Ice.UnknownReplyStatusException();
- _state = StateLocalException;
- break;
- }
- }
-
- notify();
- }
-
- @Override
- public synchronized void
- finished(Ice.Exception ex)
- {
- //assert(_state <= StateInProgress);
- if(_state > StateInProgress)
- {
- //
- // Response was already received but message
- // didn't get removed first from the connection
- // send message queue so it's possible we can be
- // notified of failures. In this case, ignore the
- // failure and assume the outgoing has been sent.
- //
- assert(_state != StateFailed);
- _sent = true;
- notify();
- return;
- }
-
- if(_childObserver != null)
- {
- _childObserver.failed(ex.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- _state = StateFailed;
- _exception = ex;
- notify();
- }
-
- public BasicStream
- os()
- {
- return _os;
- }
-
- public BasicStream
- startReadParams()
- {
- _is.startReadEncaps();
- return _is;
- }
-
- public void
- endReadParams()
- {
- _is.endReadEncaps();
- }
-
- public void
- readEmptyParams()
- {
- _is.skipEmptyEncaps(null);
- }
-
- public byte[]
- readParamEncaps()
- {
- return _is.readEncaps(null);
- }
-
- public BasicStream
- startWriteParams(Ice.FormatType format)
- {
- _os.startWriteEncaps(_encoding, format);
- return _os;
- }
-
- public void
- endWriteParams()
- {
- _os.endWriteEncaps();
- }
-
- public void
- writeEmptyParams()
- {
- _os.writeEmptyEncaps(_encoding);
- }
-
- public void
- writeParamEncaps(byte[] encaps)
- {
- if(encaps == null || encaps.length == 0)
- {
- _os.writeEmptyEncaps(_encoding);
- }
- else
- {
- _os.writeEncaps(encaps);
- }
- }
-
- public boolean
- hasResponse()
- {
- return _is != null && !_is.isEmpty();
- }
-
- public void
- throwUserException()
- throws Ice.UserException
- {
- try
- {
- _is.startReadEncaps();
- _is.throwException(null);
- }
- catch(Ice.UserException ex)
- {
- _is.endReadEncaps();
- throw ex;
- }
- }
-
- public void
- attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- public void
- attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getCollocatedObserver(adapter, requestId, _os.size() - Protocol.headerSize - 4);
- if(_childObserver != null)
- {
- _childObserver.attach();
- }
- }
- }
-
- private void
- writeHeader(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, boolean explicitCtx)
- {
- if(explicitCtx && context == null)
- {
- context = _emptyContext;
- }
-
- switch(_proxy.__reference().getMode())
- {
- case Reference.ModeTwoway:
- case Reference.ModeOneway:
- case Reference.ModeDatagram:
- {
- _os.writeBlob(IceInternal.Protocol.requestHdr);
- break;
- }
-
- case Reference.ModeBatchOneway:
- case Reference.ModeBatchDatagram:
- {
- while(true)
- {
- try
- {
- _handler = _proxy.__getRequestHandler();
- _handler.prepareBatchRequest(_os);
- break;
- }
- catch(RetryException ex)
- {
- _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
- }
- catch(Ice.LocalException ex)
- {
- if(_observer != null)
- {
- _observer.failed(ex.ice_name());
- }
- _proxy.__setRequestHandler(_handler, null); // Clear request handler
- throw ex;
- }
- }
- break;
- }
- }
-
- try
- {
- _proxy.__reference().getIdentity().__write(_os);
-
- //
- // For compatibility with the old FacetPath.
- //
- String facet = _proxy.__reference().getFacet();
- if(facet == null || facet.length() == 0)
- {
- _os.writeStringSeq(null);
- }
- else
- {
- String[] facetPath = { facet };
- _os.writeStringSeq(facetPath);
- }
-
- _os.writeString(operation);
-
- _os.writeByte((byte)mode.value());
-
- if(context != null)
- {
- //
- // Explicit context
- //
- Ice.ContextHelper.write(_os, context);
- }
- else
- {
- //
- // Implicit context
- //
- Ice.ImplicitContextI implicitContext = _proxy.__reference().getInstance().getImplicitContext();
- java.util.Map<String, String> prxContext = _proxy.__reference().getContext();
-
- if(implicitContext == null)
- {
- Ice.ContextHelper.write(_os, prxContext);
- }
- else
- {
- implicitContext.write(prxContext, _os);
- }
- }
- }
- catch(Ice.LocalException ex)
- {
- abort(ex);
- }
- }
-
- private Ice.ObjectPrxHelperBase _proxy;
- private Ice.OperationMode _mode;
- private RequestHandler _handler;
- private Ice.EncodingVersion _encoding;
- private BasicStream _is;
- private BasicStream _os;
- private boolean _sent;
- private Ice.Exception _exception;
-
- private static final int StateUnsent = 0;
- private static final int StateInProgress = 1;
- private static final int StateOK = 2;
- private static final int StateUserException = 3;
- private static final int StateLocalException = 4;
- private static final int StateFailed = 5;
- private int _state;
-
- private InvocationObserver _observer;
- private ChildInvocationObserver _childObserver;
-
- public Outgoing next; // For use by Ice.ObjectPrxHelperBase
-
- private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
-}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index dbb6ac37ab9..0618ffb93e9 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,23 +9,33 @@
package IceInternal;
-public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessageCallback, Runnable
+public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback
{
public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb)
{
- super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase)prx).__reference().getInstance(), operation, cb);
- _proxy = (Ice.ObjectPrxHelperBase)prx;
+ super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb);
+ _proxy = (Ice.ObjectPrxHelperBase) prx;
_encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
}
-
- public void __prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
- boolean explicitCtx)
+
+ public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb, IceInternal.BasicStream is,
+ IceInternal.BasicStream os)
+ {
+ super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb,
+ is, os);
+ _proxy = (Ice.ObjectPrxHelperBase) prx;
+ _encoding = Protocol.getCompatibleEncoding(_proxy.__reference().getEncoding());
+ }
+
+ public void prepare(String operation, Ice.OperationMode mode, java.util.Map<String, String> ctx,
+ boolean explicitCtx, boolean synchronous)
{
_handler = null;
_cnt = 0;
_sent = false;
_mode = mode;
_sentSynchronously = false;
+ _synchronous = synchronous;
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(_proxy.__reference().getProtocol()));
@@ -36,15 +46,47 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_observer = ObserverHelper.get(_proxy, operation, ctx);
- //
- // Can't call async via a batch proxy.
- //
- if(_proxy.ice_isBatchOneway() || _proxy.ice_isBatchDatagram())
+ switch(_proxy.__reference().getMode())
{
- throw new Ice.FeatureNotSupportedException("can't send batch requests with AMI");
- }
+ case Reference.ModeTwoway:
+ case Reference.ModeOneway:
+ case Reference.ModeDatagram:
+ {
+ _os.writeBlob(IceInternal.Protocol.requestHdr);
+ break;
+ }
- _os.writeBlob(IceInternal.Protocol.requestHdr);
+ case Reference.ModeBatchOneway:
+ case Reference.ModeBatchDatagram:
+ {
+ while(true)
+ {
+ try
+ {
+ _handler = _proxy.__getRequestHandler();
+ _handler.prepareBatchRequest(_os);
+ break;
+ }
+ catch(RetryException ex)
+ {
+ // Clear request handler and retry.
+ _proxy.__setRequestHandler(_handler, null);
+ }
+ catch(Ice.LocalException ex)
+ {
+ if(_observer != null)
+ {
+ _observer.failed(ex.ice_name());
+ }
+ // Clear request handler
+ _proxy.__setRequestHandler(_handler, null);
+ _handler = null;
+ throw ex;
+ }
+ }
+ break;
+ }
+ }
Reference ref = _proxy.__reference();
@@ -66,7 +108,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_os.writeString(operation);
- _os.writeByte((byte)mode.value());
+ _os.writeByte((byte) mode.value());
if(ctx != null)
{
@@ -94,39 +136,44 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
}
- @Override public Ice.ObjectPrx
- getProxy()
+ @Override
+ public Ice.ObjectPrx getProxy()
{
return _proxy;
}
@Override
- public int
- __send(Ice.ConnectionI connection, boolean compress, boolean response)
- throws RetryException
+ public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
_cachedConnection = connection;
return connection.sendAsyncRequest(this, compress, response);
}
@Override
- public int
- __invokeCollocated(CollocatedRequestHandler handler)
+ public int invokeCollocated(CollocatedRequestHandler handler)
{
- return handler.invokeAsyncRequest(this);
+ // The BasicStream cannot be cached if background io is enabled,
+ // the proxy is not a twoway or there is an invocation timeout set.
+ if(_proxy.__reference().getInstance().queueRequests() || !_proxy.ice_isTwoway() ||
+ _proxy.__reference().getInvocationTimeout() > 0)
+ {
+ // Disable caching by marking the streams as cached!
+ _state |= StateCachedBuffers;
+ }
+ handler.invokeAsyncRequest(this, _synchronous);
+ return AsyncStatus.Queued;
}
@Override
- public boolean
- __sent()
+ public boolean sent()
{
synchronized(_monitor)
{
- boolean alreadySent = (_state & Sent) != 0;
- _state |= Sent;
+ boolean alreadySent = (_state & StateSent) != 0;
+ _state |= StateSent;
_sent = true;
- assert((_state & Done) == 0);
+ assert ((_state & StateDone) == 0);
if(!_proxy.ice_isTwoway())
{
@@ -135,34 +182,40 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_childObserver.detach();
_childObserver = null;
}
+ if(_observer != null && (_callback == null || !_callback.__hasSentCallback()))
+ {
+ _observer.detach();
+ _observer = null;
+ }
if(_timeoutRequestHandler != null)
{
_future.cancel(false);
_future = null;
_timeoutRequestHandler = null;
}
- _state |= Done | OK;
- //_os.resize(0, false); // Don't clear the buffer now, it's needed for the collocation optimization
+ _state |= StateDone | StateOK;
+ // _os.resize(0, false); // Don't clear the buffer now, it's
+ // needed for the collocation optimization
}
_monitor.notifyAll();
- return !alreadySent; // Don't call the sent call is already sent.
+
+ // Don't call the sent call is already sent.
+ return !alreadySent && _callback != null && _callback.__hasSentCallback();
}
}
@Override
- public void
- __invokeSent()
+ public void invokeSent()
{
- __invokeSentInternal();
+ invokeSentInternal();
}
@Override
- public void
- __finished(Ice.Exception exc)
+ public void finished(Ice.Exception exc)
{
synchronized(_monitor)
{
- assert((_state & Done) == 0);
+ assert ((_state & StateDone) == 0);
if(_childObserver != null)
{
_childObserver.failed(exc.ice_name());
@@ -178,8 +231,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
//
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback.
+ // NOTE: at this point, synchronization isn't needed, no other threads
+ // should be calling on the callback.
//
try
{
@@ -188,41 +241,37 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
return; // Can't be retried immediately.
}
- __invoke(false); // Retry the invocation
+ invoke(false); // Retry the invocation
}
catch(Ice.Exception ex)
{
- __invokeException(ex);
+ invokeException(ex);
}
}
@Override
- public void
- __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
+ public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
- threadPool.dispatch(
- new DispatchWorkItem(connection)
+ threadPool.dispatch(new DispatchWorkItem(connection)
+ {
+ @Override
+ public void run()
{
- @Override
- public void
- run()
- {
- OutgoingAsync.this.__finished(ex);
- }
- });
+ OutgoingAsync.this.finished(ex);
+ }
+ });
}
- public final void
- __finished(BasicStream is)
+ public final void finished(BasicStream is)
{
- assert(_proxy.ice_isTwoway()); // Can only be called for twoways.
+ assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
try
{
synchronized(_monitor)
{
- assert(_exception == null && (_state & Done) == 0);
+ assert (_exception == null && (_state & StateDone) == 0);
if(_childObserver != null)
{
_childObserver.reply(is.size() - Protocol.headerSize - 4);
@@ -237,7 +286,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_timeoutRequestHandler = null;
}
- if(_is == null) // _is can already be initialized if the invocation is retried
+ // _is can already be initialized if the invocation is retried
+ if(_is == null)
{
_is = new IceInternal.BasicStream(_instance, IceInternal.Protocol.currentProtocolEncoding);
}
@@ -290,29 +340,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
Ice.RequestFailedException ex = null;
switch(replyStatus)
{
- case ReplyStatus.replyObjectNotExist:
- {
- ex = new Ice.ObjectNotExistException();
- break;
- }
+ case ReplyStatus.replyObjectNotExist:
+ {
+ ex = new Ice.ObjectNotExistException();
+ break;
+ }
- case ReplyStatus.replyFacetNotExist:
- {
- ex = new Ice.FacetNotExistException();
- break;
- }
+ case ReplyStatus.replyFacetNotExist:
+ {
+ ex = new Ice.FacetNotExistException();
+ break;
+ }
- case ReplyStatus.replyOperationNotExist:
- {
- ex = new Ice.OperationNotExistException();
- break;
- }
+ case ReplyStatus.replyOperationNotExist:
+ {
+ ex = new Ice.OperationNotExistException();
+ break;
+ }
- default:
- {
- assert(false);
- break;
- }
+ default:
+ {
+ assert (false);
+ break;
+ }
}
ex.id = id;
@@ -330,29 +380,29 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
Ice.UnknownException ex = null;
switch(replyStatus)
{
- case ReplyStatus.replyUnknownException:
- {
- ex = new Ice.UnknownException();
- break;
- }
+ case ReplyStatus.replyUnknownException:
+ {
+ ex = new Ice.UnknownException();
+ break;
+ }
- case ReplyStatus.replyUnknownLocalException:
- {
- ex = new Ice.UnknownLocalException();
- break;
- }
+ case ReplyStatus.replyUnknownLocalException:
+ {
+ ex = new Ice.UnknownLocalException();
+ break;
+ }
- case ReplyStatus.replyUnknownUserException:
- {
- ex = new Ice.UnknownUserException();
- break;
- }
+ case ReplyStatus.replyUnknownUserException:
+ {
+ ex = new Ice.UnknownUserException();
+ break;
+ }
- default:
- {
- assert(false);
- break;
- }
+ default:
+ {
+ assert (false);
+ break;
+ }
}
ex.unknown = unknown;
@@ -365,34 +415,48 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
}
- _state |= Done;
- _os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ _state |= StateDone;
+ // Clear buffer now, instead of waiting for AsyncResult
+ // deallocation
+ // _os.resize(0, false);
if(replyStatus == ReplyStatus.replyOK)
{
- _state |= OK;
+ _state |= StateOK;
}
_monitor.notifyAll();
}
}
catch(Ice.LocalException ex)
{
- __finished(ex);
+ finished(ex);
return;
}
- assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
- __invokeCompleted();
+ assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
+ invokeCompleted();
}
- public final boolean
- __invoke(boolean synchronous)
+ public final boolean invoke(boolean synchronous)
{
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
+ {
+ _state |= StateDone | StateOK;
+ _handler.finishBatchRequest(_os);
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return true;
+ }
+
while(true)
{
- _handler = _proxy.__getRequestHandler();
try
{
_sent = false;
+ _handler = _proxy.__getRequestHandler();
int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
{
@@ -401,29 +465,36 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_sentSynchronously = true;
if((status & AsyncStatus.InvokeSentCallback) > 0)
{
- __invokeSent(); // Call from the user thread.
+ invokeSent(); // Call from the user thread.
}
}
else
{
if((status & AsyncStatus.InvokeSentCallback) > 0)
{
- __invokeSentAsync(); // Call from a client thread pool thread.
+ // Call from a client thread pool thread.
+ invokeSentAsync();
}
}
}
- if(_proxy.ice_isTwoway() || (status & AsyncStatus.Sent) == 0)
+ if(mode == IceInternal.Reference.ModeTwoway || (status & AsyncStatus.Sent) == 0)
{
synchronized(_monitor)
{
- if((_state & Done) == 0)
+ if((_state & StateDone) == 0)
{
int invocationTimeout = _handler.getReference().getInvocationTimeout();
if(invocationTimeout > 0)
{
- _future = _instance.timer().schedule(this, invocationTimeout,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ _future = _instance.timer().schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ timeout();
+ }
+ }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
_timeoutRequestHandler = _handler;
}
}
@@ -431,22 +502,15 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
break;
}
- catch(Ice.OperationInterruptedException ex)
- {
- //
- // Clear the request handler, and cancel the outgoing request.
- //
- _proxy.__setRequestHandler(_handler, null);
- _handler.asyncRequestCanceled(this, ex);
- break;
- }
catch(RetryException ex)
{
- _proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
+ // Clear request handler and retry.
+ _proxy.__setRequestHandler(_handler, null);
}
catch(Ice.Exception ex)
{
- if(!handleException(ex)) // This will throw if the invocation can't be retried.
+ // This will throw if the invocation can't be retried.
+ if(!handleException(ex))
{
break; // Can't be retried immediately.
}
@@ -455,27 +519,23 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
return _sentSynchronously;
}
- public BasicStream
- __startWriteParams(Ice.FormatType format)
+ public BasicStream startWriteParams(Ice.FormatType format)
{
_os.startWriteEncaps(_encoding, format);
return _os;
}
- public void
- __endWriteParams()
+ public void endWriteParams()
{
_os.endWriteEncaps();
}
- public void
- __writeEmptyParams()
+ public void writeEmptyParams()
{
_os.writeEmptyEncaps(_encoding);
}
- public void
- __writeParamEncaps(byte[] encaps)
+ public void writeParamEncaps(byte[] encaps)
{
if(encaps == null || encaps.length == 0)
{
@@ -486,22 +546,50 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
_os.writeEncaps(encaps);
}
}
-
- BasicStream
- __getIs()
+
+ public void cacheMessageBuffers()
{
- return _is;
- }
+ if(_proxy.__reference().getInstance().cacheMessageBuffers() > 0)
+ {
+ synchronized(_monitor)
+ {
+ if((_state & StateCachedBuffers) > 0) {
+ return;
+ }
+ _state |= StateCachedBuffers;
+ }
+ if(_is != null)
+ {
+ _is.reset();
+ }
+ _os.reset();
+
+ _proxy.cacheMessageBuffers(_is, _os);
+ }
+ }
+
@Override
- public void
- run()
+ public void invokeExceptionAsync(final Ice.Exception ex)
{
- __runTimerTask();
- }
+ if((_state & StateDone) == 0 && _handler != null)
+ {
+ //
+ // If we didn't finish a batch oneway or datagram request, we
+ // must notify the connection about that we give up ownership
+ // of the batch stream.
+ //
+ int mode = _proxy.__reference().getMode();
+ if(mode == Reference.ModeBatchOneway || mode == Reference.ModeBatchDatagram)
+ {
+ _handler.abortBatchRequest();
+ }
+ }
- private boolean
- handleException(Ice.Exception exc)
+ super.invokeExceptionAsync(ex);
+ }
+
+ private boolean handleException(Ice.Exception exc)
{
try
{
@@ -514,7 +602,8 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
if(interval.value > 0)
{
_instance.retryQueue().add(this, interval.value);
- return false; // Don't retry immediately, the retry queue will take care of the retry.
+ return false; // Don't retry immediately, the retry queue will
+ // take care of the retry.
}
else
{
@@ -538,6 +627,11 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
private int _cnt;
private Ice.OperationMode _mode;
private boolean _sent;
+ //
+ // If true this AMI request is being used for a generated synchronous invocation.
+ //
+ private boolean _synchronous;
+
private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
}
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index 6c2259b35be..95c5fd2bb45 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -18,32 +18,33 @@ public interface OutgoingAsyncMessageCallback
//
// Called by the request handler to send the request over the connection.
//
- int __send(Ice.ConnectionI conection, boolean compress, boolean response)
+ int send(Ice.ConnectionI conection, boolean compress, boolean response)
throws RetryException;
- int __invokeCollocated(CollocatedRequestHandler handler);
+ int invokeCollocated(CollocatedRequestHandler handler);
//
- // Called by the connection when the message is confirmed sent. The connection is locked
- // when this is called so this method can call the sent callback. Instead, this method
- // returns true if there's a sent callback and false otherwise. If true is returned, the
- // connection will call the __invokeSent() method bellow (which in turn should call the
- // sent callback).
+ // Called by the connection when the message is confirmed sent. The
+ // connection is locked when this is called so this method can't call the
+ // sent callback. Instead, this method returns true if there's a sent
+ // callback and false otherwise. If true is returned, the connection will
+ // call the __invokeSent() method bellow (which in turn should call the sent
+ // callback).
//
- boolean __sent();
+ boolean sent();
//
// Called by the connection to call the user sent callback.
//
- void __invokeSent();
+ void invokeSent();
//
// Called by the connection when the request failed.
//
- void __finished(Ice.Exception ex);
+ void finished(Ice.Exception ex);
//
// Helper to dispatch the cancellation exception.
//
- void __dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection);
+ void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection);
}
diff --git a/java/src/IceInternal/OutgoingMessageCallback.java b/java/src/IceInternal/OutgoingMessageCallback.java
deleted file mode 100644
index 1a8426b2e19..00000000000
--- a/java/src/IceInternal/OutgoingMessageCallback.java
+++ /dev/null
@@ -1,22 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-package IceInternal;
-
-public interface OutgoingMessageCallback
-{
- boolean send(Ice.ConnectionI conection, boolean compress, boolean response)
- throws RetryException;
-
- void invokeCollocated(CollocatedRequestHandler handler);
-
- void sent();
-
- void finished(Ice.Exception ex);
-}
diff --git a/java/src/IceInternal/Protocol.java b/java/src/IceInternal/Protocol.java
index a129e406a22..6e68fd1cf7f 100644
--- a/java/src/IceInternal/Protocol.java
+++ b/java/src/IceInternal/Protocol.java
@@ -181,12 +181,6 @@ final public class Protocol
}
static public boolean
- isSupported(Ice.ProtocolVersion version, Ice.ProtocolVersion supported)
- {
- return version.major == supported.major && version.minor <= supported.minor;
- }
-
- static public boolean
isSupported(Ice.EncodingVersion version, Ice.EncodingVersion supported)
{
return version.major == supported.major && version.minor <= supported.minor;
diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
index 4f24a1919e6..009319110fe 100644
--- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
@@ -32,20 +32,26 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
_sentSynchronously = true;
if((status & AsyncStatus.InvokeSentCallback) > 0)
{
- __invokeSent();
+ invokeSent();
}
}
else
{
synchronized(_monitor)
{
- if((_state & Done) == 0)
+ if((_state & StateDone) == 0)
{
int invocationTimeout = handler.getReference().getInvocationTimeout();
if(invocationTimeout > 0)
{
- _future = _instance.timer().schedule(this, invocationTimeout,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ _future = _instance.timer().schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ timeout();
+ }
+ }, invocationTimeout, java.util.concurrent.TimeUnit.MILLISECONDS);
_timeoutRequestHandler = handler;
}
}
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
index 6c2a5b55f2c..f9d90e3e72e 100644
--- a/java/src/IceInternal/QueueRequestHandler.java
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -21,12 +21,9 @@ import Ice.ConnectionI;
public class QueueRequestHandler implements RequestHandler
{
public
- QueueRequestHandler(Instance instance, RequestHandler delegate) {
+ QueueRequestHandler(Instance instance, RequestHandler delegate)
+ {
_executor = instance.getQueueExecutor();
- if(_executor == null)
- {
- throw new CommunicatorDestroyedException();
- }
assert(delegate != null);
_delegate = delegate;
}
@@ -37,7 +34,8 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>() {
+ Future<Void> future = _executor.submit(new Callable<Void>()
+ {
@Override
public Void call() throws RetryException
{
@@ -52,11 +50,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -83,7 +81,8 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>() {
+ Future<Void> future = _executor.submit(new Callable<Void>()
+ {
@Override
public Void call()
{
@@ -97,11 +96,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -124,7 +123,8 @@ public class QueueRequestHandler implements RequestHandler
{
try
{
- Future<Void> future = _executor.submit(new Callable<Void>() {
+ Future<Void> future = _executor.submit(new Callable<Void>()
+ {
@Override
public Void call()
{
@@ -138,11 +138,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -160,74 +160,42 @@ public class QueueRequestHandler implements RequestHandler
}
@Override
- public boolean
- sendRequest(final OutgoingMessageCallback out) throws RetryException
+ public int
+ sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException
{
try
{
- Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ Future<Integer> future = _executor.submit(new Callable<Integer>()
+ {
@Override
- public Boolean call() throws RetryException
+ public Integer call() throws RetryException
{
- return _delegate.sendRequest(out);
+ return _delegate.sendAsyncRequest(out);
}
});
return future.get();
}
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
catch(RejectedExecutionException e)
{
throw new CommunicatorDestroyedException();
}
- catch (ExecutionException e)
+ catch(InterruptedException e)
{
+ // If the request cannot be canceled (or is itself interrupted) then
+ // restore the interrupt state.
try
{
- throw e.getCause();
- }
- catch(RetryException ex)
- {
- throw ex;
- }
- catch(RuntimeException ex)
- {
- throw ex;
+ if(!asyncRequestCanceled(out, new Ice.OperationInterruptedException()))
+ {
+ Thread.currentThread().interrupt();
+ }
}
- catch(Throwable ex)
+ catch(Ice.OperationInterruptedException ex)
{
- assert(false);
+ Thread.currentThread().interrupt();
}
}
- return false;
- }
-
- @Override
- public int
- sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException
- {
- try
- {
- Future<Integer> future = _executor.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws RetryException
- {
- return _delegate.sendAsyncRequest(out);
- }
- });
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -251,52 +219,12 @@ public class QueueRequestHandler implements RequestHandler
@Override
public boolean
- requestCanceled(final OutgoingMessageCallback out, final Ice.LocalException ex)
- {
- try
- {
- Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call()
- {
- return _delegate.requestCanceled(out, ex);
- }
- });
- return future.get();
- }
- catch(RejectedExecutionException e)
- {
- throw new CommunicatorDestroyedException();
- }
- catch (InterruptedException e)
- {
- throw new Ice.OperationInterruptedException();
- }
- catch (ExecutionException e)
- {
- try
- {
- throw e.getCause();
- }
- catch(RuntimeException exc)
- {
- throw exc;
- }
- catch(Throwable exc)
- {
- assert(false);
- }
- }
- return false;
- }
-
- @Override
- public boolean
asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex)
{
try
{
- Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ Future<Boolean> future = _executor.submit(new Callable<Boolean>()
+ {
@Override
public Boolean call()
{
@@ -309,11 +237,11 @@ public class QueueRequestHandler implements RequestHandler
{
throw new CommunicatorDestroyedException();
}
- catch (InterruptedException e)
+ catch(InterruptedException e)
{
throw new Ice.OperationInterruptedException();
}
- catch (ExecutionException e)
+ catch(ExecutionException e)
{
try
{
@@ -347,7 +275,7 @@ public class QueueRequestHandler implements RequestHandler
@Override
public ConnectionI
- waitForConnection() throws InterruptedException
+ waitForConnection() throws InterruptedException, RetryException
{
return _delegate.waitForConnection();
}
diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java
index 894c986a84d..cff4cd6a8a4 100644
--- a/java/src/IceInternal/RequestHandler.java
+++ b/java/src/IceInternal/RequestHandler.java
@@ -16,19 +16,14 @@ public interface RequestHandler
void finishBatchRequest(BasicStream out);
void abortBatchRequest();
- boolean sendRequest(OutgoingMessageCallback out)
- throws RetryException;
-
int sendAsyncRequest(OutgoingAsyncMessageCallback out)
throws RetryException;
- boolean requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex);
boolean asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex);
Reference getReference();
Ice.ConnectionI getConnection();
Ice.ConnectionI waitForConnection()
- throws InterruptedException;
-
+ throws InterruptedException, RetryException;
}
diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java
index 850aea259be..e4f3a6c26fe 100644
--- a/java/src/IceInternal/RetryTask.java
+++ b/java/src/IceInternal/RetryTask.java
@@ -25,11 +25,11 @@ class RetryTask implements Runnable
{
try
{
- _outAsync.__invoke(false);
+ _outAsync.invoke(false);
}
catch(Ice.LocalException ex)
{
- _outAsync.__invokeExceptionAsync(ex);
+ _outAsync.invokeExceptionAsync(ex);
}
}
}
@@ -38,7 +38,7 @@ class RetryTask implements Runnable
destroy()
{
_future.cancel(false);
- _outAsync.__invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ _outAsync.invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
}
public void setFuture(java.util.concurrent.Future<?> future)
diff --git a/java/src/IceInternal/RouterInfo.java b/java/src/IceInternal/RouterInfo.java
index f6c13dbfd00..5eecff2298c 100644
--- a/java/src/IceInternal/RouterInfo.java
+++ b/java/src/IceInternal/RouterInfo.java
@@ -133,24 +133,6 @@ public final class RouterInfo
return setServerEndpoints(_router.getServerProxy());
}
- public void
- addProxy(Ice.ObjectPrx proxy)
- {
- assert(proxy != null);
- synchronized(this)
- {
- if(_identities.contains(proxy.ice_getIdentity()))
- {
- //
- // Only add the proxy to the router if it's not already in our local map.
- //
- return;
- }
- }
-
- addAndEvictProxies(proxy, _router.addProxies(new Ice.ObjectPrx[] { proxy }));
- }
-
public boolean
addProxy(final Ice.ObjectPrx proxy, final AddProxyCallback callback)
{
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 56836072f54..c5c26eea65a 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -389,12 +389,6 @@ public final class ThreadPool
_selector.destroy();
}
- public String
- prefix()
- {
- return _prefix;
- }
-
private void
run(EventHandlerThread thread)
{
diff --git a/java/src/IceInternal/TwowayCallback.java b/java/src/IceInternal/TwowayCallback.java
index 86c6afb6202..55c9044e6d6 100644
--- a/java/src/IceInternal/TwowayCallback.java
+++ b/java/src/IceInternal/TwowayCallback.java
@@ -26,4 +26,10 @@ public abstract class TwowayCallback extends CallbackBase implements Ice.TwowayC
{
sent(__result.sentSynchronously());
}
+
+ @Override
+ public final boolean __hasSentCallback()
+ {
+ return true;
+ }
};
diff --git a/java/src/IceInternal/Util.java b/java/src/IceInternal/Util.java
index 647262fe90b..1e32d032771 100644
--- a/java/src/IceInternal/Util.java
+++ b/java/src/IceInternal/Util.java
@@ -14,7 +14,8 @@ import java.util.concurrent.ThreadFactory;
public final class Util
{
static String
- createThreadName(final Ice.Properties properties, final String name) {
+ createThreadName(final Ice.Properties properties, final String name)
+ {
String threadName = properties.getProperty("Ice.ProgramName");
if(threadName.length() > 0)
{
@@ -26,7 +27,8 @@ public final class Util
}
static ThreadFactory
- createThreadFactory(final Ice.Properties properties, final String name) {
+ createThreadFactory(final Ice.Properties properties, final String name)
+ {
return new java.util.concurrent.ThreadFactory()
{
@Override
diff --git a/java/test/Ice/adapterDeactivation/Collocated.java b/java/test/Ice/adapterDeactivation/Collocated.java
index 9f8f90bdda6..77c320bc3d3 100644
--- a/java/test/Ice/adapterDeactivation/Collocated.java
+++ b/java/test/Ice/adapterDeactivation/Collocated.java
@@ -28,6 +28,10 @@ public class Collocated extends test.Util.Application
{
Ice.InitializationData initData = new Ice.InitializationData();
initData.properties = Ice.Util.createProperties(argsH);
+ if(initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0)
+ {
+ initData.properties.setProperty("Ice.ThreadPool.Server.Size", "2");
+ }
initData.properties.setProperty("Ice.Package.Test", "test.Ice.adapterDeactivation");
initData.properties.setProperty("TestAdapter.Endpoints", "default -p 12010");
return initData;
diff --git a/java/test/Ice/exceptions/AllTests.java b/java/test/Ice/exceptions/AllTests.java
index 282add94c37..f11f576c2fb 100644
--- a/java/test/Ice/exceptions/AllTests.java
+++ b/java/test/Ice/exceptions/AllTests.java
@@ -1,6 +1,5 @@
// **********************************************************************
//
-// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
@@ -2281,18 +2280,18 @@ public class AllTests
{
final Callback_Thrower_throwLocalExceptionI cb = new Callback_Thrower_throwLocalExceptionI();
thrower.begin_throwLocalExceptionIdempotent(new Callback_Thrower_throwLocalExceptionIdempotent()
- {
- @Override
- public void response()
- {
- cb.response();
- }
-
- @Override
- public void exception(Ice.LocalException exc)
- {
- cb.exception(exc);
- }
+ {
+ @Override
+ public void response()
+ {
+ cb.response();
+ }
+
+ @Override
+ public void exception(Ice.LocalException exc)
+ {
+ cb.exception(exc);
+ }
});
cb.check();
}
diff --git a/java/test/Ice/interrupt/AllTests.java b/java/test/Ice/interrupt/AllTests.java
index 1c205b80ac8..0467444cfb9 100644
--- a/java/test/Ice/interrupt/AllTests.java
+++ b/java/test/Ice/interrupt/AllTests.java
@@ -56,31 +56,7 @@ public class AllTests
notify();
}
- public synchronized void checkResponse()
- {
- while(!_response)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
- _response = false;
- }
-
- public synchronized void response()
- {
- assert(!_response);
- _response = true;
- notify();
- }
-
private boolean _called = false;
- private boolean _response = false;
}
private static void
@@ -91,6 +67,18 @@ public class AllTests
throw new RuntimeException();
}
}
+
+ private static void failIfNotInterrupted()
+ {
+ if(Thread.currentThread().isInterrupted())
+ {
+ Thread.interrupted();
+ }
+ else
+ {
+ test(false);
+ }
+ }
public static void
allTests(test.Util.Application app, Ice.Communicator communicator, PrintWriter out)
@@ -232,48 +220,52 @@ public class AllTests
test(false);
}
- //
- // Test interrupt of waitForSent. Here hold the adapter and send a large payload. The
- // thread is interrupted in 500ms which should result in a operation interrupted exception.
- //
- executor.submit(new Runnable() {
- @Override
- public void run()
- {
- try
- {
- Thread.sleep(500);
- }
- catch(InterruptedException e)
+ // This section of the test doesn't run when collocated.
+ if(p.ice_getConnection() != null)
+ {
+ //
+ // Test interrupt of waitForSent. Here hold the adapter and send a large payload. The
+ // thread is interrupted in 500ms which should result in a operation interrupted exception.
+ //
+ executor.submit(new Runnable() {
+ @Override
+ public void run()
{
- test(false);
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch(InterruptedException e)
+ {
+ test(false);
+ }
+ mainThread.interrupt();
}
- mainThread.interrupt();
- }
- });
+ });
- testController.holdAdapter();
- Ice.AsyncResult r = null;
- try
- {
- // The sequence needs to be large enough to fill the write/recv buffers
- byte[] seq = new byte[20000000];
- r = p.begin_opWithPayload(seq);
+ testController.holdAdapter();
+ Ice.AsyncResult r = null;
+ try
+ {
+ // The sequence needs to be large enough to fill the write/recv buffers
+ byte[] seq = new byte[20000000];
+ r = p.begin_opWithPayload(seq);
+ r.waitForSent();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+ //
+ // Resume the adapter.
+ //
+ testController.resumeAdapter();
+
r.waitForSent();
- test(false);
- }
- catch(Ice.OperationInterruptedException ex)
- {
- // Expected
+ r.waitForCompleted();
+ p.end_opWithPayload(r);
}
- //
- // Resume the adapter.
- //
- testController.resumeAdapter();
-
- r.waitForSent();
- r.waitForCompleted();
- p.end_opWithPayload(r);
//
// The executor is all done.
@@ -297,7 +289,7 @@ public class AllTests
try
{
ic.destroy();
- test(false);
+ failIfNotInterrupted();
}
catch(Ice.OperationInterruptedException ex)
{
diff --git a/java/test/Ice/interrupt/Collocated.java b/java/test/Ice/interrupt/Collocated.java
new file mode 100644
index 00000000000..a377024f44b
--- /dev/null
+++ b/java/test/Ice/interrupt/Collocated.java
@@ -0,0 +1,72 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.interrupt;
+
+public class Collocated extends test.Util.Application
+{
+ @Override
+ public int run(String[] args)
+ {
+ Ice.Communicator communicator = communicator();
+ Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TestAdapter");
+ Ice.ObjectAdapter adapter2 = communicator().createObjectAdapter("ControllerAdapter");
+ TestControllerI controller = new TestControllerI(adapter);
+ adapter.add(new TestI(controller), communicator().stringToIdentity("test"));
+ adapter.activate();
+ adapter2.add(controller, communicator().stringToIdentity("testController"));
+ adapter2.activate();
+
+ try
+ {
+ AllTests.allTests(this, communicator(), getWriter());
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException();
+ }
+
+ return 0;
+ }
+
+ @Override
+ protected Ice.InitializationData getInitData(Ice.StringSeqHolder argsH)
+ {
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = Ice.Util.createProperties(argsH);
+ initData.properties.setProperty("Ice.Package.Test", "test.Ice.interrupt");
+ //
+ // We need to enable the background IO so that Ice is interrupt
+ // safe for this test.
+ //
+ initData.properties.setProperty("Ice.BackgroundIO", "1");
+ //
+ // We need to send messages large enough to cause the transport
+ // buffers to fill up.
+ //
+ initData.properties.setProperty("Ice.MessageSizeMax", "20000");
+
+ initData.properties.setProperty("TestAdapter.Endpoints", "default -p 12010");
+ initData.properties.setProperty("ControllerAdapter.Endpoints", "tcp -p 12011");
+ initData.properties.setProperty("ControllerAdapter.ThreadPool.Size", "1");
+
+ return initData;
+ }
+
+ public static void main(String[] args)
+ {
+ Collocated c = new Collocated();
+ int status = c.main("Collocated", args);
+
+ System.gc();
+ System.exit(status);
+ }
+
+}
diff --git a/java/test/Ice/interrupt/run.py b/java/test/Ice/interrupt/run.py
index 3cbc4137c5e..012b74ea130 100644
--- a/java/test/Ice/interrupt/run.py
+++ b/java/test/Ice/interrupt/run.py
@@ -22,3 +22,6 @@ import TestUtil
print("tests with regular server.")
TestUtil.clientServerTest()
+
+print("tests with collocated server.")
+TestUtil.collocatedTest() \ No newline at end of file
diff --git a/java/test/Ice/metrics/AllTests.java b/java/test/Ice/metrics/AllTests.java
index 1a8724a91ab..c24a65b3f9e 100644
--- a/java/test/Ice/metrics/AllTests.java
+++ b/java/test/Ice/metrics/AllTests.java
@@ -143,17 +143,20 @@ public class AllTests
_serverProps = serverProps;
}
- public synchronized void
+ public void
waitForUpdate()
{
- while(!_updated)
+ synchronized(this)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
+ while(!_updated)
{
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
}
}
// Ensure that the previous updates were committed, the setProperties call returns before
@@ -161,7 +164,10 @@ public class AllTests
// a second time, this will block until all the notifications from the first update have
// completed.
_serverProps.setProperties(new java.util.HashMap<String, String>());
- _updated = false;
+ synchronized(this)
+ {
+ _updated = false;
+ }
}
@Override
@@ -418,6 +424,12 @@ public class AllTests
MetricsPrx metrics = MetricsPrxHelper.checkedCast(communicator.stringToProxy("metrics:default -p 12010"));
boolean collocated = metrics.ice_getConnection() == null;
+ int threadCount = 3;
+ if(collocated && communicator.getProperties().getPropertyAsInt("Ice.BackgroundIO") > 0)
+ {
+ threadCount = 5;
+ }
+
out.print("testing metrics admin facet checkedCast... ");
out.flush();
Ice.ObjectPrx admin = communicator.getAdmin();
@@ -449,7 +461,8 @@ public class AllTests
test(view.get("Connection").length == 1 && view.get("Connection")[0].current == 1 &&
view.get("Connection")[0].total == 1);
}
- test(view.get("Thread").length == 1 && view.get("Thread")[0].current == 3 && view.get("Thread")[0].total == 3);
+ test(view.get("Thread").length == 1 && view.get("Thread")[0].current == threadCount &&
+ view.get("Thread")[0].total == threadCount);
out.println("ok");
out.print("testing group by id...");
@@ -465,7 +478,7 @@ public class AllTests
metrics.ice_connectionId("Con1").ice_ping();
view = clientMetrics.getMetricsView("View", timestamp);
- test(view.get("Thread").length == 3);
+ test(view.get("Thread").length == threadCount);
if(!collocated)
{
test(view.get("Connection").length == 2);
@@ -711,17 +724,20 @@ public class AllTests
checkFailure(clientMetrics, "ConnectionEstablishment", m1.id, "Ice::ConnectTimeoutException", 2, out);
Connect c = new Connect(metrics);
- testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "parent", "Communicator", c, out);
- testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "id", "127.0.0.1:12010", c, out);
+ testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "parent", "Communicator", c,
+ out);
+ testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "id", "127.0.0.1:12010", c,
+ out);
testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpoint",
"tcp -h 127.0.0.1 -p 12010 -t 60000", c, out);
testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointType", "1", c, out);
- testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointIsDatagram", "false", c,
- out);
+ testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointIsDatagram", "false",
+ c, out);
testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointIsSecure", "false", c,
out);
- testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointTimeout", "60000", c, out);
+ testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointTimeout", "60000", c,
+ out);
testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointCompress", "false", c,
out);
testAttribute(clientMetrics, clientProps, update, "ConnectionEstablishment", "endpointHost", "127.0.0.1", c,
@@ -921,6 +937,9 @@ public class AllTests
Callback cb = new Callback();
+ //
+ // Twoway tests
+ //
metrics.op();
metrics.end_op(metrics.begin_op());
metrics.begin_op(cb);
@@ -1096,6 +1115,54 @@ public class AllTests
testAttribute(clientMetrics, clientProps, update, "Invocation", "context.entry2", "", op, out);
testAttribute(clientMetrics, clientProps, update, "Invocation", "context.entry3", "", op, out);
+ //
+ // Oneway tests
+ //
+ clearView(clientProps, serverProps, update);
+ props.put("IceMX.Metrics.View.Map.Invocation.GroupBy", "operation");
+ props.put("IceMX.Metrics.View.Map.Invocation.Map.Remote.GroupBy", "localPort");
+ updateProps(clientProps, serverProps, update, props, "Invocation");
+
+ MetricsPrx metricsOneway = (MetricsPrx)metrics.ice_oneway();
+ metricsOneway.op();
+ metricsOneway.end_op(metricsOneway.begin_op());
+ metricsOneway.begin_op(cb).waitForSent();
+
+ map = toMap(clientMetrics.getMetricsView("View", timestamp).get("Invocation"));
+ test(map.size() == 1);
+
+ im1 = (IceMX.InvocationMetrics)map.get("op");
+ test(im1.current <= 1 && im1.total == 3 && im1.failures == 0 && im1.retry == 0);
+ test(!collocated ? (im1.remotes.length == 1) : (im1.collocated.length == 1));
+ rim1 = (IceMX.ChildInvocationMetrics)(!collocated ? im1.remotes[0] : im1.collocated[0]);
+ test(rim1.current <= 1 && rim1.total == 3 && rim1.failures == 0);
+ test(rim1.size == 63 && rim1.replySize == 0);
+
+ testAttribute(clientMetrics, clientProps, update, "Invocation", "mode", "oneway", new InvokeOp(metricsOneway),
+ out);
+
+ //
+ // Batch oneway tests
+ //
+ props.put("IceMX.Metrics.View.Map.Invocation.GroupBy", "operation");
+ props.put("IceMX.Metrics.View.Map.Invocation.Map.Remote.GroupBy", "localPort");
+ updateProps(clientProps, serverProps, update, props, "Invocation");
+
+ MetricsPrx metricsBatchOneway = (MetricsPrx)metrics.ice_batchOneway();
+ metricsBatchOneway.op();
+ metricsBatchOneway.end_op(metricsBatchOneway.begin_op());
+ //metricsBatchOneway.begin_op(cb).waitForSent();
+
+ map = toMap(clientMetrics.getMetricsView("View", timestamp).get("Invocation"));
+ test(map.size() == 1);
+
+ im1 = (IceMX.InvocationMetrics)map.get("op");
+ test(im1.current == 0 && im1.total == 2 && im1.failures == 0 && im1.retry == 0);
+ test(im1.remotes.length == 0);
+
+ testAttribute(clientMetrics, clientProps, update, "Invocation", "mode", "batch-oneway",
+ new InvokeOp(metricsBatchOneway), out);
+
out.println("ok");
out.print("testing metrics view enable/disable...");
diff --git a/java/test/Ice/metrics/Collocated.java b/java/test/Ice/metrics/Collocated.java
index 69a8896e1b5..6fbfa9f4154 100644
--- a/java/test/Ice/metrics/Collocated.java
+++ b/java/test/Ice/metrics/Collocated.java
@@ -46,6 +46,13 @@ public class Collocated extends test.Util.Application
{
Ice.InitializationData initData = new Ice.InitializationData();
initData.properties = Ice.Util.createProperties(argsH);
+ if(initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0)
+ {
+ // With background IO, collocated invocations are
+ // dispatched on the server thread pool. This test needs
+ // at least 3 threads in the server thread pool to work.
+ initData.properties.setProperty("Ice.ThreadPool.Server.Size", "3");
+ }
initData.properties.setProperty("Ice.Package.Test", "test.Ice.metrics");
initData.properties.setProperty("Ice.Admin.Endpoints", "tcp");
initData.properties.setProperty("Ice.Admin.InstanceName", "client");
diff --git a/java/test/Ice/operations/AllTests.java b/java/test/Ice/operations/AllTests.java
index a22474bbfd6..63866c9e4c0 100644
--- a/java/test/Ice/operations/AllTests.java
+++ b/java/test/Ice/operations/AllTests.java
@@ -128,6 +128,11 @@ public class AllTests
BatchOneways.batchOneways(derived, out);
out.println("ok");
+ out.print("testing batch AMI oneway operations... ");
+ out.flush();
+ BatchOnewaysAMI.batchOneways(cl, out);
+ BatchOnewaysAMI.batchOneways(derived, out);
+ out.println("ok");
return cl;
}
}
diff --git a/java/test/Ice/operations/BatchOneways.java b/java/test/Ice/operations/BatchOneways.java
index 8415f3d6ece..fc20955e9d2 100644
--- a/java/test/Ice/operations/BatchOneways.java
+++ b/java/test/Ice/operations/BatchOneways.java
@@ -61,7 +61,6 @@ class BatchOneways
MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
batch.ice_flushBatchRequests();
- batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests());
for(int i = 0 ; i < 30 ; ++i)
{
diff --git a/java/test/Ice/operations/BatchOnewaysAMI.java b/java/test/Ice/operations/BatchOnewaysAMI.java
new file mode 100644
index 00000000000..f11df72b019
--- /dev/null
+++ b/java/test/Ice/operations/BatchOnewaysAMI.java
@@ -0,0 +1,208 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.operations;
+
+import java.io.PrintWriter;
+
+import Ice.LocalException;
+import test.Ice.operations.Test.Callback_MyClass_opByteSOneway;
+import test.Ice.operations.Test.MyClassPrx;
+import test.Ice.operations.Test.MyClassPrxHelper;
+
+class BatchOnewaysAMI
+{
+ private static class Callback
+ {
+ Callback()
+ {
+ _called = false;
+ }
+
+ public synchronized void check()
+ {
+ while(!_called)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ _called = false;
+ }
+
+ public synchronized void called()
+ {
+ assert (!_called);
+ _called = true;
+ notify();
+ }
+
+ private boolean _called;
+ }
+
+ private static void test(boolean b)
+ {
+ if(!b)
+ {
+ throw new RuntimeException();
+ }
+ }
+
+ static void batchOneways(MyClassPrx p, PrintWriter out)
+ {
+ final byte[] bs1 = new byte[10 * 1024];
+ final byte[] bs2 = new byte[99 * 1024];
+ final byte[] bs3 = new byte[100 * 1024];
+
+ final Callback cb = new Callback();
+ p.begin_opByteSOneway(bs1, new Callback_MyClass_opByteSOneway()
+ {
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(false);
+ }
+
+ @Override
+ public void response()
+ {
+ cb.called();
+ }
+ });
+ cb.check();
+ p.begin_opByteSOneway(bs2, new Callback_MyClass_opByteSOneway()
+ {
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(false);
+ }
+
+ @Override
+ public void response()
+ {
+ cb.called();
+ }
+ });
+ cb.check();
+
+ p.begin_opByteSOneway(bs3, new Callback_MyClass_opByteSOneway()
+ {
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(ex instanceof Ice.MemoryLimitException);
+ cb.called();
+ }
+
+ @Override
+ public void response()
+ {
+ test(false);
+ }
+ });
+ cb.check();
+
+ MyClassPrx batch = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
+ batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests());
+
+ for(int i = 0; i < 30; ++i)
+ {
+ batch.begin_opByteSOneway(bs1, new Callback_MyClass_opByteSOneway()
+ {
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(false);
+ }
+
+ @Override
+ public void response()
+ {
+ }
+ });
+ }
+
+ if(batch.ice_getConnection() != null)
+ {
+ batch.ice_getConnection().end_flushBatchRequests(batch.ice_getConnection().begin_flushBatchRequests());
+
+ MyClassPrx batch2 = MyClassPrxHelper.uncheckedCast(p.ice_batchOneway());
+
+ batch.begin_ice_ping();
+ batch2.begin_ice_ping();
+ batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests());
+ batch.ice_getConnection().close(false);
+ batch.begin_ice_ping();
+ batch2.begin_ice_ping();
+
+ batch.ice_getConnection();
+ batch2.ice_getConnection();
+
+ batch.begin_ice_ping();
+ batch.ice_getConnection().close(false);
+ batch.begin_ice_ping(new Ice.Callback_Object_ice_ping()
+ {
+
+ @Override
+ public void response()
+ {
+ test(false);
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(ex instanceof Ice.CloseConnectionException);
+ cb.called();
+ }
+
+ });
+ cb.check();
+ batch2.begin_ice_ping(new Ice.Callback_Object_ice_ping()
+ {
+
+ @Override
+ public void response()
+ {
+ test(false);
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(ex instanceof Ice.CloseConnectionException);
+ cb.called();
+ }
+
+ });
+ cb.check();
+ batch.begin_ice_ping();
+ batch2.begin_ice_ping();
+ }
+
+ Ice.Identity identity = new Ice.Identity();
+ identity.name = "invalid";
+ Ice.ObjectPrx batch3 = batch.ice_identity(identity);
+ batch3.begin_ice_ping();
+ batch3.end_ice_flushBatchRequests(batch3.begin_ice_flushBatchRequests());
+
+ // Make sure that a bogus batch request doesn't cause troubles to other
+ // ones.
+ batch3.begin_ice_ping();
+ batch.begin_ice_ping();
+ batch.end_ice_flushBatchRequests(batch.begin_ice_flushBatchRequests());
+ batch.begin_ice_ping();
+ }
+}
diff --git a/java/test/Ice/operations/Collocated.java b/java/test/Ice/operations/Collocated.java
index c34d07d5f11..278e81a9b65 100644
--- a/java/test/Ice/operations/Collocated.java
+++ b/java/test/Ice/operations/Collocated.java
@@ -35,6 +35,10 @@ public class Collocated extends test.Util.Application
{
Ice.InitializationData initData = new Ice.InitializationData();
initData.properties = Ice.Util.createProperties(argsH);
+ if(initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0)
+ {
+ initData.properties.setProperty("Ice.ThreadPool.Server.Size", "2");
+ }
initData.properties.setProperty("Ice.Package.Test", "test.Ice.operations");
//