summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java81
1 files changed, 51 insertions, 30 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 01f86f523f6..f1cbcd0723d 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -324,7 +324,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
boolean sent = false;
try
{
- sent = sendMessage(new OutgoingMessage(out, out.os(), compress, response));
+ sent = sendMessage(new OutgoingMessage(out, out.os(), compress, requestId));
}
catch(Ice.LocalException ex)
{
@@ -392,7 +392,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
boolean sent;
try
{
- sent = sendMessage(new OutgoingMessage(out, out.__os(), compress, response));
+ sent = sendMessage(new OutgoingMessage(out, out.__os(), compress, requestId));
}
catch(Ice.LocalException ex)
{
@@ -652,7 +652,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
boolean sent = false;
try
{
- OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, false);
+ OutgoingMessage message = new OutgoingMessage(out, out.os(), _batchRequestCompress, 0);
sent = sendMessage(message);
}
catch(Ice.LocalException ex)
@@ -708,7 +708,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
boolean sent;
try
{
- OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__os(), _batchRequestCompress, false);
+ OutgoingMessage message = new OutgoingMessage(outAsync, outAsync.__os(), _batchRequestCompress, 0);
sent = sendMessage(message);
}
catch(Ice.LocalException ex)
@@ -1081,7 +1081,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
for(OutgoingMessage msg : sentCBs)
{
- msg.outAsync.__sent(_instance);
+ msg.outAsync.__sent();
}
}
@@ -1136,21 +1136,39 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_startCallback = null;
}
+ //
+ // NOTE: for twoway requests which are not sent, finished can be called twice: the
+ // first time because the outgoing is in the _sendStreams set and the second time
+ // because it's either in the _requests/_asyncRequests set. This is fine, only the
+ // first call should be taken into account by the implementation of finished.
+ //
+
for(OutgoingMessage p : _sendStreams)
{
+ if(p.requestId > 0)
+ {
+ if(p.out != null) // Make sure finished isn't called twice.
+ {
+ _requests.remove(p.requestId);
+ }
+ else
+ {
+ _asyncRequests.remove(p.requestId);
+ }
+ }
p.finished(_exception);
}
_sendStreams.clear();
for(IceInternal.Outgoing p : _requests.values())
{
- p.finished(_exception);
+ p.finished(_exception, true);
}
_requests.clear();
for(IceInternal.OutgoingAsync p : _asyncRequests.values())
{
- p.__finished(_exception);
+ p.__finished(_exception, true);
}
_asyncRequests.clear();
@@ -1769,8 +1787,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
OutgoingMessage message = _sendStreams.getFirst();
_writeStream.swap(message.stream);
- message.sent(this, true);
- if(message.outAsync instanceof Ice.AMISentCallback)
+ if(message.sent(this, true))
{
callbacks.add(message);
}
@@ -2388,24 +2405,28 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.stream = stream;
this.compress = compress;
this.adopt = adopt;
+ this.isSent = false;
+ this.requestId = 0;
}
OutgoingMessage(IceInternal.OutgoingMessageCallback out, IceInternal.BasicStream stream, boolean compress,
- boolean resp)
+ int requestId)
{
this.stream = stream;
this.compress = compress;
this.out = out;
- this.response = resp;
+ this.requestId = requestId;
+ this.isSent = false;
}
OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress,
- boolean resp)
+ int requestId)
{
this.stream = stream;
this.compress = compress;
this.outAsync = out;
- this.response = resp;
+ this.requestId = requestId;
+ this.isSent = false;
}
public void
@@ -2420,37 +2441,36 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
- public void
+ public boolean
sent(ConnectionI connection, boolean notify)
{
+ isSent = true; // The message is sent.
+
if(out != null)
{
out.sent(notify); // true = notify the waiting thread that the request was sent.
+ return false;
}
else if(outAsync != null)
{
- outAsync.__sent(connection);
+ return outAsync.__sent(connection);
+ }
+ else
+ {
+ return false;
}
}
public void
finished(Ice.LocalException ex)
{
- //
- // Only notify oneway requests. The connection keeps track of twoway
- // requests in the _requests/_asyncRequests maps and will notify them
- // of the connection exceptions.
- //
- if(!response)
+ if(out != null)
{
- if(out != null)
- {
- out.finished(ex);
- }
- else if(outAsync != null)
- {
- outAsync.__finished(ex);
- }
+ out.finished(ex, isSent);
+ }
+ else if(outAsync != null)
+ {
+ outAsync.__finished(ex, isSent);
}
}
@@ -2458,9 +2478,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public IceInternal.OutgoingMessageCallback out;
public IceInternal.OutgoingAsyncMessageCallback outAsync;
public boolean compress;
- public boolean response;
+ public int requestId;
boolean adopt;
boolean prepared;
+ boolean isSent;
}
private final IceInternal.Instance _instance;