diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2007-01-17 18:11:11 +0000 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2007-01-17 18:11:11 +0000 |
commit | e124e693704faa56ecf32ce2a243d0c3fa3a5e6e (patch) | |
tree | 071dcaa6edc458f9371289976a4537880e5bb005 /java/demo/Ice/async | |
parent | Changed async demo (diff) | |
download | ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.bz2 ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.xz ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.zip |
Changed demo
Diffstat (limited to 'java/demo/Ice/async')
-rw-r--r-- | java/demo/Ice/async/Client.java (renamed from java/demo/Ice/async/Consumer.java) | 96 | ||||
-rw-r--r-- | java/demo/Ice/async/Hello.ice (renamed from java/demo/Ice/async/Queue.ice) | 14 | ||||
-rw-r--r-- | java/demo/Ice/async/HelloI.java | 52 | ||||
-rw-r--r-- | java/demo/Ice/async/Publisher.java | 83 | ||||
-rw-r--r-- | java/demo/Ice/async/QueueI.java | 109 | ||||
-rw-r--r-- | java/demo/Ice/async/README | 15 | ||||
-rw-r--r-- | java/demo/Ice/async/Server.java | 37 | ||||
-rw-r--r-- | java/demo/Ice/async/WorkQueue.java | 115 | ||||
-rw-r--r-- | java/demo/Ice/async/build.xml | 2 | ||||
-rw-r--r-- | java/demo/Ice/async/config.client | 2 | ||||
-rw-r--r-- | java/demo/Ice/async/config.server | 4 |
11 files changed, 265 insertions, 264 deletions
diff --git a/java/demo/Ice/async/Consumer.java b/java/demo/Ice/async/Client.java index 2bace272823..3a89d037dd2 100644 --- a/java/demo/Ice/async/Consumer.java +++ b/java/demo/Ice/async/Client.java @@ -9,37 +9,32 @@ import Demo.*; -public class Consumer extends Ice.Application +public class Client extends Ice.Application { - public class AMI_Queue_getI extends AMI_Queue_get + class ShutdownHook extends Thread { - public AMI_Queue_getI(String id) - { - _id = id; - - synchronized(_requests) - { - _requests.add(id); - } - } + public void + run() + { + try + { + communicator().destroy(); + } + catch(Ice.LocalException ex) + { + ex.printStackTrace(); + } + } + } - public void ice_response(String message) + public class AMI_Hello_sayHelloI extends AMI_Hello_sayHello + { + public void ice_response() { - synchronized(_requests) - { - _requests.remove(_id); - } - - System.out.println(message); } public void ice_exception(Ice.LocalException ex) { - synchronized(_requests) - { - _requests.remove(_id); - } - ex.printStackTrace(); } @@ -54,8 +49,6 @@ public class Consumer extends Ice.Application ex.printStackTrace(); } } - - private String _id; } private static void @@ -63,7 +56,9 @@ public class Consumer extends Ice.Application { System.out.println( "usage:\n" + - "g: get a message\n" + + "i: send immediate greeting\n" + + "d: send delayed greeting\n" + + "s: shutdown the server\n" + "x: exit\n" + "?: help\n"); } @@ -71,8 +66,15 @@ public class Consumer extends Ice.Application public int run(String[] args) { - QueuePrx queue = QueuePrxHelper.checkedCast(communicator().propertyToProxy("Queue.Proxy")); - if(queue == null) + // + // Since this is an interactive demo we want to clear the + // Application installed interrupt callback and install our + // own shutdown hook. + // + setInterruptHook(new ShutdownHook()); + + HelloPrx hello = HelloPrxHelper.checkedCast(communicator().propertyToProxy("Hello.Proxy")); + if(hello == null) { System.err.println("invalid proxy"); return 1; @@ -94,11 +96,18 @@ public class Consumer extends Ice.Application { break; } - if(line.equals("g")) + if(line.equals("i")) { - String id = Ice.Util.generateUUID(); - queue.get_async(new AMI_Queue_getI(id), id); + hello.sayHello(0); } + else if(line.equals("d")) + { + hello.sayHello_async(new AMI_Hello_sayHelloI(), 5000); + } + else if(line.equals("s")) + { + hello.shutdown(); + } else if(line.equals("x")) { // Nothing to do @@ -113,6 +122,10 @@ public class Consumer extends Ice.Application { ex.printStackTrace(); } + catch(Ice.UserException ex) + { + ex.printStackTrace(); + } catch(Ice.LocalException ex) { ex.printStackTrace(); @@ -120,31 +133,14 @@ public class Consumer extends Ice.Application } while(!line.equals("x")); - synchronized(_requests) - { - if(_requests.size() != 0) - { - try - { - queue.cancel((String[])_requests.toArray(new String[0])); - } - catch(Ice.LocalException ex) - { - // Igmore - } - } - } - return 0; } public static void main(String[] args) { - Consumer app = new Consumer(); - int status = app.main("Consumer", args, "config.client"); + Client app = new Client(); + int status = app.main("Client", args, "config.client"); System.exit(status); } - - private java.util.HashSet _requests = new java.util.HashSet(); } diff --git a/java/demo/Ice/async/Queue.ice b/java/demo/Ice/async/Hello.ice index 4408921ef18..8206d953fc7 100644 --- a/java/demo/Ice/async/Queue.ice +++ b/java/demo/Ice/async/Hello.ice @@ -7,10 +7,8 @@ // // ********************************************************************** -#ifndef QUEUE_ICE -#define QUEUE_ICE - -#include <Ice/BuiltinSequences.ice> +#ifndef HELLO_ICE +#define HELLO_ICE module Demo { @@ -19,14 +17,12 @@ exception RequestCanceledException { }; -interface Queue +interface Hello { - ["ami", "amd"] string get(string id) + ["ami", "amd"] void sayHello(int delay) throws RequestCanceledException; - void add(string message); - - ["amd"] void cancel(Ice::StringSeq ids); + idempotent void shutdown(); }; }; diff --git a/java/demo/Ice/async/HelloI.java b/java/demo/Ice/async/HelloI.java new file mode 100644 index 00000000000..ce555b3e9dd --- /dev/null +++ b/java/demo/Ice/async/HelloI.java @@ -0,0 +1,52 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +import Demo.*; + +public class HelloI extends _HelloDisp +{ + public + HelloI(WorkQueue workQueue) + { + _workQueue = workQueue; + } + + public void + sayHello_async(AMD_Hello_sayHello cb, int delay, Ice.Current current) + { + if(delay == 0) + { + System.out.println("Hello World!"); + cb.ice_response(); + } + else + { + _workQueue.add(cb, delay); + } + } + + public void + shutdown(Ice.Current current) + { + System.out.println("Shutting down..."); + + _workQueue.destroy(); + try + { + _workQueue.join(); + } + catch(java.lang.InterruptedException ex) + { + } + + current.adapter.getCommunicator().shutdown(); + } + + private WorkQueue _workQueue; +} diff --git a/java/demo/Ice/async/Publisher.java b/java/demo/Ice/async/Publisher.java deleted file mode 100644 index 292269d71d7..00000000000 --- a/java/demo/Ice/async/Publisher.java +++ /dev/null @@ -1,83 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -import Demo.*; - -public class Publisher extends Ice.Application -{ - private static void - menu() - { - System.out.println("Enter /quit to exit."); - } - - public int - run(String[] args) - { - QueuePrx queue = QueuePrxHelper.checkedCast(communicator().propertyToProxy("Queue.Proxy")); - if(queue == null) - { - System.err.println("invalid proxy"); - return 1; - } - - System.out.println("Type a message and hit return to queue a message."); - menu(); - - java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)); - - String line = null; - try - { - while(true) - { - System.out.print("==> "); - System.out.flush(); - line = in.readLine().trim(); - if(line == null) - { - break; - } - if(line.length() != 0) - { - if(line.charAt(0) == '/') - { - if(line.equals("/quit")) - { - break; - } - menu(); - } - else - { - queue.add(line); - } - } - } - } - catch(java.io.IOException ex) - { - ex.printStackTrace(); - } - catch(Ice.LocalException ex) - { - ex.printStackTrace(); - } - - return 0; - } - - public static void - main(String[] args) - { - Publisher app = new Publisher(); - int status = app.main("Publisher", args, "config.client"); - System.exit(status); - } -} diff --git a/java/demo/Ice/async/QueueI.java b/java/demo/Ice/async/QueueI.java deleted file mode 100644 index 3ad48eca475..00000000000 --- a/java/demo/Ice/async/QueueI.java +++ /dev/null @@ -1,109 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -import Demo.*; - -public class QueueI extends _QueueDisp -{ - synchronized public void - get_async(AMD_Queue_get cb, String id, Ice.Current current) - { - // - // If there is already a message in the message queue, send the - // response immediately. Otherwise add the callback to the - // request queue. - // - if(_messageQueue.size() != 0) - { - try - { - cb.ice_response((String)_messageQueue.getFirst()); - _messageQueue.removeFirst(); - } - catch(Ice.LocalException ex) - { - ex.printStackTrace(); - } - } - else - { - Request request = new Request(); - request.id = id; - request.cb = cb; - _requestQueue.add(request); - } - } - - synchronized public void - add(String message, Ice.Current current) - { - // - // If there is an outstanding request in the request queue, - // send a response. Otherwise add the message to the message - // queue. - // - if(_requestQueue.size() != 0) - { - try - { - Request request = (Request)_requestQueue.removeFirst(); - request.cb.ice_response(message); - } - catch(Ice.LocalException ex) - { - ex.printStackTrace(); - } - } - else - { - _messageQueue.add(message); - } - } - - synchronized public void - cancel_async(AMD_Queue_cancel cb, String[] ids, Ice.Current current) - { - // - // We send immediate response so that later call to ice_exception - // on queued requests will not cause deadlocks. - // - cb.ice_response(); - - for(int i = 0; i < ids.length; ++i) - { - java.util.Iterator p = _requestQueue.iterator(); - while(p.hasNext()) - { - Request request = (Request)p.next(); - if(request.id.equals(ids[i])) - { - try - { - request.cb.ice_exception(new RequestCanceledException()); - } - catch(Ice.LocalException ex) - { - // Ignore - } - p.remove(); - break; - } - } - } - } - - class Request - { - String id; - AMD_Queue_get cb; - } - - private java.util.LinkedList _messageQueue = new java.util.LinkedList(); - private java.util.LinkedList _requestQueue = new java.util.LinkedList(); -} diff --git a/java/demo/Ice/async/README b/java/demo/Ice/async/README index ff9e8c8ddde..8a47691dc39 100644 --- a/java/demo/Ice/async/README +++ b/java/demo/Ice/async/README @@ -5,12 +5,13 @@ To run the demo, first start the server: $ java Server -In a second window, start the publisher: +In a second window, start the client: -$ java Publisher +$ java Client -In a third window, start the consumer: - -$ java Consumer - -Multiple publishers and consumers can be used if so desired. +The demo invocation can either have a short response time or require a +significant amount of time to complete. For the long running request +the client uses AMI and the server uses AMD plus a worker thread to +process the request. While a long request is processing, short +requests are still able to processed and more long requests can be +queued for processing by the worker thread. diff --git a/java/demo/Ice/async/Server.java b/java/demo/Ice/async/Server.java index 48f9514c619..6077a8b9a34 100644 --- a/java/demo/Ice/async/Server.java +++ b/java/demo/Ice/async/Server.java @@ -11,12 +11,43 @@ import Demo.*; public class Server extends Ice.Application { + class ShutdownHook extends Thread + { + public void + run() + { + _workQueue.destroy(); + try + { + _workQueue.join(); + } + catch(java.lang.InterruptedException ex) + { + } + + try + { + communicator().destroy(); + } + catch(Ice.LocalException ex) + { + ex.printStackTrace(); + } + } + } + public int run(String[] args) { - Ice.ObjectAdapter adapter = communicator().createObjectAdapter("Queue"); - adapter.add(new QueueI(), communicator().stringToIdentity("queue")); + setInterruptHook(new ShutdownHook()); + + Ice.ObjectAdapter adapter = communicator().createObjectAdapter("Hello"); + _workQueue = new WorkQueue(); + adapter.add(new HelloI(_workQueue), communicator().stringToIdentity("hello")); + + _workQueue.start(); adapter.activate(); + communicator().waitForShutdown(); return 0; } @@ -28,4 +59,6 @@ public class Server extends Ice.Application int status = app.main("Server", args, "config.server"); System.exit(status); } + + private WorkQueue _workQueue; } diff --git a/java/demo/Ice/async/WorkQueue.java b/java/demo/Ice/async/WorkQueue.java new file mode 100644 index 00000000000..036a1978b49 --- /dev/null +++ b/java/demo/Ice/async/WorkQueue.java @@ -0,0 +1,115 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +import Demo.*; + +public class WorkQueue extends Thread +{ + class CallbackEntry + { + AMD_Hello_sayHello cb; + int delay; + } + + public synchronized void + run() + { + while(!_done) + { + if(_callbacks.size() == 0) + { + try + { + wait(); + } + catch(java.lang.InterruptedException ex) + { + } + } + + if(_callbacks.size() != 0) + { + // + // Get next work item. + // + CallbackEntry entry = (CallbackEntry)_callbacks.getFirst(); + + // + // Wait for the amount of time indicated in delay to + // emulate a process that takes a significant period of + // time to complete. + // + try + { + wait(entry.delay); + } + catch(java.lang.InterruptedException ex) + { + } + + if(!_done) + { + // + // Print greeting and send response. + // + _callbacks.removeFirst(); + System.err.println("Belated Hello World!"); + entry.cb.ice_response(); + } + } + } + + // + // Throw exception for any outstanding requests. + // + java.util.Iterator p = _callbacks.iterator(); + while(p.hasNext()) + { + CallbackEntry entry = (CallbackEntry)p.next(); + entry.cb.ice_exception(new RequestCanceledException()); + } + } + + public synchronized void + add(AMD_Hello_sayHello cb, int delay) + { + if(!_done) + { + // + // Add the work item. + // + CallbackEntry entry = new CallbackEntry(); + entry.cb = cb; + entry.delay = delay; + + if(_callbacks.size() == 0) + { + notify(); + } + _callbacks.add(entry); + } + else + { + // + // Destroyed, throw exception. + // + cb.ice_exception(new RequestCanceledException()); + } + } + + public synchronized void + destroy() + { + _done = true; + notify(); + } + + private java.util.LinkedList _callbacks = new java.util.LinkedList(); + private boolean _done = false; +} diff --git a/java/demo/Ice/async/build.xml b/java/demo/Ice/async/build.xml index dfdfeeb7954..789ca9af26d 100644 --- a/java/demo/Ice/async/build.xml +++ b/java/demo/Ice/async/build.xml @@ -25,7 +25,7 @@ <includepath> <pathelement path="${slice.dir}"/> </includepath> - <fileset dir="." includes="Queue.ice"/> + <fileset dir="." includes="Hello.ice"/> </slice2java> </target> diff --git a/java/demo/Ice/async/config.client b/java/demo/Ice/async/config.client index 9cca378e1e3..0cea3d3bc26 100644 --- a/java/demo/Ice/async/config.client +++ b/java/demo/Ice/async/config.client @@ -2,7 +2,7 @@ # The client reads this property to create the reference to the # "hello" object in the server. # -Queue.Proxy=queue:tcp -p 10000 +Hello.Proxy=hello:tcp -p 10000 # # Warn about connection exceptions diff --git a/java/demo/Ice/async/config.server b/java/demo/Ice/async/config.server index c7d7b368b1b..05c1345d404 100644 --- a/java/demo/Ice/async/config.server +++ b/java/demo/Ice/async/config.server @@ -1,9 +1,9 @@ # # The server creates one single object adapter with the name -# "Queue". The following line sets the endpoints for this +# "Hello". The following line sets the endpoints for this # adapter. # -Ice.OA.Queue.Endpoints=tcp -p 10000 +Ice.OA.Hello.Endpoints=tcp -p 10000 # # Warn about connection exceptions |