diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-08-07 14:36:07 -0230 |
commit | b36ae21c88735cbd2c39c5ccde2572a8fcc4e928 (patch) | |
tree | dfd5eee6e7d61a9c6efcbaabe916639009aaa9af /java/demo/Ice/interrupt | |
parent | Add @Override where possible, and remove trailing white space. (diff) | |
download | ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.bz2 ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.tar.xz ice-b36ae21c88735cbd2c39c5ccde2572a8fcc4e928.zip |
ICE-1593 Handling thread interrupts in Java
- Added Ice.BackgroundIO property to perform all IO in a non-user
thread. This makes Ice for Java interrupt safe. This is implemented
by the QueueRequestHanbler.
- EndpointHostResolver now uses an executor instead of a thread.
- Added java/demo/Ice/interrupt and java/test/Ice/interrupt.
- Made several changes that must be ported to C++ & C#.
- InvocationTimeout exceptions can hang forever.
- Connection establishment is always asynchronous.
- RequestHandler.requestTimeout and asyncRequestTimeout have been
renamed to requestCancel and asyncRequestCancel.
Diffstat (limited to 'java/demo/Ice/interrupt')
-rw-r--r-- | java/demo/Ice/interrupt/Client.java | 198 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/README | 36 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/Server.java | 91 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/TaskManager.ice | 21 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/TaskManagerI.java | 56 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/build.xml | 44 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/config.client | 46 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/config.server | 47 | ||||
-rw-r--r-- | java/demo/Ice/interrupt/expect.py | 30 |
9 files changed, 569 insertions, 0 deletions
diff --git a/java/demo/Ice/interrupt/Client.java b/java/demo/Ice/interrupt/Client.java new file mode 100644 index 00000000000..93af8923cd0 --- /dev/null +++ b/java/demo/Ice/interrupt/Client.java @@ -0,0 +1,198 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import Demo.*; +import Ice.LocalException; + +public class Client extends Ice.Application +{ + class ShutdownHook extends Thread + { + public void + run() + { + try + { + communicator().destroy(); + } + catch(Ice.LocalException ex) + { + ex.printStackTrace(); + } + } + } + + private static void + menu() + { + System.out.println( + "usage:\n" + + "t: start a task\n" + + "b: start a blocking task\n" + + "i: interrupt the blocking task\n" + + "s: shutdown server\n" + + "x: exit\n" + + "?: help\n"); + } + + @Override + public int + run(String[] args) + { + if(args.length > 0) + { + System.err.println(appName() + ": too many arguments"); + return 1; + } + + // + // Since this is an interactive demo we want to clear the + // Application installed interrupt callback and install our + // own shutdown hook. + // + setInterruptHook(new ShutdownHook()); + + final TaskManagerPrx taskManager = TaskManagerPrxHelper.checkedCast(communicator().propertyToProxy("TaskManager.Proxy")); + if(taskManager == null) + { + System.err.println("invalid proxy"); + return 1; + } + + menu(); + + java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)); + + ExecutorService executor = Executors.newFixedThreadPool(5); + List<Future<?> > futures = new ArrayList<Future<?> >(); + int nextId = 0; + String line = null; + do + { + try + { + System.out.print("==> "); + System.out.flush(); + line = in.readLine(); + if(line == null) + { + break; + } + if(line.equals("t")) + { + final int id = nextId++; + taskManager.begin_run(id, new Callback_TaskManager_run() + { + @Override + public void response() + { + System.out.println("task " + id + " completed running"); + } + + @Override + public void exception(LocalException ex) + { + System.out.println("blocking task " + id + " failed"); + ex.printStackTrace(); + } + }); + } + else if(line.equals("b")) + { + // + // Remove any completed tasks. + // + Iterator<Future<?> > iterator = futures.iterator(); + while(iterator.hasNext()) { + Future<?> f = iterator.next(); + if(f.isDone()) { + iterator.remove(); + } + } + + final int id = nextId++; + Future<?> future = executor.submit(new Runnable() { + @Override + public void + run() + { + try + { + taskManager.run(id); + System.out.println("task " + id + " completed running"); + } + catch(Ice.OperationInterruptedException e) + { + System.out.println("blocking task " + id + " interrupted"); + } + catch(Ice.Exception e) + { + System.out.println("blocking task " + id + " failed"); + e.printStackTrace(); + } + } + }); + futures.add(future); + } + else if(line.equals("i")) + { + for(Future<?> f : futures) + { + f.cancel(true); + } + futures.clear(); + } + else if(line.equals("s")) + { + taskManager.shutdown(); + } + else if(line.equals("x")) + { + // Nothing to do + } + else if(line.equals("?")) + { + menu(); + } + else + { + System.out.println("unknown command `" + line + "'"); + menu(); + } + } + catch(java.io.IOException ex) + { + ex.printStackTrace(); + } + catch(Ice.LocalException ex) + { + ex.printStackTrace(); + } + } + while(!line.equals("x")); + + return 0; + } + + public static void + main(String[] args) + { + Client app = new Client(); + int status = app.main("Client", args, "config.client"); + System.exit(status); + } +} + diff --git a/java/demo/Ice/interrupt/README b/java/demo/Ice/interrupt/README new file mode 100644 index 00000000000..a28ae716ef2 --- /dev/null +++ b/java/demo/Ice/interrupt/README @@ -0,0 +1,36 @@ +This demo illustrates how to interrupt blocking servant dispatches on the server +and interrupt blocking proxy invocations on the client by using +java.lang.Thread.interrupt. + +To run the demo, first start the server: + +$ java Server + +In a separate window, start the client: + +$ java Client + +Calling TaskManager::run on the server simulates a long running task by sleeping +10 seconds. Ordinarily a server will not shutdown until all executing dispatched +requests are complete. By interrupting dispatch threads using +java.lang.Thread.interrupt a server shutdown will proceed, as long as the +servant implementation correctly handles the interrupt. + +The simplest way to interrupt method dispatch threads is through the use of an +Ice.Dispatcher and ExecutorService. Calling shutdownNow on the ExecutorService +interrupts any executing tasks. + +Pressing ^C in the server calls shutdownNow on the executor service, as does +pressing 's' in the Client which calls TaskManager::shutdown, the implementation +of which itself calls shutdownNow. + +It is also possible to interrupt blocking invocations on an Ice proxy through +the use of java.lang.Thread.interrupt. + +In this demo, to interrupt a blocking proxy invocation on the client press 'b' +to run the invocation, and 'i' to interrupt the invocation. Only a single +blocking can be active at once. + +Pressing 't' in the client runs the task on the server using a non-blocking AMI +invocation. + diff --git a/java/demo/Ice/interrupt/Server.java b/java/demo/Ice/interrupt/Server.java new file mode 100644 index 00000000000..e4575ca77c1 --- /dev/null +++ b/java/demo/Ice/interrupt/Server.java @@ -0,0 +1,91 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class Server extends Ice.Application +{ + @Override + public int + run(String[] args) + { + if(args.length > 0) + { + System.err.println(appName() + ": too many arguments"); + return 1; + } + + // + // If ^C is pressed we want to interrupt all running upcalls from the + // dispatcher and destroy the communicator. + // + setInterruptHook(new Thread() { + @Override + public void + run() + { + // + // Call shutdownNow on the executor. This interrupts all + // executor threads causing any running servant dispatch threads + // to terminate quickly. + // + _executor.shutdownNow(); + try + { + communicator().shutdown(); + } + catch(Ice.LocalException ex) + { + ex.printStackTrace(); + } + } + }); + + Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TaskManager"); + adapter.add(new TaskManagerI(_executor), communicator().stringToIdentity("manager")); + adapter.activate(); + communicator().waitForShutdown(); + + return 0; + } + + public static void + main(String[] args) + { + final Server app = new Server(); + + Ice.InitializationData initData = new Ice.InitializationData(); + initData.properties = Ice.Util.createProperties(); + initData.properties.load("config.server"); + + // + // This demo uses a dispatcher to execute any invocations on the server. + // By using an executor it is straightforward to interrupt any servant + // dispatch threads by using ExecutorService.shutdownNow. + // + initData.dispatcher = new Ice.Dispatcher() { + @Override + public void dispatch(Runnable runnable, Ice.Connection con) + { + app.getExecutor().submit(runnable); + } + }; + + int status = app.main("Server", args, initData); + System.exit(status); + } + + ExecutorService getExecutor() + { + return _executor; + } + + private ExecutorService _executor = Executors.newFixedThreadPool(5); +} diff --git a/java/demo/Ice/interrupt/TaskManager.ice b/java/demo/Ice/interrupt/TaskManager.ice new file mode 100644 index 00000000000..f1829c81711 --- /dev/null +++ b/java/demo/Ice/interrupt/TaskManager.ice @@ -0,0 +1,21 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#pragma once + +module Demo +{ + +interface TaskManager +{ + void run(int id); + void shutdown(); +}; + +}; diff --git a/java/demo/Ice/interrupt/TaskManagerI.java b/java/demo/Ice/interrupt/TaskManagerI.java new file mode 100644 index 00000000000..a0cf5886d4c --- /dev/null +++ b/java/demo/Ice/interrupt/TaskManagerI.java @@ -0,0 +1,56 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +import java.util.concurrent.ExecutorService; + +import Demo.*; + +public class TaskManagerI extends _TaskManagerDisp +{ + public TaskManagerI(ExecutorService executor) + { + _executor = executor; + } + + @Override + public void + run(int id, Ice.Current current) + { + System.out.println("starting task " + id); + // Sleep for 10 seconds. + try + { + Thread.sleep(10000); + System.out.println("stopping task " + id); + } + catch(InterruptedException ex) + { + // + // We are done, the server is shutting down. + // + System.out.println("interrupted task " + id); + } + } + + @Override + public void + shutdown(Ice.Current current) + { + System.out.println("Shutting down..."); + // + // Call shutdownNow on the executor. This interrupts all + // executor threads causing any running upcalls to terminate + // quickly. + // + _executor.shutdownNow(); + current.adapter.getCommunicator().shutdown(); + } + + private ExecutorService _executor; +} diff --git a/java/demo/Ice/interrupt/build.xml b/java/demo/Ice/interrupt/build.xml new file mode 100644 index 00000000000..3d2b43813ae --- /dev/null +++ b/java/demo/Ice/interrupt/build.xml @@ -0,0 +1,44 @@ +<!-- + ********************************************************************** + + Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. + + This copy of Ice is licensed to you under the terms described in the + ICE_LICENSE file included in this distribution. + + ********************************************************************** +--> + +<project name="demo_Ice_interrupt" default="all" basedir="."> + + <!-- set global properties for this build --> + <property name="top.dir" value="../../.."/> + + <!-- import common definitions --> + <import file="${top.dir}/config/common.xml"/> + + <target name="generate" depends="init"> + <!-- Create the output directory for generated code --> + <mkdir dir="${generated.dir}"/> + <slice2java outputdir="${generated.dir}"> + <fileset dir="." includes="TaskManager.ice"/> + </slice2java> + </target> + + <target name="compile" depends="generate"> + <mkdir dir="${class.dir}"/> + <javac srcdir=".:${generated.dir}" destdir="${class.dir}" debug="${debug}"> + <exclude name="${generated.dir}/**"/> + <classpath refid="ice.classpath"/> + <compilerarg value="${javac.lint}"/> + </javac> + </target> + + <target name="all" depends="compile"/> + + <target name="clean"> + <delete dir="${generated.dir}"/> + <delete dir="${class.dir}"/> + </target> + +</project> diff --git a/java/demo/Ice/interrupt/config.client b/java/demo/Ice/interrupt/config.client new file mode 100644 index 00000000000..2c25349f5df --- /dev/null +++ b/java/demo/Ice/interrupt/config.client @@ -0,0 +1,46 @@ +# +# The client reads this property to create the reference to the +# "task manager" object in the server. +# +TaskManager.Proxy=manager:tcp -p 10000 + +# +# BackgroundIO must be enabled to use interrupt. +# +Ice.BackgroundIO=1 + +# +# Only connect to the localhost interface by default. +# +Ice.Default.Host=localhost + +# +# Warn about connection exceptions. +# +Ice.Warn.Connections=1 + +# +# Network Tracing +# +# 0 = no network tracing +# 1 = trace connection establishment and closure +# 2 = like 1, but more detailed +# 3 = like 2, but also trace data transfer +# +#Ice.Trace.Network=1 + +# +# Protocol Tracing +# +# 0 = no protocol tracing +# 1 = trace protocol messages +# +#Ice.Trace.Protocol=1 + +# +# IceMX configuration. +# +#Ice.Admin.Endpoints=tcp -h localhost -p 10003 +Ice.Admin.InstanceName=client +IceMX.Metrics.Debug.GroupBy=id +IceMX.Metrics.ByParent.GroupBy=parent diff --git a/java/demo/Ice/interrupt/config.server b/java/demo/Ice/interrupt/config.server new file mode 100644 index 00000000000..a03349cff26 --- /dev/null +++ b/java/demo/Ice/interrupt/config.server @@ -0,0 +1,47 @@ +# +# The server creates one single object adapter with the name +# "Hello". The following line sets the endpoints for this +# adapter. +# +TaskManager.Endpoints=tcp -p 10000 + +# +# BackgroundIO must be enabled to use interrupt. +# +Ice.BackgroundIO=1 + +# +# Only listen on the localhost interface by default. +# +Ice.Default.Host=localhost + +# +# Warn about connection exceptions +# +Ice.Warn.Connections=1 + +# +# Network Tracing +# +# 0 = no network tracing +# 1 = trace connection establishment and closure +# 2 = like 1, but more detailed +# 3 = like 2, but also trace data transfer +# +#Ice.Trace.Network=1 + +# +# Protocol Tracing +# +# 0 = no protocol tracing +# 1 = trace protocol messages +# +#Ice.Trace.Protocol=1 + +# +# IceMX configuration. +# +#Ice.Admin.Endpoints=tcp -h localhost -p 10002 +Ice.Admin.InstanceName=server +IceMX.Metrics.Debug.GroupBy=id +IceMX.Metrics.ByParent.GroupBy=parent diff --git a/java/demo/Ice/interrupt/expect.py b/java/demo/Ice/interrupt/expect.py new file mode 100644 index 00000000000..65797c9a5ba --- /dev/null +++ b/java/demo/Ice/interrupt/expect.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# ********************************************************************** +# +# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +import sys, os + +path = [ ".", "..", "../..", "../../..", "../../../.." ] +head = os.path.dirname(sys.argv[0]) +if len(head) > 0: + path = [os.path.join(head, p) for p in path] +path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "demoscript")) ] +if len(path) == 0: + raise RuntimeError("can't find toplevel directory!") +sys.path.append(path[0]) + +from demoscript import Util +from demoscript.Ice import interrupt + +server = Util.spawn('java Server --Ice.PrintAdapterReady --Ice.Warn.Connections=0') +server.expect('.* ready') +client = Util.spawn('java Client --Ice.Warn.Connections=0') +client.expect('.*==>') + +interrupt.run(client, server) |