summaryrefslogtreecommitdiff
path: root/java/demo/Ice/interrupt
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2014-08-07 14:36:07 -0230
committerMatthew Newhook <matthew@zeroc.com>2014-08-07 14:36:07 -0230
commitb36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch)
treedfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/demo/Ice/interrupt
parentAdd @Override where possible, and remove trailing white space. (diff)
downloadice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2
ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz
ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user thread. This makes Ice for Java interrupt safe. This is implemented by the QueueRequestHanbler. - EndpointHostResolver now uses an executor instead of a thread. - Added java/demo/Ice/interrupt and java/test/Ice/interrupt. - Made several changes that must be ported to C++ & C#. - InvocationTimeout exceptions can hang forever. - Connection establishment is always asynchronous. - RequestHandler.requestTimeout and asyncRequestTimeout have been renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/demo/Ice/interrupt')
-rw-r--r--java/demo/Ice/interrupt/Client.java198
-rw-r--r--java/demo/Ice/interrupt/README36
-rw-r--r--java/demo/Ice/interrupt/Server.java91
-rw-r--r--java/demo/Ice/interrupt/TaskManager.ice21
-rw-r--r--java/demo/Ice/interrupt/TaskManagerI.java56
-rw-r--r--java/demo/Ice/interrupt/build.xml44
-rw-r--r--java/demo/Ice/interrupt/config.client46
-rw-r--r--java/demo/Ice/interrupt/config.server47
-rw-r--r--java/demo/Ice/interrupt/expect.py30
9 files changed, 569 insertions, 0 deletions
diff --git a/java/demo/Ice/interrupt/Client.java b/java/demo/Ice/interrupt/Client.java
new file mode 100644
index 00000000000..93af8923cd0
--- /dev/null
+++ b/java/demo/Ice/interrupt/Client.java
@@ -0,0 +1,198 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import Demo.*;
+import Ice.LocalException;
+
+public class Client extends Ice.Application
+{
+ class ShutdownHook extends Thread
+ {
+ public void
+ run()
+ {
+ try
+ {
+ communicator().destroy();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ private static void
+ menu()
+ {
+ System.out.println(
+ "usage:\n" +
+ "t: start a task\n" +
+ "b: start a blocking task\n" +
+ "i: interrupt the blocking task\n" +
+ "s: shutdown server\n" +
+ "x: exit\n" +
+ "?: help\n");
+ }
+
+ @Override
+ public int
+ run(String[] args)
+ {
+ if(args.length > 0)
+ {
+ System.err.println(appName() + ": too many arguments");
+ return 1;
+ }
+
+ //
+ // Since this is an interactive demo we want to clear the
+ // Application installed interrupt callback and install our
+ // own shutdown hook.
+ //
+ setInterruptHook(new ShutdownHook());
+
+ final TaskManagerPrx taskManager = TaskManagerPrxHelper.checkedCast(communicator().propertyToProxy("TaskManager.Proxy"));
+ if(taskManager == null)
+ {
+ System.err.println("invalid proxy");
+ return 1;
+ }
+
+ menu();
+
+ java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));
+
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+ List<Future<?> > futures = new ArrayList<Future<?> >();
+ int nextId = 0;
+ String line = null;
+ do
+ {
+ try
+ {
+ System.out.print("==> ");
+ System.out.flush();
+ line = in.readLine();
+ if(line == null)
+ {
+ break;
+ }
+ if(line.equals("t"))
+ {
+ final int id = nextId++;
+ taskManager.begin_run(id, new Callback_TaskManager_run()
+ {
+ @Override
+ public void response()
+ {
+ System.out.println("task " + id + " completed running");
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ System.out.println("blocking task " + id + " failed");
+ ex.printStackTrace();
+ }
+ });
+ }
+ else if(line.equals("b"))
+ {
+ //
+ // Remove any completed tasks.
+ //
+ Iterator<Future<?> > iterator = futures.iterator();
+ while(iterator.hasNext()) {
+ Future<?> f = iterator.next();
+ if(f.isDone()) {
+ iterator.remove();
+ }
+ }
+
+ final int id = nextId++;
+ Future<?> future = executor.submit(new Runnable() {
+ @Override
+ public void
+ run()
+ {
+ try
+ {
+ taskManager.run(id);
+ System.out.println("task " + id + " completed running");
+ }
+ catch(Ice.OperationInterruptedException e)
+ {
+ System.out.println("blocking task " + id + " interrupted");
+ }
+ catch(Ice.Exception e)
+ {
+ System.out.println("blocking task " + id + " failed");
+ e.printStackTrace();
+ }
+ }
+ });
+ futures.add(future);
+ }
+ else if(line.equals("i"))
+ {
+ for(Future<?> f : futures)
+ {
+ f.cancel(true);
+ }
+ futures.clear();
+ }
+ else if(line.equals("s"))
+ {
+ taskManager.shutdown();
+ }
+ else if(line.equals("x"))
+ {
+ // Nothing to do
+ }
+ else if(line.equals("?"))
+ {
+ menu();
+ }
+ else
+ {
+ System.out.println("unknown command `" + line + "'");
+ menu();
+ }
+ }
+ catch(java.io.IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ while(!line.equals("x"));
+
+ return 0;
+ }
+
+ public static void
+ main(String[] args)
+ {
+ Client app = new Client();
+ int status = app.main("Client", args, "config.client");
+ System.exit(status);
+ }
+}
+
diff --git a/java/demo/Ice/interrupt/README b/java/demo/Ice/interrupt/README
new file mode 100644
index 00000000000..a28ae716ef2
--- /dev/null
+++ b/java/demo/Ice/interrupt/README
@@ -0,0 +1,36 @@
+This demo illustrates how to interrupt blocking servant dispatches on the server
+and interrupt blocking proxy invocations on the client by using
+java.lang.Thread.interrupt.
+
+To run the demo, first start the server:
+
+$ java Server
+
+In a separate window, start the client:
+
+$ java Client
+
+Calling TaskManager::run on the server simulates a long running task by sleeping
+10 seconds. Ordinarily a server will not shutdown until all executing dispatched
+requests are complete. By interrupting dispatch threads using
+java.lang.Thread.interrupt a server shutdown will proceed, as long as the
+servant implementation correctly handles the interrupt.
+
+The simplest way to interrupt method dispatch threads is through the use of an
+Ice.Dispatcher and ExecutorService. Calling shutdownNow on the ExecutorService
+interrupts any executing tasks.
+
+Pressing ^C in the server calls shutdownNow on the executor service, as does
+pressing 's' in the Client which calls TaskManager::shutdown, the implementation
+of which itself calls shutdownNow.
+
+It is also possible to interrupt blocking invocations on an Ice proxy through
+the use of java.lang.Thread.interrupt.
+
+In this demo, to interrupt a blocking proxy invocation on the client press 'b'
+to run the invocation, and 'i' to interrupt the invocation. Only a single
+blocking can be active at once.
+
+Pressing 't' in the client runs the task on the server using a non-blocking AMI
+invocation.
+
diff --git a/java/demo/Ice/interrupt/Server.java b/java/demo/Ice/interrupt/Server.java
new file mode 100644
index 00000000000..e4575ca77c1
--- /dev/null
+++ b/java/demo/Ice/interrupt/Server.java
@@ -0,0 +1,91 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class Server extends Ice.Application
+{
+ @Override
+ public int
+ run(String[] args)
+ {
+ if(args.length > 0)
+ {
+ System.err.println(appName() + ": too many arguments");
+ return 1;
+ }
+
+ //
+ // If ^C is pressed we want to interrupt all running upcalls from the
+ // dispatcher and destroy the communicator.
+ //
+ setInterruptHook(new Thread() {
+ @Override
+ public void
+ run()
+ {
+ //
+ // Call shutdownNow on the executor. This interrupts all
+ // executor threads causing any running servant dispatch threads
+ // to terminate quickly.
+ //
+ _executor.shutdownNow();
+ try
+ {
+ communicator().shutdown();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ });
+
+ Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TaskManager");
+ adapter.add(new TaskManagerI(_executor), communicator().stringToIdentity("manager"));
+ adapter.activate();
+ communicator().waitForShutdown();
+
+ return 0;
+ }
+
+ public static void
+ main(String[] args)
+ {
+ final Server app = new Server();
+
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = Ice.Util.createProperties();
+ initData.properties.load("config.server");
+
+ //
+ // This demo uses a dispatcher to execute any invocations on the server.
+ // By using an executor it is straightforward to interrupt any servant
+ // dispatch threads by using ExecutorService.shutdownNow.
+ //
+ initData.dispatcher = new Ice.Dispatcher() {
+ @Override
+ public void dispatch(Runnable runnable, Ice.Connection con)
+ {
+ app.getExecutor().submit(runnable);
+ }
+ };
+
+ int status = app.main("Server", args, initData);
+ System.exit(status);
+ }
+
+ ExecutorService getExecutor()
+ {
+ return _executor;
+ }
+
+ private ExecutorService _executor = Executors.newFixedThreadPool(5);
+}
diff --git a/java/demo/Ice/interrupt/TaskManager.ice b/java/demo/Ice/interrupt/TaskManager.ice
new file mode 100644
index 00000000000..f1829c81711
--- /dev/null
+++ b/java/demo/Ice/interrupt/TaskManager.ice
@@ -0,0 +1,21 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#pragma once
+
+module Demo
+{
+
+interface TaskManager
+{
+ void run(int id);
+ void shutdown();
+};
+
+};
diff --git a/java/demo/Ice/interrupt/TaskManagerI.java b/java/demo/Ice/interrupt/TaskManagerI.java
new file mode 100644
index 00000000000..a0cf5886d4c
--- /dev/null
+++ b/java/demo/Ice/interrupt/TaskManagerI.java
@@ -0,0 +1,56 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import java.util.concurrent.ExecutorService;
+
+import Demo.*;
+
+public class TaskManagerI extends _TaskManagerDisp
+{
+ public TaskManagerI(ExecutorService executor)
+ {
+ _executor = executor;
+ }
+
+ @Override
+ public void
+ run(int id, Ice.Current current)
+ {
+ System.out.println("starting task " + id);
+ // Sleep for 10 seconds.
+ try
+ {
+ Thread.sleep(10000);
+ System.out.println("stopping task " + id);
+ }
+ catch(InterruptedException ex)
+ {
+ //
+ // We are done, the server is shutting down.
+ //
+ System.out.println("interrupted task " + id);
+ }
+ }
+
+ @Override
+ public void
+ shutdown(Ice.Current current)
+ {
+ System.out.println("Shutting down...");
+ //
+ // Call shutdownNow on the executor. This interrupts all
+ // executor threads causing any running upcalls to terminate
+ // quickly.
+ //
+ _executor.shutdownNow();
+ current.adapter.getCommunicator().shutdown();
+ }
+
+ private ExecutorService _executor;
+}
diff --git a/java/demo/Ice/interrupt/build.xml b/java/demo/Ice/interrupt/build.xml
new file mode 100644
index 00000000000..3d2b43813ae
--- /dev/null
+++ b/java/demo/Ice/interrupt/build.xml
@@ -0,0 +1,44 @@
+<!--
+ **********************************************************************
+
+ Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+
+ This copy of Ice is licensed to you under the terms described in the
+ ICE_LICENSE file included in this distribution.
+
+ **********************************************************************
+-->
+
+<project name="demo_Ice_interrupt" default="all" basedir=".">
+
+ <!-- set global properties for this build -->
+ <property name="top.dir" value="../../.."/>
+
+ <!-- import common definitions -->
+ <import file="${top.dir}/config/common.xml"/>
+
+ <target name="generate" depends="init">
+ <!-- Create the output directory for generated code -->
+ <mkdir dir="${generated.dir}"/>
+ <slice2java outputdir="${generated.dir}">
+ <fileset dir="." includes="TaskManager.ice"/>
+ </slice2java>
+ </target>
+
+ <target name="compile" depends="generate">
+ <mkdir dir="${class.dir}"/>
+ <javac srcdir=".:${generated.dir}" destdir="${class.dir}" debug="${debug}">
+ <exclude name="${generated.dir}/**"/>
+ <classpath refid="ice.classpath"/>
+ <compilerarg value="${javac.lint}"/>
+ </javac>
+ </target>
+
+ <target name="all" depends="compile"/>
+
+ <target name="clean">
+ <delete dir="${generated.dir}"/>
+ <delete dir="${class.dir}"/>
+ </target>
+
+</project>
diff --git a/java/demo/Ice/interrupt/config.client b/java/demo/Ice/interrupt/config.client
new file mode 100644
index 00000000000..2c25349f5df
--- /dev/null
+++ b/java/demo/Ice/interrupt/config.client
@@ -0,0 +1,46 @@
+#
+# The client reads this property to create the reference to the
+# "task manager" object in the server.
+#
+TaskManager.Proxy=manager:tcp -p 10000
+
+#
+# BackgroundIO must be enabled to use interrupt.
+#
+Ice.BackgroundIO=1
+
+#
+# Only connect to the localhost interface by default.
+#
+Ice.Default.Host=localhost
+
+#
+# Warn about connection exceptions.
+#
+Ice.Warn.Connections=1
+
+#
+# Network Tracing
+#
+# 0 = no network tracing
+# 1 = trace connection establishment and closure
+# 2 = like 1, but more detailed
+# 3 = like 2, but also trace data transfer
+#
+#Ice.Trace.Network=1
+
+#
+# Protocol Tracing
+#
+# 0 = no protocol tracing
+# 1 = trace protocol messages
+#
+#Ice.Trace.Protocol=1
+
+#
+# IceMX configuration.
+#
+#Ice.Admin.Endpoints=tcp -h localhost -p 10003
+Ice.Admin.InstanceName=client
+IceMX.Metrics.Debug.GroupBy=id
+IceMX.Metrics.ByParent.GroupBy=parent
diff --git a/java/demo/Ice/interrupt/config.server b/java/demo/Ice/interrupt/config.server
new file mode 100644
index 00000000000..a03349cff26
--- /dev/null
+++ b/java/demo/Ice/interrupt/config.server
@@ -0,0 +1,47 @@
+#
+# The server creates one single object adapter with the name
+# "Hello". The following line sets the endpoints for this
+# adapter.
+#
+TaskManager.Endpoints=tcp -p 10000
+
+#
+# BackgroundIO must be enabled to use interrupt.
+#
+Ice.BackgroundIO=1
+
+#
+# Only listen on the localhost interface by default.
+#
+Ice.Default.Host=localhost
+
+#
+# Warn about connection exceptions
+#
+Ice.Warn.Connections=1
+
+#
+# Network Tracing
+#
+# 0 = no network tracing
+# 1 = trace connection establishment and closure
+# 2 = like 1, but more detailed
+# 3 = like 2, but also trace data transfer
+#
+#Ice.Trace.Network=1
+
+#
+# Protocol Tracing
+#
+# 0 = no protocol tracing
+# 1 = trace protocol messages
+#
+#Ice.Trace.Protocol=1
+
+#
+# IceMX configuration.
+#
+#Ice.Admin.Endpoints=tcp -h localhost -p 10002
+Ice.Admin.InstanceName=server
+IceMX.Metrics.Debug.GroupBy=id
+IceMX.Metrics.ByParent.GroupBy=parent
diff --git a/java/demo/Ice/interrupt/expect.py b/java/demo/Ice/interrupt/expect.py
new file mode 100644
index 00000000000..65797c9a5ba
--- /dev/null
+++ b/java/demo/Ice/interrupt/expect.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# **********************************************************************
+#
+# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+#
+# This copy of Ice is licensed to you under the terms described in the
+# ICE_LICENSE file included in this distribution.
+#
+# **********************************************************************
+
+import sys, os
+
+path = [ ".", "..", "../..", "../../..", "../../../.." ]
+head = os.path.dirname(sys.argv[0])
+if len(head) > 0:
+ path = [os.path.join(head, p) for p in path]
+path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "demoscript")) ]
+if len(path) == 0:
+ raise RuntimeError("can't find toplevel directory!")
+sys.path.append(path[0])
+
+from demoscript import Util
+from demoscript.Ice import interrupt
+
+server = Util.spawn('java Server --Ice.PrintAdapterReady --Ice.Warn.Connections=0')
+server.expect('.* ready')
+client = Util.spawn('java Client --Ice.Warn.Connections=0')
+client.expect('.*==>')
+
+interrupt.run(client, server)