summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2024-06-18 08:19:36 +0200
committerGitHub <noreply@github.com>2024-06-18 08:19:36 +0200
commitb08306ef3bf16a7ecadf558375b2eb4f14336047 (patch)
treee8ac785d69e8f2baa6f93ed4736f596ae9c53eb8
parentFix bogus call to ERR_get_error - Fix #2153 (#2231) (diff)
downloadice-b08306ef3bf16a7ecadf558375b2eb4f14336047.tar.bz2
ice-b08306ef3bf16a7ecadf558375b2eb4f14336047.tar.xz
ice-b08306ef3bf16a7ecadf558375b2eb4f14336047.zip
Back pressure fix for 3.7 (#2270)
-rw-r--r--CHANGELOG-3.7.md7
-rw-r--r--cpp/src/Ice/ThreadPool.cpp4
-rw-r--r--cpp/test/Ice/ami/AllTests.cpp24
-rw-r--r--csharp/src/Ice/AsyncIOThread.cs182
-rw-r--r--csharp/src/Ice/ConnectionFactory.cs204
-rw-r--r--csharp/src/Ice/ConnectionI.cs495
-rw-r--r--csharp/src/Ice/EventHandler.cs2
-rw-r--r--csharp/src/Ice/Instance.cs33
-rw-r--r--csharp/src/Ice/ThreadPool.cs401
-rw-r--r--csharp/test/Ice/ami/AllTests.cs31
-rw-r--r--csharp/test/Ice/metrics/AllTests.cs4
-rw-r--r--java/test/src/main/java/test/Ice/ami/AllTests.java41
12 files changed, 659 insertions, 769 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md
index bb25067386a..5022971c014 100644
--- a/CHANGELOG-3.7.md
+++ b/CHANGELOG-3.7.md
@@ -10,6 +10,7 @@ particular aspect of Ice.
- [Changes since Ice 3.7.10](#changes-since-ice-3710)
- [C++ Changes](#c-changes)
+ - [C# Changes](#c-changes)
- [Changes in Ice 3.7.10](#changes-in-ice-3710)
- [C++ Changes](#c-changes-1)
- [C# Changes](#c-changes-2)
@@ -102,6 +103,12 @@ particular aspect of Ice.
- Fix bug with the Ice.ServerIdleTime affecting Apple platforms. (https://github.com/zeroc-ice/ice/issues/2025)
+## C# Changes
+
+- Fixed bug in the Ice for C# connection code. A connection no longer reads messages off the network when its
+ associated thread-pool has no thread available to process these messages. As a result, back-pressure now works in
+ Ice for C# just like in other language mappings.
+
# Changes in Ice 3.7.10
These are the major changes since Ice 3.7.9.
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 5a56c74f4c4..434dc728a9e 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -778,7 +778,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// If there are no more ready handlers and there are still threads busy performing
// IO, we give up leadership and promote another follower (which will perform the
// select() only once all the IOs are completed). Otherwise, if there are no more
- // threads peforming IOs, it's time to do another select().
+ // threads performing IOs, it's time to do another select().
//
if(_inUseIO > 0)
{
@@ -915,7 +915,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
+ current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
current._thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForUser));
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp
index a39f3dd4f3a..d5bc48233c0 100644
--- a/cpp/test/Ice/ami/AllTests.cpp
+++ b/cpp/test/Ice/ami/AllTests.cpp
@@ -7,7 +7,10 @@
#include <TestHelper.h>
#include <Test.h>
+
#ifdef ICE_CPP11_MAPPING
+# include <chrono>
+# include <future>
# include <thread>
#endif
@@ -2482,6 +2485,27 @@ allTests(Test::TestHelper* helper, bool collocated)
if(p->ice_getConnection())
{
+ cout << "testing back pressure... " << flush;
+ {
+ // Keep the 3 server thread pool threads busy.
+ auto sleep1Future = p->sleepAsync(1000);
+ auto sleep2Future = p->sleepAsync(1000);
+ auto sleep3Future = p->sleepAsync(1000);
+
+ auto onewayProxy = Ice::uncheckedCast<Test::TestIntfPrx>(p->ice_oneway());
+
+ // Sending should block because the TCP send/receive buffer size on the server is set to 50KB.
+ Ice::ByteSeq seq;
+ seq.resize(768 * 1024);
+ auto future = onewayProxy->opWithPayloadAsync(seq);
+
+ test(future.wait_for(200ms) == future_status::timeout && sleep1Future.wait_for(0s) != future_status::ready);
+ sleep1Future.wait();
+ sleep2Future.wait();
+ sleep3Future.wait();
+ }
+ cout << "ok" << endl;
+
cout << "testing bidir... " << flush;
auto adapter = communicator->createObjectAdapter("");
auto replyI = make_shared<PingReplyI>();
diff --git a/csharp/src/Ice/AsyncIOThread.cs b/csharp/src/Ice/AsyncIOThread.cs
deleted file mode 100644
index bca757b110f..00000000000
--- a/csharp/src/Ice/AsyncIOThread.cs
+++ /dev/null
@@ -1,182 +0,0 @@
-//
-// Copyright (c) ZeroC, Inc. All rights reserved.
-//
-
-namespace IceInternal
-{
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Threading;
-
- public class AsyncIOThread
- {
- internal AsyncIOThread(Instance instance)
- {
- _instance = instance;
-
- _thread = new HelperThread(this);
- updateObserver();
- _thread.Start(Util.stringToThreadPriority(
- instance.initializationData().properties.getProperty("Ice.ThreadPriority")));
- }
-
- public void
- updateObserver()
- {
- lock(this)
- {
- Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
- if(obsv != null)
- {
- _observer = obsv.getThreadObserver("Communicator",
- _thread.getName(),
- Ice.Instrumentation.ThreadState.ThreadStateIdle,
- _observer);
- if(_observer != null)
- {
- _observer.attach();
- }
- }
- }
- }
-
- public void queue(ThreadPoolWorkItem callback)
- {
- lock(this)
- {
- Debug.Assert(!_destroyed);
- _queue.AddLast(callback);
- Monitor.Pulse(this);
- }
- }
-
- public void destroy()
- {
- lock(this)
- {
- Debug.Assert(!_destroyed);
- _destroyed = true;
- Monitor.Pulse(this);
- }
- }
-
- public void joinWithThread()
- {
- if(_thread != null)
- {
- _thread.Join();
- }
- }
-
- public void run()
- {
- LinkedList<ThreadPoolWorkItem> queue = new LinkedList<ThreadPoolWorkItem>();
- bool inUse = false;
- while(true)
- {
- lock(this)
- {
- if(_observer != null && inUse)
- {
- _observer.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO,
- Ice.Instrumentation.ThreadState.ThreadStateIdle);
- inUse = false;
- }
-
- if(_destroyed && _queue.Count == 0)
- {
- break;
- }
-
- while(!_destroyed && _queue.Count == 0)
- {
- Monitor.Wait(this);
- }
-
- LinkedList<ThreadPoolWorkItem> tmp = queue;
- queue = _queue;
- _queue = tmp;
-
- if(_observer != null)
- {
- _observer.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle,
- Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
- inUse = true;
- }
- }
-
- foreach(ThreadPoolWorkItem cb in queue)
- {
- try
- {
- cb();
- }
- catch(Ice.LocalException ex)
- {
- string s = "exception in asynchronous IO thread:\n" + ex;
- _instance.initializationData().logger.error(s);
- }
- catch(System.Exception ex)
- {
- string s = "unknown exception in asynchronous IO thread:\n" + ex;
- _instance.initializationData().logger.error(s);
- }
- }
- queue.Clear();
- }
-
- if(_observer != null)
- {
- _observer.detach();
- }
- }
-
- private Instance _instance;
- private bool _destroyed;
- private LinkedList<ThreadPoolWorkItem> _queue = new LinkedList<ThreadPoolWorkItem>();
- private Ice.Instrumentation.ThreadObserver _observer;
-
- private sealed class HelperThread
- {
- internal HelperThread(AsyncIOThread asyncIOThread)
- {
- _asyncIOThread = asyncIOThread;
- _name = _asyncIOThread._instance.initializationData().properties.getProperty("Ice.ProgramName");
- if(_name.Length > 0)
- {
- _name += "-";
- }
- _name += "Ice.AsyncIOThread";
- }
-
- public void Join()
- {
- _thread.Join();
- }
-
- public string getName()
- {
- return _name;
- }
-
- public void Start(ThreadPriority priority)
- {
- _thread = new Thread(new ThreadStart(Run));
- _thread.IsBackground = true;
- _thread.Name = _name;
- _thread.Start();
- }
-
- public void Run()
- {
- _asyncIOThread.run();
- }
-
- private AsyncIOThread _asyncIOThread;
- private string _name;
- private Thread _thread;
- }
-
- private HelperThread _thread;
- }
-}
diff --git a/csharp/src/Ice/ConnectionFactory.cs b/csharp/src/Ice/ConnectionFactory.cs
index 45e6947dbda..20c9e4dbe25 100644
--- a/csharp/src/Ice/ConnectionFactory.cs
+++ b/csharp/src/Ice/ConnectionFactory.cs
@@ -8,6 +8,7 @@ namespace IceInternal
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
+ using System.Threading.Tasks;
public class MultiDictionary<K, V> : Dictionary<K, ICollection<V>>
{
@@ -1336,7 +1337,7 @@ namespace IceInternal
//
// Operations from EventHandler.
//
- public override bool startAsync(int operation, AsyncCallback callback, ref bool completedSynchronously)
+ public override bool startAsync(int operation, AsyncCallback completedCallback)
{
if(_state >= StateClosed)
{
@@ -1344,15 +1345,33 @@ namespace IceInternal
}
Debug.Assert(_acceptor != null);
- try
- {
- completedSynchronously = _acceptor.startAccept(callback, this);
- }
- catch(Ice.LocalException ex)
- {
- _acceptorException = ex;
- completedSynchronously = true;
- }
+
+ // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
+ // Ice thread pool thread is terminated.
+ Task.Run(() => {
+ lock (this)
+ {
+ if(_state >= StateClosed)
+ {
+ completedCallback(this);
+ return;
+ }
+
+ try
+ {
+ if(_acceptor.startAccept(completedCallback, this))
+ {
+ completedCallback(this);
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ _acceptorException = ex;
+ completedCallback(this);
+ }
+ }
+ });
+
return true;
}
@@ -1386,115 +1405,116 @@ namespace IceInternal
{
Ice.ConnectionI connection = null;
- ThreadPoolMessage msg = new ThreadPoolMessage(this);
-
- lock(this)
+ using(ThreadPoolMessage msg = new ThreadPoolMessage(current, this))
{
- if(!msg.startIOScope(ref current))
+ lock(this)
{
- return;
- }
-
- try
- {
- if(_state >= StateClosed)
- {
- return;
- }
- else if(_state == StateHolding)
+ if(!msg.startIOScope())
{
return;
}
- //
- // Reap closed connections
- //
- ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections();
- if(cons != null)
+ try
{
- foreach(Ice.ConnectionI c in cons)
+ if(_state >= StateClosed)
{
- _connections.Remove(c);
+ return;
+ }
+ else if(_state == StateHolding)
+ {
+ return;
}
- }
-
- if(!_acceptorStarted)
- {
- return;
- }
- //
- // Now accept a new connection.
- //
- Transceiver transceiver = null;
- try
- {
- transceiver = _acceptor.accept();
+ //
+ // Reap closed connections
+ //
+ ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections();
+ if(cons != null)
+ {
+ foreach(Ice.ConnectionI c in cons)
+ {
+ _connections.Remove(c);
+ }
+ }
- if(_instance.traceLevels().network >= 2)
+ if(!_acceptorStarted)
{
- StringBuilder s = new StringBuilder("trying to accept ");
- s.Append(_endpoint.protocol());
- s.Append(" connection\n");
- s.Append(transceiver.ToString());
- _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
+ return;
}
- }
- catch(Ice.SocketException ex)
- {
- if(Network.noMoreFds(ex.InnerException))
+
+ //
+ // Now accept a new connection.
+ //
+ Transceiver transceiver = null;
+ try
{
- string s = "can't accept more connections:\n" + ex + '\n' + _acceptor.ToString();
- _instance.initializationData().logger.error(s);
- Debug.Assert(_acceptorStarted);
- _acceptorStarted = false;
- _adapter.getThreadPool().finish(this);
- closeAcceptor();
+ transceiver = _acceptor.accept();
+
+ if(_instance.traceLevels().network >= 2)
+ {
+ StringBuilder s = new StringBuilder("trying to accept ");
+ s.Append(_endpoint.protocol());
+ s.Append(" connection\n");
+ s.Append(transceiver.ToString());
+ _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
+ }
}
+ catch(Ice.SocketException ex)
+ {
+ if(Network.noMoreFds(ex.InnerException))
+ {
+ string s = "can't accept more connections:\n" + ex + '\n' + _acceptor.ToString();
+ _instance.initializationData().logger.error(s);
+ Debug.Assert(_acceptorStarted);
+ _acceptorStarted = false;
+ _adapter.getThreadPool().finish(this);
+ closeAcceptor();
+ }
- // Ignore socket exceptions.
- return;
- }
- catch(Ice.LocalException ex)
- {
- // Warn about other Ice local exceptions.
- if(_warn)
+ // Ignore socket exceptions.
+ return;
+ }
+ catch(Ice.LocalException ex)
{
- warning(ex);
+ // Warn about other Ice local exceptions.
+ if(_warn)
+ {
+ warning(ex);
+ }
+ return;
}
- return;
- }
- Debug.Assert(transceiver != null);
+ Debug.Assert(transceiver != null);
- try
- {
- connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, _monitor, transceiver,
- null, _endpoint, _adapter);
- }
- catch(Ice.LocalException ex)
- {
try
{
- transceiver.close();
+ connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, _monitor, transceiver,
+ null, _endpoint, _adapter);
}
- catch(Ice.LocalException)
+ catch(Ice.LocalException ex)
{
- // Ignore
- }
+ try
+ {
+ transceiver.close();
+ }
+ catch(Ice.LocalException)
+ {
+ // Ignore
+ }
- if(_warn)
- {
- warning(ex);
+ if(_warn)
+ {
+ warning(ex);
+ }
+ return;
}
- return;
- }
- _connections.Add(connection);
- }
- finally
- {
- msg.finishIOScope(ref current);
+ _connections.Add(connection);
+ }
+ finally
+ {
+ msg.finishIOScope();
+ }
}
}
diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs
index 81c4b9f6f98..8c7b0c9fa11 100644
--- a/csharp/src/Ice/ConnectionI.cs
+++ b/csharp/src/Ice/ConnectionI.cs
@@ -566,16 +566,17 @@ namespace Ice
if(callback != null)
{
_threadPool.dispatch(() =>
- {
- try
- {
- callback(this);
- }
- catch(System.Exception ex)
{
- _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
- }
- } , this);
+ try
+ {
+ callback(this);
+ }
+ catch(System.Exception ex)
+ {
+ _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
+ }
+ },
+ this);
}
}
else
@@ -1035,45 +1036,67 @@ namespace Ice
//
// Operations from EventHandler
//
- public override bool startAsync(int operation, IceInternal.AsyncCallback cb, ref bool completedSynchronously)
+ public override bool startAsync(int operation, IceInternal.AsyncCallback completedCallback)
{
if(_state >= StateClosed)
{
return false;
}
- try
- {
- if((operation & SocketOperation.Write) != 0)
+ // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
+ // Ice thread pool thread is terminated (.NET Socket read/write fail with a SocketError.OperationAborted
+ // error if started from a thread which is later terminated).
+ Task.Run(() => {
+ lock(this)
{
- if(_observer != null)
+ if(_state >= StateClosed)
{
- observerStartWrite(_writeStream.getBuffer());
+ completedCallback(this);
+ return;
}
- bool completed;
- completedSynchronously = _transceiver.startWrite(_writeStream.getBuffer(), cb, this, out completed);
- if(completed && _sendStreams.Count > 0)
+ try
{
- // The whole message is written, assume it's sent now for at-most-once semantics.
- _sendStreams.First.Value.isSent = true;
+ if((operation & SocketOperation.Write) != 0)
+ {
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.getBuffer());
+ }
+
+ bool completed;
+ if (_transceiver.startWrite(_writeStream.getBuffer(), completedCallback, this, out completed))
+ {
+ // If the write completed immediately and the buffer
+ if (completed && _sendStreams.Count > 0)
+ {
+ // The whole message is written, assume it's sent now for at-most-once semantics.
+ _sendStreams.First.Value.isSent = true;
+ }
+ completedCallback(this);
+ }
+ }
+ else if((operation & SocketOperation.Read) != 0)
+ {
+ if(_observer != null && !_readHeader)
+ {
+ observerStartRead(_readStream.getBuffer());
+ }
+
+ if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
+ {
+ completedCallback(this);
+ }
+ }
}
- }
- else if((operation & SocketOperation.Read) != 0)
- {
- if(_observer != null && !_readHeader)
+ catch(LocalException ex)
{
- observerStartRead(_readStream.getBuffer());
+ setState(StateClosed, ex);
+ completedCallback(this);
}
-
- completedSynchronously = _transceiver.startRead(_readStream.getBuffer(), cb, this);
}
- }
- catch(LocalException ex)
- {
- setState(StateClosed, ex);
- return false;
- }
+ });
+
return true;
}
@@ -1152,274 +1175,276 @@ namespace Ice
MessageInfo info = new MessageInfo();
int dispatchCount = 0;
- ThreadPoolMessage msg = new ThreadPoolMessage(this);
- try
+ using(ThreadPoolMessage msg = new ThreadPoolMessage(current, this))
{
lock(this)
{
- if(!msg.startIOScope(ref current))
- {
- return;
- }
-
- if(_state >= StateClosed)
- {
- return;
- }
-
- int readyOp = current.operation;
try
{
- unscheduleTimeout(current.operation);
-
- int writeOp = SocketOperation.None;
- int readOp = SocketOperation.None;
- if((readyOp & SocketOperation.Write) != 0)
+ if(!msg.startIOScope())
{
- if(_observer != null)
- {
- observerStartWrite(_writeStream.getBuffer());
- }
- writeOp = write(_writeStream.getBuffer());
- if(_observer != null && (writeOp & SocketOperation.Write) == 0)
- {
- observerFinishWrite(_writeStream.getBuffer());
- }
+ // No read/write IO is ready for the transceiver. This can call startAsync
+ return;
}
- while((readyOp & SocketOperation.Read) != 0)
+ if(_state >= StateClosed)
{
- IceInternal.Buffer buf = _readStream.getBuffer();
+ return;
+ }
- if(_observer != null && !_readHeader)
- {
- observerStartRead(buf);
- }
+ int readyOp = current.operation;
+ try
+ {
+ unscheduleTimeout(current.operation);
- readOp = read(buf);
- if((readOp & SocketOperation.Read) != 0)
- {
- break;
- }
- if(_observer != null && !_readHeader)
+ int writeOp = SocketOperation.None;
+ int readOp = SocketOperation.None;
+ if((readyOp & SocketOperation.Write) != 0)
{
- Debug.Assert(!buf.b.hasRemaining());
- observerFinishRead(buf);
+ if(_observer != null)
+ {
+ observerStartWrite(_writeStream.getBuffer());
+ }
+ writeOp = write(_writeStream.getBuffer());
+ if(_observer != null && (writeOp & SocketOperation.Write) == 0)
+ {
+ observerFinishWrite(_writeStream.getBuffer());
+ }
}
- if(_readHeader) // Read header if necessary.
+ while((readyOp & SocketOperation.Read) != 0)
{
- _readHeader = false;
+ IceInternal.Buffer buf = _readStream.getBuffer();
- if(_observer != null)
+ if(_observer != null && !_readHeader)
{
- _observer.receivedBytes(Protocol.headerSize);
+ observerStartRead(buf);
}
- //
- // Connection is validated on first message. This is only used by
- // setState() to check wether or not we can print a connection
- // warning (a client might close the connection forcefully if the
- // connection isn't validated, we don't want to print a warning
- // in this case).
- //
- _validated = true;
+ readOp = read(buf);
+ if((readOp & SocketOperation.Read) != 0)
+ {
+ break;
+ }
+ if(_observer != null && !_readHeader)
+ {
+ Debug.Assert(!buf.b.hasRemaining());
+ observerFinishRead(buf);
+ }
- int pos = _readStream.pos();
- if(pos < Protocol.headerSize)
+ if(_readHeader) // Read header if necessary.
{
+ _readHeader = false;
+
+ if(_observer != null)
+ {
+ _observer.receivedBytes(Protocol.headerSize);
+ }
+
//
- // This situation is possible for small UDP packets.
+ // Connection is validated on first message. This is only used by
+ // setState() to check wether or not we can print a connection
+ // warning (a client might close the connection forcefully if the
+ // connection isn't validated, we don't want to print a warning
+ // in this case).
//
- throw new IllegalMessageSizeException();
+ _validated = true;
+
+ int pos = _readStream.pos();
+ if(pos < Protocol.headerSize)
+ {
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw new IllegalMessageSizeException();
+ }
+
+ _readStream.pos(0);
+ byte[] m = new byte[4];
+ m[0] = _readStream.readByte();
+ m[1] = _readStream.readByte();
+ m[2] = _readStream.readByte();
+ m[3] = _readStream.readByte();
+ if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
+ m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
+ {
+ BadMagicException ex = new BadMagicException();
+ ex.badMagic = m;
+ throw ex;
+ }
+
+ ProtocolVersion pv = new ProtocolVersion();
+ pv.ice_readMembers(_readStream);
+ Protocol.checkSupportedProtocol(pv);
+ EncodingVersion ev = new EncodingVersion();
+ ev.ice_readMembers(_readStream);
+ Protocol.checkSupportedProtocolEncoding(ev);
+
+ _readStream.readByte(); // messageType
+ _readStream.readByte(); // compress
+ int size = _readStream.readInt();
+ if(size < Protocol.headerSize)
+ {
+ throw new IllegalMessageSizeException();
+ }
+
+ if(size > _messageSizeMax)
+ {
+ Ex.throwMemoryLimitException(size, _messageSizeMax);
+ }
+ if(size > _readStream.size())
+ {
+ _readStream.resize(size);
+ }
+ _readStream.pos(pos);
}
- _readStream.pos(0);
- byte[] m = new byte[4];
- m[0] = _readStream.readByte();
- m[1] = _readStream.readByte();
- m[2] = _readStream.readByte();
- m[3] = _readStream.readByte();
- if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
- m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
+ if(buf.b.hasRemaining())
{
- BadMagicException ex = new BadMagicException();
- ex.badMagic = m;
- throw ex;
+ if(_endpoint.datagram())
+ {
+ throw new DatagramLimitException(); // The message was truncated.
+ }
+ continue;
}
+ break;
+ }
- ProtocolVersion pv = new ProtocolVersion();
- pv.ice_readMembers(_readStream);
- Protocol.checkSupportedProtocol(pv);
- EncodingVersion ev = new EncodingVersion();
- ev.ice_readMembers(_readStream);
- Protocol.checkSupportedProtocolEncoding(ev);
-
- _readStream.readByte(); // messageType
- _readStream.readByte(); // compress
- int size = _readStream.readInt();
- if(size < Protocol.headerSize)
- {
- throw new IllegalMessageSizeException();
- }
+ int newOp = readOp | writeOp;
+ readyOp &= ~newOp;
+ Debug.Assert(readyOp != 0 || newOp != 0);
- if(size > _messageSizeMax)
+ if(_state <= StateNotValidated)
+ {
+ if(newOp != 0)
{
- Ex.throwMemoryLimitException(size, _messageSizeMax);
+ //
+ // Wait for all the transceiver conditions to be
+ // satisfied before continuing.
+ //
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
+ return;
}
- if(size > _readStream.size())
+
+ if(_state == StateNotInitialized && !initialize(current.operation))
{
- _readStream.resize(size);
+ return;
}
- _readStream.pos(pos);
- }
- if(buf.b.hasRemaining())
- {
- if(_endpoint.datagram())
+ if(_state <= StateNotValidated && !validate(current.operation))
{
- throw new DatagramLimitException(); // The message was truncated.
+ return;
}
- continue;
- }
- break;
- }
- int newOp = readOp | writeOp;
- readyOp &= ~newOp;
- Debug.Assert(readyOp != 0 || newOp != 0);
+ _threadPool.unregister(this, current.operation);
- if(_state <= StateNotValidated)
- {
- if(newOp != 0)
- {
//
- // Wait for all the transceiver conditions to be
- // satisfied before continuing.
+ // We start out in holding state.
//
- scheduleTimeout(newOp);
- _threadPool.update(this, current.operation, newOp);
- return;
- }
-
- if(_state == StateNotInitialized && !initialize(current.operation))
- {
- return;
+ setState(StateHolding);
+ if(_startCallback != null)
+ {
+ startCB = _startCallback;
+ _startCallback = null;
+ if(startCB != null)
+ {
+ ++dispatchCount;
+ }
+ }
}
-
- if(_state <= StateNotValidated && !validate(current.operation))
+ else
{
- return;
- }
+ Debug.Assert(_state <= StateClosingPending);
- _threadPool.unregister(this, current.operation);
-
- //
- // We start out in holding state.
- //
- setState(StateHolding);
- if(_startCallback != null)
- {
- startCB = _startCallback;
- _startCallback = null;
- if(startCB != null)
+ //
+ // We parse messages first, if we receive a close
+ // connection message we won't send more messages.
+ //
+ if((readyOp & SocketOperation.Read) != 0)
{
- ++dispatchCount;
+ newOp |= parseMessage(ref info);
+ dispatchCount += info.messageDispatchCount;
}
- }
- }
- else
- {
- Debug.Assert(_state <= StateClosingPending);
- //
- // We parse messages first, if we receive a close
- // connection message we won't send more messages.
- //
- if((readyOp & SocketOperation.Read) != 0)
- {
- newOp |= parseMessage(ref info);
- dispatchCount += info.messageDispatchCount;
- }
+ if((readyOp & SocketOperation.Write) != 0)
+ {
+ newOp |= sendNextMessage(out sentCBs);
+ if(sentCBs != null)
+ {
+ ++dispatchCount;
+ }
+ }
- if((readyOp & SocketOperation.Write) != 0)
- {
- newOp |= sendNextMessage(out sentCBs);
- if(sentCBs != null)
+ if(_state < StateClosed)
{
- ++dispatchCount;
+ scheduleTimeout(newOp);
+ _threadPool.update(this, current.operation, newOp);
}
}
- if(_state < StateClosed)
+ if(_acmLastActivity > -1)
{
- scheduleTimeout(newOp);
- _threadPool.update(this, current.operation, newOp);
+ _acmLastActivity = Time.currentMonotonicTimeMillis();
}
- }
- if(_acmLastActivity > -1)
- {
- _acmLastActivity = Time.currentMonotonicTimeMillis();
- }
-
- if(dispatchCount == 0)
- {
- return; // Nothing to dispatch we're done!
- }
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
- _dispatchCount += dispatchCount;
+ _dispatchCount += dispatchCount;
- msg.completed(ref current);
- }
- catch(DatagramLimitException) // Expected.
- {
- if(_warnUdp)
- {
- _logger.warning(string.Format("maximum datagram size of {0} exceeded", _readStream.pos()));
+ msg.ioCompleted();
}
- _readStream.resize(Protocol.headerSize);
- _readStream.pos(0);
- _readHeader = true;
- return;
- }
- catch(SocketException ex)
- {
- setState(StateClosed, ex);
- return;
- }
- catch(LocalException ex)
- {
- if(_endpoint.datagram())
+ catch(DatagramLimitException) // Expected.
{
- if(_warn)
+ if(_warnUdp)
{
- _logger.warning(string.Format("datagram connection exception:\n{0}\n{1}", ex, _desc));
+ _logger.warning(string.Format("maximum datagram size of {0} exceeded", _readStream.pos()));
}
_readStream.resize(Protocol.headerSize);
_readStream.pos(0);
_readHeader = true;
+ return;
}
- else
+ catch(SocketException ex)
{
setState(StateClosed, ex);
+ return;
}
- return;
+ catch(LocalException ex)
+ {
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ _logger.warning(string.Format("datagram connection exception:\n{0}\n{1}", ex, _desc));
+ }
+ _readStream.resize(Protocol.headerSize);
+ _readStream.pos(0);
+ _readHeader = true;
+ }
+ else
+ {
+ setState(StateClosed, ex);
+ }
+ return;
+ }
+
}
+ finally
+ {
+ msg.finishIOScope();
+ }
+ }
- ThreadPoolCurrent c = current;
- _threadPool.dispatch(() =>
+ _threadPool.dispatchFromThisThread(() =>
{
dispatch(startCB, sentCBs, info);
- msg.destroy(ref c);
- }, this);
- }
+ },
+ this);
}
- finally
- {
- msg.finishIOScope(ref current);
- }
-
}
private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
@@ -1546,7 +1571,7 @@ namespace Ice
//
// If there are no callbacks to call, we don't call ioCompleted() since we're not going
// to call code that will potentially block (this avoids promoting a new leader and
- // unecessary thread creation, especially if this is called on shutdown).
+ // unnecessary thread creation, especially if this is called on shutdown).
//
if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 &&
_closeCallback == null && _heartbeatCallback == null)
@@ -1555,13 +1580,9 @@ namespace Ice
return;
}
- //
- // Unlike C++/Java, this method is called from an IO thread of the .NET thread
- // pool of from the communicator async IO thread. While it's fine to handle the
- // non-blocking activity of the connection from these threads, the dispatching
- // of the message must be taken care of by the Ice thread pool.
- //
- _threadPool.dispatch(finish, this);
+ current.ioCompleted();
+
+ _threadPool.dispatchFromThisThread(finish, this);
}
private void finish()
diff --git a/csharp/src/Ice/EventHandler.cs b/csharp/src/Ice/EventHandler.cs
index c5560b631ee..c4d5d4d82a7 100644
--- a/csharp/src/Ice/EventHandler.cs
+++ b/csharp/src/Ice/EventHandler.cs
@@ -10,7 +10,7 @@ public abstract class EventHandler
//
// Called to start a new asynchronous read or write operation.
//
- abstract public bool startAsync(int op, AsyncCallback cb, ref bool completedSynchronously);
+ abstract public bool startAsync(int op, AsyncCallback cb);
abstract public bool finishAsync(int op);
diff --git a/csharp/src/Ice/Instance.cs b/csharp/src/Ice/Instance.cs
index 16054521058..e164d799061 100644
--- a/csharp/src/Ice/Instance.cs
+++ b/csharp/src/Ice/Instance.cs
@@ -228,25 +228,6 @@ namespace IceInternal
}
}
- public AsyncIOThread
- asyncIOThread()
- {
- lock(this)
- {
- if(_state == StateDestroyed)
- {
- throw new Ice.CommunicatorDestroyedException();
- }
-
- if(_asyncIOThread == null) // Lazy initialization.
- {
- _asyncIOThread = new AsyncIOThread(this);
- }
-
- return _asyncIOThread;
- }
- }
-
public EndpointHostResolver endpointHostResolver()
{
lock(this)
@@ -1285,10 +1266,6 @@ namespace IceInternal
{
_clientThreadPool.destroy();
}
- if(_asyncIOThread != null)
- {
- _asyncIOThread.destroy();
- }
if(_endpointHostResolver != null)
{
_endpointHostResolver.destroy();
@@ -1309,10 +1286,6 @@ namespace IceInternal
{
_serverThreadPool.joinWithAllThreads();
}
- if(_asyncIOThread != null)
- {
- _asyncIOThread.joinWithThread();
- }
if(_endpointHostResolver != null)
{
_endpointHostResolver.joinWithThread();
@@ -1373,7 +1346,6 @@ namespace IceInternal
_serverThreadPool = null;
_clientThreadPool = null;
- _asyncIOThread = null;
_endpointHostResolver = null;
_timer = null;
@@ -1503,10 +1475,6 @@ namespace IceInternal
{
_endpointHostResolver.updateObserver();
}
- if(_asyncIOThread != null)
- {
- _asyncIOThread.updateObserver();
- }
if(_timer != null)
{
_timer.updateObserver(_initData.observer);
@@ -1638,7 +1606,6 @@ namespace IceInternal
private NetworkProxy _networkProxy;
private ThreadPool _clientThreadPool;
private ThreadPool _serverThreadPool;
- private AsyncIOThread _asyncIOThread;
private EndpointHostResolver _endpointHostResolver;
private Timer _timer;
private RetryQueue _retryQueue;
diff --git a/csharp/src/Ice/ThreadPool.cs b/csharp/src/Ice/ThreadPool.cs
index cad6db97da2..c113ec1ab57 100644
--- a/csharp/src/Ice/ThreadPool.cs
+++ b/csharp/src/Ice/ThreadPool.cs
@@ -4,12 +4,14 @@
namespace IceInternal
{
+ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
- public delegate void ThreadPoolWorkItem();
+ public delegate void ThreadPoolWorkItem(ThreadPoolCurrent current);
+
public delegate void AsyncCallback(object state);
//
@@ -30,7 +32,7 @@ namespace IceInternal
// Dispatch the continuation on the thread pool if this isn't called
// already from a thread pool thread. We don't use the dispatcher
// for the continuations, the dispatcher is only used when the
- // call is initialy invoked (e.g.: a servant dispatch after being
+ // call is initially invoked (e.g.: a servant dispatch after being
// received is dispatched using the dispatcher which might dispatch
// the call on the UI thread which will then use its own synchronization
// context to execute continuations).
@@ -38,7 +40,7 @@ namespace IceInternal
var ctx = Current as ThreadPoolSynchronizationContext;
if(ctx != this)
{
- _threadPool.dispatch(() => { d(state); }, null, false);
+ _threadPool.dispatch(() => { d(state); }, null);
}
else
{
@@ -54,100 +56,98 @@ namespace IceInternal
private ThreadPool _threadPool;
}
- internal struct ThreadPoolMessage
+
+ internal class ThreadPoolMessage : IDisposable
{
- public ThreadPoolMessage(object mutex)
+ public ThreadPoolMessage(ThreadPoolCurrent current, object mutex)
{
+ _current = current;
_mutex = mutex;
_finish = false;
_finishWithIO = false;
}
- public bool startIOScope(ref ThreadPoolCurrent current)
+ public bool startIOScope()
{
// This must be called with the handler locked.
- _finishWithIO = current.startMessage();
+ _finishWithIO = _current.startMessage();
return _finishWithIO;
}
- public void finishIOScope(ref ThreadPoolCurrent current)
+ public void finishIOScope()
{
if(_finishWithIO)
{
- lock(_mutex)
- {
- current.finishMessage(true);
- }
+ // This must be called with the handler locked.
+ _current.finishMessage();
}
}
- public void completed(ref ThreadPoolCurrent current)
+ public void ioCompleted()
{
//
// Call finishMessage once IO is completed only if serialization is not enabled.
// Otherwise, finishMessage will be called when the event handler is done with
- // the message (it will be called from destroy below).
+ // the message (it will be called from Dispose below).
//
Debug.Assert(_finishWithIO);
- if(current.ioCompleted())
+ if(_current.ioCompleted())
{
_finishWithIO = false;
_finish = true;
}
}
- public void destroy(ref ThreadPoolCurrent current)
+ public void Dispose()
{
if(_finish)
{
//
- // A ThreadPoolMessage instance must be created outside the synchronization
- // of the event handler. We need to lock the event handler here to call
- // finishMessage.
+ // A ThreadPoolMessage instance must be created outside the synchronization of the event handler. We
+ // need to lock the event handler here to call finishMessage.
//
lock(_mutex)
{
- current.finishMessage(false);
- Debug.Assert(!current.completedSynchronously);
+ _current.finishMessage();
}
}
}
+ private ThreadPoolCurrent _current;
private object _mutex;
private bool _finish;
private bool _finishWithIO;
}
- public struct ThreadPoolCurrent
+ public class ThreadPoolCurrent
{
- public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op)
+ internal ThreadPoolCurrent(ThreadPool threadPool, ThreadPool.WorkerThread thread)
{
_threadPool = threadPool;
- _handler = handler;
- operation = op;
- completedSynchronously = false;
+ _thread = thread;
}
- public readonly int operation;
- public bool completedSynchronously;
+ public int operation;
public bool ioCompleted()
{
- return _threadPool.serialize();
+ return _threadPool.ioCompleted(this);
}
public bool startMessage()
{
- return _threadPool.startMessage(ref this);
+ return _threadPool.startMessage(this);
}
- public void finishMessage(bool fromIOThread)
+ public void finishMessage()
{
- _threadPool.finishMessage(ref this, fromIOThread);
+ _threadPool.finishMessage(this);
}
internal readonly ThreadPool _threadPool;
- internal readonly EventHandler _handler;
+ internal readonly ThreadPool.WorkerThread _thread;
+ internal bool _ioCompleted;
+ internal EventHandler _handler;
}
public sealed class ThreadPool : System.Threading.Tasks.TaskScheduler
@@ -251,7 +251,7 @@ namespace IceInternal
_threads = new List<WorkerThread>();
for(int i = 0; i < _size; ++i)
{
- WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
+ var thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
thread.start(_priority);
_threads.Add(thread);
}
@@ -329,18 +329,12 @@ namespace IceInternal
if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0)
{
handler._pending |= SocketOperation.Read;
- executeNonBlocking(() =>
- {
- messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read));
- });
+ queueReadyForIOHandler(handler, SocketOperation.Read);
}
else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0)
{
handler._pending |= SocketOperation.Write;
- executeNonBlocking(() =>
- {
- messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write));
- });
+ queueReadyForIOHandler(handler, SocketOperation.Write);
}
}
}
@@ -363,11 +357,13 @@ namespace IceInternal
//
if(handler._pending == 0)
{
- executeNonBlocking(() =>
- {
- ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None);
- handler.finished(ref current);
- });
+ _workItems.Enqueue(current =>
+ {
+ current.operation = SocketOperation.None;
+ current._handler = handler;
+ handler.finished(ref current);
+ });
+ Monitor.Pulse(this);
}
else
{
@@ -376,13 +372,13 @@ namespace IceInternal
}
}
- public void dispatchFromThisThread(System.Action call, Ice.Connection con)
+ public void dispatchFromThisThread(System.Action call, Ice.Connection connection)
{
if(_dispatcher != null)
{
try
{
- _dispatcher(call, con);
+ _dispatcher(call, connection);
}
catch(System.Exception ex)
{
@@ -399,7 +395,7 @@ namespace IceInternal
}
}
- public void dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true)
+ public void dispatch(Action workItem, Ice.Connection connection)
{
lock(this)
{
@@ -407,63 +403,21 @@ namespace IceInternal
{
throw new Ice.CommunicatorDestroyedException();
}
-
- if(useDispatcher)
- {
- _workItems.Enqueue(() => { dispatchFromThisThread(call, con); });
- }
- else
- {
- _workItems.Enqueue(() => { call(); });
- }
- Monitor.Pulse(this);
-
- //
- // If this is a dynamic thread pool which can still grow and if all threads are
- // currently busy dispatching or about to dispatch, we spawn a new thread to
- // execute this new work item right away.
- //
- if(_threads.Count < _sizeMax &&
- (_inUse + _workItems.Count) > _threads.Count &&
- !_destroyed)
- {
- if(_instance.traceLevels().threadPool >= 1)
- {
- string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
-
- try
+ _workItems.Enqueue(current =>
{
- WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
- t.start(_priority);
- _threads.Add(t);
- }
- catch(System.Exception ex)
- {
- string s = "cannot create thread for `" + _prefix + "':\n" + ex;
- _instance.initializationData().logger.error(s);
- }
- }
- }
- }
-
- public void executeNonBlocking(ThreadPoolWorkItem workItem)
- {
- lock(this)
- {
- Debug.Assert(!_destroyed);
- _instance.asyncIOThread().queue(workItem);
+ current.ioCompleted();
+ dispatchFromThisThread(workItem, connection);
+ });
+ Monitor.Pulse(this);
}
}
public void joinWithAllThreads()
{
//
- // _threads is immutable after destroy() has been called,
- // therefore no synchronization is needed. (Synchronization
- // wouldn't be possible here anyway, because otherwise the
- // other threads would never terminate.)
+ // _threads is immutable after destroy() has been called, therefore no synchronization is needed.
+ // (Synchronization wouldn't be possible here anyway, because otherwise the other threads would never
+ // terminate.)
//
Debug.Assert(_destroyed);
foreach(WorkerThread thread in _threads)
@@ -484,7 +438,7 @@ namespace IceInternal
protected sealed override void QueueTask(System.Threading.Tasks.Task task)
{
- dispatch(() => { TryExecuteTask(task); }, null, _dispatcher != null);
+ dispatch(() => { TryExecuteTask(task); }, null);
}
protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)
@@ -507,25 +461,38 @@ namespace IceInternal
return new System.Threading.Tasks.Task[0];
}
- private void run(WorkerThread thread)
+ private void queueReadyForIOHandler(EventHandler handler, int operation)
{
- ThreadPoolWorkItem workItem = null;
- while(true)
+ lock(this)
{
- lock(this)
- {
- if(workItem != null)
+ Debug.Assert(!_destroyed);
+ _workItems.Enqueue(current =>
{
- Debug.Assert(_inUse > 0);
- --_inUse;
- if(_workItems.Count == 0)
+ current._handler = handler;
+ current.operation = operation;
+ try
{
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
+ current._handler.message(ref current);
}
- }
-
- workItem = null;
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " +
+ current._handler.ToString();
+ _instance.initializationData().logger.error(s);
+ }
+ });
+ Monitor.Pulse(this);
+ }
+ }
+ private void run(WorkerThread thread)
+ {
+ var current = new ThreadPoolCurrent(this, thread);
+ while(true)
+ {
+ ThreadPoolWorkItem workItem = null;
+ lock(this)
+ {
while(_workItems.Count == 0)
{
if(_destroyed)
@@ -541,15 +508,8 @@ namespace IceInternal
{
return;
}
- else if(_serverIdleTime == 0 || _threads.Count > 1)
+ else if (_inUse < _threads.Count - 1)
{
- //
- // If not the last thread or if server idle time isn't configured,
- // we can exit. Unlike C++/Java, there's no need to have a thread
- // always spawned in the thread pool because all the IO is done
- // by the .NET thread pool threads. Instead, we'll just spawn a
- // new thread when needed (i.e.: when a new work item is queued).
- //
if(_instance.traceLevels().threadPool >= 1)
{
string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1);
@@ -558,32 +518,39 @@ namespace IceInternal
}
_threads.Remove(thread);
- _instance.asyncIOThread().queue(() =>
+ _workItems.Enqueue(c =>
{
+ // No call to ioCompleted, this shouldn't block (and we don't want to cause
+ // a new thread to be started).
thread.join();
});
+ Monitor.Pulse(this);
return;
}
- else
+ else if (_inUse > 0)
{
- Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1);
- if(!Monitor.Wait(this, _serverIdleTime * 1000) &&
- _workItems.Count == 0)
- {
- if(!_destroyed)
+ //
+ // If this is the last idle thread but there are still other threads
+ // busy dispatching, we go back waiting with _threadIdleTime. We only
+ // wait with _serverIdleTime when there's only one thread left.
+ //
+ continue;
+ }
+
+ Debug.Assert(_threads.Count == 1);
+ if(!Monitor.Wait(this, _serverIdleTime * 1000) && !_destroyed)
+ {
+ _workItems.Enqueue(c =>
{
- _workItems.Enqueue(() =>
- {
- try
- {
- _instance.objectAdapterFactory().shutdown();
- }
- catch(Ice.CommunicatorDestroyedException)
- {
- }
- });
- }
- }
+ c.ioCompleted();
+ try
+ {
+ _instance.objectAdapterFactory().shutdown();
+ }
+ catch(Ice.CommunicatorDestroyedException)
+ {
+ }
+ });
}
}
}
@@ -596,32 +563,78 @@ namespace IceInternal
Debug.Assert(_workItems.Count > 0);
workItem = _workItems.Dequeue();
+ current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
+ current._ioCompleted = false;
+ }
+
+ try
+ {
+ workItem(current);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n';
+ _instance.initializationData().logger.error(s);
+ }
+
+ lock (this)
+ {
+ if (_sizeMax > 1 && current._ioCompleted)
+ {
+ Debug.Assert(_inUse > 0);
+ --_inUse;
+ }
+ thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
+ }
+ }
+ }
+
+ public bool ioCompleted(ThreadPoolCurrent current)
+ {
+ lock(this)
+ {
+ current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
+
+ current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
+
+ if(_sizeMax > 1)
+ {
Debug.Assert(_inUse >= 0);
++_inUse;
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser);
-
if(_sizeMax > 1 && _inUse == _sizeWarn)
{
string s = "thread pool `" + _prefix + "' is running low on threads\n"
+ "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
_instance.initializationData().logger.warning(s);
}
- }
- try
- {
- workItem();
- }
- catch(System.Exception ex)
- {
- string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n';
- _instance.initializationData().logger.error(s);
+ if(!_destroyed && _inUse < _sizeMax && _inUse == _threads.Count)
+ {
+ if(_instance.traceLevels().threadPool >= 1)
+ {
+ string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
+
+ try
+ {
+ var t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++);
+ t.start(_priority);
+ _threads.Add(t);
+ }
+ catch(System.Exception ex)
+ {
+ string s = "cannot create thread for `" + _prefix + "':\n" + ex;
+ _instance.initializationData().logger.error(s);
+ }
+ }
}
}
+ return _serialize;
}
- public bool startMessage(ref ThreadPoolCurrent current)
+ public bool startMessage(ThreadPoolCurrent current)
{
Debug.Assert((current._handler._pending & current.operation) != 0);
@@ -644,8 +657,7 @@ namespace IceInternal
(current._handler._registered & current.operation) != 0)
{
Debug.Assert((current._handler._started & current.operation) == 0);
- bool completed = false;
- if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
+ if(!current._handler.startAsync(current.operation, getCallback(current.operation)))
{
current._handler._pending &= ~current.operation;
if(current._handler._pending == 0 && current._handler._finish)
@@ -656,7 +668,6 @@ namespace IceInternal
}
else
{
- current.completedSynchronously = completed;
current._handler._started |= current.operation;
return false;
}
@@ -679,32 +690,19 @@ namespace IceInternal
}
}
- public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread)
+ public void finishMessage(ThreadPoolCurrent current)
{
if((current._handler._registered & current.operation) != 0)
{
- if(fromIOThread)
+ Debug.Assert((current._handler._ready & current.operation) == 0);
+ if(!current._handler.startAsync(current.operation, getCallback(current.operation)))
{
- Debug.Assert((current._handler._ready & current.operation) == 0);
- bool completed = false;
- if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed))
- {
- current._handler._pending &= ~current.operation;
- }
- else
- {
- Debug.Assert((current._handler._pending & current.operation) != 0);
- current.completedSynchronously = completed;
- current._handler._started |= current.operation;
- }
+ current._handler._pending &= ~current.operation;
}
else
{
- ThreadPoolCurrent c = current;
- executeNonBlocking(() =>
- {
- messageCallback(c);
- });
+ Debug.Assert((current._handler._pending & current.operation) != 0);
+ current._handler._started |= current.operation;
}
}
else
@@ -719,55 +717,13 @@ namespace IceInternal
}
}
- public void asyncReadCallback(object state)
- {
- messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read));
- }
-
- public void asyncWriteCallback(object state)
- {
- messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write));
- }
-
- public void messageCallback(ThreadPoolCurrent current)
- {
- try
- {
- do
- {
- current.completedSynchronously = false;
- current._handler.message(ref current);
- }
- while(current.completedSynchronously);
- }
- catch(System.Exception ex)
- {
- string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString();
- _instance.initializationData().logger.error(s);
- }
- }
-
private AsyncCallback getCallback(int operation)
{
- switch(operation)
- {
- case SocketOperation.Read:
- return asyncReadCallback;
- case SocketOperation.Write:
- return asyncWriteCallback;
- default:
- Debug.Assert(false);
- return null;
- }
+ Debug.Assert(operation == SocketOperation.Read || operation == SocketOperation.Write);
+ return state => queueReadyForIOHandler((EventHandler)state, operation);
}
- private Instance _instance;
- private System.Action<System.Action, Ice.Connection> _dispatcher;
- private bool _destroyed;
- private readonly string _prefix;
- private readonly string _threadPrefix;
-
- private sealed class WorkerThread
+ internal sealed class WorkerThread
{
private ThreadPool _threadPool;
private Ice.Instrumentation.ThreadObserver _observer;
@@ -890,6 +846,11 @@ namespace IceInternal
private Thread _thread;
}
+ private Instance _instance;
+ private System.Action<System.Action, Ice.Connection> _dispatcher;
+ private bool _destroyed;
+ private readonly string _prefix;
+ private readonly string _threadPrefix;
private readonly int _size; // Number of threads that are pre-created.
private readonly int _sizeMax; // Maximum number of threads.
private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs
index de9d5452b4b..c8d75f64846 100644
--- a/csharp/test/Ice/ami/AllTests.cs
+++ b/csharp/test/Ice/ami/AllTests.cs
@@ -3842,6 +3842,37 @@ namespace Ice
}
output.WriteLine("ok");
+ output.Write("testing back pressure... ");
+ output.Flush();
+ {
+ // Keep the 3 server thread pool threads busy.
+ Task sleep1Task = p.sleepAsync(1000);
+ Task sleep2Task = p.sleepAsync(1000);
+ Task sleep3Task = p.sleepAsync(1000);
+ bool canceled = false;
+ using(var cts = new CancellationTokenSource(200))
+ {
+ try
+ {
+ var onewayProxy = (Test.TestIntfPrx)p.ice_oneway();
+
+ // Sending should be canceled because the TCP send/receive buffer size on the server is set
+ // to 50KB. Note: we don't use the cancel parameter of the operation here because the
+ // cancellation doesn't cancel the operation whose payload is being sent.
+ onewayProxy.opWithPayloadAsync(new byte[768 * 1024]).Wait(cts.Token);
+ }
+ catch(OperationCanceledException)
+ {
+ canceled = true;
+ }
+ }
+ test(canceled && !sleep1Task.IsCompleted);
+ sleep1Task.Wait();
+ sleep2Task.Wait();
+ sleep3Task.Wait();
+ }
+ output.WriteLine("ok");
+
p.shutdown();
}
}
diff --git a/csharp/test/Ice/metrics/AllTests.cs b/csharp/test/Ice/metrics/AllTests.cs
index 7daafb02afc..8eb0921b425 100644
--- a/csharp/test/Ice/metrics/AllTests.cs
+++ b/csharp/test/Ice/metrics/AllTests.cs
@@ -529,7 +529,7 @@ public class AllTests : Test.AllTests
test(view["Connection"].Length == 1 && view["Connection"][0].current == 1 &&
view["Connection"][0].total == 1);
}
- test(view["Thread"].Length == 1 && view["Thread"][0].current == 5 && view["Thread"][0].total == 5);
+ test(view["Thread"].Length == 1 && view["Thread"][0].current == 4 && view["Thread"][0].total == 4);
output.WriteLine("ok");
output.Write("testing group by id...");
@@ -548,7 +548,7 @@ public class AllTests : Test.AllTests
waitForCurrent(serverMetrics, "View", "Dispatch", 0);
view = clientMetrics.getMetricsView("View", out timestamp);
- test(view["Thread"].Length == 5);
+ test(view["Thread"].Length == 4);
if(!collocated)
{
test(view["Connection"].Length == 2);
diff --git a/java/test/src/main/java/test/Ice/ami/AllTests.java b/java/test/src/main/java/test/Ice/ami/AllTests.java
index 10fb6223cd7..a15bfb7cc78 100644
--- a/java/test/src/main/java/test/Ice/ami/AllTests.java
+++ b/java/test/src/main/java/test/Ice/ami/AllTests.java
@@ -9,9 +9,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
import com.zeroc.Ice.InvocationFuture;
import com.zeroc.Ice.Util;
+import com.zeroc.IceInternal.Time;
import com.zeroc.Ice.CompressBatch;
import test.Ice.ami.Test.CloseMode;
@@ -1103,6 +1106,44 @@ public class AllTests
}
out.println("ok");
+ if (p.ice_getConnection() != null)
+ {
+ out.print("testing back pressure... ");
+ out.flush();
+ try
+ {
+ // Keep the 3 server thread pool threads busy.
+ CompletableFuture<Void> sleep1Future = p.sleepAsync(1000);
+ CompletableFuture<Void> sleep2Future = p.sleepAsync(1000);
+ CompletableFuture<Void> sleep3Future = p.sleepAsync(1000);
+ TestIntfPrx onewayProxy = p.ice_oneway();
+
+ // Sending should block because the TCP send/receive buffer size on the server is set to 50KB.
+ CompletableFuture<Void> future = onewayProxy.opWithPayloadAsync(new byte[768 * 1024]);
+ boolean timeout = false;
+ try
+ {
+ future.get(200, TimeUnit.MILLISECONDS);
+ }
+ catch(TimeoutException ex)
+ {
+ timeout = true;
+ }
+ test(timeout);
+ test(!sleep1Future.isDone());
+
+ sleep1Future.get();
+ sleep2Future.get();
+ sleep3Future.get();
+ }
+ catch(Exception ex)
+ {
+ out.println(ex.toString());
+ test(false);
+ }
+ out.println("ok");
+ }
+
executor.shutdown();
p.shutdown();