diff options
author | Marc Laukien <marc@zeroc.com> | 2004-02-20 20:07:25 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-02-20 20:07:25 +0000 |
commit | 20ce462d81e0885ef6c34e3aab08adb8bf53d82f (patch) | |
tree | 76da7f5f7c9964c2a9e3bc7e651f2e1cd109e377 /java/src | |
parent | more ami (diff) | |
download | ice-20ce462d81e0885ef6c34e3aab08adb8bf53d82f.tar.bz2 ice-20ce462d81e0885ef6c34e3aab08adb8bf53d82f.tar.xz ice-20ce462d81e0885ef6c34e3aab08adb8bf53d82f.zip |
more ami
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/AMI_Object_ice_invoke.java | 22 | ||||
-rw-r--r-- | java/src/Ice/_ObjectDelM.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/BasicStream.java | 37 | ||||
-rw-r--r-- | java/src/IceInternal/Connection.java | 21 | ||||
-rw-r--r-- | java/src/IceInternal/Incoming.java | 36 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingBase.java | 28 | ||||
-rw-r--r-- | java/src/IceInternal/Outgoing.java | 23 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 378 |
8 files changed, 306 insertions, 245 deletions
diff --git a/java/src/Ice/AMI_Object_ice_invoke.java b/java/src/Ice/AMI_Object_ice_invoke.java index 7d2d571850e..3627d90e5d3 100644 --- a/java/src/Ice/AMI_Object_ice_invoke.java +++ b/java/src/Ice/AMI_Object_ice_invoke.java @@ -17,20 +17,36 @@ package Ice; public abstract class AMI_Object_ice_invoke extends IceInternal.OutgoingAsync { public abstract void ice_response(boolean ok, byte[] outParams); - public abstract void ice_exception(Ice.LocalException ex); + public abstract void ice_exception(LocalException ex); + + public final void __invoke(IceInternal.Reference ref, String operation, OperationMode mode, + byte[] inParams, java.util.Map context) + { + try + { + __prepare(ref, operation, mode, context); + __os.writeBlob(inParams); + __os.endWriteEncaps(); + } + catch(LocalException ex) + { + __finished(ex); + return; + } + __send(); + } protected final void __response(boolean ok) // ok == true means no user exception. { byte[] outParams; try { - IceInternal.BasicStream __is = this.__is(); int sz = __is.getReadEncapsSize(); outParams = __is.readBlob(sz); } catch(LocalException ex) { - ice_exception(ex); + __finished(ex); return; } ice_response(ok, outParams); diff --git a/java/src/Ice/_ObjectDelM.java b/java/src/Ice/_ObjectDelM.java index 549859a7bbc..7481976ee3b 100644 --- a/java/src/Ice/_ObjectDelM.java +++ b/java/src/Ice/_ObjectDelM.java @@ -241,12 +241,6 @@ public class _ObjectDelM implements _ObjectDel protected void reclaimOutgoing(IceInternal.Outgoing out) { - // - // TODO: Is this code necessary? Shouldn't __outgoingCache be - // empty, i.e., shouldn't this be assert(__outgoingCache == - // null), just like for _incomingCache in - // IceInternal::Connection? - // synchronized(__outgoingMutex) { out.next = __outgoingCache; diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java index 92679446dcf..7226a5c296a 100644 --- a/java/src/IceInternal/BasicStream.java +++ b/java/src/IceInternal/BasicStream.java @@ -40,23 +40,22 @@ public class BasicStream _messageSizeMax = _instance.messageSizeMax(); // Cached for efficiency. } -/* - * Do NOT use a finalizer for BasicStream - this causes a - * severe performance penalty! - * - protected void - finalize() - throws Throwable + // + // Do NOT use a finalizer, this would cause a severe performance + // penalty! We must make sure that destroy() is called instead, to + // reclaim resources. + // + public void + destroy() { - if(_buf != null) - { - _bufferManager.reclaim(_buf); - } - - super.finalize(); + _bufferManager.reclaim(_buf); + _buf = null; } -*/ + // + // This function allows this object to be reused, rather than + // reallocated. + // public void reset() { @@ -73,16 +72,6 @@ public class BasicStream } } - // - // Must be called in order to reclaim the buffer - // - public void - destroy() - { - _bufferManager.reclaim(_buf); - _buf = null; - } - public IceInternal.Instance instance() { diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index 74041fc4621..cefe2cd7a79 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -1646,27 +1646,6 @@ public final class Connection extends EventHandler } } -// TODO: This function doesn't seem to be needed? -/* - private void - destroyIncomingCache() - { - Incoming in = null; - - synchronized(_incomingCacheMutex) - { - in = _incomingCache; - _incomingCache = null; - } - - while(in != null) - { - in.__destroy(); - in = in.next; - } - } -*/ - private Transceiver _transceiver; private final String _desc; private final Endpoint _endpoint; diff --git a/java/src/IceInternal/Incoming.java b/java/src/IceInternal/Incoming.java index 713614e086f..da0c92fe230 100644 --- a/java/src/IceInternal/Incoming.java +++ b/java/src/IceInternal/Incoming.java @@ -25,14 +25,29 @@ final public class Incoming extends IncomingBase } // + // Do NOT use a finalizer, this would cause a severe performance + // penalty! We must make sure that __destroy() is called instead, + // to reclaim resources. + // + public void + __destroy() + { + if(_is != null) + { + _is.destroy(); + _is = null; + } + + super.__destroy(); + } + + // // This function allows this object to be reused, rather than // reallocated. // public void reset(Instance instance, Connection connection, Ice.ObjectAdapter adapter, boolean response, byte compress) { - super.reset(instance, connection, adapter, response, compress); - if(_is == null) { _is = new BasicStream(instance); @@ -41,6 +56,8 @@ final public class Incoming extends IncomingBase { _is.reset(); } + + super.reset(instance, connection, adapter, response, compress); } public void @@ -300,21 +317,6 @@ final public class Incoming extends IncomingBase return _os; } - // - // Reclaim resources. - // - public void - __destroy() - { - super.__destroy(); - - if(_is != null) - { - _is.destroy(); - _is = null; - } - } - Incoming next; // For use by Connection. private BasicStream _is; diff --git a/java/src/IceInternal/IncomingBase.java b/java/src/IceInternal/IncomingBase.java index 64dc3639ff4..c9be07b844e 100644 --- a/java/src/IceInternal/IncomingBase.java +++ b/java/src/IceInternal/IncomingBase.java @@ -63,6 +63,21 @@ public class IncomingBase } // + // Do NOT use a finalizer, this would cause a severe performance + // penalty! We must make sure that __destroy() is called instead, + // to reclaim resources. + // + public void + __destroy() + { + if(_os != null) + { + _os.destroy(); + _os = null; + } + } + + // // This function allows this object to be reused, rather than // reallocated. // @@ -115,19 +130,6 @@ public class IncomingBase _connection = connection; } - // - // Reclaim resources. - // - public void - __destroy() - { - if(_os != null) - { - _os.destroy(); - _os = null; - } - } - final protected void __finishInvoke() { diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java index b58d004b76e..fcbf6b1b6a3 100644 --- a/java/src/IceInternal/Outgoing.java +++ b/java/src/IceInternal/Outgoing.java @@ -28,6 +28,22 @@ public final class Outgoing writeHeader(operation, mode, context); } + // + // Do NOT use a finalizer, this would cause a severe performance + // penalty! We must make sure that destroy() is called instead, + // to reclaim resources. + // + public void + destroy() + { + _os.destroy(); + _is.destroy(); + } + + // + // This function allows this object to be reused, rather than + // reallocated. + // public void reset(String operation, Ice.OperationMode mode, java.util.Map context) { @@ -40,13 +56,6 @@ public final class Outgoing writeHeader(operation, mode, context); } - public void - destroy() - { - _os.destroy(); - _is.destroy(); - } - // Returns true if ok, false if user exception. public boolean invoke() diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index dc7720c70eb..d205d90abf5 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -19,117 +19,42 @@ public abstract class OutgoingAsync public OutgoingAsync() { - _is = null; - _os = null; + __is = null; + __os = null; } public abstract void ice_exception(Ice.LocalException ex); - public void - __setup(Connection connection, Reference ref, String operation, Ice.OperationMode mode, java.util.Map context) + public final void + __finished(BasicStream is) { - try - { - _connection = connection; - if(_is == null) - { - _is = new BasicStream(ref.instance); - } - else - { - _is.reset(); - } - if(_os == null) - { - _os = new BasicStream(ref.instance); - } - else - { - _os.reset(); - } + // + // No mutex protection necessary, this function can only be + // called after __send() and __prepare() have completed. + // - _connection.prepareRequest(_os); + assert(_reference != null); + assert(_connection != null); - ref.identity.__write(_os); - _os.writeStringSeq(ref.facet); - _os.writeString(operation); - _os.writeByte((byte)mode.value()); - if(context == null) - { - _os.writeSize(0); - } - else - { - final int sz = context.size(); - _os.writeSize(sz); - if(sz > 0) - { - java.util.Iterator i = context.entrySet().iterator(); - while(i.hasNext()) - { - java.util.Map.Entry entry = (java.util.Map.Entry)i.next(); - _os.writeString((String)entry.getKey()); - _os.writeString((String)entry.getValue()); - } - } - } - - // - // Input and output parameters are always sent in an - // encapsulation, which makes it possible to forward - // requests as blobs. - // - _os.startWriteEncaps(); - } - catch(RuntimeException ex) - { - destroy(); - throw ex; - } - } + int status; - public void - __invoke() - { try { - _os.endWriteEncaps(); - - _connection.sendAsyncRequest(_os, this); - - if(_connection.timeout() >= 0) + if(__is != null) { - _absoluteTimeoutMillis = System.currentTimeMillis() + _connection.timeout(); + __is.destroy(); } - } - catch(RuntimeException ex) - { - destroy(); - throw ex; - } - } + __is = new BasicStream(_reference.instance); + __is.swap(is); - public void - __finished(BasicStream is) - { - try - { - _is.swap(is); - byte status = _is.readByte(); + status = (int)__is.readByte(); - switch((int)status) + switch(status) { case DispatchStatus._DispatchOK: - { - _is.startReadEncaps(); - __response(true); - break; - } - case DispatchStatus._DispatchUserException: { - _is.startReadEncaps(); - __response(false); + __is.startReadEncaps(); break; } @@ -137,96 +62,154 @@ public abstract class OutgoingAsync { Ice.ObjectNotExistException ex = new Ice.ObjectNotExistException(); ex.id = new Ice.Identity(); - ex.id.__read(_is); - ex.facet = _is.readStringSeq(); - ex.operation = _is.readString(); - ice_exception(ex); - break; + ex.id.__read(__is); + ex.facet = __is.readStringSeq(); + ex.operation = __is.readString(); + throw ex; } case DispatchStatus._DispatchFacetNotExist: { Ice.FacetNotExistException ex = new Ice.FacetNotExistException(); ex.id = new Ice.Identity(); - ex.id.__read(_is); - ex.facet = _is.readStringSeq(); - ex.operation = _is.readString(); - ice_exception(ex); - break; + ex.id.__read(__is); + ex.facet = __is.readStringSeq(); + ex.operation = __is.readString(); + throw ex; } case DispatchStatus._DispatchOperationNotExist: { Ice.OperationNotExistException ex = new Ice.OperationNotExistException(); ex.id = new Ice.Identity(); - ex.id.__read(_is); - ex.facet = _is.readStringSeq(); - ex.operation = _is.readString(); - ice_exception(ex); - break; + ex.id.__read(__is); + ex.facet = __is.readStringSeq(); + ex.operation = __is.readString(); + throw ex; } case DispatchStatus._DispatchUnknownException: { Ice.UnknownException ex = new Ice.UnknownException(); - ex.unknown = _is.readString(); - ice_exception(ex); - break; + ex.unknown = __is.readString(); + throw ex; } case DispatchStatus._DispatchUnknownLocalException: { Ice.UnknownLocalException ex = new Ice.UnknownLocalException(); - ex.unknown = _is.readString(); - ice_exception(ex); - break; + ex.unknown = __is.readString(); + throw ex; } case DispatchStatus._DispatchUnknownUserException: { Ice.UnknownUserException ex = new Ice.UnknownUserException(); - ex.unknown = _is.readString(); - ice_exception(ex); - break; + ex.unknown = __is.readString(); + throw ex; } default: { - ice_exception(new Ice.UnknownReplyStatusException()); - break; + throw new Ice.UnknownReplyStatusException(); } } } - catch(Exception ex) + catch(Ice.LocalException ex) { - warning(ex); + __finished(ex); + return; + } + + assert(status == DispatchStatus._DispatchOK || status == DispatchStatus._DispatchUserException); + + try + { + __response(status == DispatchStatus._DispatchOK); } - finally + catch(Exception ex) { - destroy(); + warning(ex); } } - public void + public final void __finished(Ice.LocalException exc) { - try + // + // No mutex protection necessary, this function can only be called + // after __send() and __prepare() have completed. + // + + assert(_reference != null); + //assert(_connection != null); // Might be null, if getConnection() failed. + + if(_reference.locatorInfo != null) { - ice_exception(exc); + _reference.locatorInfo.clearObjectCache(_reference); } - catch(Exception ex) + + boolean doRetry = false; + + // + // A CloseConnectionException indicates graceful server + // shutdown, and is therefore always repeatable without + // violating "at-most-once". That's because by sending a close + // connection message, the server guarantees that all + // outstanding requests can safely be repeated. Otherwise, we + // can also retry if the operation mode Nonmutating or + // Idempotent. + // + if(_mode == Ice.OperationMode.Nonmutating || _mode == Ice.OperationMode.Idempotent || + exc instanceof Ice.CloseConnectionException) { - warning(ex); + try + { + ProxyFactory proxyFactory = _reference.instance.proxyFactory(); + if(proxyFactory != null) + { + _cnt = proxyFactory.checkRetryAfterException(exc, _cnt); + } + else + { + throw exc; // The communicator is already destroyed, so we cannot retry. + } + + doRetry = true; + } + catch(Ice.LocalException ex) + { + } } - finally + + if(doRetry) { - destroy(); + _connection = null; + __send(); + } + else + { + try + { + ice_exception(exc); + } + catch(Exception ex) + { + warning(ex); + } } } - public boolean + public final boolean __timedOut() { + // + // No mutex protection necessary, this function can only be called + // after __send() and __prepare() have completed. + // + + assert(_connection != null); + if(_connection.timeout() >= 0) { return System.currentTimeMillis() >= _absoluteTimeoutMillis; @@ -237,39 +220,121 @@ public abstract class OutgoingAsync } } - public BasicStream - __is() + protected final void + __prepare(Reference ref, String operation, Ice.OperationMode mode, java.util.Map context) { - return _is; - } + // + // No mutex protection necessary, using this object for a new + // AMI call while another one is in progress is not allowed + // and leads to undefined behavior. + // + + _reference = ref; + _connection = _reference.getConnection(); + _cnt = 0; + _mode = mode; + + if(__os != null) + { + __os.destroy(); + } + __os = new BasicStream(_reference.instance); + + _connection.prepareRequest(__os); - public BasicStream - __os() - { - return _os; - } + ref.identity.__write(__os); + __os.writeStringSeq(ref.facet); + __os.writeString(operation); + __os.writeByte((byte)mode.value()); + if(context == null) + { + __os.writeSize(0); + } + else + { + final int sz = context.size(); + __os.writeSize(sz); + if(sz > 0) + { + java.util.Iterator i = context.entrySet().iterator(); + while(i.hasNext()) + { + java.util.Map.Entry entry = (java.util.Map.Entry)i.next(); + __os.writeString((String)entry.getKey()); + __os.writeString((String)entry.getValue()); + } + } + } - protected abstract void __response(boolean ok); + __os.startWriteEncaps(); + } - private void - destroy() + protected final void + __send() { - if(_is != null) + assert(_reference != null); + //assert(_connection != null); // Might be null, if called from __finished() for retry. + + try { - _is.destroy(); - _is = null; + while(true) + { + if(_connection == null) + { + _connection = _reference.getConnection(); + } + + if(_connection.timeout() >= 0) + { + _absoluteTimeoutMillis = System.currentTimeMillis() + _connection.timeout(); + } + + try + { + _connection.sendAsyncRequest(__os, this); + + // + // Don't do anything after sendAsyncRequest() returned + // without an exception. I such case, there will be + // callbacks, i.e., calls to the __finished() + // functions. Since there is no mutex protection, we + // cannot modify state here and in such callbacks. + // + return; + } + catch(Ice.LocalException ex) + { + if(_reference.locatorInfo != null) + { + _reference.locatorInfo.clearObjectCache(_reference); + } + + ProxyFactory proxyFactory = _reference.instance.proxyFactory(); + if(proxyFactory != null) + { + _cnt = proxyFactory.checkRetryAfterException(ex, _cnt); + } + else + { + throw ex; // The communicator is already destroyed, so we cannot retry. + } + } + + _connection = null; + } } - if(_os != null) + catch(Ice.LocalException ex) { - _os.destroy(); - _os = null; + __finished(ex); } } - private void + protected abstract void __response(boolean ok); + + private final void warning(Exception ex) { - if(_os.instance().properties().getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + if(__os.instance().properties().getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) { java.io.StringWriter sw = new java.io.StringWriter(); java.io.PrintWriter pw = new java.io.PrintWriter(sw); @@ -278,12 +343,17 @@ public abstract class OutgoingAsync out.print("exception raised by AMI callback:\n"); ex.printStackTrace(pw); pw.flush(); - _os.instance().logger().warning(sw.toString()); + __os.instance().logger().warning(sw.toString()); } } + protected BasicStream __is; + protected BasicStream __os; + + private Reference _reference; private Connection _connection; + private int _cnt; + private Ice.OperationMode _mode; + private long _absoluteTimeoutMillis; - private BasicStream _is; - private BasicStream _os; } |