summaryrefslogtreecommitdiff
path: root/java/demo/Ice/async
diff options
context:
space:
mode:
Diffstat (limited to 'java/demo/Ice/async')
-rw-r--r--java/demo/Ice/async/Consumer.java55
-rw-r--r--java/demo/Ice/async/Queue.ice12
-rw-r--r--java/demo/Ice/async/QueueI.java60
-rw-r--r--java/demo/Ice/async/build.xml3
4 files changed, 123 insertions, 7 deletions
diff --git a/java/demo/Ice/async/Consumer.java b/java/demo/Ice/async/Consumer.java
index c3019837324..2d441b241d2 100644
--- a/java/demo/Ice/async/Consumer.java
+++ b/java/demo/Ice/async/Consumer.java
@@ -13,15 +13,49 @@ public class Consumer extends Ice.Application
{
public class AMI_Queue_getI extends AMI_Queue_get
{
+ public AMI_Queue_getI(String id)
+ {
+ _id = id;
+
+ synchronized(_requestMutex)
+ {
+ _requests.add(id);
+ }
+ }
+
public void ice_response(String message)
{
+ synchronized(_requestMutex)
+ {
+ _requests.remove(_id);
+ }
+
System.out.println(message);
}
public void ice_exception(Ice.LocalException ex)
{
+ synchronized(_requestMutex)
+ {
+ _requests.remove(_id);
+ }
+
ex.printStackTrace();
}
+
+ public void ice_exception(Ice.UserException ex)
+ {
+ if(ex instanceof Demo.RequestCanceledException)
+ {
+ System.out.println("Request canceled");
+ }
+ else
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ private String _id;
}
private static void
@@ -71,7 +105,8 @@ public class Consumer extends Ice.Application
}
if(line.equals("g"))
{
- queue.get_async(new AMI_Queue_getI());
+ String id = Ice.Util.generateUUID();
+ queue.get_async(new AMI_Queue_getI(id), id);
}
else if(line.equals("x"))
{
@@ -94,6 +129,21 @@ public class Consumer extends Ice.Application
}
while(!line.equals("x"));
+ synchronized(_requestMutex)
+ {
+ if(_requests.size() != 0)
+ {
+ try
+ {
+ queue.cancel((String[])_requests.toArray(new String[0]));
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Igmore
+ }
+ }
+ }
+
return 0;
}
@@ -104,4 +154,7 @@ public class Consumer extends Ice.Application
int status = app.main("Consumer", args, "config.client");
System.exit(status);
}
+
+ private java.lang.Object _requestMutex = new java.lang.Object();
+ private java.util.HashSet _requests = new java.util.HashSet();
}
diff --git a/java/demo/Ice/async/Queue.ice b/java/demo/Ice/async/Queue.ice
index d275116694c..337b51a8ad6 100644
--- a/java/demo/Ice/async/Queue.ice
+++ b/java/demo/Ice/async/Queue.ice
@@ -10,13 +10,23 @@
#ifndef QUEUE_ICE
#define QUEUE_ICE
+#include <Ice/BuiltinSequences.ice>
+
module Demo
{
+exception RequestCanceledException
+{
+};
+
interface Queue
{
- ["ami", "amd"] string get();
+ ["ami", "amd"] string get(string id)
+ throws RequestCanceledException;
+
void add(string message);
+
+ ["amd"] void cancel(Ice::StringSeq ids);
};
};
diff --git a/java/demo/Ice/async/QueueI.java b/java/demo/Ice/async/QueueI.java
index c6243869c4d..57216d2acbd 100644
--- a/java/demo/Ice/async/QueueI.java
+++ b/java/demo/Ice/async/QueueI.java
@@ -12,13 +12,18 @@ import Demo.*;
public class QueueI extends _QueueDisp
{
synchronized public void
- get_async(AMD_Queue_get getCB, Ice.Current current)
+ get_async(AMD_Queue_get cb, String id, Ice.Current current)
{
+ //
+ // If there is already a message in the message queue, send the
+ // response immediately. Otherwise add the callback to the
+ // request queue.
+ //
if(_messageQueue.size() != 0)
{
try
{
- getCB.ice_response((String)_messageQueue.getFirst());
+ cb.ice_response((String)_messageQueue.getFirst());
_messageQueue.removeFirst();
}
catch(Ice.LocalException ex)
@@ -28,19 +33,27 @@ public class QueueI extends _QueueDisp
}
else
{
- _requestQueue.add(getCB);
+ Request request = new Request();
+ request.id = id;
+ request.cb = cb;
+ _requestQueue.add(request);
}
}
synchronized public void
add(String message, Ice.Current current)
{
+ //
+ // If there is an outstanding request in the request queue,
+ // send a response. Otherwise add the message to the message
+ // queue.
+ //
if(_requestQueue.size() != 0)
{
try
{
- AMD_Queue_get cb = (AMD_Queue_get)_requestQueue.removeFirst();
- cb.ice_response(message);
+ Request request = (Request)_requestQueue.removeFirst();
+ request.cb.ice_response(message);
}
catch(Ice.LocalException ex)
{
@@ -53,6 +66,43 @@ public class QueueI extends _QueueDisp
}
}
+ synchronized public void
+ cancel_async(AMD_Queue_cancel cb, String[] ids, Ice.Current current)
+ {
+ //
+ // We send immediate response so that later call to ice_exception
+ // on queued requests will not cause deadlocks.
+ //
+ cb.ice_response();
+
+ for(int i = 0; i < ids.length; ++i)
+ {
+ java.util.Iterator p = _requestQueue.iterator();
+ while(p.hasNext())
+ {
+ Request request = (Request)p.next();
+ if(request.id.equals(ids[i]))
+ {
+ try
+ {
+ request.cb.ice_exception(new RequestCanceledException());
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Ignore
+ }
+ p.remove();
+ }
+ }
+ }
+ }
+
+ class Request
+ {
+ String id;
+ AMD_Queue_get cb;
+ }
+
private java.util.LinkedList _messageQueue = new java.util.LinkedList();
private java.util.LinkedList _requestQueue = new java.util.LinkedList();
}
diff --git a/java/demo/Ice/async/build.xml b/java/demo/Ice/async/build.xml
index eaa3d0c5a8a..3d4b0c36483 100644
--- a/java/demo/Ice/async/build.xml
+++ b/java/demo/Ice/async/build.xml
@@ -22,6 +22,9 @@
<mkdir dir="${generated.dir}"/>
<slice2java outputdir="${generated.dir}">
<meta value="${java5metadata}"/>
+ <includepath>
+ <pathelement path="${slice.dir}"/>
+ </includepath>
<fileset dir="." includes="Queue.ice"/>
</slice2java>
</target>