summaryrefslogtreecommitdiff
path: root/java/demo/Ice/async
diff options
context:
space:
mode:
authorDwayne Boone <dwayne@zeroc.com>2007-01-17 18:11:11 +0000
committerDwayne Boone <dwayne@zeroc.com>2007-01-17 18:11:11 +0000
commite124e693704faa56ecf32ce2a243d0c3fa3a5e6e (patch)
tree071dcaa6edc458f9371289976a4537880e5bb005 /java/demo/Ice/async
parentChanged async demo (diff)
downloadice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.bz2
ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.tar.xz
ice-e124e693704faa56ecf32ce2a243d0c3fa3a5e6e.zip
Changed demo
Diffstat (limited to 'java/demo/Ice/async')
-rw-r--r--java/demo/Ice/async/Client.java (renamed from java/demo/Ice/async/Consumer.java)96
-rw-r--r--java/demo/Ice/async/Hello.ice (renamed from java/demo/Ice/async/Queue.ice)14
-rw-r--r--java/demo/Ice/async/HelloI.java52
-rw-r--r--java/demo/Ice/async/Publisher.java83
-rw-r--r--java/demo/Ice/async/QueueI.java109
-rw-r--r--java/demo/Ice/async/README15
-rw-r--r--java/demo/Ice/async/Server.java37
-rw-r--r--java/demo/Ice/async/WorkQueue.java115
-rw-r--r--java/demo/Ice/async/build.xml2
-rw-r--r--java/demo/Ice/async/config.client2
-rw-r--r--java/demo/Ice/async/config.server4
11 files changed, 265 insertions, 264 deletions
diff --git a/java/demo/Ice/async/Consumer.java b/java/demo/Ice/async/Client.java
index 2bace272823..3a89d037dd2 100644
--- a/java/demo/Ice/async/Consumer.java
+++ b/java/demo/Ice/async/Client.java
@@ -9,37 +9,32 @@
import Demo.*;
-public class Consumer extends Ice.Application
+public class Client extends Ice.Application
{
- public class AMI_Queue_getI extends AMI_Queue_get
+ class ShutdownHook extends Thread
{
- public AMI_Queue_getI(String id)
- {
- _id = id;
-
- synchronized(_requests)
- {
- _requests.add(id);
- }
- }
+ public void
+ run()
+ {
+ try
+ {
+ communicator().destroy();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
- public void ice_response(String message)
+ public class AMI_Hello_sayHelloI extends AMI_Hello_sayHello
+ {
+ public void ice_response()
{
- synchronized(_requests)
- {
- _requests.remove(_id);
- }
-
- System.out.println(message);
}
public void ice_exception(Ice.LocalException ex)
{
- synchronized(_requests)
- {
- _requests.remove(_id);
- }
-
ex.printStackTrace();
}
@@ -54,8 +49,6 @@ public class Consumer extends Ice.Application
ex.printStackTrace();
}
}
-
- private String _id;
}
private static void
@@ -63,7 +56,9 @@ public class Consumer extends Ice.Application
{
System.out.println(
"usage:\n" +
- "g: get a message\n" +
+ "i: send immediate greeting\n" +
+ "d: send delayed greeting\n" +
+ "s: shutdown the server\n" +
"x: exit\n" +
"?: help\n");
}
@@ -71,8 +66,15 @@ public class Consumer extends Ice.Application
public int
run(String[] args)
{
- QueuePrx queue = QueuePrxHelper.checkedCast(communicator().propertyToProxy("Queue.Proxy"));
- if(queue == null)
+ //
+ // Since this is an interactive demo we want to clear the
+ // Application installed interrupt callback and install our
+ // own shutdown hook.
+ //
+ setInterruptHook(new ShutdownHook());
+
+ HelloPrx hello = HelloPrxHelper.checkedCast(communicator().propertyToProxy("Hello.Proxy"));
+ if(hello == null)
{
System.err.println("invalid proxy");
return 1;
@@ -94,11 +96,18 @@ public class Consumer extends Ice.Application
{
break;
}
- if(line.equals("g"))
+ if(line.equals("i"))
{
- String id = Ice.Util.generateUUID();
- queue.get_async(new AMI_Queue_getI(id), id);
+ hello.sayHello(0);
}
+ else if(line.equals("d"))
+ {
+ hello.sayHello_async(new AMI_Hello_sayHelloI(), 5000);
+ }
+ else if(line.equals("s"))
+ {
+ hello.shutdown();
+ }
else if(line.equals("x"))
{
// Nothing to do
@@ -113,6 +122,10 @@ public class Consumer extends Ice.Application
{
ex.printStackTrace();
}
+ catch(Ice.UserException ex)
+ {
+ ex.printStackTrace();
+ }
catch(Ice.LocalException ex)
{
ex.printStackTrace();
@@ -120,31 +133,14 @@ public class Consumer extends Ice.Application
}
while(!line.equals("x"));
- synchronized(_requests)
- {
- if(_requests.size() != 0)
- {
- try
- {
- queue.cancel((String[])_requests.toArray(new String[0]));
- }
- catch(Ice.LocalException ex)
- {
- // Igmore
- }
- }
- }
-
return 0;
}
public static void
main(String[] args)
{
- Consumer app = new Consumer();
- int status = app.main("Consumer", args, "config.client");
+ Client app = new Client();
+ int status = app.main("Client", args, "config.client");
System.exit(status);
}
-
- private java.util.HashSet _requests = new java.util.HashSet();
}
diff --git a/java/demo/Ice/async/Queue.ice b/java/demo/Ice/async/Hello.ice
index 4408921ef18..8206d953fc7 100644
--- a/java/demo/Ice/async/Queue.ice
+++ b/java/demo/Ice/async/Hello.ice
@@ -7,10 +7,8 @@
//
// **********************************************************************
-#ifndef QUEUE_ICE
-#define QUEUE_ICE
-
-#include <Ice/BuiltinSequences.ice>
+#ifndef HELLO_ICE
+#define HELLO_ICE
module Demo
{
@@ -19,14 +17,12 @@ exception RequestCanceledException
{
};
-interface Queue
+interface Hello
{
- ["ami", "amd"] string get(string id)
+ ["ami", "amd"] void sayHello(int delay)
throws RequestCanceledException;
- void add(string message);
-
- ["amd"] void cancel(Ice::StringSeq ids);
+ idempotent void shutdown();
};
};
diff --git a/java/demo/Ice/async/HelloI.java b/java/demo/Ice/async/HelloI.java
new file mode 100644
index 00000000000..ce555b3e9dd
--- /dev/null
+++ b/java/demo/Ice/async/HelloI.java
@@ -0,0 +1,52 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import Demo.*;
+
+public class HelloI extends _HelloDisp
+{
+ public
+ HelloI(WorkQueue workQueue)
+ {
+ _workQueue = workQueue;
+ }
+
+ public void
+ sayHello_async(AMD_Hello_sayHello cb, int delay, Ice.Current current)
+ {
+ if(delay == 0)
+ {
+ System.out.println("Hello World!");
+ cb.ice_response();
+ }
+ else
+ {
+ _workQueue.add(cb, delay);
+ }
+ }
+
+ public void
+ shutdown(Ice.Current current)
+ {
+ System.out.println("Shutting down...");
+
+ _workQueue.destroy();
+ try
+ {
+ _workQueue.join();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ current.adapter.getCommunicator().shutdown();
+ }
+
+ private WorkQueue _workQueue;
+}
diff --git a/java/demo/Ice/async/Publisher.java b/java/demo/Ice/async/Publisher.java
deleted file mode 100644
index 292269d71d7..00000000000
--- a/java/demo/Ice/async/Publisher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-import Demo.*;
-
-public class Publisher extends Ice.Application
-{
- private static void
- menu()
- {
- System.out.println("Enter /quit to exit.");
- }
-
- public int
- run(String[] args)
- {
- QueuePrx queue = QueuePrxHelper.checkedCast(communicator().propertyToProxy("Queue.Proxy"));
- if(queue == null)
- {
- System.err.println("invalid proxy");
- return 1;
- }
-
- System.out.println("Type a message and hit return to queue a message.");
- menu();
-
- java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));
-
- String line = null;
- try
- {
- while(true)
- {
- System.out.print("==> ");
- System.out.flush();
- line = in.readLine().trim();
- if(line == null)
- {
- break;
- }
- if(line.length() != 0)
- {
- if(line.charAt(0) == '/')
- {
- if(line.equals("/quit"))
- {
- break;
- }
- menu();
- }
- else
- {
- queue.add(line);
- }
- }
- }
- }
- catch(java.io.IOException ex)
- {
- ex.printStackTrace();
- }
- catch(Ice.LocalException ex)
- {
- ex.printStackTrace();
- }
-
- return 0;
- }
-
- public static void
- main(String[] args)
- {
- Publisher app = new Publisher();
- int status = app.main("Publisher", args, "config.client");
- System.exit(status);
- }
-}
diff --git a/java/demo/Ice/async/QueueI.java b/java/demo/Ice/async/QueueI.java
deleted file mode 100644
index 3ad48eca475..00000000000
--- a/java/demo/Ice/async/QueueI.java
+++ /dev/null
@@ -1,109 +0,0 @@
-// **********************************************************************
-//
-// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
-//
-// This copy of Ice is licensed to you under the terms described in the
-// ICE_LICENSE file included in this distribution.
-//
-// **********************************************************************
-
-import Demo.*;
-
-public class QueueI extends _QueueDisp
-{
- synchronized public void
- get_async(AMD_Queue_get cb, String id, Ice.Current current)
- {
- //
- // If there is already a message in the message queue, send the
- // response immediately. Otherwise add the callback to the
- // request queue.
- //
- if(_messageQueue.size() != 0)
- {
- try
- {
- cb.ice_response((String)_messageQueue.getFirst());
- _messageQueue.removeFirst();
- }
- catch(Ice.LocalException ex)
- {
- ex.printStackTrace();
- }
- }
- else
- {
- Request request = new Request();
- request.id = id;
- request.cb = cb;
- _requestQueue.add(request);
- }
- }
-
- synchronized public void
- add(String message, Ice.Current current)
- {
- //
- // If there is an outstanding request in the request queue,
- // send a response. Otherwise add the message to the message
- // queue.
- //
- if(_requestQueue.size() != 0)
- {
- try
- {
- Request request = (Request)_requestQueue.removeFirst();
- request.cb.ice_response(message);
- }
- catch(Ice.LocalException ex)
- {
- ex.printStackTrace();
- }
- }
- else
- {
- _messageQueue.add(message);
- }
- }
-
- synchronized public void
- cancel_async(AMD_Queue_cancel cb, String[] ids, Ice.Current current)
- {
- //
- // We send immediate response so that later call to ice_exception
- // on queued requests will not cause deadlocks.
- //
- cb.ice_response();
-
- for(int i = 0; i < ids.length; ++i)
- {
- java.util.Iterator p = _requestQueue.iterator();
- while(p.hasNext())
- {
- Request request = (Request)p.next();
- if(request.id.equals(ids[i]))
- {
- try
- {
- request.cb.ice_exception(new RequestCanceledException());
- }
- catch(Ice.LocalException ex)
- {
- // Ignore
- }
- p.remove();
- break;
- }
- }
- }
- }
-
- class Request
- {
- String id;
- AMD_Queue_get cb;
- }
-
- private java.util.LinkedList _messageQueue = new java.util.LinkedList();
- private java.util.LinkedList _requestQueue = new java.util.LinkedList();
-}
diff --git a/java/demo/Ice/async/README b/java/demo/Ice/async/README
index ff9e8c8ddde..8a47691dc39 100644
--- a/java/demo/Ice/async/README
+++ b/java/demo/Ice/async/README
@@ -5,12 +5,13 @@ To run the demo, first start the server:
$ java Server
-In a second window, start the publisher:
+In a second window, start the client:
-$ java Publisher
+$ java Client
-In a third window, start the consumer:
-
-$ java Consumer
-
-Multiple publishers and consumers can be used if so desired.
+The demo invocation can either have a short response time or require a
+significant amount of time to complete. For the long running request
+the client uses AMI and the server uses AMD plus a worker thread to
+process the request. While a long request is processing, short
+requests are still able to processed and more long requests can be
+queued for processing by the worker thread.
diff --git a/java/demo/Ice/async/Server.java b/java/demo/Ice/async/Server.java
index 48f9514c619..6077a8b9a34 100644
--- a/java/demo/Ice/async/Server.java
+++ b/java/demo/Ice/async/Server.java
@@ -11,12 +11,43 @@ import Demo.*;
public class Server extends Ice.Application
{
+ class ShutdownHook extends Thread
+ {
+ public void
+ run()
+ {
+ _workQueue.destroy();
+ try
+ {
+ _workQueue.join();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ try
+ {
+ communicator().destroy();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+
public int
run(String[] args)
{
- Ice.ObjectAdapter adapter = communicator().createObjectAdapter("Queue");
- adapter.add(new QueueI(), communicator().stringToIdentity("queue"));
+ setInterruptHook(new ShutdownHook());
+
+ Ice.ObjectAdapter adapter = communicator().createObjectAdapter("Hello");
+ _workQueue = new WorkQueue();
+ adapter.add(new HelloI(_workQueue), communicator().stringToIdentity("hello"));
+
+ _workQueue.start();
adapter.activate();
+
communicator().waitForShutdown();
return 0;
}
@@ -28,4 +59,6 @@ public class Server extends Ice.Application
int status = app.main("Server", args, "config.server");
System.exit(status);
}
+
+ private WorkQueue _workQueue;
}
diff --git a/java/demo/Ice/async/WorkQueue.java b/java/demo/Ice/async/WorkQueue.java
new file mode 100644
index 00000000000..036a1978b49
--- /dev/null
+++ b/java/demo/Ice/async/WorkQueue.java
@@ -0,0 +1,115 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import Demo.*;
+
+public class WorkQueue extends Thread
+{
+ class CallbackEntry
+ {
+ AMD_Hello_sayHello cb;
+ int delay;
+ }
+
+ public synchronized void
+ run()
+ {
+ while(!_done)
+ {
+ if(_callbacks.size() == 0)
+ {
+ try
+ {
+ wait();
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+ }
+
+ if(_callbacks.size() != 0)
+ {
+ //
+ // Get next work item.
+ //
+ CallbackEntry entry = (CallbackEntry)_callbacks.getFirst();
+
+ //
+ // Wait for the amount of time indicated in delay to
+ // emulate a process that takes a significant period of
+ // time to complete.
+ //
+ try
+ {
+ wait(entry.delay);
+ }
+ catch(java.lang.InterruptedException ex)
+ {
+ }
+
+ if(!_done)
+ {
+ //
+ // Print greeting and send response.
+ //
+ _callbacks.removeFirst();
+ System.err.println("Belated Hello World!");
+ entry.cb.ice_response();
+ }
+ }
+ }
+
+ //
+ // Throw exception for any outstanding requests.
+ //
+ java.util.Iterator p = _callbacks.iterator();
+ while(p.hasNext())
+ {
+ CallbackEntry entry = (CallbackEntry)p.next();
+ entry.cb.ice_exception(new RequestCanceledException());
+ }
+ }
+
+ public synchronized void
+ add(AMD_Hello_sayHello cb, int delay)
+ {
+ if(!_done)
+ {
+ //
+ // Add the work item.
+ //
+ CallbackEntry entry = new CallbackEntry();
+ entry.cb = cb;
+ entry.delay = delay;
+
+ if(_callbacks.size() == 0)
+ {
+ notify();
+ }
+ _callbacks.add(entry);
+ }
+ else
+ {
+ //
+ // Destroyed, throw exception.
+ //
+ cb.ice_exception(new RequestCanceledException());
+ }
+ }
+
+ public synchronized void
+ destroy()
+ {
+ _done = true;
+ notify();
+ }
+
+ private java.util.LinkedList _callbacks = new java.util.LinkedList();
+ private boolean _done = false;
+}
diff --git a/java/demo/Ice/async/build.xml b/java/demo/Ice/async/build.xml
index dfdfeeb7954..789ca9af26d 100644
--- a/java/demo/Ice/async/build.xml
+++ b/java/demo/Ice/async/build.xml
@@ -25,7 +25,7 @@
<includepath>
<pathelement path="${slice.dir}"/>
</includepath>
- <fileset dir="." includes="Queue.ice"/>
+ <fileset dir="." includes="Hello.ice"/>
</slice2java>
</target>
diff --git a/java/demo/Ice/async/config.client b/java/demo/Ice/async/config.client
index 9cca378e1e3..0cea3d3bc26 100644
--- a/java/demo/Ice/async/config.client
+++ b/java/demo/Ice/async/config.client
@@ -2,7 +2,7 @@
# The client reads this property to create the reference to the
# "hello" object in the server.
#
-Queue.Proxy=queue:tcp -p 10000
+Hello.Proxy=hello:tcp -p 10000
#
# Warn about connection exceptions
diff --git a/java/demo/Ice/async/config.server b/java/demo/Ice/async/config.server
index c7d7b368b1b..05c1345d404 100644
--- a/java/demo/Ice/async/config.server
+++ b/java/demo/Ice/async/config.server
@@ -1,9 +1,9 @@
#
# The server creates one single object adapter with the name
-# "Queue". The following line sets the endpoints for this
+# "Hello". The following line sets the endpoints for this
# adapter.
#
-Ice.OA.Queue.Endpoints=tcp -p 10000
+Ice.OA.Hello.Endpoints=tcp -p 10000
#
# Warn about connection exceptions