summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/OutgoingAsyncBase.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/OutgoingAsyncBase.java')
-rw-r--r--java/src/IceInternal/OutgoingAsyncBase.java571
1 files changed, 45 insertions, 526 deletions
diff --git a/java/src/IceInternal/OutgoingAsyncBase.java b/java/src/IceInternal/OutgoingAsyncBase.java
index 2e6aa221f4d..f04b09ea941 100644
--- a/java/src/IceInternal/OutgoingAsyncBase.java
+++ b/java/src/IceInternal/OutgoingAsyncBase.java
@@ -9,385 +9,41 @@
package IceInternal;
-import Ice.AsyncResult;
-import Ice.Communicator;
-import Ice.CommunicatorDestroyedException;
-import Ice.Connection;
-import Ice.ObjectPrx;
-import Ice.UserException;
-
-/**
- * An AsyncResult object is the return value of an asynchronous invocation.
- * With this object, an application can obtain several attributes of the
- * invocation and discover its outcome.
- **/
-public abstract class OutgoingAsyncBase implements Ice.AsyncResult
+//
+// Base class for handling asynchronous invocations. This class is
+// responsible for the handling of the output stream and the child
+// invocation observer.
+//
+public abstract class OutgoingAsyncBase extends IceInternal.AsyncResultI
{
- protected OutgoingAsyncBase(Communicator communicator, IceInternal.Instance instance, String op,
- IceInternal.CallbackBase del)
+ public int send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException
{
- _communicator = communicator;
- _instance = instance;
- _operation = op;
- _os = new IceInternal.BasicStream(instance, IceInternal.Protocol.currentProtocolEncoding);
- _state = 0;
- _sentSynchronously = false;
- _exception = null;
- _callback = del;
- }
-
- protected OutgoingAsyncBase(Communicator communicator, Instance instance, String op, CallbackBase del,
- BasicStream is, BasicStream os)
- {
- _communicator = communicator;
- _instance = instance;
- _operation = op;
- _os = os;
- _is = is;
- _state = 0;
- _sentSynchronously = false;
- _exception = null;
- _callback = del;
- }
-
- /**
- * Returns the communicator that sent the invocation.
- *
- * @return The communicator.
- **/
- @Override
- public Communicator getCommunicator()
- {
- return _communicator;
- }
-
- /**
- * Returns the connection that was used for the invocation.
- *
- * @return The connection.
- **/
- @Override
- public Connection getConnection()
- {
- return null;
- }
-
- /**
- * Returns the proxy that was used to call the <code>begin_</code> method.
- *
- * @return The proxy.
- **/
- @Override
- public ObjectPrx getProxy()
- {
- return null;
- }
-
- /**
- * Indicates whether the result of an invocation is available.
- *
- * @return True if the result is available, which means a call to the <code>end_</code>
- * method will not block. The method returns false if the result is not yet available.
- **/
- @Override
- public final boolean isCompleted()
- {
- synchronized(_monitor)
- {
- return (_state & StateDone) > 0;
- }
+ assert(false); // This should be overriden if this object is used with a request handler
+ return AsyncStatus.Queued;
}
- /**
- * Blocks the caller until the result of the invocation is available.
- **/
- @Override
- public final void waitForCompleted()
+ public int invokeCollocated(CollocatedRequestHandler handler)
{
- synchronized(_monitor)
- {
- if(Thread.interrupted())
- {
- throw new Ice.OperationInterruptedException();
- }
- while((_state & StateDone) == 0)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
+ assert(false); // This should be overriden if this object is used with a request handler
+ return AsyncStatus.Queued;
}
- /**
- * When you call the <code>begin_</code> method, the Ice run time attempts to
- * write the corresponding request to the client-side transport. If the
- * transport cannot accept the request, the Ice run time queues the request
- * for later transmission. This method returns true if, at the time it is called,
- * the request has been written to the local transport (whether it was initially
- * queued or not). Otherwise, if the request is still queued, this method returns
- * false.
- *
- * @return True if the request has been sent, or false if the request is queued.
- **/
- @Override
- public final boolean isSent()
+ public boolean sent()
{
- synchronized(_monitor)
- {
- return (_state & StateSent) > 0;
- }
+ return sent(true);
}
- /**
- * Blocks the caller until the request has been written to the client-side transport.
- **/
- @Override
- public final void waitForSent()
+ public boolean completed(Ice.Exception ex)
{
- synchronized(_monitor)
- {
- if(Thread.interrupted())
- {
- throw new Ice.OperationInterruptedException();
- }
- while((_state & StateSent) == 0 && _exception == null)
- {
- try
- {
- _monitor.wait();
- }
- catch(InterruptedException ex)
- {
- throw new Ice.OperationInterruptedException();
- }
- }
- }
+ return finished(ex);
}
- /**
- * If the invocation failed with a local exception, throws the local exception.
- **/
- @Override
- public final void throwLocalException()
+ public final void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)
{
- synchronized(_monitor)
- {
- if(_exception != null)
- {
- throw _exception;
- }
- }
- }
-
- /**
- * This method returns true if a request was written to the client-side
- * transport without first being queued. If the request was initially
- * queued, this method returns false (independent of whether the request
- * is still in the queue or has since been written to the client-side transport).
- *
- * @return True if the request was sent without being queued, or false
- * otherwise.
- **/
- @Override
- public final boolean sentSynchronously()
- {
- return _sentSynchronously; // No lock needed, immutable once __send() is called
- }
-
- /**
- * Returns the name of the operation.
- *
- * @return The operation name.
- **/
- @Override
- public final String getOperation()
- {
- return _operation;
- }
-
- public final IceInternal.BasicStream getOs()
- {
- return _os;
- }
-
- public IceInternal.BasicStream
- startReadParams()
- {
- _is.startReadEncaps();
- return _is;
- }
-
- public void
- endReadParams()
- {
- _is.endReadEncaps();
- }
-
- public void
- readEmptyParams()
- {
- _is.skipEmptyEncaps(null);
- }
-
- public byte[]
- readParamEncaps()
- {
- return _is.readEncaps(null);
- }
-
- public final boolean __wait()
- {
- try
- {
- synchronized(_monitor)
- {
- if((_state & StateEndCalled) > 0)
- {
- throw new java.lang.IllegalArgumentException("end_ method called more than once");
- }
-
- _state |= StateEndCalled;
- if(Thread.interrupted())
- {
- throw new InterruptedException();
- }
- while((_state & StateDone) == 0)
- {
- _monitor.wait();
- }
-
- if(_exception != null)
- {
- throw (Ice.Exception)_exception.fillInStackTrace();
- }
-
- return (_state & StateOK) > 0;
- }
- }
- catch(InterruptedException ex)
- {
- // This must be called outside of the monitor as the
- // invocation will potentially want to lock the
- // connection (which in turn may want to lock the outgoing
- // to notify that the message has been sent).
- cancelRequest();
- throw new Ice.OperationInterruptedException();
- }
- }
-
- public final void throwUserException()
- throws UserException
- {
- try
- {
- _is.startReadEncaps();
- _is.throwException(null);
- }
- catch(UserException ex)
- {
- _is.endReadEncaps();
- throw ex;
- }
- }
-
- public void invokeExceptionAsync(final Ice.Exception ex)
- {
- //
- // This is called when it's not safe to call the exception callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
- {
- @Override
- public void
- run()
- {
- invokeException(ex);
- }
- });
- }
- catch(CommunicatorDestroyedException exc)
- {
- throw exc; // CommunicatorDestroyedException is the only exception that can propagate directly.
- }
- }
-
- public final void invokeException(Ice.Exception ex)
- {
- synchronized(_monitor)
- {
- _state |= StateDone;
- //_os.resize(0, false); // Clear buffer now, instead of waiting for AsyncResult deallocation
- _exception = ex;
- _monitor.notifyAll();
- }
-
- invokeCompleted();
- }
-
- protected final void invokeSentInternal()
- {
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
- }
-
- try
- {
- _callback.__sent(this);
- }
- catch(RuntimeException ex)
- {
- warning(ex);
- }
- catch(AssertionError exc)
- {
- error(exc);
- }
- catch(OutOfMemoryError exc)
- {
- error(exc);
- }
- finally
- {
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
- }
-
if(_observer != null)
{
- Ice.ObjectPrx proxy = getProxy();
- if(proxy == null || !proxy.ice_isTwoway())
- {
- _observer.detach();
- }
- }
- }
-
- public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId, int size)
- {
- if(_observer != null)
- {
- _childObserver = _observer.getRemoteObserver(info, endpt, requestId, size);
+ final int size = _os.size() - IceInternal.Protocol.headerSize - 4;
+ _childObserver = getObserver().getRemoteObserver(info, endpt, requestId, size);
if(_childObserver != null)
{
_childObserver.attach();
@@ -395,13 +51,12 @@ public abstract class OutgoingAsyncBase implements Ice.AsyncResult
}
}
- void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
+ public final void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
{
if(_observer != null)
{
- _childObserver = _observer.getCollocatedObserver(adapter,
- requestId,
- _os.size() - IceInternal.Protocol.headerSize - 4);
+ final int size = _os.size() - IceInternal.Protocol.headerSize - 4;
+ _childObserver = getObserver().getCollocatedObserver(adapter, requestId, size);
if(_childObserver != null)
{
_childObserver.attach();
@@ -409,185 +64,49 @@ public abstract class OutgoingAsyncBase implements Ice.AsyncResult
}
}
- abstract void processRetry();
-
- final protected void invokeSentAsync()
- {
- //
- // This is called when it's not safe to call the sent callback synchronously
- // from this thread. Instead the exception callback is called asynchronously from
- // the client thread pool.
- //
- try
- {
- _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
- {
- @Override
- public void run()
- {
- invokeSentInternal();
- }
- });
- }
- catch(CommunicatorDestroyedException exc)
- {
- }
- }
-
- public static void check(AsyncResult r, ObjectPrx prx, String operation)
- {
- check(r, operation);
- if(r.getProxy() != prx)
- {
- throw new IllegalArgumentException("Proxy for call to end_" + operation +
- " does not match proxy that was used to call corresponding begin_" +
- operation + " method");
- }
- }
-
- public static void check(AsyncResult r, Connection con, String operation)
+ public final IceInternal.BasicStream getOs()
{
- check(r, operation);
- if(r.getConnection() != con)
- {
- throw new IllegalArgumentException("Connection for call to end_" + operation +
- " does not match connection that was used to call corresponding begin_" +
- operation + " method");
- }
+ return _os;
}
- public static void check(AsyncResult r, Communicator com, String operation)
+ protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, String op, CallbackBase del)
{
- check(r, operation);
- if(r.getCommunicator() != com)
- {
- throw new IllegalArgumentException("Communicator for call to end_" + operation +
- " does not match communicator that was used to call corresponding " +
- "begin_" + operation + " method");
- }
+ super(com, instance, op, del);
+ _os = new BasicStream(instance, Protocol.currentProtocolEncoding);
}
- public void cacheMessageBuffers()
+ protected OutgoingAsyncBase(Ice.Communicator com, Instance instance, String op, CallbackBase del, BasicStream os)
{
+ super(com, instance, op, del);
+ _os = os;
}
- public final void invokeCompleted()
+ @Override
+ protected boolean sent(boolean done)
{
- //
- // Note: no need to change the _state here, specializations are responsible for
- // changing the state.
- //
-
- if(_callback != null)
+ if(done)
{
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(_callback.getClass().getClassLoader());
- }
-
- try
- {
- _callback.__completed(this);
- }
- catch(RuntimeException ex)
- {
- warning(ex);
- }
- catch(AssertionError exc)
- {
- error(exc);
- }
- catch(OutOfMemoryError exc)
- {
- error(exc);
- }
- finally
+ if(_childObserver != null)
{
- if(_instance.useApplicationClassLoader())
- {
- Thread.currentThread().setContextClassLoader(null);
- }
+ _childObserver.detach();
+ _childObserver = null;
}
}
-
- if(_observer != null)
- {
- _observer.detach();
- _observer = null;
- }
+ return super.sent(done);
}
- protected void
- timeout()
- {
- IceInternal.RequestHandler handler;
- synchronized(_monitor)
- {
- handler = _timeoutRequestHandler;
- _timeoutRequestHandler = null;
- }
-
- if(handler != null)
- {
- handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this,
- new Ice.InvocationTimeoutException());
- }
- }
-
- private static void check(AsyncResult r, String operation)
- {
- if(r == null)
- {
- throw new IllegalArgumentException("AsyncResult == null");
- }
- else if(r.getOperation() != operation) // Do NOT use equals() here - we are comparing reference equality
- {
- throw new IllegalArgumentException("Incorrect operation for end_" + operation + " method: " +
- r.getOperation());
- }
- }
-
- private final void warning(RuntimeException ex)
+ @Override
+ protected boolean finished(Ice.Exception ex)
{
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ if(_childObserver != null)
{
- String s = "exception raised by AMI callback:\n" + IceInternal.Ex.toString(ex);
- _instance.initializationData().logger.warning(s);
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
}
+ return super.finished(ex);
}
- private final void error(Error error)
- {
- String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error);
- _instance.initializationData().logger.error(s);
- }
-
- abstract protected void cancelRequest();
-
- protected Communicator _communicator;
- protected IceInternal.Instance _instance;
- protected String _operation;
- protected Ice.Connection _cachedConnection;
-
- protected java.lang.Object _monitor = new java.lang.Object();
- protected IceInternal.BasicStream _is;
- protected IceInternal.BasicStream _os;
-
- protected IceInternal.RequestHandler _timeoutRequestHandler;
- protected java.util.concurrent.Future<?> _future;
-
- protected static final byte StateOK = 0x1;
- protected static final byte StateDone = 0x2;
- protected static final byte StateSent = 0x4;
- protected static final byte StateEndCalled = 0x8;
- protected static final byte StateCachedBuffers = 0x10;
-
- protected byte _state;
- protected boolean _sentSynchronously;
- protected Ice.Exception _exception;
-
- protected Ice.Instrumentation.InvocationObserver _observer;
+ protected BasicStream _os;
protected Ice.Instrumentation.ChildInvocationObserver _childObserver;
-
- protected IceInternal.CallbackBase _callback;
}