diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-05-23 11:59:44 +0200 |
commit | d81701ca8182942b7936f9fd84a019b695e9c890 (patch) | |
tree | dc036c9d701fbbe1afad67782bd78572c0f61974 /java/src/IceInternal/ConnectRequestHandler.java | |
parent | Fixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff) | |
download | ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2 ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip |
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 121 |
1 files changed, 80 insertions, 41 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 34d485d93f0..0cfe4e61aac 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -22,18 +22,18 @@ public class ConnectRequestHandler this.os.swap(os); } - Request(OutgoingAsync out) + Request(OutgoingMessageCallback out) { this.out = out; } - Request(BatchOutgoingAsync out) + Request(OutgoingAsyncMessageCallback out) { - this.batchOut = out; + this.outAsync = out; } - OutgoingAsync out = null; - BatchOutgoingAsync batchOut = null; + OutgoingMessageCallback out = null; + OutgoingAsyncMessageCallback outAsync = null; BasicStream os = null; } @@ -134,22 +134,23 @@ public class ConnectRequestHandler _connection.abortBatchRequest(); } - public Ice.ConnectionI - sendRequest(Outgoing out) + public boolean + sendRequest(OutgoingMessageCallback out) throws LocalExceptionWrapper { - if(!getConnection(true).sendRequest(out, _compress, _response) || _response) - { - return _connection; // The request has been sent or we're expecting a response. - } - else + synchronized(this) { - return null; // The request hasn't been sent yet. + if(!initialized()) + { + _requests.add(new Request(out)); + return false; // Not sent + } } + return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response. } public int - sendAsyncRequest(OutgoingAsync out) + sendAsyncRequest(OutgoingAsyncMessageCallback out) throws LocalExceptionWrapper { synchronized(this) @@ -160,29 +161,57 @@ public class ConnectRequestHandler return AsyncStatus.Queued; } } - return _connection.sendAsyncRequest(out, _compress, _response); + return out.__send(_connection, _compress, _response); } - public boolean - flushBatchRequests(BatchOutgoing out) + public void + requestTimedOut(OutgoingMessageCallback out) { - return getConnection(true).flushBatchRequests(out); + synchronized(this) + { + if(!initialized()) + { + java.util.Iterator<Request> it = _requests.iterator(); + while(it.hasNext()) + { + Request request = it.next(); + if(request.out == out) + { + out.finished(new Ice.InvocationTimeoutException(), false); + it.remove(); + return; + } + } + assert(false); // The request has to be queued if it timed out and we're not initialized yet. + } + } + _connection.requestTimedOut(out); } - public int - flushAsyncBatchRequests(BatchOutgoingAsync out) + public void + asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync) { synchronized(this) { if(!initialized()) { - _requests.add(new Request(out)); - return AsyncStatus.Queued; + java.util.Iterator<Request> it = _requests.iterator(); + while(it.hasNext()) + { + Request request = it.next(); + if(request.outAsync == outAsync) + { + outAsync.__finished(new Ice.InvocationTimeoutException(), false); + it.remove(); + return; + } + } + assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - return _connection.flushAsyncBatchRequests(out); + _connection.asyncRequestTimedOut(outAsync); } - + public Outgoing getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context, InvocationObserver observer) @@ -412,18 +441,14 @@ public class ConnectRequestHandler Request request = p.next(); if(request.out != null) { - if((_connection.sendAsyncRequest(request.out, _compress, _response) & - AsyncStatus.InvokeSentCallback) > 0) - { - sentCallbacks.add(request.out); - } + request.out.send(_connection, _compress, _response); } - else if(request.batchOut != null) + else if(request.outAsync != null) { - if((_connection.flushAsyncBatchRequests(request.batchOut) & + if((request.outAsync.__send(_connection, _compress, _response) & AsyncStatus.InvokeSentCallback) > 0) { - sentCallbacks.add(request.batchOut); + sentCallbacks.add(request.outAsync); } } else @@ -488,7 +513,7 @@ public class ConnectRequestHandler { for(OutgoingAsyncMessageCallback callback : sentCallbacks) { - callback.__sent(); + callback.__invokeSent(); } }; }); @@ -528,12 +553,12 @@ public class ConnectRequestHandler for(Request request : _requests) { if(request.out != null) - { - request.out.__finished(ex, false); + { + request.out.finished(ex, false); } - else if(request.batchOut != null) + else if(request.outAsync != null) { - request.batchOut.__finished(ex, false); + request.outAsync.__finished(ex, false); } } _requests.clear(); @@ -545,12 +570,26 @@ public class ConnectRequestHandler for(Request request : _requests) { if(request.out != null) - { - request.out.__finished(ex); + { + if(request.out instanceof Outgoing) + { + ((Outgoing)request.out).finished(ex); + } + else + { + request.out.finished(ex.get(), false); + } } - else if(request.batchOut != null) + else if(request.outAsync != null) { - request.batchOut.__finished(ex.get(), false); + if(request.outAsync instanceof OutgoingAsync) + { + ((OutgoingAsync)request.outAsync).__finished(ex); + } + else + { + request.outAsync.__finished(ex.get(), false); + } } } _requests.clear(); |