diff options
Diffstat (limited to 'java-compat/src')
4 files changed, 232 insertions, 31 deletions
diff --git a/java-compat/src/Ice/src/main/java/Ice/Callback_Connection_heartbeat.java b/java-compat/src/Ice/src/main/java/Ice/Callback_Connection_heartbeat.java new file mode 100644 index 00000000000..e02ef2906c7 --- /dev/null +++ b/java-compat/src/Ice/src/main/java/Ice/Callback_Connection_heartbeat.java @@ -0,0 +1,55 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +/** + * Asynchronous callback base class for Connection.begin_heartbeat. + **/ +public abstract class Callback_Connection_heartbeat extends IceInternal.CallbackBase +{ + /** + * Called when the invocation raises an Ice run-time exception. + * + * @param ex The Ice run-time exception raised by the operation. + **/ + public abstract void exception(LocalException ex); + + /** + * Called when a queued invocation is sent successfully. + **/ + public void sent(boolean sentSynchronously) + { + } + + @Override + public final void _iceCompleted(AsyncResult __result) + { + try + { + __result.getConnection().end_heartbeat(__result); + } + catch(LocalException __ex) + { + exception(__ex); + } + } + + @Override + public final void _iceSent(AsyncResult __result) + { + sent(__result.sentSynchronously()); + } + + @Override + public final boolean _iceHasSentCallback() + { + return true; + } +} diff --git a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java index f4f1b32f4bf..84131a38cfc 100644 --- a/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java +++ b/java-compat/src/Ice/src/main/java/Ice/ConnectionI.java @@ -155,25 +155,27 @@ public final class ConnectionI extends IceInternal.EventHandler } @Override - synchronized public void close(boolean force) + synchronized public void close(ConnectionClose mode) { if(Thread.interrupted()) { throw new Ice.OperationInterruptedException(); } - if(force) + if(mode == ConnectionClose.CloseForcefully) { - setState(StateClosed, new ForcedCloseConnectionException()); + setState(StateClosed, new ConnectionManuallyClosedException(false)); + } + else if(mode == ConnectionClose.CloseGracefully) + { + setState(StateClosing, new ConnectionManuallyClosedException(true)); } else { + assert(mode == ConnectionClose.CloseGracefullyAndWait); + // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, - // the CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the - // server has processed them or not. + // Wait until all outstanding requests have been completed. // while(!_asyncRequests.isEmpty()) { @@ -187,7 +189,7 @@ public final class ConnectionI extends IceInternal.EventHandler } } - setState(StateClosing, new CloseConnectionException()); + setState(StateClosing, new ConnectionManuallyClosedException(true)); } } @@ -279,14 +281,14 @@ public final class ConnectionI extends IceInternal.EventHandler // We send a heartbeat if there was no activity in the last // (timeout / 4) period. Sending a heartbeat sooner than // really needed is safer to ensure that the receiver will - // receive in time the heartbeat. Sending the heartbeat if + // receive the heartbeat in time. Sending the heartbeat if // there was no activity in the last (timeout / 2) period // isn't enough since monitor() is called only every (timeout // / 2) period. // // Note that this doesn't imply that we are sending 4 // heartbeats per timeout period because the monitor() method - // is sill only called every (timeout / 2) period. + // is still only called every (timeout / 2) period. // if(acm.heartbeat == ACMHeartbeat.HeartbeatAlways || (acm.heartbeat != ACMHeartbeat.HeartbeatOff && _writeStream.isEmpty() && @@ -518,6 +520,155 @@ public final class ConnectionI extends IceInternal.EventHandler } @Override + public void heartbeat() + { + end_heartbeat(begin_heartbeat()); + } + + private static final String __heartbeat_name = "heartbeat"; + + @Override + public AsyncResult begin_heartbeat() + { + return begin_heartbeatInternal(null); + } + + @Override + public AsyncResult begin_heartbeat(Callback cb) + { + return begin_heartbeatInternal(cb); + } + + @Override + public AsyncResult begin_heartbeat(Callback_Connection_heartbeat cb) + { + return begin_heartbeatInternal(cb); + } + + @Override + public AsyncResult begin_heartbeat(IceInternal.Functional_VoidCallback __responseCb, + final IceInternal.Functional_GenericCallback1<Ice.Exception> __exceptionCb, + IceInternal.Functional_BoolCallback __sentCb) + { + return begin_heartbeatInternal(new IceInternal.Functional_CallbackBase(false, __exceptionCb, __sentCb) + { + @Override + public final void _iceCompleted(AsyncResult __result) + { + try + { + __result.getConnection().end_heartbeat(__result); + } + catch(Exception __ex) + { + __exceptionCb.apply(__ex); + } + } + }); + } + + static class HeartbeatAsync extends IceInternal.OutgoingAsyncBase + { + public static HeartbeatAsync check(AsyncResult r, Connection con, String operation) + { + check(r, operation); + if(!(r instanceof HeartbeatAsync)) + { + throw new IllegalArgumentException("Incorrect AsyncResult object for end_" + operation + " method"); + } + if(r.getConnection() != con) + { + throw new IllegalArgumentException("Connection for call to end_" + operation + + " does not match connection that was used to call corresponding " + + "begin_" + operation + " method"); + } + return (HeartbeatAsync)r; + } + + public HeartbeatAsync(ConnectionI con, Communicator communicator, IceInternal.Instance instance, + String operation, IceInternal.CallbackBase callback) + { + super(communicator, instance, operation, callback); + _connection = con; + } + + @Override + public Connection getConnection() + { + return _connection; + } + + public void invoke() + { + try + { + _os.writeBlob(IceInternal.Protocol.magic); + ProtocolVersion.ice_write(_os, IceInternal.Protocol.currentProtocol); + EncodingVersion.ice_write(_os, IceInternal.Protocol.currentProtocolEncoding); + _os.writeByte(IceInternal.Protocol.validateConnectionMsg); + _os.writeByte((byte) 0); + _os.writeInt(IceInternal.Protocol.headerSize); // Message size. + + int status; + if(_instance.queueRequests()) + { + status = _instance.getQueueExecutor().execute(new Callable<Integer>() + { + @Override + public Integer call() throws IceInternal.RetryException + { + return _connection.sendAsyncRequest(HeartbeatAsync.this, false, false, 0); + } + }); + } + else + { + status = _connection.sendAsyncRequest(this, false, false, 0); + } + + if((status & IceInternal.AsyncStatus.Sent) > 0) + { + _sentSynchronously = true; + if((status & IceInternal.AsyncStatus.InvokeSentCallback) > 0) + { + invokeSent(); + } + } + } + catch(IceInternal.RetryException ex) + { + if(completed(ex.get())) + { + invokeCompletedAsync(); + } + } + catch(Ice.Exception ex) + { + if(completed(ex)) + { + invokeCompletedAsync(); + } + } + } + + private Ice.ConnectionI _connection; + } + + private AsyncResult begin_heartbeatInternal(IceInternal.CallbackBase cb) + { + HeartbeatAsync result = new HeartbeatAsync(this, _communicator, _instance, __heartbeat_name, cb); + result.invoke(); + return result; + } + + @Override + public void end_heartbeat(AsyncResult ir) + { + HeartbeatAsync r = HeartbeatAsync.check(ir, this, __heartbeat_name); + r.waitForResponseOrUserEx(); + } + + @Override synchronized public void setACM(Ice.IntOptional timeout, Ice.Optional<ACMClose> close, Ice.Optional<ACMHeartbeat> heartbeat) { @@ -703,8 +854,7 @@ public final class ConnectionI extends IceInternal.EventHandler public synchronized void invokeException(int requestId, LocalException ex, int invokeNum, boolean amd) { // - // Fatal exception while invoking a request. Since - // sendResponse/sendNoResponse isn't + // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't // called in case of a fatal exception we decrement _dispatchCount here. // @@ -968,7 +1118,7 @@ public final class ConnectionI extends IceInternal.EventHandler } else { - assert (_state <= StateClosingPending); + assert(_state <= StateClosingPending); // // We parse messages first, if we receive a close @@ -1144,7 +1294,8 @@ public final class ConnectionI extends IceInternal.EventHandler // if(info.invokeNum > 0) { - invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); + invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, + info.adapter); // // Don't increase dispatchedCount, the dispatch count is @@ -1301,7 +1452,7 @@ public final class ConnectionI extends IceInternal.EventHandler // Trace the cause of unexpected connection closures // if(!(_exception instanceof CloseConnectionException || - _exception instanceof ForcedCloseConnectionException || + _exception instanceof ConnectionManuallyClosedException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || _exception instanceof ObjectAdapterDeactivatedException)) @@ -1650,7 +1801,7 @@ public final class ConnectionI extends IceInternal.EventHandler // Don't warn about certain expected exceptions. // if(!(_exception instanceof CloseConnectionException || - _exception instanceof ForcedCloseConnectionException || + _exception instanceof ConnectionManuallyClosedException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || _exception instanceof ObjectAdapterDeactivatedException || @@ -1840,7 +1991,7 @@ public final class ConnectionI extends IceInternal.EventHandler if(_observer != null && state == StateClosed && _exception != null) { if(!(_exception instanceof CloseConnectionException || - _exception instanceof ForcedCloseConnectionException || + _exception instanceof ConnectionManuallyClosedException || _exception instanceof ConnectionTimeoutException || _exception instanceof CommunicatorDestroyedException || _exception instanceof ObjectAdapterDeactivatedException || @@ -1869,8 +2020,7 @@ public final class ConnectionI extends IceInternal.EventHandler private void initiateShutdown() { - assert (_state == StateClosing); - assert (_dispatchCount == 0); + assert(_state == StateClosing && _dispatchCount == 0); if(_shutdownInitiated) { @@ -1897,8 +2047,7 @@ public final class ConnectionI extends IceInternal.EventHandler setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection - // closure. + // Notify the the transceiver of the graceful connection closure. // int op = _transceiver.closing(true, _exception); if(op != 0) @@ -1910,7 +2059,7 @@ public final class ConnectionI extends IceInternal.EventHandler } } - private void heartbeat() + private void sendHeartbeatNow() { assert (_state == StateActive); @@ -1948,8 +2097,7 @@ public final class ConnectionI extends IceInternal.EventHandler } // - // Update the connection description once the transceiver is - // initialized. + // Update the connection description once the transceiver is initialized. // _desc = _transceiver.toString(); _initialized = true; @@ -2108,8 +2256,7 @@ public final class ConnectionI extends IceInternal.EventHandler } else if(_state == StateClosingPending && _writeStream.pos() == 0) { - // Message wasn't sent, empty the _writeStream, we're not going to - // send more data. + // Message wasn't sent, empty the _writeStream, we're not going to send more data. OutgoingMessage message = _sendStreams.getFirst(); _writeStream.swap(message.stream); return IceInternal.SocketOperation.None; @@ -2416,8 +2563,7 @@ public final class ConnectionI extends IceInternal.EventHandler setState(StateClosingPending, new CloseConnectionException()); // - // Notify the the transceiver of the graceful connection - // closure. + // Notify the the transceiver of the graceful connection closure. // int op = _transceiver.closing(false, _exception); if(op != 0) diff --git a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java index 9405e3dcc17..108df4607e0 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/IncomingConnectionFactory.java @@ -141,7 +141,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice // for(Ice.ConnectionI c : connections) { - c.close(true); + c.close(Ice.ConnectionClose.CloseForcefully); } throw e; } diff --git a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java index 94b80c0e2fb..b51856cf3e6 100644 --- a/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java +++ b/java-compat/src/Ice/src/main/java/IceInternal/OutgoingConnectionFactory.java @@ -133,7 +133,7 @@ public final class OutgoingConnectionFactory { for(Ice.ConnectionI c : l) { - c.close(true); + c.close(Ice.ConnectionClose.CloseForcefully); } } throw new Ice.OperationInterruptedException(); |