diff options
author | Jose <jose@zeroc.com> | 2017-05-04 18:57:41 +0200 |
---|---|---|
committer | Jose <jose@zeroc.com> | 2017-05-04 18:57:41 +0200 |
commit | 7fd2631fd3ddaf794e997e55f74efe8d12679bb6 (patch) | |
tree | dc02e82cd7ebf99bc4ceabb746bfae83402cc1ce | |
parent | Removed optional semicolons after braces (diff) | |
download | ice-7fd2631fd3ddaf794e997e55f74efe8d12679bb6.tar.bz2 ice-7fd2631fd3ddaf794e997e55f74efe8d12679bb6.tar.xz ice-7fd2631fd3ddaf794e997e55f74efe8d12679bb6.zip |
Fixes ice_scheduler/ice_executor:
- ice_scheduler always honors the TaskContinuationOptions.ExecuteSynchronously.
- ice_executor always queue the continuation with the Ice thread pool
-rw-r--r-- | csharp/src/Ice/ThreadPool.cs | 11 | ||||
-rw-r--r-- | csharp/test/Ice/ami/AllTests.cs | 41 | ||||
-rw-r--r-- | csharp/test/Ice/ami/Client.cs | 5 | ||||
-rw-r--r-- | csharp/test/Ice/ami/Collocated.cs | 6 | ||||
-rw-r--r-- | java/src/Ice/src/main/java/com/zeroc/IceInternal/ThreadPool.java | 20 |
5 files changed, 58 insertions, 25 deletions
diff --git a/csharp/src/Ice/ThreadPool.cs b/csharp/src/Ice/ThreadPool.cs index 4d466590988..ba46dd08df4 100644 --- a/csharp/src/Ice/ThreadPool.cs +++ b/csharp/src/Ice/ThreadPool.cs @@ -12,6 +12,7 @@ namespace IceInternal using System.Collections.Generic; using System.Diagnostics; using System.Threading; + using System.Threading.Tasks; public delegate void ThreadPoolWorkItem(); public delegate void AsyncCallback(object state); @@ -493,13 +494,10 @@ namespace IceInternal protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { - if(_dispatcher == null && !taskWasPreviouslyQueued) + if(!taskWasPreviouslyQueued) { - if(_threads.Find(t => t.getThread().ManagedThreadId.Equals(Thread.CurrentThread.ManagedThreadId)) != null) - { - dispatchFromThisThread(() => { TryExecuteTask(task); }, null); - return true; - } + dispatchFromThisThread(() => { TryExecuteTask(task); }, null); + return true; } return false; } @@ -912,5 +910,4 @@ namespace IceInternal private Queue<ThreadPoolWorkItem> _workItems; } - } diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs index 86281cd6976..2f5d3e12473 100644 --- a/csharp/test/Ice/ami/AllTests.cs +++ b/csharp/test/Ice/ami/AllTests.cs @@ -3764,6 +3764,47 @@ public class AllTests : TestCommon.AllTests test(Thread.CurrentThread.Name.Contains("Ice.ThreadPool.Client")); }, p.ice_scheduler()).Wait(); + { + TaskCompletionSource<int> s1 = new TaskCompletionSource<int>(); + TaskCompletionSource<int> s2 = new TaskCompletionSource<int>(); + Task t1 = s1.Task; + Task t2 = s2.Task; + Task t3 = null; + Task t4 = null; + p.ice_pingAsync().ContinueWith( + (t) => + { + test(Thread.CurrentThread.Name.Contains("Ice.ThreadPool.Client")); + // + // t1 Continuation run in the thread that completes it. + // + var id = Thread.CurrentThread.ManagedThreadId; + t3 = t1.ContinueWith(prev => + { + test(id == Thread.CurrentThread.ManagedThreadId); + }, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + p.ice_scheduler()); + s1.SetResult(1); + + // + // t2 completed from the main thread + // + t4 = t2.ContinueWith(prev => + { + test(id != Thread.CurrentThread.ManagedThreadId); + test(Thread.CurrentThread.Name == null || + !Thread.CurrentThread.Name.Contains("Ice.ThreadPool.Client")); + }, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + p.ice_scheduler()); + }, p.ice_scheduler()).Wait(); + s2.SetResult(1); + Task.WaitAll(t1, t2, t3, t4); + } + if(!collocated) { communicator.getProperties().setProperty("ReplyAdapter.Endpoints", "tcp"); diff --git a/csharp/test/Ice/ami/Client.cs b/csharp/test/Ice/ami/Client.cs index 35ff7e352ff..1408a9a7dc2 100644 --- a/csharp/test/Ice/ami/Client.cs +++ b/csharp/test/Ice/ami/Client.cs @@ -29,6 +29,11 @@ public class Client : TestCommon.Application Ice.InitializationData initData = base.getInitData(ref args); initData.properties.setProperty("Ice.Warn.AMICallback", "0"); initData.properties.setProperty("Ice.Warn.Connections", "0"); + // + // We use a client thread pool with more than one thread to test + // that task inlining works. + // + initData.properties.setProperty("Ice.ThreadPool.Client.Size", "5"); // // Limit the send buffer size, this test relies on the socket diff --git a/csharp/test/Ice/ami/Collocated.cs b/csharp/test/Ice/ami/Collocated.cs index 516f8b1df76..0fad64a3189 100644 --- a/csharp/test/Ice/ami/Collocated.cs +++ b/csharp/test/Ice/ami/Collocated.cs @@ -23,6 +23,7 @@ public class Collocated : TestCommon.Application communicator().getProperties().setProperty("TestAdapter.Endpoints", getTestEndpoint(0)); communicator().getProperties().setProperty("ControllerAdapter.Endpoints", getTestEndpoint(1)); communicator().getProperties().setProperty("ControllerAdapter.ThreadPool.Size", "1"); + Ice.ObjectAdapter adapter = communicator().createObjectAdapter("TestAdapter"); Ice.ObjectAdapter adapter2 = communicator().createObjectAdapter("ControllerAdapter"); @@ -45,6 +46,11 @@ public class Collocated : TestCommon.Application // send() blocking after sending a given amount of data. // initData.properties.setProperty("Ice.TCP.SndSize", "50000"); + // + // We use a client thread pool with more than one thread to test + // that task inlining works. + // + initData.properties.setProperty("Ice.ThreadPool.Client.Size", "5"); return initData; } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ThreadPool.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ThreadPool.java index 3e37179c86b..dcd0672d43b 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/ThreadPool.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/ThreadPool.java @@ -387,16 +387,12 @@ public final class ThreadPool implements java.util.concurrent.Executor } // - // Implment execute method from java.util.concurrent.Executor interface + // Implement execute method from java.util.concurrent.Executor interface // @Override public void execute(Runnable command) { - long id = Thread.currentThread().getId(); - if(_dispatcher != null || - _threads.stream().filter(t -> t.getThread().getId() == id).findAny().isPresent()) - { - dispatchFromThisThread(new com.zeroc.IceInternal.DispatchWorkItem() + dispatch(new com.zeroc.IceInternal.DispatchWorkItem() { @Override public void run() @@ -404,18 +400,6 @@ public final class ThreadPool implements java.util.concurrent.Executor command.run(); } }); - } - else - { - dispatch(new com.zeroc.IceInternal.DispatchWorkItem() - { - @Override - public void run() - { - command.run(); - } - }); - } } private void |