diff options
Diffstat (limited to 'java/demo/Ice/async')
-rw-r--r-- | java/demo/Ice/async/Consumer.java | 55 | ||||
-rw-r--r-- | java/demo/Ice/async/Queue.ice | 12 | ||||
-rw-r--r-- | java/demo/Ice/async/QueueI.java | 60 | ||||
-rw-r--r-- | java/demo/Ice/async/build.xml | 3 |
4 files changed, 123 insertions, 7 deletions
diff --git a/java/demo/Ice/async/Consumer.java b/java/demo/Ice/async/Consumer.java index c3019837324..2d441b241d2 100644 --- a/java/demo/Ice/async/Consumer.java +++ b/java/demo/Ice/async/Consumer.java @@ -13,15 +13,49 @@ public class Consumer extends Ice.Application { public class AMI_Queue_getI extends AMI_Queue_get { + public AMI_Queue_getI(String id) + { + _id = id; + + synchronized(_requestMutex) + { + _requests.add(id); + } + } + public void ice_response(String message) { + synchronized(_requestMutex) + { + _requests.remove(_id); + } + System.out.println(message); } public void ice_exception(Ice.LocalException ex) { + synchronized(_requestMutex) + { + _requests.remove(_id); + } + ex.printStackTrace(); } + + public void ice_exception(Ice.UserException ex) + { + if(ex instanceof Demo.RequestCanceledException) + { + System.out.println("Request canceled"); + } + else + { + ex.printStackTrace(); + } + } + + private String _id; } private static void @@ -71,7 +105,8 @@ public class Consumer extends Ice.Application } if(line.equals("g")) { - queue.get_async(new AMI_Queue_getI()); + String id = Ice.Util.generateUUID(); + queue.get_async(new AMI_Queue_getI(id), id); } else if(line.equals("x")) { @@ -94,6 +129,21 @@ public class Consumer extends Ice.Application } while(!line.equals("x")); + synchronized(_requestMutex) + { + if(_requests.size() != 0) + { + try + { + queue.cancel((String[])_requests.toArray(new String[0])); + } + catch(Ice.LocalException ex) + { + // Igmore + } + } + } + return 0; } @@ -104,4 +154,7 @@ public class Consumer extends Ice.Application int status = app.main("Consumer", args, "config.client"); System.exit(status); } + + private java.lang.Object _requestMutex = new java.lang.Object(); + private java.util.HashSet _requests = new java.util.HashSet(); } diff --git a/java/demo/Ice/async/Queue.ice b/java/demo/Ice/async/Queue.ice index d275116694c..337b51a8ad6 100644 --- a/java/demo/Ice/async/Queue.ice +++ b/java/demo/Ice/async/Queue.ice @@ -10,13 +10,23 @@ #ifndef QUEUE_ICE #define QUEUE_ICE +#include <Ice/BuiltinSequences.ice> + module Demo { +exception RequestCanceledException +{ +}; + interface Queue { - ["ami", "amd"] string get(); + ["ami", "amd"] string get(string id) + throws RequestCanceledException; + void add(string message); + + ["amd"] void cancel(Ice::StringSeq ids); }; }; diff --git a/java/demo/Ice/async/QueueI.java b/java/demo/Ice/async/QueueI.java index c6243869c4d..57216d2acbd 100644 --- a/java/demo/Ice/async/QueueI.java +++ b/java/demo/Ice/async/QueueI.java @@ -12,13 +12,18 @@ import Demo.*; public class QueueI extends _QueueDisp { synchronized public void - get_async(AMD_Queue_get getCB, Ice.Current current) + get_async(AMD_Queue_get cb, String id, Ice.Current current) { + // + // If there is already a message in the message queue, send the + // response immediately. Otherwise add the callback to the + // request queue. + // if(_messageQueue.size() != 0) { try { - getCB.ice_response((String)_messageQueue.getFirst()); + cb.ice_response((String)_messageQueue.getFirst()); _messageQueue.removeFirst(); } catch(Ice.LocalException ex) @@ -28,19 +33,27 @@ public class QueueI extends _QueueDisp } else { - _requestQueue.add(getCB); + Request request = new Request(); + request.id = id; + request.cb = cb; + _requestQueue.add(request); } } synchronized public void add(String message, Ice.Current current) { + // + // If there is an outstanding request in the request queue, + // send a response. Otherwise add the message to the message + // queue. + // if(_requestQueue.size() != 0) { try { - AMD_Queue_get cb = (AMD_Queue_get)_requestQueue.removeFirst(); - cb.ice_response(message); + Request request = (Request)_requestQueue.removeFirst(); + request.cb.ice_response(message); } catch(Ice.LocalException ex) { @@ -53,6 +66,43 @@ public class QueueI extends _QueueDisp } } + synchronized public void + cancel_async(AMD_Queue_cancel cb, String[] ids, Ice.Current current) + { + // + // We send immediate response so that later call to ice_exception + // on queued requests will not cause deadlocks. + // + cb.ice_response(); + + for(int i = 0; i < ids.length; ++i) + { + java.util.Iterator p = _requestQueue.iterator(); + while(p.hasNext()) + { + Request request = (Request)p.next(); + if(request.id.equals(ids[i])) + { + try + { + request.cb.ice_exception(new RequestCanceledException()); + } + catch(Ice.LocalException ex) + { + // Ignore + } + p.remove(); + } + } + } + } + + class Request + { + String id; + AMD_Queue_get cb; + } + private java.util.LinkedList _messageQueue = new java.util.LinkedList(); private java.util.LinkedList _requestQueue = new java.util.LinkedList(); } diff --git a/java/demo/Ice/async/build.xml b/java/demo/Ice/async/build.xml index eaa3d0c5a8a..3d4b0c36483 100644 --- a/java/demo/Ice/async/build.xml +++ b/java/demo/Ice/async/build.xml @@ -22,6 +22,9 @@ <mkdir dir="${generated.dir}"/> <slice2java outputdir="${generated.dir}"> <meta value="${java5metadata}"/> + <includepath> + <pathelement path="${slice.dir}"/> + </includepath> <fileset dir="." includes="Queue.ice"/> </slice2java> </target> |