summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/ConnectRequestHandler.java
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-05-23 11:59:44 +0200
commitd81701ca8182942b7936f9fd84a019b695e9c890 (patch)
treedc036c9d701fbbe1afad67782bd78572c0f61974 /java/src/IceInternal/ConnectRequestHandler.java
parentFixed bug ICE-5543: stringToIdentity bug with escaped escapes (diff)
downloadice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.bz2
ice-d81701ca8182942b7936f9fd84a019b695e9c890.tar.xz
ice-d81701ca8182942b7936f9fd84a019b695e9c890.zip
Added support for invocation timeouts and ACM heartbeats
Diffstat (limited to 'java/src/IceInternal/ConnectRequestHandler.java')
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java121
1 files changed, 80 insertions, 41 deletions
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 34d485d93f0..0cfe4e61aac 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -22,18 +22,18 @@ public class ConnectRequestHandler
this.os.swap(os);
}
- Request(OutgoingAsync out)
+ Request(OutgoingMessageCallback out)
{
this.out = out;
}
- Request(BatchOutgoingAsync out)
+ Request(OutgoingAsyncMessageCallback out)
{
- this.batchOut = out;
+ this.outAsync = out;
}
- OutgoingAsync out = null;
- BatchOutgoingAsync batchOut = null;
+ OutgoingMessageCallback out = null;
+ OutgoingAsyncMessageCallback outAsync = null;
BasicStream os = null;
}
@@ -134,22 +134,23 @@ public class ConnectRequestHandler
_connection.abortBatchRequest();
}
- public Ice.ConnectionI
- sendRequest(Outgoing out)
+ public boolean
+ sendRequest(OutgoingMessageCallback out)
throws LocalExceptionWrapper
{
- if(!getConnection(true).sendRequest(out, _compress, _response) || _response)
- {
- return _connection; // The request has been sent or we're expecting a response.
- }
- else
+ synchronized(this)
{
- return null; // The request hasn't been sent yet.
+ if(!initialized())
+ {
+ _requests.add(new Request(out));
+ return false; // Not sent
+ }
}
+ return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response.
}
public int
- sendAsyncRequest(OutgoingAsync out)
+ sendAsyncRequest(OutgoingAsyncMessageCallback out)
throws LocalExceptionWrapper
{
synchronized(this)
@@ -160,29 +161,57 @@ public class ConnectRequestHandler
return AsyncStatus.Queued;
}
}
- return _connection.sendAsyncRequest(out, _compress, _response);
+ return out.__send(_connection, _compress, _response);
}
- public boolean
- flushBatchRequests(BatchOutgoing out)
+ public void
+ requestTimedOut(OutgoingMessageCallback out)
{
- return getConnection(true).flushBatchRequests(out);
+ synchronized(this)
+ {
+ if(!initialized())
+ {
+ java.util.Iterator<Request> it = _requests.iterator();
+ while(it.hasNext())
+ {
+ Request request = it.next();
+ if(request.out == out)
+ {
+ out.finished(new Ice.InvocationTimeoutException(), false);
+ it.remove();
+ return;
+ }
+ }
+ assert(false); // The request has to be queued if it timed out and we're not initialized yet.
+ }
+ }
+ _connection.requestTimedOut(out);
}
- public int
- flushAsyncBatchRequests(BatchOutgoingAsync out)
+ public void
+ asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
{
synchronized(this)
{
if(!initialized())
{
- _requests.add(new Request(out));
- return AsyncStatus.Queued;
+ java.util.Iterator<Request> it = _requests.iterator();
+ while(it.hasNext())
+ {
+ Request request = it.next();
+ if(request.outAsync == outAsync)
+ {
+ outAsync.__finished(new Ice.InvocationTimeoutException(), false);
+ it.remove();
+ return;
+ }
+ }
+ assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- return _connection.flushAsyncBatchRequests(out);
+ _connection.asyncRequestTimedOut(outAsync);
}
-
+
public Outgoing
getOutgoing(String operation, Ice.OperationMode mode, java.util.Map<String, String> context,
InvocationObserver observer)
@@ -412,18 +441,14 @@ public class ConnectRequestHandler
Request request = p.next();
if(request.out != null)
{
- if((_connection.sendAsyncRequest(request.out, _compress, _response) &
- AsyncStatus.InvokeSentCallback) > 0)
- {
- sentCallbacks.add(request.out);
- }
+ request.out.send(_connection, _compress, _response);
}
- else if(request.batchOut != null)
+ else if(request.outAsync != null)
{
- if((_connection.flushAsyncBatchRequests(request.batchOut) &
+ if((request.outAsync.__send(_connection, _compress, _response) &
AsyncStatus.InvokeSentCallback) > 0)
{
- sentCallbacks.add(request.batchOut);
+ sentCallbacks.add(request.outAsync);
}
}
else
@@ -488,7 +513,7 @@ public class ConnectRequestHandler
{
for(OutgoingAsyncMessageCallback callback : sentCallbacks)
{
- callback.__sent();
+ callback.__invokeSent();
}
};
});
@@ -528,12 +553,12 @@ public class ConnectRequestHandler
for(Request request : _requests)
{
if(request.out != null)
- {
- request.out.__finished(ex, false);
+ {
+ request.out.finished(ex, false);
}
- else if(request.batchOut != null)
+ else if(request.outAsync != null)
{
- request.batchOut.__finished(ex, false);
+ request.outAsync.__finished(ex, false);
}
}
_requests.clear();
@@ -545,12 +570,26 @@ public class ConnectRequestHandler
for(Request request : _requests)
{
if(request.out != null)
- {
- request.out.__finished(ex);
+ {
+ if(request.out instanceof Outgoing)
+ {
+ ((Outgoing)request.out).finished(ex);
+ }
+ else
+ {
+ request.out.finished(ex.get(), false);
+ }
}
- else if(request.batchOut != null)
+ else if(request.outAsync != null)
{
- request.batchOut.__finished(ex.get(), false);
+ if(request.outAsync instanceof OutgoingAsync)
+ {
+ ((OutgoingAsync)request.outAsync).__finished(ex);
+ }
+ else
+ {
+ request.outAsync.__finished(ex.get(), false);
+ }
}
}
_requests.clear();