summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/Ice/Exception.cpp8
-rwxr-xr-xjava/allTests.py1
-rw-r--r--java/build.xml3
-rw-r--r--java/demo/Ice/README5
-rw-r--r--java/demo/Ice/build.xml2
-rw-r--r--java/demo/Ice/interrupt/Client.java198
-rw-r--r--java/demo/Ice/interrupt/README36
-rw-r--r--java/demo/Ice/interrupt/Server.java91
-rw-r--r--java/demo/Ice/interrupt/TaskManager.ice21
-rw-r--r--java/demo/Ice/interrupt/TaskManagerI.java56
-rw-r--r--java/demo/Ice/interrupt/build.xml44
-rw-r--r--java/demo/Ice/interrupt/config.client46
-rw-r--r--java/demo/Ice/interrupt/config.server47
-rw-r--r--java/demo/Ice/interrupt/expect.py30
-rw-r--r--java/src/Ice/Application.java12
-rw-r--r--java/src/Ice/AsyncResult.java11
-rw-r--r--java/src/Ice/ConnectionI.java191
-rw-r--r--java/src/Ice/ObjectAdapterI.java125
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java80
-rw-r--r--java/src/Ice/ThreadNotification.java2
-rw-r--r--java/src/Ice/Util.java1
-rw-r--r--java/src/IceInternal/BasicStream.java19
-rw-r--r--java/src/IceInternal/BatchOutgoing.java44
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java4
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java99
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java4
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java149
-rw-r--r--java/src/IceInternal/ConnectionRequestHandler.java63
-rw-r--r--java/src/IceInternal/EndpointHostResolver.java252
-rw-r--r--java/src/IceInternal/FixedReference.java112
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java39
-rw-r--r--java/src/IceInternal/Instance.java132
-rw-r--r--java/src/IceInternal/LocatorInfo.java121
-rw-r--r--java/src/IceInternal/ObjectAdapterFactory.java14
-rw-r--r--java/src/IceInternal/Outgoing.java76
-rw-r--r--java/src/IceInternal/OutgoingAsync.java15
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java4
-rw-r--r--java/src/IceInternal/OutgoingConnectionFactory.java295
-rw-r--r--java/src/IceInternal/PropertyNames.java1
-rw-r--r--java/src/IceInternal/ProxyBatchOutgoingAsync.java2
-rw-r--r--java/src/IceInternal/ProxyFactory.java10
-rw-r--r--java/src/IceInternal/QueueRequestHandler.java357
-rw-r--r--java/src/IceInternal/Reference.java1
-rw-r--r--java/src/IceInternal/RequestHandler.java9
-rw-r--r--java/src/IceInternal/RoutableReference.java141
-rw-r--r--java/src/IceInternal/Selector.java5
-rw-r--r--java/src/IceInternal/TcpTransceiver.java6
-rw-r--r--java/src/IceInternal/ThreadPool.java91
-rw-r--r--java/src/IceInternal/ThreadPoolWorkQueue.java36
-rw-r--r--java/src/IceInternal/UdpTransceiver.java4
-rw-r--r--java/src/IceInternal/Util.java33
-rw-r--r--java/test/Ice/ami/Server.java2
-rw-r--r--java/test/Ice/interrupt/AllTests.java444
-rw-r--r--java/test/Ice/interrupt/Client.java46
-rw-r--r--java/test/Ice/interrupt/Server.java51
-rw-r--r--java/test/Ice/interrupt/Test.ice43
-rw-r--r--java/test/Ice/interrupt/TestControllerI.java69
-rw-r--r--java/test/Ice/interrupt/TestI.java68
-rw-r--r--java/test/Ice/interrupt/run.py24
-rw-r--r--slice/Ice/LocalException.ice10
60 files changed, 2717 insertions, 1188 deletions
diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp
index af4e44e6087..a8bb8c6f81a 100644
--- a/cpp/src/Ice/Exception.cpp
+++ b/cpp/src/Ice/Exception.cpp
@@ -459,6 +459,14 @@ Ice::DNSException::ice_print(ostream& out) const
}
void
+Ice::OperationInterruptedException::ice_print(ostream& out) const
+{
+ Exception::ice_print(out);
+ out << ":\noperation interrupted";
+}
+
+
+void
Ice::TimeoutException::ice_print(ostream& out) const
{
Exception::ice_print(out);
diff --git a/java/allTests.py b/java/allTests.py
index 64e9a0eb036..4c1d9542b8e 100755
--- a/java/allTests.py
+++ b/java/allTests.py
@@ -49,6 +49,7 @@ tests = [
("Ice/custom", ["core"]),
("Ice/checksum", ["core"]),
("Ice/dispatcher", ["core"]),
+ ("Ice/interrupt", ["core"]),
("Ice/packagemd", ["core"]),
("Ice/stream", ["core"]),
("Ice/hold", ["core"]),
diff --git a/java/build.xml b/java/build.xml
index f097dde2048..4ea685cdc3a 100644
--- a/java/build.xml
+++ b/java/build.xml
@@ -492,6 +492,7 @@
<include name="test/Ice/info/Test.ice" />
<include name="test/Ice/inheritance/Test.ice" />
<include name="test/Ice/interceptor/Test.ice" />
+ <include name="test/Ice/interrupt/Test.ice" />
<include name="test/Ice/invoke/Test.ice" />
<include name="test/Ice/location/Test.ice" />
<include name="test/Ice/metrics/Test.ice" />
@@ -513,7 +514,7 @@
<include name="test/Ice/timeout/Test.ice" />
<include name="test/Ice/acm/Test.ice" />
<include name="test/Ice/throughput/Throughput.ice" />
- <include name="test/Ice/threadPoolPriority/Test.ice" />
+ <include name="test/Ice/threadPoolPriority/Test.ice" />
<include name="test/Ice/udp/Test.ice" />
<include name="test/Freeze/complex/Complex.ice" />
<include name="test/Glacier2/router/Callback.ice" />
diff --git a/java/demo/Ice/README b/java/demo/Ice/README
index 76ae67430dd..368c71269fc 100644
--- a/java/demo/Ice/README
+++ b/java/demo/Ice/README
@@ -96,3 +96,8 @@ Demos in this directory:
- optional
This demo shows the use of the optional keyword.
+
+- interrupt
+
+ This demo shows to use Thread.interrupt to interrupt blocking client
+ side invocations, and forcably shutdown servers.
diff --git a/java/demo/Ice/build.xml b/java/demo/Ice/build.xml
index 6a0dfca2513..dbe8d340960 100644
--- a/java/demo/Ice/build.xml
+++ b/java/demo/Ice/build.xml
@@ -18,6 +18,7 @@
<ant dir="callback"/>
<ant dir="hello"/>
<ant dir="invoke"/>
+ <ant dir="interrupt"/>
<ant dir="latency"/>
<ant dir="minimal"/>
<ant dir="multicast"/>
@@ -40,6 +41,7 @@
<ant dir="callback" target="clean"/>
<ant dir="hello" target="clean"/>
<ant dir="invoke" target="clean"/>
+ <ant dir="interrupt" target="clean"/>
<ant dir="latency" target="clean"/>
<ant dir="minimal" target="clean"/>
<ant dir="multicast" target="clean"/>
diff --git a/java/demo/Ice/interrupt/Client.java b/java/demo/Ice/interrupt/Client.java
new file mode 100644
index 00000000000..93af8923cd0
--- /dev/null
+++ b/java/demo/Ice/interrupt/Client.java
@@ -0,0 +1,198 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import Demo.*;
+import Ice.LocalException;
+
+public class Client extends Ice.Application
+{
+ class ShutdownHook extends Thread
+ {
+ public void
+ run()
+ {
+ try
+ {
+ communicator().destroy();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ private static void
+ menu()
+ {
+ System.out.println(
+ "usage:\n" +
+ "t: start a task\n" +
+ "b: start a blocking task\n" +
+ "i: interrupt the blocking task\n" +
+ "s: shutdown server\n" +
+ "x: exit\n" +
+ "?: help\n");
+ }
+
+ @Override
+ public int
+ run(String[] args)
+ {
+ if(args.length > 0)
+ {
+ System.err.println(appName() + ": too many arguments");
+ return 1;
+ }
+
+ //
+ // Since this is an interactive demo we want to clear the
+ // Application installed interrupt callback and install our
+ // own shutdown hook.
+ //
+ setInterruptHook(new ShutdownHook());
+
+ final TaskManagerPrx taskManager = TaskManagerPrxHelper.checkedCast(communicator().propertyToProxy("TaskManager.Proxy"));
+ if(taskManager == null)
+ {
+ System.err.println("invalid proxy");
+ return 1;
+ }
+
+ menu();
+
+ java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));
+
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+ List<Future<?> > futures = new ArrayList<Future<?> >();
+ int nextId = 0;
+ String line = null;
+ do
+ {
+ try
+ {
+ System.out.print("==> ");
+ System.out.flush();
+ line = in.readLine();
+ if(line == null)
+ {
+ break;
+ }
+ if(line.equals("t"))
+ {
+ final int id = nextId++;
+ taskManager.begin_run(id, new Callback_TaskManager_run()
+ {
+ @Override
+ public void response()
+ {
+ System.out.println("task " + id + " completed running");
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ System.out.println("blocking task " + id + " failed");
+ ex.printStackTrace();
+ }
+ });
+ }
+ else if(line.equals("b"))
+ {
+ //
+ // Remove any completed tasks.
+ //
+ Iterator<Future<?> > iterator = futures.iterator();
+ while(iterator.hasNext()) {
+ Future<?> f = iterator.next();
+ if(f.isDone()) {
+ iterator.remove();
+ }
+ }
+
+ final int id = nextId++;
+ Future<?> future = executor.submit(new Runnable() {
+ @Override
+ public void
+ run()
+ {
+ try
+ {
+ taskManager.run(id);
+ System.out.println("task " + id + " completed running");
+ }
+ catch(Ice.OperationInterruptedException e)
+ {
+ System.out.println("blocking task " + id + " interrupted");
+ }
+ catch(Ice.Exception e)
+ {
+ System.out.println("blocking task " + id + " failed");
+ e.printStackTrace();
+ }
+ }
+ });
+ futures.add(future);
+ }
+ else if(line.equals("i"))
+ {
+ for(Future<?> f : futures)
+ {
+ f.cancel(true);
+ }
+ futures.clear();
+ }
+ else if(line.equals("s"))
+ {
+ taskManager.shutdown();
+ }
+ else if(line.equals("x"))
+ {
+ // Nothing to do
+ }
+ else if(line.equals("?"))
+ {
+ menu();
+ }
+ else
+ {
+ System.out.println("unknown command `" + line + "'");
+ menu();
+ }
+ }
+ catch(java.io.IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ while(!line.equals("x"));
+
+ return 0;
+ }
+
+ public static void
+ main(String[] args)
+ {
+ Client app = new Client();
+ int status = app.main("Client", args, "config.client");
+ System.exit(status);
+ }
+}
+
diff --git a/java/demo/Ice/interrupt/README b/java/demo/Ice/interrupt/README
new file mode 100644
index 00000000000..a28ae716ef2
--- /dev/null
+++ b/java/demo/Ice/interrupt/README
@@ -0,0 +1,36 @@
+This demo illustrates how to interrupt blocking servant dispatches on the server
+and interrupt blocking proxy invocations on the client by using
+java.lang.Thread.interrupt.
+
+To run the demo, first start the server:
+
+$ java Server
+
+In a separate window, start the client:
+
+$ java Client
+
+Calling TaskManager::run on the server simulates a long running task by sleeping
+10 seconds. Ordinarily a server will not shutdown until all executing dispatched
+requests are complete. By interrupting dispatch threads using
+java.lang.Thread.interrupt a server shutdown will proceed, as long as the
+servant implementation correctly handles the interrupt.
+
+The simplest way to interrupt method dispatch threads is through the use of an
+Ice.Dispatcher and ExecutorService. Calling shutdownNow on the ExecutorService
+interrupts any executing tasks.
+
+Pressing ^C in the server calls shutdownNow on the executor service, as does
+pressing 's' in the Client which calls TaskManager::shutdown, the implementation
+of which itself calls shutdownNow.
+
+It is also possible to interrupt blocking invocations on an Ice proxy through
+the use of java.lang.Thread.interrupt.
+
+In this demo, to interrupt a blocking proxy invocation on the client press 'b'
+to run the invocation, and 'i' to interrupt the invocation. Only a single
+blocking can be active at once.
+
+Pressing 't' in the client runs the task on the server using a non-blocking AMI
+invocation.
+
diff --git a/java/demo/Ice/interrupt/Server.java b/java/demo/Ice/interrupt/Server.java
new file mode 100644
index 00000000000..e4575ca77c1
--- /dev/null
+++ b/java/demo/Ice/interrupt/Server.java
@@ -0,0 +1,91 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class Server extends Ice.Application
+{
+ @Override
+ public int
+ run(String[] args)
+ {
+ if(args.length > 0)
+ {
+ System.err.println(appName() + ": too many arguments");
+ return 1;
+ }
+
+ //
+ // If ^C is pressed we want to interrupt all running upcalls from the
+ // dispatcher and destroy the communicator.
+ //
+ setInterruptHook(new Thread() {
+ @Override
+ public void
+ run()
+ {
+ //
+ // Call shutdownNow on the executor. This interrupts all
+ // executor threads causing any running servant dispatch threads
+ // to terminate quickly.
+ //
+ _executor.shutdownNow();
+ try
+ {
+ communicator().shutdown();
+ }
+ catch(Ice.LocalException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ });
+
+ Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TaskManager");
+ adapter.add(new TaskManagerI(_executor), communicator().stringToIdentity("manager"));
+ adapter.activate();
+ communicator().waitForShutdown();
+
+ return 0;
+ }
+
+ public static void
+ main(String[] args)
+ {
+ final Server app = new Server();
+
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = Ice.Util.createProperties();
+ initData.properties.load("config.server");
+
+ //
+ // This demo uses a dispatcher to execute any invocations on the server.
+ // By using an executor it is straightforward to interrupt any servant
+ // dispatch threads by using ExecutorService.shutdownNow.
+ //
+ initData.dispatcher = new Ice.Dispatcher() {
+ @Override
+ public void dispatch(Runnable runnable, Ice.Connection con)
+ {
+ app.getExecutor().submit(runnable);
+ }
+ };
+
+ int status = app.main("Server", args, initData);
+ System.exit(status);
+ }
+
+ ExecutorService getExecutor()
+ {
+ return _executor;
+ }
+
+ private ExecutorService _executor = Executors.newFixedThreadPool(5);
+}
diff --git a/java/demo/Ice/interrupt/TaskManager.ice b/java/demo/Ice/interrupt/TaskManager.ice
new file mode 100644
index 00000000000..f1829c81711
--- /dev/null
+++ b/java/demo/Ice/interrupt/TaskManager.ice
@@ -0,0 +1,21 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#pragma once
+
+module Demo
+{
+
+interface TaskManager
+{
+ void run(int id);
+ void shutdown();
+};
+
+};
diff --git a/java/demo/Ice/interrupt/TaskManagerI.java b/java/demo/Ice/interrupt/TaskManagerI.java
new file mode 100644
index 00000000000..a0cf5886d4c
--- /dev/null
+++ b/java/demo/Ice/interrupt/TaskManagerI.java
@@ -0,0 +1,56 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+import java.util.concurrent.ExecutorService;
+
+import Demo.*;
+
+public class TaskManagerI extends _TaskManagerDisp
+{
+ public TaskManagerI(ExecutorService executor)
+ {
+ _executor = executor;
+ }
+
+ @Override
+ public void
+ run(int id, Ice.Current current)
+ {
+ System.out.println("starting task " + id);
+ // Sleep for 10 seconds.
+ try
+ {
+ Thread.sleep(10000);
+ System.out.println("stopping task " + id);
+ }
+ catch(InterruptedException ex)
+ {
+ //
+ // We are done, the server is shutting down.
+ //
+ System.out.println("interrupted task " + id);
+ }
+ }
+
+ @Override
+ public void
+ shutdown(Ice.Current current)
+ {
+ System.out.println("Shutting down...");
+ //
+ // Call shutdownNow on the executor. This interrupts all
+ // executor threads causing any running upcalls to terminate
+ // quickly.
+ //
+ _executor.shutdownNow();
+ current.adapter.getCommunicator().shutdown();
+ }
+
+ private ExecutorService _executor;
+}
diff --git a/java/demo/Ice/interrupt/build.xml b/java/demo/Ice/interrupt/build.xml
new file mode 100644
index 00000000000..3d2b43813ae
--- /dev/null
+++ b/java/demo/Ice/interrupt/build.xml
@@ -0,0 +1,44 @@
+<!--
+ **********************************************************************
+
+ Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+
+ This copy of Ice is licensed to you under the terms described in the
+ ICE_LICENSE file included in this distribution.
+
+ **********************************************************************
+-->
+
+<project name="demo_Ice_interrupt" default="all" basedir=".">
+
+ <!-- set global properties for this build -->
+ <property name="top.dir" value="../../.."/>
+
+ <!-- import common definitions -->
+ <import file="${top.dir}/config/common.xml"/>
+
+ <target name="generate" depends="init">
+ <!-- Create the output directory for generated code -->
+ <mkdir dir="${generated.dir}"/>
+ <slice2java outputdir="${generated.dir}">
+ <fileset dir="." includes="TaskManager.ice"/>
+ </slice2java>
+ </target>
+
+ <target name="compile" depends="generate">
+ <mkdir dir="${class.dir}"/>
+ <javac srcdir=".:${generated.dir}" destdir="${class.dir}" debug="${debug}">
+ <exclude name="${generated.dir}/**"/>
+ <classpath refid="ice.classpath"/>
+ <compilerarg value="${javac.lint}"/>
+ </javac>
+ </target>
+
+ <target name="all" depends="compile"/>
+
+ <target name="clean">
+ <delete dir="${generated.dir}"/>
+ <delete dir="${class.dir}"/>
+ </target>
+
+</project>
diff --git a/java/demo/Ice/interrupt/config.client b/java/demo/Ice/interrupt/config.client
new file mode 100644
index 00000000000..2c25349f5df
--- /dev/null
+++ b/java/demo/Ice/interrupt/config.client
@@ -0,0 +1,46 @@
+#
+# The client reads this property to create the reference to the
+# "task manager" object in the server.
+#
+TaskManager.Proxy=manager:tcp -p 10000
+
+#
+# BackgroundIO must be enabled to use interrupt.
+#
+Ice.BackgroundIO=1
+
+#
+# Only connect to the localhost interface by default.
+#
+Ice.Default.Host=localhost
+
+#
+# Warn about connection exceptions.
+#
+Ice.Warn.Connections=1
+
+#
+# Network Tracing
+#
+# 0 = no network tracing
+# 1 = trace connection establishment and closure
+# 2 = like 1, but more detailed
+# 3 = like 2, but also trace data transfer
+#
+#Ice.Trace.Network=1
+
+#
+# Protocol Tracing
+#
+# 0 = no protocol tracing
+# 1 = trace protocol messages
+#
+#Ice.Trace.Protocol=1
+
+#
+# IceMX configuration.
+#
+#Ice.Admin.Endpoints=tcp -h localhost -p 10003
+Ice.Admin.InstanceName=client
+IceMX.Metrics.Debug.GroupBy=id
+IceMX.Metrics.ByParent.GroupBy=parent
diff --git a/java/demo/Ice/interrupt/config.server b/java/demo/Ice/interrupt/config.server
new file mode 100644
index 00000000000..a03349cff26
--- /dev/null
+++ b/java/demo/Ice/interrupt/config.server
@@ -0,0 +1,47 @@
+#
+# The server creates one single object adapter with the name
+# "Hello". The following line sets the endpoints for this
+# adapter.
+#
+TaskManager.Endpoints=tcp -p 10000
+
+#
+# BackgroundIO must be enabled to use interrupt.
+#
+Ice.BackgroundIO=1
+
+#
+# Only listen on the localhost interface by default.
+#
+Ice.Default.Host=localhost
+
+#
+# Warn about connection exceptions
+#
+Ice.Warn.Connections=1
+
+#
+# Network Tracing
+#
+# 0 = no network tracing
+# 1 = trace connection establishment and closure
+# 2 = like 1, but more detailed
+# 3 = like 2, but also trace data transfer
+#
+#Ice.Trace.Network=1
+
+#
+# Protocol Tracing
+#
+# 0 = no protocol tracing
+# 1 = trace protocol messages
+#
+#Ice.Trace.Protocol=1
+
+#
+# IceMX configuration.
+#
+#Ice.Admin.Endpoints=tcp -h localhost -p 10002
+Ice.Admin.InstanceName=server
+IceMX.Metrics.Debug.GroupBy=id
+IceMX.Metrics.ByParent.GroupBy=parent
diff --git a/java/demo/Ice/interrupt/expect.py b/java/demo/Ice/interrupt/expect.py
new file mode 100644
index 00000000000..65797c9a5ba
--- /dev/null
+++ b/java/demo/Ice/interrupt/expect.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# **********************************************************************
+#
+# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+#
+# This copy of Ice is licensed to you under the terms described in the
+# ICE_LICENSE file included in this distribution.
+#
+# **********************************************************************
+
+import sys, os
+
+path = [ ".", "..", "../..", "../../..", "../../../.." ]
+head = os.path.dirname(sys.argv[0])
+if len(head) > 0:
+ path = [os.path.join(head, p) for p in path]
+path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "demoscript")) ]
+if len(path) == 0:
+ raise RuntimeError("can't find toplevel directory!")
+sys.path.append(path[0])
+
+from demoscript import Util
+from demoscript.Ice import interrupt
+
+server = Util.spawn('java Server --Ice.PrintAdapterReady --Ice.Warn.Connections=0')
+server.expect('.* ready')
+client = Util.spawn('java Client --Ice.Warn.Connections=0')
+client.expect('.*==>')
+
+interrupt.run(client, server)
diff --git a/java/src/Ice/Application.java b/java/src/Ice/Application.java
index 3c649f72301..4190f39043e 100644
--- a/java/src/Ice/Application.java
+++ b/java/src/Ice/Application.java
@@ -240,16 +240,22 @@ public abstract class Application
synchronized(_mutex)
{
+ boolean interrupted = false;
while(_callbackInProgress)
{
try
{
_mutex.wait();
}
- catch(java.lang.InterruptedException ex)
+ catch(InterruptedException ex)
{
+ interrupted = true;
}
}
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
if(_destroyed)
{
@@ -504,7 +510,7 @@ public abstract class Application
}
//
- // Note that we let the IllegalStateException propogate
+ // Note that we let the IllegalStateException propagate
// out if necessary.
//
if(newHook != null)
@@ -590,6 +596,7 @@ public abstract class Application
}
catch(InterruptedException ex)
{
+ break;
}
}
}
@@ -625,6 +632,7 @@ public abstract class Application
}
catch(InterruptedException ex)
{
+ break;
}
}
}
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index 2cce84b373e..eed405ad00c 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -88,6 +88,7 @@ public class AsyncResult
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
}
@@ -127,6 +128,7 @@ public class AsyncResult
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
}
@@ -217,6 +219,12 @@ public class AsyncResult
}
catch(InterruptedException ex)
{
+ //
+ // Remove the EndCalled flag since it should be possible to
+ // call end_* again on the AsyncResult.
+ //
+ _state &= ~EndCalled;
+ throw new Ice.OperationInterruptedException();
}
}
if(_exception != null)
@@ -485,7 +493,8 @@ public class AsyncResult
if(handler != null)
{
- handler.asyncRequestTimedOut((IceInternal.OutgoingAsyncMessageCallback)this);
+ handler.asyncRequestCanceled((IceInternal.OutgoingAsyncMessageCallback)this,
+ new Ice.InvocationTimeoutException());
}
}
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 8d90a391253..3e8c735a99d 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -42,24 +42,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
{
- if(callback != null)
- {
- _startCallback = callback;
- return;
- }
+ _startCallback = callback;
+ return;
+ }
- //
- // Wait for the connection to be validated.
- //
+ //
+ // We start out in holding state.
+ //
+ setState(StateHolding);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
+ callback.connectionStartFailed(this, _exception);
+ return;
+ }
+
+ callback.connectionStartCompleted(this);
+ }
+
+ public void
+ startAndWait()
+ throws InterruptedException
+ {
+ try
+ {
+ synchronized(this)
+ {
+ if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed.
+ {
+ assert(_exception != null);
+ throw (Ice.LocalException)_exception.fillInStackTrace();
+ }
+
+ if(!initialize(IceInternal.SocketOperation.None) || !validate(IceInternal.SocketOperation.None))
+ {
while(_state <= StateNotValidated)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
if(_state >= StateClosing)
@@ -78,21 +99,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
catch(Ice.LocalException ex)
{
exception(ex);
- if(callback != null)
- {
- callback.connectionStartFailed(this, _exception);
- return;
- }
- else
- {
- waitUntilFinished();
- throw ex;
- }
- }
-
- if(callback != null)
- {
- callback.connectionStartCompleted(this);
+ waitUntilFinished();
+ return;
}
}
@@ -171,6 +179,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
@@ -208,21 +217,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public synchronized void
waitUntilHolding()
+ throws InterruptedException
{
while(_state < StateHolding || _dispatchCount > 0)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
}
public synchronized void
waitUntilFinished()
+ throws InterruptedException
{
//
// We wait indefinitely until the connection is finished and all
@@ -232,13 +237,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
while(_state < StateFinished || _dispatchCount > 0)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
assert(_state == StateFinished);
@@ -488,19 +487,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
prepareBatchRequest(IceInternal.BasicStream os)
throws IceInternal.RetryException
{
- //
- // Wait if flushing is currently in progress.
- //
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
+ waitBatchStreamInUse();
if(_exception != null)
{
@@ -692,7 +679,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
flushBatchRequests()
{
IceInternal.BatchOutgoing out = new IceInternal.BatchOutgoing(this, _instance, __flushBatchRequests_name);
- out.invoke();
+ try
+ {
+ out.invoke();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
}
private static final String __flushBatchRequests_name = "flushBatchRequests";
@@ -769,17 +763,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
synchronized public boolean
flushBatchRequests(IceInternal.BatchOutgoing out)
{
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
-
+ waitBatchStreamInUse();
if(_exception != null)
{
throw (Ice.LocalException)_exception.fillInStackTrace();
@@ -832,16 +816,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
synchronized public int
flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
{
- while(_batchStreamInUse && _exception == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
+ waitBatchStreamInUse();
if(_exception != null)
{
@@ -941,8 +916,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
}
- synchronized public void
- requestTimedOut(IceInternal.OutgoingMessageCallback out)
+ synchronized public boolean
+ requestCanceled(IceInternal.OutgoingMessageCallback out, Ice.LocalException ex)
{
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
@@ -964,8 +939,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
it.remove();
}
- out.finished(new InvocationTimeoutException());
- return; // We're done.
+ out.finished(ex);
+ return true; // We're done.
}
}
@@ -977,16 +952,17 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(it2.next() == o)
{
- o.finished(new InvocationTimeoutException());
+ o.finished(ex);
it2.remove();
- return; // We're done.
+ return true; // We're done.
}
}
}
+ return false;
}
- public void
- asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync)
+ public boolean
+ asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
{
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
@@ -1008,8 +984,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
it.remove();
}
- outAsync.__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _threadPool, this);
+ return true; // We're done
}
}
@@ -1022,13 +998,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(it2.next() == o)
{
it2.remove();
- outAsync.__dispatchInvocationTimeout(_threadPool, this);
- return; // We're done.
+ outAsync.__dispatchInvocationCancel(ex, _threadPool, this);
+ return true; // We're done.
}
}
}
- }
+ return false;
+ }
@Override
synchronized public void
@@ -3092,6 +3069,35 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
}
+ private void waitBatchStreamInUse()
+ {
+ //
+ // This is similar to a mutex lock in that the flag is
+ // only true for a short time period. As such we don't permit the
+ // wait to be interrupted. Instead the interrupted status is saved\
+ // and restored.
+ //
+ boolean interrupted = false;
+ while(_batchStreamInUse && _exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private static class OutgoingMessage
{
OutgoingMessage(IceInternal.BasicStream stream, boolean compress, boolean adopt)
@@ -3263,4 +3269,5 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateClosed
Ice.Instrumentation.ConnectionState.ConnectionStateClosed, // StateFinished
};
+
}
diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java
index 6e2c67ac852..c3ab836967c 100644
--- a/java/src/Ice/ObjectAdapterI.java
+++ b/java/src/Ice/ObjectAdapterI.java
@@ -9,6 +9,8 @@
package Ice;
+import java.util.Map;
+
public final class ObjectAdapterI implements ObjectAdapter
{
@Override
@@ -22,7 +24,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
@Override
- public synchronized Communicator
+ public Communicator
getCommunicator()
{
return _communicator;
@@ -108,7 +110,7 @@ public final class ObjectAdapterI implements ObjectAdapter
synchronized(this)
{
- assert(!_deactivated); // Not possible if _waitForActivate = true;
+ assert(_deactivated == DeactivatedState.Steady); // Not possible if _waitForActivate = true;
//
// Signal threads waiting for the activation.
@@ -156,7 +158,21 @@ public final class ObjectAdapterI implements ObjectAdapter
for(IceInternal.IncomingConnectionFactory factory : incomingConnectionFactories)
{
- factory.waitUntilHolding();
+ try
+ {
+ factory.waitUntilHolding();
+ }
+ catch(InterruptedException ex)
+ {
+ synchronized(this)
+ {
+ if(--_waitForHold == 0)
+ {
+ notifyAll();
+ }
+ }
+ throw new Ice.OperationInterruptedException();
+ }
}
synchronized(this)
@@ -169,7 +185,7 @@ public final class ObjectAdapterI implements ObjectAdapter
//
// If we don't need to retry, we're done. Otherwise, we wait until
// all the waiters finish waiting on the connections and we try
- // again waiting on all the conncetions. This is necessary in the
+ // again waiting on all the connections. This is necessary in the
// case activate() is called by another thread while waitForHold()
// waits on the some connection, if we didn't retry, waitForHold()
// could return only after waiting on a subset of the connections.
@@ -187,8 +203,9 @@ public final class ObjectAdapterI implements ObjectAdapter
{
wait();
}
- catch(java.lang.InterruptedException ex)
+ catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
_waitForHoldRetry = false;
@@ -210,7 +227,7 @@ public final class ObjectAdapterI implements ObjectAdapter
// Ignore deactivation requests if the object adapter has
// already been deactivated.
//
- if(_deactivated)
+ if(_deactivated != DeactivatedState.Steady)
{
return;
}
@@ -228,6 +245,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
@@ -249,9 +267,7 @@ public final class ObjectAdapterI implements ObjectAdapter
outgoingConnectionFactory = _instance.outgoingConnectionFactory();
locatorInfo = _locatorInfo;
- _deactivated = true;
-
- notifyAll();
+ _deactivated = DeactivatedState.Deactivating;
}
try
@@ -282,48 +298,54 @@ public final class ObjectAdapterI implements ObjectAdapter
// requests being dispatched.
//
outgoingConnectionFactory.removeAdapter(this);
+
+ synchronized(this)
+ {
+ _deactivated = DeactivatedState.Deactivated;
+ notifyAll();
+ }
}
@Override
public void
waitForDeactivate()
{
- IceInternal.IncomingConnectionFactory[] incomingConnectionFactories;
-
- synchronized(this)
+ try
{
- if(_destroyed)
- {
- return;
- }
-
- //
- // Wait for deactivation of the adapter itself, and
- // for the return of all direct method calls using this
- // adapter.
- //
- while(!_deactivated || _directCount > 0)
+ IceInternal.IncomingConnectionFactory[] incomingConnectionFactories;
+ synchronized(this)
{
- try
+ if(_destroyed)
{
- wait();
+ return;
}
- catch(InterruptedException ex)
+
+ //
+ // Wait for deactivation of the adapter itself, and
+ // for the return of all direct method calls using this
+ // adapter.
+ //
+ while((_deactivated != DeactivatedState.Deactivated) || _directCount > 0)
{
+ wait();
}
+
+ incomingConnectionFactories =
+ _incomingConnectionFactories.toArray(new IceInternal.IncomingConnectionFactory[0]);
}
- incomingConnectionFactories =
- _incomingConnectionFactories.toArray(new IceInternal.IncomingConnectionFactory[0]);
+ //
+ // Now we wait for until all incoming connection factories are
+ // finished.
+ //
+ for(IceInternal.IncomingConnectionFactory f : incomingConnectionFactories)
+ {
+ f.waitUntilFinished();
+ }
}
-
- //
- // Now we wait for until all incoming connection factories are
- // finished.
- //
- for(IceInternal.IncomingConnectionFactory f : incomingConnectionFactories)
+ catch (InterruptedException e)
{
- f.waitUntilFinished();
+ throw new Ice.OperationInterruptedException();
}
}
@@ -331,7 +353,7 @@ public final class ObjectAdapterI implements ObjectAdapter
public synchronized boolean
isDeactivated()
{
- return _deactivated;
+ return _deactivated == DeactivatedState.Deactivated;
}
@Override
@@ -352,6 +374,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
@@ -384,7 +407,18 @@ public final class ObjectAdapterI implements ObjectAdapter
if(_threadPool != null)
{
_threadPool.destroy();
- _threadPool.joinWithAllThreads();
+ try
+ {
+ _threadPool.joinWithAllThreads();
+ }
+ catch (InterruptedException e)
+ {
+ synchronized(this)
+ {
+ _destroying = false;
+ }
+ throw new Ice.OperationInterruptedException();
+ }
}
IceInternal.ObjectAdapterFactory objectAdapterFactory;
@@ -495,7 +529,7 @@ public final class ObjectAdapterI implements ObjectAdapter
}
@Override
- public synchronized java.util.Map<String, Ice.Object>
+ public synchronized Map<String, Object>
removeAllFacets(Identity ident)
{
checkForDeactivation();
@@ -877,7 +911,7 @@ public final class ObjectAdapterI implements ObjectAdapter
IceInternal.ObjectAdapterFactory objectAdapterFactory, String name,
RouterPrx router, boolean noConfig)
{
- _deactivated = false;
+ _deactivated = DeactivatedState.Steady;
_instance = instance;
_communicator = communicator;
_objectAdapterFactory = objectAdapterFactory;
@@ -929,7 +963,7 @@ public final class ObjectAdapterI implements ObjectAdapter
//
// These need to be set to prevent finalizer from complaining.
//
- _deactivated = true;
+ _deactivated = DeactivatedState.Deactivated;
_destroyed = true;
_instance = null;
_incomingConnectionFactories = null;
@@ -1089,7 +1123,7 @@ public final class ObjectAdapterI implements ObjectAdapter
{
try
{
- if(!_deactivated)
+ if(_deactivated != DeactivatedState.Deactivated)
{
_instance.initializationData().logger.warning("object adapter `" + getName() +
"' has not been deactivated");
@@ -1173,7 +1207,7 @@ public final class ObjectAdapterI implements ObjectAdapter
private void
checkForDeactivation()
{
- if(_deactivated)
+ if(_deactivated != DeactivatedState.Steady)
{
ObjectAdapterDeactivatedException ex = new ObjectAdapterDeactivatedException();
ex.name = getName();
@@ -1606,7 +1640,12 @@ public final class ObjectAdapterI implements ObjectAdapter
return noProps;
}
- private boolean _deactivated;
+ private enum DeactivatedState {
+ Steady,
+ Deactivating,
+ Deactivated
+ };
+ private DeactivatedState _deactivated;
private IceInternal.Instance _instance;
private Communicator _communicator;
private IceInternal.ObjectAdapterFactory _objectAdapterFactory;
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 27504e0d4c2..2f9d1cd67d4 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -10,6 +10,7 @@
package Ice;
import Ice.Instrumentation.InvocationObserver;
+import IceInternal.QueueRequestHandler;
/**
* Base class for all proxies.
@@ -2391,8 +2392,15 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
IceInternal.RequestHandler handler = null;
try
{
- handler = __getRequestHandler(false);
- return handler.getConnection(true);
+ handler = __getRequestHandler();
+ try
+ {
+ return handler.waitForConnection();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
}
catch(Ice.Exception ex)
{
@@ -2412,6 +2420,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
catch(InterruptedException ex1)
{
+ throw new Ice.OperationInterruptedException();
}
}
}
@@ -2460,7 +2469,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
try
{
- return handler.getConnection(false);
+ return handler.getConnection();
}
catch(LocalException ex)
{
@@ -2477,7 +2486,14 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
ice_flushBatchRequests()
{
IceInternal.BatchOutgoing __og = new IceInternal.BatchOutgoing(this, __ice_flushBatchRequests_name);
- __og.invoke();
+ try
+ {
+ __og.invoke();
+ }
+ catch(InterruptedException ex)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
}
/**
@@ -2746,7 +2762,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
public final IceInternal.RequestHandler
- __getRequestHandler(boolean async)
+ __getRequestHandler()
{
if(_reference.getCacheConnection())
{
@@ -2756,16 +2772,12 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
return _requestHandler;
}
- // async = true to avoid blocking with the proxy mutex locked.
- _requestHandler = createRequestHandler(true);
+ _requestHandler = createRequestHandler();
return _requestHandler;
}
}
- final int mode = _reference.getMode();
- return createRequestHandler(async ||
- mode == IceInternal.Reference.ModeBatchOneway ||
- mode == IceInternal.Reference.ModeBatchDatagram);
+ return createRequestHandler();
}
public void
@@ -2777,7 +2789,21 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
{
if(previous == _requestHandler)
{
- _requestHandler = handler;
+ if(handler != null)
+ {
+ if(_reference.getInstance().queueRequests())
+ {
+ _requestHandler = new QueueRequestHandler(_reference.getInstance(), handler);
+ }
+ else
+ {
+ _requestHandler = handler;
+ }
+ }
+ else
+ {
+ _requestHandler = null;
+ }
}
else if(previous != null && _requestHandler != null)
{
@@ -2788,9 +2814,23 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
// update the request handler. See bug ICE-5489 for reasons why
// this can be useful.
//
- if(previous.getConnection(false) == _requestHandler.getConnection(false))
+ if(previous.getConnection() == _requestHandler.getConnection())
{
- _requestHandler = handler;
+ if(handler != null)
+ {
+ if(_reference.getInstance().queueRequests())
+ {
+ _requestHandler = new QueueRequestHandler(_reference.getInstance(), handler);
+ }
+ else
+ {
+ _requestHandler = handler;
+ }
+ }
+ else
+ {
+ _requestHandler = null;
+ }
}
}
catch(Ice.Exception ex)
@@ -2803,7 +2843,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
private IceInternal.RequestHandler
- createRequestHandler(boolean async)
+ createRequestHandler()
{
if(_reference.getCollocationOptimized())
{
@@ -2814,14 +2854,12 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
}
}
- if(async)
- {
- return (new IceInternal.ConnectRequestHandler(_reference, this)).connect();
- }
- else
+ IceInternal.RequestHandler handler = (new IceInternal.ConnectRequestHandler(_reference, this)).connect();
+ if(_reference.getInstance().queueRequests())
{
- return new IceInternal.ConnectionRequestHandler(_reference, this);
+ handler = new QueueRequestHandler(_reference.getInstance(), handler);
}
+ return handler;
}
//
diff --git a/java/src/Ice/ThreadNotification.java b/java/src/Ice/ThreadNotification.java
index ef274ee9b41..f98e7570fa8 100644
--- a/java/src/Ice/ThreadNotification.java
+++ b/java/src/Ice/ThreadNotification.java
@@ -11,7 +11,7 @@ package Ice;
/**
* Interface for thread notification hooks. Applications can derive
- * a class tat implements the <code>start</code> and <code>stop</code>
+ * a class that implements the <code>start</code> and <code>stop</code>
* methods to intercept creation and destruction of threads created
* by the Ice run time.
*
diff --git a/java/src/Ice/Util.java b/java/src/Ice/Util.java
index ece779d6e06..e591f4b75af 100644
--- a/java/src/Ice/Util.java
+++ b/java/src/Ice/Util.java
@@ -736,7 +736,6 @@ public final class Util
public final static Ice.EncodingVersion Encoding_1_0 = new Ice.EncodingVersion((byte)1, (byte)0);
public final static Ice.EncodingVersion Encoding_1_1 = new Ice.EncodingVersion((byte)1, (byte)1);
- private static String _localAddress = null;
private static java.lang.Object _processLoggerMutex = new java.lang.Object();
private static Logger _processLogger = null;
}
diff --git a/java/src/IceInternal/BasicStream.java b/java/src/IceInternal/BasicStream.java
index 8d8a999e5af..efc602aa773 100644
--- a/java/src/IceInternal/BasicStream.java
+++ b/java/src/IceInternal/BasicStream.java
@@ -9,6 +9,8 @@
package IceInternal;
+import java.io.IOException;
+
public class BasicStream
{
public
@@ -941,16 +943,31 @@ public class BasicStream
{
return null;
}
+ ObjectInputStream in = null;
try
{
InputStreamWrapper w = new InputStreamWrapper(sz, this);
- ObjectInputStream in = new ObjectInputStream(_instance, w);
+ in = new ObjectInputStream(_instance, w);
return (java.io.Serializable)in.readObject();
}
catch(java.lang.Exception ex)
{
throw new Ice.MarshalException("cannot deserialize object", ex);
}
+ finally
+ {
+ if(in != null)
+ {
+ try
+ {
+ in.close();
+ }
+ catch (IOException ex)
+ {
+ throw new Ice.MarshalException("cannot deserialize object", ex);
+ }
+ }
+ }
}
public void
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java
index a3294769e56..a5f958eb10b 100644
--- a/java/src/IceInternal/BatchOutgoing.java
+++ b/java/src/IceInternal/BatchOutgoing.java
@@ -35,6 +35,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback
public void
invoke()
+ throws InterruptedException
{
assert(_proxy != null || _connection != null);
@@ -49,13 +50,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback
{
while(_exception == null && !_sent)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
if(_exception != null)
{
@@ -68,7 +63,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback
RequestHandler handler = null;
try
{
- handler = _proxy.__getRequestHandler(false);
+ handler = _proxy.__getRequestHandler();
if(handler.sendRequest(this))
{
return;
@@ -84,17 +79,11 @@ public final class BatchOutgoing implements OutgoingMessageCallback
long deadline = now + timeout;
while(_exception == null && !_sent && !timedOut)
{
- try
- {
- wait(deadline - now);
- if(_exception == null && !_sent)
- {
- now = Time.currentMonotonicTimeMillis();
- timedOut = now >= deadline;
- }
- }
- catch(InterruptedException ex)
+ wait(deadline - now);
+ if(_exception == null && !_sent)
{
+ now = Time.currentMonotonicTimeMillis();
+ timedOut = now >= deadline;
}
}
}
@@ -102,32 +91,21 @@ public final class BatchOutgoing implements OutgoingMessageCallback
{
while(_exception == null && !_sent)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
}
}
if(timedOut)
{
- handler.requestTimedOut(this);
-
- synchronized(this)
+ if(handler.requestCanceled(this, new Ice.InvocationTimeoutException()))
{
- while(_exception == null)
+ synchronized(this)
{
- try
+ while(_exception == null)
{
wait();
}
- catch(InterruptedException ex)
- {
- }
}
}
}
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index 1d839796356..74777db0d4b 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -86,7 +86,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
@Override
public void
- __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection)
+ __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
threadPool.dispatch(
new DispatchWorkItem(connection)
@@ -95,7 +95,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
public void
run()
{
- BatchOutgoingAsync.this.__finished(new Ice.InvocationTimeoutException());
+ BatchOutgoingAsync.this.__finished(ex);
}
});
}
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 1b47feccbb9..4c79b2e3c58 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -97,17 +97,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
synchronized public void
prepareBatchRequest(BasicStream os)
{
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
+ waitStreamInUse();
if(_batchStream.isEmpty())
{
try
@@ -232,8 +222,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
@Override
- synchronized public void
- requestTimedOut(OutgoingMessageCallback out)
+ synchronized public boolean
+ requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
{
Integer requestId = _sendRequests.get(out);
if(requestId != null)
@@ -242,8 +232,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_requests.remove(requestId);
}
- out.finished(new Ice.InvocationTimeoutException());
+ out.finished(ex);
_sendRequests.remove(out);
+ return true;
}
else if(out instanceof Outgoing)
{
@@ -253,17 +244,18 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
if(e.getValue() == o)
{
- out.finished(new Ice.InvocationTimeoutException());
+ out.finished(ex);
_requests.remove(e.getKey());
- return; // We're done.
+ return true; // We're done.
}
}
}
+ return false;
}
@Override
- synchronized public void
- asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
+ synchronized public boolean
+ asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
{
Integer requestId = _sendAsyncRequests.get(outAsync);
if(requestId != null)
@@ -273,8 +265,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_asyncRequests.remove(requestId);
}
_sendAsyncRequests.remove(outAsync);
- outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ return true; // We're done
}
if(outAsync instanceof OutgoingAsync)
@@ -286,11 +278,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(e.getValue() == o)
{
_asyncRequests.remove(e.getKey());
- outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ return true; // We're done
}
}
}
+ return false;
}
public void
@@ -364,17 +357,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
int invokeNum;
synchronized(this)
{
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
+ waitStreamInUse();
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
@@ -428,16 +411,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
int invokeNum;
synchronized(this)
{
- while(_batchStreamInUse)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
+ waitStreamInUse();
invokeNum = _batchRequestNum;
if(_batchRequestNum > 0)
@@ -570,7 +544,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
@Override
public Ice.ConnectionI
- getConnection(boolean wait)
+ getConnection()
+ {
+ return null;
+ }
+
+ @Override
+ public Ice.ConnectionI
+ waitForConnection()
{
return null;
}
@@ -720,6 +701,36 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
+ private void
+ waitStreamInUse()
+ {
+ //
+ // This is similar to a mutex lock in that the stream is
+ // only "locked" while marshaling. As such we don't permit the wait
+ // to be interrupted. Instead the interrupted status is saved and
+ // restored.
+ //
+ boolean interrupted = false;
+ while(_batchStreamInUse)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private final Reference _reference;
private final boolean _dispatcher;
private final boolean _response;
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
index 6e96ea9f81f..3c3d07bcc4e 100644
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
@@ -60,8 +60,10 @@ public class CommunicatorBatchOutgoingAsync extends Ice.AsyncResult
return false;
}
+ // TODO: MJN: This is missing a test.
+ @Override
public void
- __finished(Ice.LocalException ex, boolean sent)
+ __finished(Ice.Exception ex)
{
if(_childObserver != null)
{
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 276395ef3ef..9cc70b6a76d 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -9,6 +9,8 @@
package IceInternal;
+import Ice.ConnectionI;
+
public class ConnectRequestHandler
implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback
{
@@ -62,17 +64,7 @@ public class ConnectRequestHandler
{
synchronized(this)
{
- while(_batchRequestInProgress)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
-
+ waitBatchRequestInProgress();
try
{
if(!initialized())
@@ -189,14 +181,14 @@ public class ConnectRequestHandler
}
@Override
- public void
- requestTimedOut(OutgoingMessageCallback out)
+ public boolean
+ requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
{
synchronized(this)
{
if(_exception != null)
{
- return; // The request has been notified of a failure already.
+ return false; // The request has been notified of a failure already.
}
if(!initialized())
@@ -207,26 +199,26 @@ public class ConnectRequestHandler
Request request = it.next();
if(request.out == out)
{
- out.finished(new Ice.InvocationTimeoutException());
+ out.finished(ex);
it.remove();
- return;
+ return true;
}
}
assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- _connection.requestTimedOut(out);
+ return _connection.requestCanceled(out, ex);
}
@Override
- public void
- asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
+ public boolean
+ asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex)
{
synchronized(this)
{
if(_exception != null)
{
- return; // The request has been notified of a failure already.
+ return false; // The request has been notified of a failure already.
}
if(!initialized())
@@ -238,14 +230,14 @@ public class ConnectRequestHandler
if(request.outAsync == outAsync)
{
it.remove();
- outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
- return; // We're done
+ outAsync.__dispatchInvocationCancel(ex, _reference.getInstance().clientThreadPool(), null);
+ return true; // We're done
}
}
assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- _connection.asyncRequestTimedOut(outAsync);
+ return _connection.asyncRequestCanceled(outAsync, ex);
}
@Override
@@ -256,37 +248,33 @@ public class ConnectRequestHandler
}
@Override
- synchronized public Ice.ConnectionI
- getConnection(boolean waitInit)
- {
- if(waitInit)
- {
- //
- // Wait for the connection establishment to complete or fail.
- //
- while(!_initialized && _exception == null)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
- }
-
+ synchronized public ConnectionI
+ getConnection() {
if(_exception != null)
{
throw (Ice.LocalException)_exception.fillInStackTrace();
}
else
{
- assert(!waitInit || _initialized);
return _connection;
}
}
+ @Override
+ synchronized public
+ ConnectionI waitForConnection()
+ throws InterruptedException
+ {
+ //
+ // Wait for the connection establishment to complete or fail.
+ //
+ while(!_initialized && _exception == null)
+ {
+ wait();
+ }
+ return getConnection();
+ }
+
//
// Implementation of Reference.GetConnectionCallback
//
@@ -338,14 +326,14 @@ public class ConnectRequestHandler
if(!_requests.isEmpty())
{
_reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
- {
- @Override
- public void
- run()
- {
- flushRequestsWithException();
- };
- });
+ {
+ @Override
+ public void
+ run()
+ {
+ flushRequestsWithException();
+ };
+ });
}
notifyAll();
@@ -393,16 +381,29 @@ public class ConnectRequestHandler
}
else
{
+ //
+ // This is similar to a mutex lock in that the flag is
+ // only true for a short period of time.
+ //
+ boolean interrupted = false;
while(_flushing && _exception == null)
{
try
{
wait();
}
- catch(java.lang.InterruptedException ex)
+ catch(InterruptedException ex)
{
+ interrupted = true;
}
}
+ //
+ // Restore the interrupted status.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
if(_exception != null)
{
@@ -421,17 +422,7 @@ public class ConnectRequestHandler
synchronized(this)
{
assert(_connection != null && !_initialized);
-
- while(_batchRequestInProgress)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
+ waitBatchRequestInProgress();
//
// We set the _flushing flag to true to prevent any additional queuing. Callers
@@ -566,7 +557,35 @@ public class ConnectRequestHandler
}
}
- void
+ private void
+ waitBatchRequestInProgress()
+ {
+ //
+ // This is similar to a mutex lock in that the stream is
+ // only "locked" while the request is in progress.
+ //
+ boolean interrupted = false;
+ while(_batchRequestInProgress)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
+ }
+ //
+ // Restore the interrupted flag if we were interrupted.
+ //
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void
flushRequestsWithException()
{
for(Request request : _requests)
diff --git a/java/src/IceInternal/ConnectionRequestHandler.java b/java/src/IceInternal/ConnectionRequestHandler.java
index 8d44be04e5b..4dcf9db63e8 100644
--- a/java/src/IceInternal/ConnectionRequestHandler.java
+++ b/java/src/IceInternal/ConnectionRequestHandler.java
@@ -13,54 +13,51 @@ public class ConnectionRequestHandler implements RequestHandler
{
@Override
public void
- prepareBatchRequest(BasicStream out)
- throws RetryException
- {
+ prepareBatchRequest(BasicStream out) throws RetryException {
_connection.prepareBatchRequest(out);
}
@Override
public void
- finishBatchRequest(BasicStream out)
- {
+ finishBatchRequest(BasicStream out) {
_connection.finishBatchRequest(out, _compress);
}
@Override
public void
- abortBatchRequest()
- {
+ abortBatchRequest() {
_connection.abortBatchRequest();
}
@Override
public boolean
sendRequest(OutgoingMessageCallback out)
- throws RetryException
- {
- return out.send(_connection, _compress, _response) && !_response; // Finished if sent and no response
+ throws RetryException {
+ //
+ // Finished if sent and no response.
+ //
+ return out.send(_connection, _compress, _response) && !_response;
}
@Override
public int
sendAsyncRequest(OutgoingAsyncMessageCallback out)
- throws RetryException
- {
+ throws RetryException {
return out.__send(_connection, _compress, _response);
}
@Override
- public void
- requestTimedOut(OutgoingMessageCallback out)
+ public boolean
+ requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex)
{
- _connection.requestTimedOut(out);
+ return _connection.requestCanceled(out, ex);
}
@Override
- public void
- asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
+ public boolean
+ asyncRequestCanceled(OutgoingAsyncMessageCallback outgoingAsync, Ice.LocalException ex)
{
- _connection.asyncRequestTimedOut(outAsync);
+ return _connection.asyncRequestCanceled(outgoingAsync, ex);
}
@Override
@@ -72,35 +69,20 @@ public class ConnectionRequestHandler implements RequestHandler
@Override
public Ice.ConnectionI
- getConnection(boolean wait)
+ getConnection()
{
return _connection;
}
- public
- ConnectionRequestHandler(Reference ref, Ice.ObjectPrx proxy)
+ @Override
+ public Ice.ConnectionI
+ waitForConnection()
{
- _reference = ref;
- _response = _reference.getMode() == Reference.ModeTwoway;
-
- Ice.BooleanHolder compress = new Ice.BooleanHolder();
- _connection = _reference.getConnection(compress);
- _compress = compress.value;
-
- //
- // If this proxy is for a non-local object, and we are using a router, then
- // add this proxy to the router info object.
- //
- IceInternal.RouterInfo ri = _reference.getRouterInfo();
- if(ri != null)
- {
- ri.addProxy(proxy);
- }
+ return _connection;
}
- public
- ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection, boolean compress)
- {
+ public ConnectionRequestHandler(Reference ref, Ice.ConnectionI connection,
+ boolean compress) {
_reference = ref;
_response = _reference.getMode() == Reference.ModeTwoway;
_connection = connection;
@@ -111,4 +93,5 @@ public class ConnectionRequestHandler implements RequestHandler
private final boolean _response;
private final Ice.ConnectionI _connection;
private final boolean _compress;
+
}
diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java
index 66a75956a3d..ee49e23ddf2 100644
--- a/java/src/IceInternal/EndpointHostResolver.java
+++ b/java/src/IceInternal/EndpointHostResolver.java
@@ -18,13 +18,10 @@ public class EndpointHostResolver
_preferIPv6 = instance.preferIPv6();
try
{
- _thread = new HelperThread();
+ _threadName = Util.createThreadName(_instance.initializationData().properties, "Ice.HostResolver");
+ _executor = java.util.concurrent.Executors.newFixedThreadPool(1,
+ Util.createThreadFactory(_instance.initializationData().properties, _threadName));
updateObserver();
- if(_instance.initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0)
- {
- _thread.setPriority(Util.getThreadPriorityProperty(_instance.initializationData().properties, "Ice"));
- }
- _thread.start();
}
catch(RuntimeException ex)
{
@@ -92,8 +89,8 @@ public class EndpointHostResolver
return connectors;
}
- synchronized public void resolve(String host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint,
- EndpointI_connectors callback)
+ synchronized public void resolve(final String host, final int port, final Ice.EndpointSelectionType selType, final IPEndpointI endpoint,
+ final EndpointI_connectors callback)
{
//
// TODO: Optimize to avoid the lookup if the given host is a textual IPv4 or IPv6
@@ -103,134 +100,105 @@ public class EndpointHostResolver
assert(!_destroyed);
- ResolveEntry entry = new ResolveEntry();
- entry.host = host;
- entry.port = port;
- entry.selType = selType;
- entry.endpoint = endpoint;
- entry.callback = callback;
-
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
- if(obsv != null)
+ final Ice.Instrumentation.ThreadObserver threadObserver = _observer;
+ final Ice.Instrumentation.Observer observer = getObserver(endpoint);
+ if(observer != null)
{
- entry.observer = obsv.getEndpointLookupObserver(endpoint);
- if(entry.observer != null)
- {
- entry.observer.attach();
- }
+ observer.attach();
}
- _queue.add(entry);
- notify();
- }
-
- synchronized public void destroy()
- {
- assert(!_destroyed);
- _destroyed = true;
- notify();
- }
-
- public void joinWithThread()
- {
- if(_thread != null)
- {
- try
- {
- _thread.join();
- }
- catch(InterruptedException ex)
- {
- }
- if(_observer != null)
+ _executor.execute(new Runnable()
{
- _observer.detach();
- }
- }
- }
-
- public void run()
- {
- while(true)
- {
- ResolveEntry r;
- Ice.Instrumentation.ThreadObserver threadObserver;
- synchronized(this)
- {
- while(!_destroyed && _queue.isEmpty())
+ @Override
+ public void run()
{
+ synchronized(EndpointHostResolver.this)
+ {
+ if(_destroyed)
+ {
+ Ice.CommunicatorDestroyedException ex = new Ice.CommunicatorDestroyedException();
+ if(observer != null)
+ {
+ observer.failed(ex.ice_name());
+ observer.detach();
+ }
+ callback.exception(ex);
+ return;
+ }
+ }
+
try
{
- wait();
+ if(threadObserver != null)
+ {
+ threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle,
+ Ice.Instrumentation.ThreadState.ThreadStateInUseForOther);
+ }
+
+ NetworkProxy networkProxy = _instance.networkProxy();
+ if(networkProxy != null)
+ {
+ networkProxy = networkProxy.resolveHost();
+ }
+
+ callback.connectors(endpoint.connectors(Network.getAddresses(host,
+ port,
+ _protocol,
+ selType,
+ _preferIPv6),
+ networkProxy));
+
+ if(threadObserver != null)
+ {
+ threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForOther,
+ Ice.Instrumentation.ThreadState.ThreadStateIdle);
+ }
+
+ if(observer != null)
+ {
+ observer.detach();
+ }
}
- catch(java.lang.InterruptedException ex)
+ catch(Ice.LocalException ex)
{
+ if(observer != null)
+ {
+ observer.failed(ex.ice_name());
+ observer.detach();
+ }
+ callback.exception(ex);
}
}
+ });
+ }
- if(_destroyed)
- {
- break;
- }
-
- r = _queue.removeFirst();
- threadObserver = _observer;
- }
-
- try
- {
- if(threadObserver != null)
- {
- threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle,
- Ice.Instrumentation.ThreadState.ThreadStateInUseForOther);
- }
-
- NetworkProxy networkProxy = _instance.networkProxy();
- if(networkProxy != null)
- {
- networkProxy = networkProxy.resolveHost();
- }
-
- r.callback.connectors(r.endpoint.connectors(Network.getAddresses(r.host,
- r.port,
- _protocol,
- r.selType,
- _preferIPv6),
- networkProxy));
+ synchronized public void destroy()
+ {
+ assert(!_destroyed);
+ _destroyed = true;
- if(threadObserver != null)
- {
- threadObserver.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForOther,
- Ice.Instrumentation.ThreadState.ThreadStateIdle);
- }
+ //
+ // Shutdown the executor. No new tasks will be accepted.
+ // Existing tasks will execute.
+ //
+ _executor.shutdown();
+ }
- if(r.observer != null)
- {
- r.observer.detach();
- }
- }
- catch(Ice.LocalException ex)
- {
- if(r.observer != null)
- {
- r.observer.failed(ex.ice_name());
- r.observer.detach();
- }
- r.callback.exception(ex);
- }
+ public void joinWithThread()
+ throws InterruptedException
+ {
+ // Wait for the executor to terminate.
+ try
+ {
+ _executor.awaitTermination(Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS);
}
-
- for(ResolveEntry entry : _queue)
+ finally
{
- Ice.CommunicatorDestroyedException ex = new Ice.CommunicatorDestroyedException();
- if(entry.observer != null)
+ if(_observer != null)
{
- entry.observer.failed(ex.ice_name());
- entry.observer.detach();
+ _observer.detach();
}
- entry.callback.exception(ex);
}
- _queue.clear();
}
synchronized public void updateObserver()
@@ -238,8 +206,7 @@ public class EndpointHostResolver
Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
if(obsv != null)
{
- _observer = obsv.getThreadObserver("Communicator",
- _thread.getName(),
+ _observer = obsv.getThreadObserver("Communicator", _threadName,
Ice.Instrumentation.ThreadState.ThreadStateIdle,
_observer);
if(_observer != null)
@@ -249,49 +216,22 @@ public class EndpointHostResolver
}
}
- static class ResolveEntry
+ private Ice.Instrumentation.Observer
+ getObserver(IPEndpointI endpoint)
{
- String host;
- int port;
- Ice.EndpointSelectionType selType;
- IPEndpointI endpoint;
- EndpointI_connectors callback;
- Ice.Instrumentation.Observer observer;
+ Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
+ if(obsv != null)
+ {
+ return obsv.getEndpointLookupObserver(endpoint);
+ }
+ return null;
}
private final Instance _instance;
private final int _protocol;
private final boolean _preferIPv6;
private boolean _destroyed;
- private java.util.LinkedList<ResolveEntry> _queue = new java.util.LinkedList<ResolveEntry>();
private Ice.Instrumentation.ThreadObserver _observer;
-
- private final class HelperThread extends Thread
- {
- HelperThread()
- {
- String threadName = _instance.initializationData().properties.getProperty("Ice.ProgramName");
- if(threadName.length() > 0)
- {
- threadName += "-";
- }
- setName(threadName + "Ice.HostResolver");
- }
-
- @Override
- public void run()
- {
- try
- {
- EndpointHostResolver.this.run();
- }
- catch(java.lang.Exception ex)
- {
- String s = "exception in endpoint host resolver thread " + getName() + ":\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
- }
- }
- }
-
- private HelperThread _thread;
+ private String _threadName;
+ private java.util.concurrent.ExecutorService _executor;
}
diff --git a/java/src/IceInternal/FixedReference.java b/java/src/IceInternal/FixedReference.java
index 9df5d5f4b68..c930fe6156c 100644
--- a/java/src/IceInternal/FixedReference.java
+++ b/java/src/IceInternal/FixedReference.java
@@ -210,78 +210,70 @@ public class FixedReference extends Reference
}
@Override
- public Ice.ConnectionI
- getConnection(Ice.BooleanHolder compress)
+ public void
+ getConnection(GetConnectionCallback callback)
{
- switch(getMode())
+ try
{
- case Reference.ModeTwoway:
- case Reference.ModeOneway:
- case Reference.ModeBatchOneway:
+ Ice.BooleanHolder compress = new Ice.BooleanHolder();
+ switch(getMode())
{
- if(_fixedConnection.endpoint().datagram())
+ case Reference.ModeTwoway:
+ case Reference.ModeOneway:
+ case Reference.ModeBatchOneway:
{
- throw new Ice.NoEndpointException("");
+ if(_fixedConnection.endpoint().datagram())
+ {
+ throw new Ice.NoEndpointException("");
+ }
+ break;
}
- break;
- }
- case Reference.ModeDatagram:
- case Reference.ModeBatchDatagram:
- {
- if(!_fixedConnection.endpoint().datagram())
+ case Reference.ModeDatagram:
+ case Reference.ModeBatchDatagram:
{
- throw new Ice.NoEndpointException("");
+ if(!_fixedConnection.endpoint().datagram())
+ {
+ throw new Ice.NoEndpointException("");
+ }
+ break;
}
- break;
}
- }
-
- //
- // If a secure connection is requested or secure overrides is set,
- // check if the connection is secure.
- //
- boolean secure;
- DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
- if(defaultsAndOverrides.overrideSecure)
- {
- secure = defaultsAndOverrides.overrideSecureValue;
- }
- else
- {
- secure = getSecure();
- }
- if(secure && !_fixedConnection.endpoint().secure())
- {
- throw new Ice.NoEndpointException("");
- }
- _fixedConnection.throwException(); // Throw in case our connection is already destroyed.
+ //
+ // If a secure connection is requested or secure overrides is set,
+ // check if the connection is secure.
+ //
+ boolean secure;
+ DefaultsAndOverrides defaultsAndOverrides = getInstance().defaultsAndOverrides();
+ if(defaultsAndOverrides.overrideSecure)
+ {
+ secure = defaultsAndOverrides.overrideSecureValue;
+ }
+ else
+ {
+ secure = getSecure();
+ }
+ if(secure && !_fixedConnection.endpoint().secure())
+ {
+ throw new Ice.NoEndpointException("");
+ }
- if(defaultsAndOverrides.overrideCompress)
- {
- compress.value = defaultsAndOverrides.overrideCompressValue;
- }
- else if(_overrideCompress)
- {
- compress.value = _compress;
- }
- else
- {
- compress.value = _fixedConnection.endpoint().compress();
- }
- return _fixedConnection;
- }
+ _fixedConnection.throwException(); // Throw in case our connection is already destroyed.
- @Override
- public void
- getConnection(GetConnectionCallback callback)
- {
- try
- {
- Ice.BooleanHolder compress = new Ice.BooleanHolder();
- Ice.ConnectionI connection = getConnection(compress);
- callback.setConnection(connection, compress.value);
+ if(defaultsAndOverrides.overrideCompress)
+ {
+ compress.value = defaultsAndOverrides.overrideCompressValue;
+ }
+ else if(_overrideCompress)
+ {
+ compress.value = _compress;
+ }
+ else
+ {
+ compress.value = _fixedConnection.endpoint().compress();
+ }
+ callback.setConnection(_fixedConnection, compress.value);
}
catch(Ice.LocalException ex)
{
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 683d619875c..e0413fc1b5d 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -40,6 +40,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public void
waitUntilHolding()
+ throws InterruptedException
{
java.util.LinkedList<Ice.ConnectionI> connections;
@@ -51,13 +52,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
//
while(_state < StateHolding)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
//
@@ -78,6 +73,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
public void
waitUntilFinished()
+ throws InterruptedException
{
java.util.LinkedList<Ice.ConnectionI> connections = null;
@@ -89,13 +85,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
//
while(_state != StateFinished)
{
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
+ wait();
}
//
@@ -114,7 +104,20 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
for(Ice.ConnectionI connection : connections)
{
- connection.waitUntilFinished();
+ try
+ {
+ connection.waitUntilFinished();
+ }
+ catch(InterruptedException e)
+ {
+ //
+ // Force close all of the connections.
+ //
+ for(Ice.ConnectionI c : connections)
+ {
+ c.close(true);
+ }
+ }
}
}
@@ -375,7 +378,7 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
Ice.ConnectionI connection =
new Ice.ConnectionI(_adapter.getCommunicator(), _instance, null, _transceiver, null, _endpoint,
_adapter);
- connection.start(null);
+ connection.startAndWait();
_connections.add(connection);
}
else
@@ -425,6 +428,10 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice
{
throw (Ice.LocalException)ex;
}
+ else if(ex instanceof InterruptedException)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
else
{
throw new Ice.SyscallException(ex);
diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java
index 8eb6de0f210..fbe2f3333fb 100644
--- a/java/src/IceInternal/Instance.java
+++ b/java/src/IceInternal/Instance.java
@@ -9,28 +9,27 @@
package IceInternal;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
public final class Instance
{
private class ObserverUpdaterI implements Ice.Instrumentation.ObserverUpdater
{
- ObserverUpdaterI(Instance instance)
- {
- _instance = instance;
- }
-
- @Override public void
+ @Override
+ public void
updateConnectionObservers()
{
- _instance.updateConnectionObservers();
+ Instance.this.updateConnectionObservers();
}
- @Override public void
+ @Override
+ public void
updateThreadObservers()
{
- _instance.updateThreadObservers();
+ Instance.this.updateThreadObservers();
}
-
- final private Instance _instance;
}
public Ice.InitializationData
@@ -603,6 +602,18 @@ public final class Instance
return _useApplicationClassLoader;
}
+ public boolean
+ queueRequests()
+ {
+ return _hasQueueExecutor;
+ }
+
+ public ExecutorService
+ getQueueExecutor()
+ {
+ return _queueExecutor;
+ }
+
//
// Only for use by Ice.CommunicatorI
//
@@ -735,7 +746,6 @@ public final class Instance
}
}
- _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2);
_implicitContext = Ice.ImplicitContextI.create(_initData.properties.getProperty("Ice.ImplicitContext"));
@@ -834,6 +844,22 @@ public final class Instance
{
_observer = _initData.observer;
}
+
+ if(_initData.properties.getPropertyAsInt("Ice.BackgroundIO") > 0)
+ {
+ _hasQueueExecutor = true;
+ _queueExecutor = Executors.newFixedThreadPool(1,
+ Util.createThreadFactory(_initData.properties,
+ Util.createThreadName(_initData.properties, "Ice.BackgroundIO")));
+ //
+ // If background IO is enabled message buffers cannot be cached.
+ //
+ _cacheMessageBuffers = 0;
+ }
+ else
+ {
+ _cacheMessageBuffers = _initData.properties.getPropertyAsIntWithDefault("Ice.CacheMessageBuffers", 2);
+ }
}
catch(Ice.LocalException ex)
{
@@ -889,7 +915,7 @@ public final class Instance
//
if(_observer != null)
{
- _observer.setObserverUpdater(new ObserverUpdaterI(this));
+ _observer.setObserverUpdater(new ObserverUpdaterI());
}
//
@@ -899,29 +925,9 @@ public final class Instance
{
java.util.concurrent.ScheduledThreadPoolExecutor executor =
new java.util.concurrent.ScheduledThreadPoolExecutor(1,
- new java.util.concurrent.ThreadFactory()
- {
- @Override
- public Thread newThread(Runnable r)
- {
- Thread t = new Thread(r);
- if(initializationData().properties.getProperty("Ice.ThreadPriority").length() > 0)
- {
- final int priority = Util.getThreadPriorityProperty(
- initializationData().properties, "Ice");
- t.setPriority(priority);
- }
-
- String threadName = initializationData().properties.getProperty("Ice.ProgramName");
- if(threadName.length() > 0)
- {
- threadName += "-";
- }
- t.setName(threadName + "Ice.Timer");
+ Util.createThreadFactory(_initData.properties,
+ Util.createThreadName(_initData.properties, "Ice.Timer")));
- return t;
- }
- });
executor.setRemoveOnCancelPolicy(true);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
_timer = executor;
@@ -1014,6 +1020,7 @@ public final class Instance
_state = StateDestroyInProgress;
}
+
if(_objectAdapterFactory != null)
{
_objectAdapterFactory.shutdown();
@@ -1031,7 +1038,19 @@ public final class Instance
if(_outgoingConnectionFactory != null)
{
- _outgoingConnectionFactory.waitUntilFinished();
+ try
+ {
+ _outgoingConnectionFactory.waitUntilFinished();
+ }
+ catch (InterruptedException e)
+ {
+ //
+ // Restore the interrupt, otherwise the instance will be
+ // left in an undefined state. The thread joins below will
+ // interrupt which is fine.
+ //
+ Thread.currentThread().interrupt();
+ }
}
if(_retryQueue != null)
@@ -1132,20 +1151,41 @@ public final class Instance
_state = StateDestroyed;
}
- //
- // Join with threads outside the synchronization.
- //
- if(clientThreadPool != null)
+ try
{
- clientThreadPool.joinWithAllThreads();
+ //
+ // Join with threads outside the synchronization.
+ //
+ if(clientThreadPool != null)
+ {
+ clientThreadPool.joinWithAllThreads();
+ }
+ if(serverThreadPool != null)
+ {
+ serverThreadPool.joinWithAllThreads();
+ }
+ if(endpointHostResolver != null)
+ {
+ endpointHostResolver.joinWithThread();
+ }
}
- if(serverThreadPool != null)
+ catch(InterruptedException ex)
{
- serverThreadPool.joinWithAllThreads();
+ throw new Ice.OperationInterruptedException();
}
- if(endpointHostResolver != null)
+
+ if(_queueExecutor != null)
{
- endpointHostResolver.joinWithThread();
+ _queueExecutor.shutdown();
+ try
+ {
+ _queueExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ _queueExecutor = null;
}
if(_initData.properties.getPropertyAsInt("Ice.Warn.UnusedProperties") > 0)
@@ -1288,4 +1328,6 @@ public final class Instance
final private boolean _useApplicationClassLoader;
private static boolean _oneOffDone = false;
+ private boolean _hasQueueExecutor = false;
+ private ExecutorService _queueExecutor;
}
diff --git a/java/src/IceInternal/LocatorInfo.java b/java/src/IceInternal/LocatorInfo.java
index 5afb2a3170a..7f043465c71 100644
--- a/java/src/IceInternal/LocatorInfo.java
+++ b/java/src/IceInternal/LocatorInfo.java
@@ -117,66 +117,6 @@ public final class LocatorInfo
}
}
- synchronized EndpointI[]
- getEndpoints(Reference ref, Reference wellKnownRef, int ttl, Ice.BooleanHolder cached)
- {
- if(!_response || _exception == null)
- {
- if(wellKnownRef != null) // This request is to resolve the endpoints of a cached well-known object ref
- {
- _wellKnownRefs.add(wellKnownRef);
- }
- if(!_sent)
- {
- _sent = true;
- send();
- }
-
- while(!_response && _exception == null)
- {
- try
- {
- wait();
- }
- catch(java.lang.InterruptedException ex)
- {
- }
- }
- }
-
- if(_exception != null)
- {
- _locatorInfo.getEndpointsException(ref, _exception); // This throws.
- }
-
- assert(_response);
- EndpointI[] endpoints = null;
- if(_proxy != null)
- {
- Reference r = ((Ice.ObjectPrxHelperBase)_proxy).__reference();
- if(!r.isIndirect())
- {
- endpoints = r.getEndpoints();
- }
- else if(ref.isWellKnown() && !r.isWellKnown())
- {
- //
- // We're resolving the endpoints of a well-known object and the proxy returned
- // by the locator is an indirect proxy. We now need to resolve the endpoints
- // of this indirect proxy.
- //
- return _locatorInfo.getEndpoints(r, ref, ttl, cached);
- }
- }
-
- cached.value = false;
- if(_ref.getInstance().traceLevels().location >= 1)
- {
- _locatorInfo.getEndpointsTrace(ref, endpoints, false);
- }
- return endpoints == null ? new EndpointI[0] : endpoints;
- }
-
Request(LocatorInfo locatorInfo, Reference ref)
{
_locatorInfo = locatorInfo;
@@ -397,67 +337,6 @@ public final class LocatorInfo
}
}
- public EndpointI[]
- getEndpoints(Reference ref, int ttl, Ice.BooleanHolder cached)
- {
- return getEndpoints(ref, null, ttl, cached);
- }
-
- public EndpointI[]
- getEndpoints(Reference ref, Reference wellKnownRef, int ttl, Ice.BooleanHolder cached)
- {
- assert(ref.isIndirect());
- EndpointI[] endpoints = null;
- cached.value = false;
- if(!ref.isWellKnown())
- {
- endpoints = _table.getAdapterEndpoints(ref.getAdapterId(), ttl, cached);
- if(!cached.value)
- {
- if(_background && endpoints != null)
- {
- getAdapterRequest(ref).addCallback(ref, wellKnownRef, ttl, null);
- }
- else
- {
- return getAdapterRequest(ref).getEndpoints(ref, wellKnownRef, ttl, cached);
- }
- }
- }
- else
- {
- Reference r = _table.getObjectReference(ref.getIdentity(), ttl, cached);
- if(!cached.value)
- {
- if(_background && r != null)
- {
- getObjectRequest(ref).addCallback(ref, null, ttl, null);
- }
- else
- {
- return getObjectRequest(ref).getEndpoints(ref, null, ttl, cached);
- }
- }
-
- if(!r.isIndirect())
- {
- endpoints = r.getEndpoints();
- }
- else if(!r.isWellKnown())
- {
- return getEndpoints(r, ref, ttl, cached);
- }
- }
-
- assert(endpoints != null);
- cached.value = true;
- if(ref.getInstance().traceLevels().location >= 1)
- {
- getEndpointsTrace(ref, endpoints, true);
- }
- return endpoints;
- }
-
public void
getEndpoints(Reference ref, int ttl, GetEndpointsCallback callback)
{
diff --git a/java/src/IceInternal/ObjectAdapterFactory.java b/java/src/IceInternal/ObjectAdapterFactory.java
index 308be141f5b..5b9688a70d7 100644
--- a/java/src/IceInternal/ObjectAdapterFactory.java
+++ b/java/src/IceInternal/ObjectAdapterFactory.java
@@ -26,12 +26,7 @@ public final class ObjectAdapterFactory
return;
}
- _instance = null;
- _communicator = null;
-
adapters = new java.util.LinkedList<Ice.ObjectAdapterI>(_adapters);
-
- notifyAll();
}
//
@@ -42,8 +37,16 @@ public final class ObjectAdapterFactory
{
adapter.deactivate();
}
+
+ synchronized(this)
+ {
+ _instance = null;
+ _communicator = null;
+ notifyAll();
+ }
}
+
public void
waitForShutdown()
{
@@ -61,6 +64,7 @@ public final class ObjectAdapterFactory
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index c4f79392e36..333671c8b86 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -97,11 +97,43 @@ public final class Outgoing implements OutgoingMessageCallback
_exception = null;
_sent = false;
- _handler = _proxy.__getRequestHandler(false);
-
- if(_handler.sendRequest(this)) // Request sent and no response expected, we're done.
+ _handler = _proxy.__getRequestHandler();
+ try
+ {
+ if(_handler.sendRequest(this)) // Request sent and no response expected, we're done.
+ {
+ return true;
+ }
+ }
+ catch(Ice.OperationInterruptedException ex)
{
- return true;
+ if(_handler.requestCanceled(this, new Ice.OperationInterruptedException()))
+ {
+ //
+ // Wait for the exception to propagate. It's possible the request handler ignores
+ // the timeout if there was a failure shortly before requestTimedOut got called.
+ // In this case, the exception should be set on the Outgoing.
+ //
+ synchronized(this)
+ {
+ boolean interrupted = false;
+ while(_exception == null)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex2)
+ {
+ interrupted = true;
+ }
+ }
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
boolean timedOut = false;
@@ -124,6 +156,7 @@ public final class Outgoing implements OutgoingMessageCallback
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
if((_state == StateInProgress || !_sent) && _state != StateFailed)
{
@@ -142,6 +175,7 @@ public final class Outgoing implements OutgoingMessageCallback
}
catch(InterruptedException ex)
{
+ throw new Ice.OperationInterruptedException();
}
}
}
@@ -149,23 +183,30 @@ public final class Outgoing implements OutgoingMessageCallback
if(timedOut)
{
- _handler.requestTimedOut(this);
-
- //
- // Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestTimedOut got called.
- // In this case, the exception should be set on the Outgoing.
- //
- synchronized(this)
+ if(_handler.requestCanceled(this, new Ice.InvocationTimeoutException()))
{
- while(_exception == null)
+ //
+ // Wait for the exception to propagate. It's possible the request handler ignores
+ // the timeout if there was a failure shortly before requestTimedOut got called.
+ // In this case, the exception should be set on the Outgoing.
+ //
+ synchronized(this)
{
- try
+ boolean interrupted = false;
+ while(_exception == null)
{
- wait();
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ interrupted = true;
+ }
}
- catch(InterruptedException ex)
+ if(interrupted)
{
+ Thread.currentThread().interrupt();
}
}
}
@@ -203,6 +244,7 @@ public final class Outgoing implements OutgoingMessageCallback
}
catch(InterruptedException exi)
{
+ throw new Ice.OperationInterruptedException();
}
}
}
@@ -584,7 +626,7 @@ public final class Outgoing implements OutgoingMessageCallback
{
try
{
- _handler = _proxy.__getRequestHandler(true);
+ _handler = _proxy.__getRequestHandler();
_handler.prepareBatchRequest(_os);
break;
}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 9095fdb94c5..2451e7dc99b 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -198,7 +198,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
@Override
public void
- __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection)
+ __dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
threadPool.dispatch(
new DispatchWorkItem(connection)
@@ -207,7 +207,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
public void
run()
{
- OutgoingAsync.this.__finished(new Ice.InvocationTimeoutException());
+ OutgoingAsync.this.__finished(ex);
}
});
}
@@ -389,10 +389,10 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
{
while(true)
{
+ _handler = _proxy.__getRequestHandler();
try
{
_sent = false;
- _handler = _proxy.__getRequestHandler(true);
int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
{
@@ -431,6 +431,15 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
break;
}
+ catch(Ice.OperationInterruptedException ex)
+ {
+ //
+ // Clear the request handler, and cancel the outgoing request.
+ //
+ _proxy.__setRequestHandler(_handler, null);
+ _handler.asyncRequestCanceled(this, ex);
+ break;
+ }
catch(RetryException ex)
{
_proxy.__setRequestHandler(_handler, null); // Clear request handler and retry.
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index 269ba2a0123..6c2259b35be 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -43,7 +43,7 @@ public interface OutgoingAsyncMessageCallback
void __finished(Ice.Exception ex);
//
- // Helper to dispatch invocation timeout.
+ // Helper to dispatch the cancellation exception.
//
- void __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection);
+ void __dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection);
}
diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java
index 63ebc0bd919..4a893444be8 100644
--- a/java/src/IceInternal/OutgoingConnectionFactory.java
+++ b/java/src/IceInternal/OutgoingConnectionFactory.java
@@ -81,227 +81,94 @@ public final class OutgoingConnectionFactory
}
}
+ // Called from Instance.destroy().
public void
waitUntilFinished()
+ throws InterruptedException
{
- java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null;
-
- synchronized(this)
+ try
{
- //
- // First we wait until the factory is destroyed. We also
- // wait until there are no pending connections
- // anymore. Only then we can be sure the _connections
- // contains all connections.
- //
- while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0)
+ java.util.Map<Connector, java.util.List<Ice.ConnectionI> > connections = null;
+ synchronized(this)
{
- try
+ //
+ // First we wait until the factory is destroyed. We also
+ // wait until there are no pending connections
+ // anymore. Only then we can be sure the _connections
+ // contains all connections.
+ //
+ while(!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0)
{
wait();
}
- catch(InterruptedException ex)
- {
- }
- }
-
- //
- // We want to wait until all connections are finished outside the
- // thread synchronization.
- //
- connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections);
- }
- //
- // Now we wait until the destruction of each connection is finished.
- //
- for(java.util.List<Ice.ConnectionI> connectionList : connections.values())
- {
- for(Ice.ConnectionI connection : connectionList)
- {
- connection.waitUntilFinished();
- }
- }
-
- synchronized(this)
- {
- // Ensure all the connections are finished and reapable at this point.
- java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections();
- if(cons != null)
- {
- int size = 0;
- for(java.util.List<Ice.ConnectionI> connectionList : _connections.values())
- {
- size += connectionList.size();
- }
- assert(cons.size() == size);
- _connections.clear();
- _connectionsByEndpoint.clear();
- }
- else
- {
- assert(_connections.isEmpty());
- assert(_connectionsByEndpoint.isEmpty());
+ //
+ // We want to wait until all connections are finished outside the
+ // thread synchronization.
+ //
+ connections = new java.util.HashMap<Connector, java.util.List<Ice.ConnectionI> >(_connections);
}
- _monitor.destroy();
- }
- }
-
- public Ice.ConnectionI
- create(EndpointI[] endpts, boolean hasMore, Ice.EndpointSelectionType selType, Ice.BooleanHolder compress)
- {
- assert(endpts.length > 0);
-
- //
- // Apply the overrides.
- //
- java.util.List<EndpointI> endpoints = applyOverrides(endpts);
-
- //
- // Try to find a connection to one of the given endpoints.
- //
- Ice.ConnectionI connection = findConnectionByEndpoint(endpoints, compress);
- if(connection != null)
- {
- return connection;
- }
-
- Ice.LocalException exception = null;
-
- //
- // If we didn't find a connection with the endpoints, we create the connectors
- // for the endpoints.
- //
- java.util.List<ConnectorInfo> connectors = new java.util.ArrayList<ConnectorInfo>();
- java.util.Iterator<EndpointI> p = endpoints.iterator();
- while(p.hasNext())
- {
- EndpointI endpoint = p.next();
//
- // Create connectors for the endpoint.
+ // Now we wait until the destruction of each connection is finished.
//
- try
+ for(java.util.List<Ice.ConnectionI> connectionList : connections.values())
{
- java.util.List<Connector> cons = endpoint.connectors(selType);
- assert(cons.size() > 0);
- for(Connector c : cons)
- {
- connectors.add(new ConnectorInfo(c, endpoint));
- }
- }
- catch(Ice.LocalException ex)
- {
- exception = ex;
- handleException(exception, hasMore || p.hasNext());
- }
- }
-
- if(connectors.isEmpty())
- {
- assert(exception != null);
- throw exception;
- }
-
- //
- // Try to get a connection to one of the connectors. A null result indicates that no
- // connection was found and that we should try to establish the connection (and that
- // the connectors were added to _pending to prevent other threads from establishing
- // the connection).
- //
- connection = getConnection(connectors, null, compress);
- if(connection != null)
- {
- return connection;
- }
-
- //
- // Try to establish the connection to the connectors.
- //
- DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver();
- java.util.Iterator<ConnectorInfo> q = connectors.iterator();
- ConnectorInfo ci = null;
- while(q.hasNext())
- {
- ci = q.next();
-
- Ice.Instrumentation.Observer observer = null;
- if(obsv != null)
- {
- observer = obsv.getConnectionEstablishmentObserver(ci.endpoint, ci.connector.toString());
- if(observer != null)
+ for(Ice.ConnectionI connection : connectionList)
{
- observer.attach();
+ try
+ {
+ connection.waitUntilFinished();
+ }
+ catch(InterruptedException e)
+ {
+ //
+ // Force close all of the connections.
+ //
+ for(java.util.List<Ice.ConnectionI> l : connections.values())
+ {
+ for(Ice.ConnectionI c : l)
+ {
+ c.close(true);
+ }
+ }
+ throw e;
+ }
}
}
- try
+ synchronized(this)
{
- connection = createConnection(ci.connector.connect(), ci);
- connection.start(null);
-
- if(observer != null)
+ // Ensure all the connections are finished and reapable at this point.
+ java.util.List<Ice.ConnectionI> cons = _monitor.swapReapedConnections();
+ if(cons != null)
{
- observer.detach();
- }
-
- if(defaultsAndOverrides.overrideCompress)
- {
- compress.value = defaultsAndOverrides.overrideCompressValue;
+ int size = 0;
+ for(java.util.List<Ice.ConnectionI> connectionList : _connections.values())
+ {
+ size += connectionList.size();
+ }
+ assert(cons.size() == size);
+ _connections.clear();
+ _connectionsByEndpoint.clear();
}
else
{
- compress.value = ci.endpoint.compress();
+ assert(_connections.isEmpty());
+ assert(_connectionsByEndpoint.isEmpty());
}
- connection.activate();
- break;
- }
- catch(Ice.CommunicatorDestroyedException ex)
- {
- if(observer != null)
- {
- observer.failed(ex.ice_name());
- observer.detach();
- }
- exception = ex;
- handleConnectionException(exception, hasMore || p.hasNext());
- connection = null;
- break; // No need to continue
- }
- catch(Ice.LocalException ex)
- {
- if(observer != null)
- {
- observer.failed(ex.ice_name());
- observer.detach();
- }
- exception = ex;
- handleConnectionException(exception, hasMore || p.hasNext());
- connection = null;
+ _monitor.destroy();
}
}
-
- //
- // Finish creating the connection (this removes the connectors from the _pending
- // list and notifies any waiting threads).
- //
- if(connection != null)
+ catch(InterruptedException ex)
{
- finishGetConnection(connectors, ci, connection, null);
- }
- else
- {
- finishGetConnection(connectors, exception, null);
- }
-
- if(connection == null)
- {
- assert(exception != null);
- throw exception;
+ // Here wait() or waitUntilFinished() were interrupted. Clear the connections
+ // and such and continue along.
+ _connections.clear();
+ _connectionsByEndpoint.clear();
+ _monitor.destroy();
+ throw ex;
}
-
- return connection;
}
public void
@@ -611,6 +478,7 @@ public final class OutgoingConnectionFactory
private Ice.ConnectionI
getConnection(java.util.List<ConnectorInfo> connectors, ConnectCallback cb, Ice.BooleanHolder compress)
{
+ assert(cb != null);
synchronized(this)
{
if(_destroyed)
@@ -655,26 +523,7 @@ public final class OutgoingConnectionFactory
if(addToPending(cb, connectors))
{
- //
- // If a callback is not specified we wait until another thread notifies us about a
- // change to the pending list. Otherwise, if a callback is provided we're done:
- // when the pending list changes the callback will be notified and will try to
- // get the connection again.
- //
- if(cb == null)
- {
- try
- {
- wait();
- }
- catch(InterruptedException ex)
- {
- }
- }
- else
- {
- return null;
- }
+ return null;
}
else
{
@@ -1104,7 +953,7 @@ public final class OutgoingConnectionFactory
}
}
- public void
+ void
setConnection(Ice.ConnectionI connection, boolean compress)
{
//
@@ -1115,7 +964,7 @@ public final class OutgoingConnectionFactory
_factory.decPendingConnectCount(); // Must be called last.
}
- public void
+ void
setException(Ice.LocalException ex)
{
//
@@ -1125,13 +974,13 @@ public final class OutgoingConnectionFactory
_factory.decPendingConnectCount(); // Must be called last.
}
- public boolean
+ boolean
hasConnector(ConnectorInfo ci)
{
return _connectors.contains(ci);
}
- public boolean
+ boolean
removeConnectors(java.util.List<ConnectorInfo> connectors)
{
_connectors.removeAll(connectors);
@@ -1139,13 +988,13 @@ public final class OutgoingConnectionFactory
return _connectors.isEmpty();
}
- public void
+ void
removeFromPending()
{
_factory.removeFromPending(this, _connectors);
}
- void
+ private void
getConnectors()
{
try
@@ -1166,7 +1015,7 @@ public final class OutgoingConnectionFactory
nextEndpoint();
}
- void
+ private void
nextEndpoint()
{
try
@@ -1181,7 +1030,7 @@ public final class OutgoingConnectionFactory
}
}
- void
+ private void
getConnection()
{
try
@@ -1196,7 +1045,7 @@ public final class OutgoingConnectionFactory
{
//
// A null return value from getConnection indicates that the connection
- // is being established and that everthing has been done to ensure that
+ // is being established and that everything has been done to ensure that
// the callback will be notified when the connection establishment is
// done.
//
@@ -1213,7 +1062,7 @@ public final class OutgoingConnectionFactory
}
}
- void
+ private void
nextConnector()
{
Ice.ConnectionI connection = null;
diff --git a/java/src/IceInternal/PropertyNames.java b/java/src/IceInternal/PropertyNames.java
index ce5eee21f7c..f27b671e3bc 100644
--- a/java/src/IceInternal/PropertyNames.java
+++ b/java/src/IceInternal/PropertyNames.java
@@ -70,6 +70,7 @@ public final class PropertyNames
new Property("Ice\\.Admin\\.Facets", false, null),
new Property("Ice\\.Admin\\.InstanceName", false, null),
new Property("Ice\\.Admin\\.ServerId", false, null),
+ new Property("Ice\\.BackgroundIO", false, null),
new Property("Ice\\.BackgroundLocatorCacheUpdates", false, null),
new Property("Ice\\.BatchAutoFlush", false, null),
new Property("Ice\\.ChangeUser", false, null),
diff --git a/java/src/IceInternal/ProxyBatchOutgoingAsync.java b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
index 15c4b26500f..4f24a1919e6 100644
--- a/java/src/IceInternal/ProxyBatchOutgoingAsync.java
+++ b/java/src/IceInternal/ProxyBatchOutgoingAsync.java
@@ -25,7 +25,7 @@ public class ProxyBatchOutgoingAsync extends BatchOutgoingAsync
RequestHandler handler = null;
try
{
- handler = _proxy.__getRequestHandler(true);
+ handler = _proxy.__getRequestHandler();
int status = handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
{
diff --git a/java/src/IceInternal/ProxyFactory.java b/java/src/IceInternal/ProxyFactory.java
index c0b2d60e150..54b43d7ab29 100644
--- a/java/src/IceInternal/ProxyFactory.java
+++ b/java/src/IceInternal/ProxyFactory.java
@@ -9,6 +9,8 @@
package IceInternal;
+import Ice.OperationInterruptedException;
+
public final class ProxyFactory
{
public Ice.ObjectPrx
@@ -211,6 +213,14 @@ public final class ProxyFactory
throw ex;
}
+ //
+ // Don't retry on OperationInterruptedException.
+ //
+ if(ex instanceof OperationInterruptedException)
+ {
+ throw ex;
+ }
+
++cnt;
assert(cnt > 0);
diff --git a/java/src/IceInternal/QueueRequestHandler.java b/java/src/IceInternal/QueueRequestHandler.java
new file mode 100644
index 00000000000..6c2a5b55f2c
--- /dev/null
+++ b/java/src/IceInternal/QueueRequestHandler.java
@@ -0,0 +1,357 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+
+import Ice.CommunicatorDestroyedException;
+import Ice.ConnectionI;
+
+public class QueueRequestHandler implements RequestHandler
+{
+ public
+ QueueRequestHandler(Instance instance, RequestHandler delegate) {
+ _executor = instance.getQueueExecutor();
+ if(_executor == null)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ assert(delegate != null);
+ _delegate = delegate;
+ }
+
+ @Override
+ public void
+ prepareBatchRequest(final BasicStream out) throws RetryException
+ {
+ try
+ {
+ Future<Void> future = _executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws RetryException
+ {
+ _delegate.prepareBatchRequest(out);
+ return null;
+ }
+ });
+
+ future.get();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RetryException ex)
+ {
+ throw ex;
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+
+ @Override
+ public void
+ finishBatchRequest(final BasicStream out)
+ {
+ try
+ {
+ Future<Void> future = _executor.submit(new Callable<Void>() {
+ @Override
+ public Void call()
+ {
+ _delegate.finishBatchRequest(out);
+ return null;
+ }
+ });
+ future.get();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+
+ @Override
+ public void
+ abortBatchRequest()
+ {
+ try
+ {
+ Future<Void> future = _executor.submit(new Callable<Void>() {
+ @Override
+ public Void call()
+ {
+ _delegate.abortBatchRequest();
+ return null;
+ }
+ });
+ future.get();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ }
+
+ @Override
+ public boolean
+ sendRequest(final OutgoingMessageCallback out) throws RetryException
+ {
+ try
+ {
+ Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws RetryException
+ {
+ return _delegate.sendRequest(out);
+ }
+ });
+ return future.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RetryException ex)
+ {
+ throw ex;
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int
+ sendAsyncRequest(final OutgoingAsyncMessageCallback out) throws RetryException
+ {
+ try
+ {
+ Future<Integer> future = _executor.submit(new Callable<Integer>() {
+ @Override
+ public Integer call() throws RetryException
+ {
+ return _delegate.sendAsyncRequest(out);
+ }
+ });
+ return future.get();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RetryException ex)
+ {
+ throw ex;
+ }
+ catch(RuntimeException ex)
+ {
+ throw ex;
+ }
+ catch(Throwable ex)
+ {
+ assert(false);
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean
+ requestCanceled(final OutgoingMessageCallback out, final Ice.LocalException ex)
+ {
+ try
+ {
+ Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call()
+ {
+ return _delegate.requestCanceled(out, ex);
+ }
+ });
+ return future.get();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException exc)
+ {
+ throw exc;
+ }
+ catch(Throwable exc)
+ {
+ assert(false);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean
+ asyncRequestCanceled(final OutgoingAsyncMessageCallback outAsync, final Ice.LocalException ex)
+ {
+ try
+ {
+ Future<Boolean> future = _executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call()
+ {
+ return _delegate.asyncRequestCanceled(outAsync, ex);
+ }
+ });
+ return future.get();
+ }
+ catch(RejectedExecutionException e)
+ {
+ throw new CommunicatorDestroyedException();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
+ catch (ExecutionException e)
+ {
+ try
+ {
+ throw e.getCause();
+ }
+ catch(RuntimeException exc)
+ {
+ throw exc;
+ }
+ catch(Throwable exc)
+ {
+ assert(false);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Reference
+ getReference()
+ {
+ return _delegate.getReference();
+ }
+
+ @Override
+ public ConnectionI
+ getConnection()
+ {
+ return _delegate.getConnection();
+ }
+
+ @Override
+ public ConnectionI
+ waitForConnection() throws InterruptedException
+ {
+ return _delegate.waitForConnection();
+ }
+
+ private final RequestHandler _delegate;
+ private final ExecutorService _executor;
+}
diff --git a/java/src/IceInternal/Reference.java b/java/src/IceInternal/Reference.java
index f0f49a75260..e4789875723 100644
--- a/java/src/IceInternal/Reference.java
+++ b/java/src/IceInternal/Reference.java
@@ -412,7 +412,6 @@ public abstract class Reference implements Cloneable
//
public abstract java.util.Map<String, String> toProperty(String prefix);
- public abstract Ice.ConnectionI getConnection(Ice.BooleanHolder comp);
public abstract void getConnection(GetConnectionCallback callback);
@Override
diff --git a/java/src/IceInternal/RequestHandler.java b/java/src/IceInternal/RequestHandler.java
index e09c6019932..894c986a84d 100644
--- a/java/src/IceInternal/RequestHandler.java
+++ b/java/src/IceInternal/RequestHandler.java
@@ -22,10 +22,13 @@ public interface RequestHandler
int sendAsyncRequest(OutgoingAsyncMessageCallback out)
throws RetryException;
- void requestTimedOut(OutgoingMessageCallback out);
- void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync);
+ boolean requestCanceled(OutgoingMessageCallback out, Ice.LocalException ex);
+ boolean asyncRequestCanceled(OutgoingAsyncMessageCallback outAsync, Ice.LocalException ex);
Reference getReference();
- Ice.ConnectionI getConnection(boolean wait);
+ Ice.ConnectionI getConnection();
+ Ice.ConnectionI waitForConnection()
+ throws InterruptedException;
+
}
diff --git a/java/src/IceInternal/RoutableReference.java b/java/src/IceInternal/RoutableReference.java
index 705aebc6475..8e89f69cd15 100644
--- a/java/src/IceInternal/RoutableReference.java
+++ b/java/src/IceInternal/RoutableReference.java
@@ -496,72 +496,6 @@ public class RoutableReference extends Reference
}
@Override
- public Ice.ConnectionI
- getConnection(Ice.BooleanHolder comp)
- {
- if(_routerInfo != null)
- {
- //
- // If we route, we send everything to the router's client
- // proxy endpoints.
- //
- EndpointI[] endpts = _routerInfo.getClientEndpoints();
- if(endpts.length > 0)
- {
- applyOverrides(endpts);
- return createConnection(endpts, comp);
- }
- }
-
- if(_endpoints.length > 0)
- {
- return createConnection(_endpoints, comp);
- }
-
- while(true)
- {
- Ice.BooleanHolder cached = new Ice.BooleanHolder(false);
- EndpointI[] endpts = null;
- if(_locatorInfo != null)
- {
- endpts = _locatorInfo.getEndpoints(this, _locatorCacheTimeout, cached);
- applyOverrides(endpts);
- }
-
- if(endpts == null || endpts.length == 0)
- {
- throw new Ice.NoEndpointException(toString());
- }
-
- try
- {
- return createConnection(endpts, comp);
- }
- catch(Ice.NoEndpointException ex)
- {
- throw ex; // No need to retry if there's no endpoints.
- }
- catch(Ice.LocalException ex)
- {
- assert(_locatorInfo != null);
- _locatorInfo.clearCache(this);
- if(cached.value)
- {
- TraceLevels traceLevels = getInstance().traceLevels();
- if(traceLevels.retry >= 2)
- {
- String s = "connection to cached endpoints failed\n" +
- "removing endpoints from cache and trying one more time\n" + ex;
- getInstance().initializationData().logger.trace(traceLevels.retryCat, s);
- }
- continue; // Try again if the endpoints were cached.
- }
- throw ex;
- }
- }
- }
-
- @Override
public void
getConnection(final GetConnectionCallback callback)
{
@@ -861,77 +795,6 @@ public class RoutableReference extends Reference
return endpoints.toArray(new EndpointI[endpoints.size()]);
}
- protected Ice.ConnectionI
- createConnection(EndpointI[] allEndpoints, Ice.BooleanHolder compress)
- {
- EndpointI[] endpoints = filterEndpoints(allEndpoints);
- if(endpoints.length == 0)
- {
- throw new Ice.NoEndpointException(toString());
- }
-
- //
- // Finally, create the connection.
- //
- OutgoingConnectionFactory factory = getInstance().outgoingConnectionFactory();
- Ice.ConnectionI connection = null;
- if(getCacheConnection() || endpoints.length == 1)
- {
- //
- // Get an existing connection or create one if there's no
- // existing connection to one of the given endpoints.
- //
- connection = factory.create(endpoints, false, getEndpointSelection(), compress);
- }
- else
- {
- //
- // Go through the list of endpoints and try to create the
- // connection until it succeeds. This is different from just
- // calling create() with the given endpoints since this might
- // create a new connection even if there's an existing
- // connection for one of the endpoints.
- //
-
- Ice.LocalException exception = null;
- EndpointI[] endpoint = new EndpointI[1];
- for(int i = 0; i < endpoints.length; ++i)
- {
- try
- {
- endpoint[0] = endpoints[i];
- final boolean more = i != endpoints.length - 1;
- connection = factory.create(endpoint, more, getEndpointSelection(), compress);
- break;
- }
- catch(Ice.LocalException ex)
- {
- exception = ex;
- }
- }
-
- if(connection == null)
- {
- assert(exception != null);
- throw exception;
- }
- }
-
- assert(connection != null);
-
- //
- // If we have a router, set the object adapter for this router
- // (if any) to the new connection, so that callbacks from the
- // router can be received over this new connection.
- //
- if(_routerInfo != null && _routerInfo.getAdapter() != null)
- {
- connection.setAdapter(_routerInfo.getAdapter());
- }
-
- return connection;
- }
-
protected void
createConnection(EndpointI[] allEndpoints, final GetConnectionCallback callback)
{
@@ -993,7 +856,7 @@ public class RoutableReference extends Reference
new OutgoingConnectionFactory.CreateConnectionCallback()
{
@Override
- public void
+ public void
setConnection(Ice.ConnectionI connection, boolean compress)
{
//
@@ -1009,7 +872,7 @@ public class RoutableReference extends Reference
}
@Override
- public void
+ public void
setException(final Ice.LocalException ex)
{
if(_exception == null)
diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java
index 55c7582fbf5..9b0835daad7 100644
--- a/java/src/IceInternal/Selector.java
+++ b/java/src/IceInternal/Selector.java
@@ -209,8 +209,11 @@ public final class Selector
{
Thread.sleep(1);
}
- catch(java.lang.InterruptedException ex)
+ catch(InterruptedException ex)
{
+ //
+ // Eat the InterruptedException (as we do in ThreadPool.promoteFollower).
+ //
}
if(++_spuriousWakeUp > 100)
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index bdaa5bca9a1..135299ebbc7 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -154,7 +154,6 @@ final class TcpTransceiver implements Transceiver
}
@Override
- @SuppressWarnings("deprecation")
public int write(Buffer buf)
{
final int size = buf.b.limit();
@@ -240,7 +239,6 @@ final class TcpTransceiver implements Transceiver
}
@Override
- @SuppressWarnings("deprecation")
public int read(Buffer buf, Ice.BooleanHolder moreData)
{
int packetSize = buf.b.remaining();
@@ -254,8 +252,8 @@ final class TcpTransceiver implements Transceiver
try
{
assert(_fd != null);
- int ret = _fd.read(buf.b);
+ int ret = _fd.read(buf.b);
if(ret == -1)
{
throw new Ice.ConnectionLostException();
@@ -330,7 +328,6 @@ final class TcpTransceiver implements Transceiver
}
}
- @SuppressWarnings("deprecation")
TcpTransceiver(ProtocolInstance instance, java.nio.channels.SocketChannel fd, NetworkProxy proxy,
java.net.InetSocketAddress addr)
{
@@ -357,7 +354,6 @@ final class TcpTransceiver implements Transceiver
}
}
- @SuppressWarnings("deprecation")
TcpTransceiver(ProtocolInstance instance, java.nio.channels.SocketChannel fd)
{
_instance = instance;
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index e9554519655..0e57a6a61c7 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -57,7 +57,14 @@ public final class ThreadPool
{
// No call to ioCompleted, this shouldn't block (and we don't want to cause
// a new thread to be started).
- _thread.join();
+ try
+ {
+ _thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore.
+ }
}
private final EventHandlerThread _thread;
@@ -183,7 +190,7 @@ public final class ThreadPool
_hasPriority = hasPriority;
_priority = priority;
- _workQueue = new ThreadPoolWorkQueue(this, _instance, _selector);
+ _workQueue = new ThreadPoolWorkQueue(_instance, this, _selector);
_nextHandler = _handlers.iterator();
@@ -216,7 +223,14 @@ public final class ThreadPool
_instance.initializationData().logger.error(s);
destroy();
- joinWithAllThreads();
+ try
+ {
+ joinWithAllThreads();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Ice.OperationInterruptedException();
+ }
throw ex;
}
}
@@ -350,6 +364,7 @@ public final class ThreadPool
public void
joinWithAllThreads()
+ throws InterruptedException
{
//
// _threads is immutable after destroy() has been called,
@@ -363,6 +378,11 @@ public final class ThreadPool
}
//
+ // TODO: MJN: InterruptedException leads to a leak as the
+ // work queue and selector are not destroyed?
+ //
+
+ //
// Destroy the selector
//
_workQueue.close();
@@ -692,36 +712,50 @@ public final class ThreadPool
//
while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0))
{
- try
+ if(_threadIdleTime > 0)
{
- if(_threadIdleTime > 0)
+ long before = IceInternal.Time.currentMonotonicTimeMillis();
+ boolean interrupted = false;
+ try
{
- long before = IceInternal.Time.currentMonotonicTimeMillis();
+ //
+ // If the wait is interrupted then we'll let the thread die as if it timed out.
+ //
wait(_threadIdleTime * 1000);
- if(IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
+ if(interrupted || IceInternal.Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000)
+ {
+ if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
+ (!_nextHandler.hasNext() && _inUseIO > 0)))
{
- if(!_destroyed && (!_promote || _inUseIO == _sizeIO ||
- (!_nextHandler.hasNext() && _inUseIO > 0)))
+ if(_instance.traceLevels().threadPool >= 1)
{
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
- assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
- _threads.remove(current._thread);
- _workQueue.queue(new JoinThreadWorkItem(current._thread));
- return true;
+ String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
}
+ assert(_threads.size() > 1); // Can only be called by a waiting follower thread.
+ _threads.remove(current._thread);
+ _workQueue.queue(new JoinThreadWorkItem(current._thread));
+ return true;
}
}
- else
+ }
+ else
+ {
+ try
{
wait();
}
- }
- catch(InterruptedException ex)
- {
+ catch (InterruptedException e)
+ {
+ //
+ // Eat the InterruptedException.
+ //
+ }
}
}
current._leader = true; // The current thread has become the leader.
@@ -777,18 +811,9 @@ public final class ThreadPool
public void
join()
+ throws InterruptedException
{
- while(true)
- {
- try
- {
- _thread.join();
- break;
- }
- catch(InterruptedException ex)
- {
- }
- }
+ _thread.join();
}
public void
diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java
index 3cf5bc4430e..c46fe75fd3a 100644
--- a/java/src/IceInternal/ThreadPoolWorkQueue.java
+++ b/java/src/IceInternal/ThreadPoolWorkQueue.java
@@ -9,12 +9,14 @@
package IceInternal;
+import java.util.concurrent.ExecutorService;
+
final class ThreadPoolWorkQueue extends EventHandler
{
- ThreadPoolWorkQueue(ThreadPool threadPool, Instance instance, Selector selector)
+ ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector)
{
+ _executor = instance.getQueueExecutor();
_threadPool = threadPool;
- _instance = instance;
_selector = selector;
_destroyed = false;
@@ -89,6 +91,7 @@ final class ThreadPoolWorkQueue extends EventHandler
{
throw new Ice.CommunicatorDestroyedException();
}
+ assert(item != null);
_workItems.add(item);
postMessage();
}
@@ -115,6 +118,7 @@ final class ThreadPoolWorkQueue extends EventHandler
if(!_workItems.isEmpty())
{
workItem = _workItems.removeFirst();
+ assert(workItem != null);
}
else
{
@@ -158,6 +162,25 @@ final class ThreadPoolWorkQueue extends EventHandler
public void
postMessage()
{
+ if(_executor != null)
+ {
+ _executor.submit(new Runnable() {
+
+ @Override
+ public void run()
+ {
+ postMessageInternal();
+ }
+ });
+ }
+ else {
+ postMessageInternal();
+ }
+ }
+
+ private void
+ postMessageInternal()
+ {
java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(1);
buf.put(0, (byte)0);
while(buf.hasRemaining())
@@ -166,6 +189,13 @@ final class ThreadPoolWorkQueue extends EventHandler
{
_fdIntrWrite.write(buf);
}
+ //
+ // This is thrown if the thread is interrupted.
+ //
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ break;
+ }
catch(java.io.IOException ex)
{
throw new Ice.SocketException(ex);
@@ -174,7 +204,6 @@ final class ThreadPoolWorkQueue extends EventHandler
}
private final ThreadPool _threadPool;
- private final Instance _instance;
private final Selector _selector;
boolean _destroyed;
@@ -182,4 +211,5 @@ final class ThreadPoolWorkQueue extends EventHandler
private java.nio.channels.WritableByteChannel _fdIntrWrite;
private java.util.LinkedList<ThreadPoolWorkItem> _workItems = new java.util.LinkedList<ThreadPoolWorkItem>();
+ private ExecutorService _executor;
}
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index ff4917d33c1..aaca6911829 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -58,7 +58,6 @@ final class UdpTransceiver implements Transceiver
}
@Override
- @SuppressWarnings("deprecation")
public int write(Buffer buf)
{
//
@@ -135,7 +134,6 @@ final class UdpTransceiver implements Transceiver
}
@Override
- @SuppressWarnings("deprecation")
public int read(Buffer buf, Ice.BooleanHolder moreData)
{
if(!buf.b.hasRemaining())
@@ -306,7 +304,6 @@ final class UdpTransceiver implements Transceiver
//
// Only for use by UdpEndpoint
//
- @SuppressWarnings("deprecation")
UdpTransceiver(ProtocolInstance instance, java.net.InetSocketAddress addr, java.net.InetSocketAddress sourceAddr,
String mcastInterface, int mcastTtl)
{
@@ -346,7 +343,6 @@ final class UdpTransceiver implements Transceiver
//
// Only for use by UdpEndpoint
//
- @SuppressWarnings("deprecation")
UdpTransceiver(ProtocolInstance instance, String host, int port, String mcastInterface, boolean connect)
{
_instance = instance;
diff --git a/java/src/IceInternal/Util.java b/java/src/IceInternal/Util.java
index 4bd8ef592e2..647262fe90b 100644
--- a/java/src/IceInternal/Util.java
+++ b/java/src/IceInternal/Util.java
@@ -9,8 +9,41 @@
package IceInternal;
+import java.util.concurrent.ThreadFactory;
+
public final class Util
{
+ static String
+ createThreadName(final Ice.Properties properties, final String name) {
+ String threadName = properties.getProperty("Ice.ProgramName");
+ if(threadName.length() > 0)
+ {
+ threadName += "-";
+ }
+
+ threadName = threadName + name;
+ return threadName;
+ }
+
+ static ThreadFactory
+ createThreadFactory(final Ice.Properties properties, final String name) {
+ return new java.util.concurrent.ThreadFactory()
+ {
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(r);
+ t.setName(name);
+
+ if(properties.getProperty("Ice.ThreadPriority").length() > 0)
+ {
+ t.setPriority(Util.getThreadPriorityProperty(properties, "Ice"));
+ }
+ return t;
+ }
+ };
+ }
+
public static Instance
getInstance(Ice.Communicator communicator)
{
diff --git a/java/test/Ice/ami/Server.java b/java/test/Ice/ami/Server.java
index 187fbf817ef..601cbb71186 100644
--- a/java/test/Ice/ami/Server.java
+++ b/java/test/Ice/ami/Server.java
@@ -14,8 +14,6 @@ public class Server extends test.Util.Application
public int
run(String[] args)
{
- Ice.Communicator communicator = communicator();
-
Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TestAdapter");
Ice.ObjectAdapter adapter2 = communicator().createObjectAdapter("ControllerAdapter");
diff --git a/java/test/Ice/interrupt/AllTests.java b/java/test/Ice/interrupt/AllTests.java
new file mode 100644
index 00000000000..6da0470ffdc
--- /dev/null
+++ b/java/test/Ice/interrupt/AllTests.java
@@ -0,0 +1,444 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.interrupt;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import test.Ice.interrupt.Test.Callback_TestIntf_op;
+import test.Ice.interrupt.Test.Callback_TestIntf_sleep;
+import test.Ice.interrupt.Test.CannotInterruptException;
+import test.Ice.interrupt.Test.TestIntfControllerPrx;
+import test.Ice.interrupt.Test.TestIntfControllerPrxHelper;
+import test.Ice.interrupt.Test.TestIntfPrx;
+import test.Ice.interrupt.Test.TestIntfPrxHelper;
+import Ice.CommunicatorDestroyedException;
+import Ice.LocalException;
+import Ice.OperationInterruptedException;
+import Ice.UserException;
+import IceInternal.Time;
+
+public class AllTests
+{
+ private static class CallbackBase
+ {
+ CallbackBase()
+ {
+ _called = false;
+ }
+
+ public synchronized void check()
+ {
+ while(!_called)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ _called = false;
+ }
+
+ public synchronized void called()
+ {
+ assert(!_called);
+ _called = true;
+ notify();
+ }
+
+ private boolean _called;
+ }
+
+ private static void
+ test(boolean b)
+ {
+ if(!b)
+ {
+ throw new RuntimeException();
+ }
+ }
+
+ public static void
+ allTests(test.Util.Application app, Ice.Communicator communicator, PrintWriter out)
+ throws InterruptedException
+ {
+ String sref = "test:default -p 12010";
+ Ice.ObjectPrx obj = communicator.stringToProxy(sref);
+ test(obj != null);
+
+ TestIntfPrx p = TestIntfPrxHelper.uncheckedCast(obj);
+
+ sref = "testController:tcp -p 12011";
+ obj = communicator.stringToProxy(sref);
+ test(obj != null);
+
+ TestIntfControllerPrx testController = TestIntfControllerPrxHelper.uncheckedCast(obj);
+
+ out.print("testing client interrupt... ");
+ out.flush();
+ {
+ final Thread mainThread = Thread.currentThread();
+ mainThread.interrupt();
+ try
+ {
+ p.op();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+
+ final CallbackBase cb = new CallbackBase();
+ mainThread.interrupt();
+ p.begin_sleep(500, new Callback_TestIntf_sleep()
+ {
+
+ @Override
+ public void response()
+ {
+ test(false);
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(ex instanceof OperationInterruptedException);
+ cb.called();
+ }
+
+ @Override
+ public void exception(UserException ex)
+ {
+ test(false);
+ }
+ });
+ cb.check();
+
+ ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(1);
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch(InterruptedException e)
+ {
+ test(false);
+ }
+ mainThread.interrupt();
+ }
+ });
+
+ try
+ {
+ test(!mainThread.isInterrupted());
+ p.sleep(2000);
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+ catch(test.Ice.interrupt.Test.InterruptedException e)
+ {
+ test(false);
+ }
+
+
+ try
+ {
+ Ice.AsyncResult r = p.begin_op();
+ Thread.currentThread().interrupt();
+ r.waitForCompleted();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+
+ try
+ {
+ Ice.AsyncResult r = p.begin_op();
+ Thread.currentThread().interrupt();
+ p.end_op(r);
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+
+ //
+ // Test interrupt of waitForSent. Here hold the adapter and send a large payload. The
+ // thread is interrupted in 500ms which should result in a operation interrupted exception.
+ //
+ executor.submit(new Runnable() {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch(InterruptedException e)
+ {
+ test(false);
+ }
+ mainThread.interrupt();
+ }
+ });
+
+ testController.holdAdapter();
+ Ice.AsyncResult r = null;
+ try
+ {
+ byte[] seq = new byte[128 * 1024];
+ r = p.begin_opWithPayload(seq);
+ r.waitForSent();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+ //
+ // Resume the adapter.
+ //
+ testController.resumeAdapter();
+
+ r.waitForSent();
+ r.waitForCompleted();
+ p.end_opWithPayload(r);
+
+ //
+ // The executor is all done.
+ //
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ out.println("ok");
+
+ out.print("testing Communicator.destroy interrupt... ");
+ out.flush();
+ {
+ //
+ // Check that CommunicatorDestroyedException is raised directly.
+ //
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = communicator.getProperties()._clone();
+ Ice.Communicator ic = app.initialize(initData);
+
+ Thread.currentThread().interrupt();
+ try
+ {
+ ic.destroy();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException ex)
+ {
+ // Expected
+ }
+
+ ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(2);
+
+ ic = app.initialize(initData);
+ Ice.ObjectPrx o = ic.stringToProxy(p.toString());
+
+ final Thread[] thread = new Thread[1];
+
+ final CallbackBase cb = new CallbackBase();
+ final TestIntfPrx p2 = TestIntfPrxHelper.checkedCast(o);
+ final CountDownLatch waitSignal = new CountDownLatch(1);
+ p2.begin_op(new Callback_TestIntf_op()
+ {
+ @Override
+ public void response()
+ {
+ try
+ {
+ Thread.sleep(250);
+ }
+ catch(InterruptedException e1)
+ {
+ test(false);
+ }
+ thread[0] = Thread.currentThread();
+ waitSignal.countDown();
+ try
+ {
+ Thread.sleep(10000);
+ test(false);
+ }
+ catch(InterruptedException e)
+ {
+ // Expected
+ }
+ cb.called();
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(false);
+
+ }
+ });
+ executor.submit(new Runnable() {
+ @Override
+ public void run()
+ {
+ try
+ {
+ waitSignal.await();
+ }
+ catch(InterruptedException e)
+ {
+ test(false);
+ }
+ thread[0].interrupt();
+ }
+ });
+
+ ic.destroy();
+
+ cb.check();
+
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ out.println("ok");
+
+ out.print("testing server interrupt... ");
+ out.flush();
+ {
+ final CallbackBase cb = new CallbackBase();
+ p.begin_sleep(2000, new Callback_TestIntf_sleep()
+ {
+ @Override
+ public void response()
+ {
+ test(false);
+ }
+
+ @Override
+ public void exception(LocalException ex)
+ {
+ test(false);
+ }
+
+ @Override
+ public void exception(UserException ex)
+ {
+ test(ex instanceof test.Ice.interrupt.Test.InterruptedException);
+ cb.called();
+ }
+ });
+ try
+ {
+ Thread.sleep(250);
+ }
+ catch(InterruptedException e)
+ {
+ test(false);
+ }
+ try
+ {
+ testController.interrupt();
+ }
+ catch(CannotInterruptException e)
+ {
+ test(false);
+ }
+ cb.check();
+ }
+ out.println("ok");
+
+ out.print("testing wait methods... ");
+ out.flush();
+ {
+ final Thread mainThread = Thread.currentThread();
+ ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(1);
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = communicator.getProperties()._clone();
+ initData.properties.setProperty("ClientTestAdapter.Endpoints", "default -p 12030");
+ Ice.Communicator ic = app.initialize(initData);
+ final Ice.ObjectAdapter adapter = ic.createObjectAdapter("ClientTestAdapter");
+ adapter.activate();
+
+ Runnable interruptMainThread = new Runnable() {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(250);
+ }
+ catch(InterruptedException e)
+ {
+ test(false);
+ }
+ mainThread.interrupt();
+ }
+ };
+ executor.execute(interruptMainThread);
+ try
+ {
+ adapter.waitForHold();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException e)
+ {
+ // Expected.
+ }
+
+ executor.execute(interruptMainThread);
+ try
+ {
+ adapter.waitForDeactivate();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException e)
+ {
+ // Expected.
+ }
+
+ executor.execute(interruptMainThread);
+ try
+ {
+ ic.waitForShutdown();
+ test(false);
+ }
+ catch(Ice.OperationInterruptedException e)
+ {
+ // Expected.
+ }
+
+ ic.destroy();
+
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+ out.println("ok");
+
+ p.shutdown();
+ }
+}
diff --git a/java/test/Ice/interrupt/Client.java b/java/test/Ice/interrupt/Client.java
new file mode 100644
index 00000000000..247e69d035e
--- /dev/null
+++ b/java/test/Ice/interrupt/Client.java
@@ -0,0 +1,46 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.interrupt;
+
+public class Client extends test.Util.Application
+{
+ @Override
+ public int run(String[] args)
+ {
+ try
+ {
+ AllTests.allTests(this, communicator(), getWriter());
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException();
+ }
+ return 0;
+ }
+
+ @Override
+ protected Ice.InitializationData getInitData(Ice.StringSeqHolder argsH)
+ {
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = Ice.Util.createProperties(argsH);
+ initData.properties.setProperty("Ice.Package.Test", "test.Ice.interrupt");
+ initData.properties.setProperty("Ice.BackgroundIO", "1");
+ return initData;
+ }
+
+ public static void main(String[] args)
+ {
+ Client app = new Client();
+ int result = app.main("Client", args);
+ System.gc();
+ System.exit(result);
+ }
+}
diff --git a/java/test/Ice/interrupt/Server.java b/java/test/Ice/interrupt/Server.java
new file mode 100644
index 00000000000..6bb587abfe1
--- /dev/null
+++ b/java/test/Ice/interrupt/Server.java
@@ -0,0 +1,51 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.interrupt;
+
+public class Server extends test.Util.Application
+{
+ @Override
+ public int
+ run(String[] args)
+ {
+ Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TestAdapter");
+ Ice.ObjectAdapter adapter2 = communicator().createObjectAdapter("ControllerAdapter");
+
+ TestControllerI controller = new TestControllerI(adapter);
+ adapter.add(new TestI(controller), communicator().stringToIdentity("test"));
+ adapter.activate();
+ adapter2.add(controller, communicator().stringToIdentity("testController"));
+ adapter2.activate();
+
+ return WAIT;
+ }
+
+ @Override
+ protected Ice.InitializationData getInitData(Ice.StringSeqHolder argsH)
+ {
+ Ice.InitializationData initData = new Ice.InitializationData();
+ initData.properties = Ice.Util.createProperties(argsH);
+ initData.properties.setProperty("Ice.Package.Test", "test.Ice.interrupt");
+ initData.properties.setProperty("Ice.BackgroundIO", "1");
+ initData.properties.setProperty("TestAdapter.Endpoints", "default -p 12010");
+ initData.properties.setProperty("ControllerAdapter.Endpoints", "tcp -p 12011");
+ initData.properties.setProperty("ControllerAdapter.ThreadPool.Size", "1");
+ return initData;
+ }
+
+ public static void
+ main(String[] args)
+ {
+ Server app = new Server();
+ int result = app.main("Server", args);
+ System.gc();
+ System.exit(result);
+ }
+}
diff --git a/java/test/Ice/interrupt/Test.ice b/java/test/Ice/interrupt/Test.ice
new file mode 100644
index 00000000000..1c636847e15
--- /dev/null
+++ b/java/test/Ice/interrupt/Test.ice
@@ -0,0 +1,43 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#pragma once
+
+#include <Ice/BuiltinSequences.ice>
+
+[["java:package:test.Ice.interrupt"]]
+module Test
+{
+
+exception InterruptedException
+{
+};
+
+interface TestIntf
+{
+ void op();
+ void sleep(int to)
+ throws InterruptedException;
+ void opWithPayload(Ice::ByteSeq seq);
+ void shutdown();
+};
+
+exception CannotInterruptException
+{
+};
+
+interface TestIntfController
+{
+ void holdAdapter();
+ void resumeAdapter();
+ void interrupt()
+ throws CannotInterruptException;
+};
+
+};
diff --git a/java/test/Ice/interrupt/TestControllerI.java b/java/test/Ice/interrupt/TestControllerI.java
new file mode 100644
index 00000000000..c3b9088393f
--- /dev/null
+++ b/java/test/Ice/interrupt/TestControllerI.java
@@ -0,0 +1,69 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.interrupt;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class TestControllerI extends test.Ice.interrupt.Test._TestIntfControllerDisp
+{
+ synchronized void
+ addUpcallThread()
+ {
+ _threads.add(Thread.currentThread());
+ }
+
+ synchronized void
+ removeUpcallThread()
+ {
+ _threads.remove(Thread.currentThread());
+ //
+ // Clear the interrupted state after removing the thread.
+ //
+ Thread.interrupted();
+ }
+
+ @Override
+ synchronized public void
+ interrupt(Ice.Current __current)
+ throws test.Ice.interrupt.Test.CannotInterruptException
+ {
+ if(_threads.isEmpty())
+ {
+ throw new test.Ice.interrupt.Test.CannotInterruptException();
+ }
+ for(Thread t : _threads)
+ {
+ t.interrupt();
+ }
+ }
+
+ @Override
+ public void
+ holdAdapter(Ice.Current current)
+ {
+ _adapter.hold();
+ }
+
+ @Override
+ public void
+ resumeAdapter(Ice.Current current)
+ {
+ _adapter.activate();
+ }
+
+ TestControllerI(Ice.ObjectAdapter adapter)
+ {
+ _adapter = adapter;
+ }
+
+ final private Ice.ObjectAdapter _adapter;
+ final private List<Thread> _threads = new ArrayList<Thread>();
+}
diff --git a/java/test/Ice/interrupt/TestI.java b/java/test/Ice/interrupt/TestI.java
new file mode 100644
index 00000000000..cc267571c6f
--- /dev/null
+++ b/java/test/Ice/interrupt/TestI.java
@@ -0,0 +1,68 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package test.Ice.interrupt;
+
+public class TestI extends test.Ice.interrupt.Test._TestIntfDisp
+{
+ private static void
+ test(boolean b)
+ {
+ if(!b)
+ {
+ throw new RuntimeException();
+ }
+ }
+
+ TestI(TestControllerI controller)
+ {
+ _controller = controller;
+ }
+
+ @Override
+ public void
+ op(Ice.Current current)
+ {
+ }
+
+ @Override
+ public void
+ sleep(int to, Ice.Current current)
+ throws test.Ice.interrupt.Test.InterruptedException
+ {
+ _controller.addUpcallThread();
+ try
+ {
+ Thread.sleep(to);
+ }
+ catch(InterruptedException ex)
+ {
+ throw new test.Ice.interrupt.Test.InterruptedException();
+ }
+ finally
+ {
+ _controller.removeUpcallThread();
+ }
+ }
+
+ @Override
+ public void
+ opWithPayload(byte[] seq, Ice.Current current)
+ {
+ }
+
+ @Override
+ public void
+ shutdown(Ice.Current current)
+ {
+ current.adapter.getCommunicator().shutdown();
+ }
+
+ private TestControllerI _controller;
+}
diff --git a/java/test/Ice/interrupt/run.py b/java/test/Ice/interrupt/run.py
new file mode 100644
index 00000000000..3cbc4137c5e
--- /dev/null
+++ b/java/test/Ice/interrupt/run.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+# **********************************************************************
+#
+# Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
+#
+# This copy of Ice is licensed to you under the terms described in the
+# ICE_LICENSE file included in this distribution.
+#
+# **********************************************************************
+
+import os, sys
+
+path = [ ".", "..", "../..", "../../..", "../../../.." ]
+head = os.path.dirname(sys.argv[0])
+if len(head) > 0:
+ path = [os.path.join(head, p) for p in path]
+path = [os.path.abspath(p) for p in path if os.path.exists(os.path.join(p, "scripts", "TestUtil.py")) ]
+if len(path) == 0:
+ raise RuntimeError("can't find toplevel directory!")
+sys.path.append(os.path.join(path[0], "scripts"))
+import TestUtil
+
+print("tests with regular server.")
+TestUtil.clientServerTest()
diff --git a/slice/Ice/LocalException.ice b/slice/Ice/LocalException.ice
index 7cb7403311c..249a92cc0c2 100644
--- a/slice/Ice/LocalException.ice
+++ b/slice/Ice/LocalException.ice
@@ -579,6 +579,16 @@ local exception DNSException
/**
*
+ * This exception indicates a request was interrupted.
+ *
+ **/
+["cpp:ice_print"]
+local exception OperationInterruptedException
+{
+};
+
+/**
+ *
* This exception indicates a timeout condition.
*
**/