diff options
author | Benoit Foucher <benoit@zeroc.com> | 2024-06-18 08:19:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-18 08:19:36 +0200 |
commit | b08306ef3bf16a7ecadf558375b2eb4f14336047 (patch) | |
tree | e8ac785d69e8f2baa6f93ed4736f596ae9c53eb8 | |
parent | Fix bogus call to ERR_get_error - Fix #2153 (#2231) (diff) | |
download | ice-b08306ef3bf16a7ecadf558375b2eb4f14336047.tar.bz2 ice-b08306ef3bf16a7ecadf558375b2eb4f14336047.tar.xz ice-b08306ef3bf16a7ecadf558375b2eb4f14336047.zip |
Back pressure fix for 3.7 (#2270)
-rw-r--r-- | CHANGELOG-3.7.md | 7 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 4 | ||||
-rw-r--r-- | cpp/test/Ice/ami/AllTests.cpp | 24 | ||||
-rw-r--r-- | csharp/src/Ice/AsyncIOThread.cs | 182 | ||||
-rw-r--r-- | csharp/src/Ice/ConnectionFactory.cs | 204 | ||||
-rw-r--r-- | csharp/src/Ice/ConnectionI.cs | 495 | ||||
-rw-r--r-- | csharp/src/Ice/EventHandler.cs | 2 | ||||
-rw-r--r-- | csharp/src/Ice/Instance.cs | 33 | ||||
-rw-r--r-- | csharp/src/Ice/ThreadPool.cs | 401 | ||||
-rw-r--r-- | csharp/test/Ice/ami/AllTests.cs | 31 | ||||
-rw-r--r-- | csharp/test/Ice/metrics/AllTests.cs | 4 | ||||
-rw-r--r-- | java/test/src/main/java/test/Ice/ami/AllTests.java | 41 |
12 files changed, 659 insertions, 769 deletions
diff --git a/CHANGELOG-3.7.md b/CHANGELOG-3.7.md index bb25067386a..5022971c014 100644 --- a/CHANGELOG-3.7.md +++ b/CHANGELOG-3.7.md @@ -10,6 +10,7 @@ particular aspect of Ice. - [Changes since Ice 3.7.10](#changes-since-ice-3710) - [C++ Changes](#c-changes) + - [C# Changes](#c-changes) - [Changes in Ice 3.7.10](#changes-in-ice-3710) - [C++ Changes](#c-changes-1) - [C# Changes](#c-changes-2) @@ -102,6 +103,12 @@ particular aspect of Ice. - Fix bug with the Ice.ServerIdleTime affecting Apple platforms. (https://github.com/zeroc-ice/ice/issues/2025) +## C# Changes + +- Fixed bug in the Ice for C# connection code. A connection no longer reads messages off the network when its + associated thread-pool has no thread available to process these messages. As a result, back-pressure now works in + Ice for C# just like in other language mappings. + # Changes in Ice 3.7.10 These are the major changes since Ice 3.7.9. diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 5a56c74f4c4..434dc728a9e 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -778,7 +778,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) // If there are no more ready handlers and there are still threads busy performing // IO, we give up leadership and promote another follower (which will perform the // select() only once all the IOs are completed). Otherwise, if there are no more - // threads peforming IOs, it's time to do another select(). + // threads performing IOs, it's time to do another select(). // if(_inUseIO > 0) { @@ -915,7 +915,7 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. + current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. current._thread->setState(ICE_ENUM(ThreadState, ThreadStateInUseForUser)); diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index a39f3dd4f3a..d5bc48233c0 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -7,7 +7,10 @@ #include <TestHelper.h> #include <Test.h> + #ifdef ICE_CPP11_MAPPING +# include <chrono> +# include <future> # include <thread> #endif @@ -2482,6 +2485,27 @@ allTests(Test::TestHelper* helper, bool collocated) if(p->ice_getConnection()) { + cout << "testing back pressure... " << flush; + { + // Keep the 3 server thread pool threads busy. + auto sleep1Future = p->sleepAsync(1000); + auto sleep2Future = p->sleepAsync(1000); + auto sleep3Future = p->sleepAsync(1000); + + auto onewayProxy = Ice::uncheckedCast<Test::TestIntfPrx>(p->ice_oneway()); + + // Sending should block because the TCP send/receive buffer size on the server is set to 50KB. + Ice::ByteSeq seq; + seq.resize(768 * 1024); + auto future = onewayProxy->opWithPayloadAsync(seq); + + test(future.wait_for(200ms) == future_status::timeout && sleep1Future.wait_for(0s) != future_status::ready); + sleep1Future.wait(); + sleep2Future.wait(); + sleep3Future.wait(); + } + cout << "ok" << endl; + cout << "testing bidir... " << flush; auto adapter = communicator->createObjectAdapter(""); auto replyI = make_shared<PingReplyI>(); diff --git a/csharp/src/Ice/AsyncIOThread.cs b/csharp/src/Ice/AsyncIOThread.cs deleted file mode 100644 index bca757b110f..00000000000 --- a/csharp/src/Ice/AsyncIOThread.cs +++ /dev/null @@ -1,182 +0,0 @@ -// -// Copyright (c) ZeroC, Inc. All rights reserved. -// - -namespace IceInternal -{ - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading; - - public class AsyncIOThread - { - internal AsyncIOThread(Instance instance) - { - _instance = instance; - - _thread = new HelperThread(this); - updateObserver(); - _thread.Start(Util.stringToThreadPriority( - instance.initializationData().properties.getProperty("Ice.ThreadPriority"))); - } - - public void - updateObserver() - { - lock(this) - { - Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; - if(obsv != null) - { - _observer = obsv.getThreadObserver("Communicator", - _thread.getName(), - Ice.Instrumentation.ThreadState.ThreadStateIdle, - _observer); - if(_observer != null) - { - _observer.attach(); - } - } - } - } - - public void queue(ThreadPoolWorkItem callback) - { - lock(this) - { - Debug.Assert(!_destroyed); - _queue.AddLast(callback); - Monitor.Pulse(this); - } - } - - public void destroy() - { - lock(this) - { - Debug.Assert(!_destroyed); - _destroyed = true; - Monitor.Pulse(this); - } - } - - public void joinWithThread() - { - if(_thread != null) - { - _thread.Join(); - } - } - - public void run() - { - LinkedList<ThreadPoolWorkItem> queue = new LinkedList<ThreadPoolWorkItem>(); - bool inUse = false; - while(true) - { - lock(this) - { - if(_observer != null && inUse) - { - _observer.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO, - Ice.Instrumentation.ThreadState.ThreadStateIdle); - inUse = false; - } - - if(_destroyed && _queue.Count == 0) - { - break; - } - - while(!_destroyed && _queue.Count == 0) - { - Monitor.Wait(this); - } - - LinkedList<ThreadPoolWorkItem> tmp = queue; - queue = _queue; - _queue = tmp; - - if(_observer != null) - { - _observer.stateChanged(Ice.Instrumentation.ThreadState.ThreadStateIdle, - Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); - inUse = true; - } - } - - foreach(ThreadPoolWorkItem cb in queue) - { - try - { - cb(); - } - catch(Ice.LocalException ex) - { - string s = "exception in asynchronous IO thread:\n" + ex; - _instance.initializationData().logger.error(s); - } - catch(System.Exception ex) - { - string s = "unknown exception in asynchronous IO thread:\n" + ex; - _instance.initializationData().logger.error(s); - } - } - queue.Clear(); - } - - if(_observer != null) - { - _observer.detach(); - } - } - - private Instance _instance; - private bool _destroyed; - private LinkedList<ThreadPoolWorkItem> _queue = new LinkedList<ThreadPoolWorkItem>(); - private Ice.Instrumentation.ThreadObserver _observer; - - private sealed class HelperThread - { - internal HelperThread(AsyncIOThread asyncIOThread) - { - _asyncIOThread = asyncIOThread; - _name = _asyncIOThread._instance.initializationData().properties.getProperty("Ice.ProgramName"); - if(_name.Length > 0) - { - _name += "-"; - } - _name += "Ice.AsyncIOThread"; - } - - public void Join() - { - _thread.Join(); - } - - public string getName() - { - return _name; - } - - public void Start(ThreadPriority priority) - { - _thread = new Thread(new ThreadStart(Run)); - _thread.IsBackground = true; - _thread.Name = _name; - _thread.Start(); - } - - public void Run() - { - _asyncIOThread.run(); - } - - private AsyncIOThread _asyncIOThread; - private string _name; - private Thread _thread; - } - - private HelperThread _thread; - } -} diff --git a/csharp/src/Ice/ConnectionFactory.cs b/csharp/src/Ice/ConnectionFactory.cs index 45e6947dbda..20c9e4dbe25 100644 --- a/csharp/src/Ice/ConnectionFactory.cs +++ b/csharp/src/Ice/ConnectionFactory.cs @@ -8,6 +8,7 @@ namespace IceInternal using System.Collections.Generic; using System.Diagnostics; using System.Text; + using System.Threading.Tasks; public class MultiDictionary<K, V> : Dictionary<K, ICollection<V>> { @@ -1336,7 +1337,7 @@ namespace IceInternal // // Operations from EventHandler. // - public override bool startAsync(int operation, AsyncCallback callback, ref bool completedSynchronously) + public override bool startAsync(int operation, AsyncCallback completedCallback) { if(_state >= StateClosed) { @@ -1344,15 +1345,33 @@ namespace IceInternal } Debug.Assert(_acceptor != null); - try - { - completedSynchronously = _acceptor.startAccept(callback, this); - } - catch(Ice.LocalException ex) - { - _acceptorException = ex; - completedSynchronously = true; - } + + // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the + // Ice thread pool thread is terminated. + Task.Run(() => { + lock (this) + { + if(_state >= StateClosed) + { + completedCallback(this); + return; + } + + try + { + if(_acceptor.startAccept(completedCallback, this)) + { + completedCallback(this); + } + } + catch(Ice.LocalException ex) + { + _acceptorException = ex; + completedCallback(this); + } + } + }); + return true; } @@ -1386,115 +1405,116 @@ namespace IceInternal { Ice.ConnectionI connection = null; - ThreadPoolMessage msg = new ThreadPoolMessage(this); - - lock(this) + using(ThreadPoolMessage msg = new ThreadPoolMessage(current, this)) { - if(!msg.startIOScope(ref current)) + lock(this) { - return; - } - - try - { - if(_state >= StateClosed) - { - return; - } - else if(_state == StateHolding) + if(!msg.startIOScope()) { return; } - // - // Reap closed connections - // - ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); - if(cons != null) + try { - foreach(Ice.ConnectionI c in cons) + if(_state >= StateClosed) { - _connections.Remove(c); + return; + } + else if(_state == StateHolding) + { + return; } - } - - if(!_acceptorStarted) - { - return; - } - // - // Now accept a new connection. - // - Transceiver transceiver = null; - try - { - transceiver = _acceptor.accept(); + // + // Reap closed connections + // + ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); + if(cons != null) + { + foreach(Ice.ConnectionI c in cons) + { + _connections.Remove(c); + } + } - if(_instance.traceLevels().network >= 2) + if(!_acceptorStarted) { - StringBuilder s = new StringBuilder("trying to accept "); - s.Append(_endpoint.protocol()); - s.Append(" connection\n"); - s.Append(transceiver.ToString()); - _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); + return; } - } - catch(Ice.SocketException ex) - { - if(Network.noMoreFds(ex.InnerException)) + + // + // Now accept a new connection. + // + Transceiver transceiver = null; + try { - string s = "can't accept more connections:\n" + ex + '\n' + _acceptor.ToString(); - _instance.initializationData().logger.error(s); - Debug.Assert(_acceptorStarted); - _acceptorStarted = false; - _adapter.getThreadPool().finish(this); - closeAcceptor(); + transceiver = _acceptor.accept(); + + if(_instance.traceLevels().network >= 2) + { + StringBuilder s = new StringBuilder("trying to accept "); + s.Append(_endpoint.protocol()); + s.Append(" connection\n"); + s.Append(transceiver.ToString()); + _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); + } } + catch(Ice.SocketException ex) + { + if(Network.noMoreFds(ex.InnerException)) + { + string s = "can't accept more connections:\n" + ex + '\n' + _acceptor.ToString(); + _instance.initializationData().logger.error(s); + Debug.Assert(_acceptorStarted); + _acceptorStarted = false; + _adapter.getThreadPool().finish(this); + closeAcceptor(); + } - // Ignore socket exceptions. - return; - } - catch(Ice.LocalException ex) - { - // Warn about other Ice local exceptions. - if(_warn) + // Ignore socket exceptions. + return; + } + catch(Ice.LocalException ex) { - warning(ex); + // Warn about other Ice local exceptions. + if(_warn) + { + warning(ex); + } + return; } - return; - } - Debug.Assert(transceiver != null); + Debug.Assert(transceiver != null); - try - { - connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, _monitor, transceiver, - null, _endpoint, _adapter); - } - catch(Ice.LocalException ex) - { try { - transceiver.close(); + connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, _monitor, transceiver, + null, _endpoint, _adapter); } - catch(Ice.LocalException) + catch(Ice.LocalException ex) { - // Ignore - } + try + { + transceiver.close(); + } + catch(Ice.LocalException) + { + // Ignore + } - if(_warn) - { - warning(ex); + if(_warn) + { + warning(ex); + } + return; } - return; - } - _connections.Add(connection); - } - finally - { - msg.finishIOScope(ref current); + _connections.Add(connection); + } + finally + { + msg.finishIOScope(); + } } } diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index 81c4b9f6f98..8c7b0c9fa11 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -566,16 +566,17 @@ namespace Ice if(callback != null) { _threadPool.dispatch(() => - { - try - { - callback(this); - } - catch(System.Exception ex) { - _logger.error("connection callback exception:\n" + ex + '\n' + _desc); - } - } , this); + try + { + callback(this); + } + catch(System.Exception ex) + { + _logger.error("connection callback exception:\n" + ex + '\n' + _desc); + } + }, + this); } } else @@ -1035,45 +1036,67 @@ namespace Ice // // Operations from EventHandler // - public override bool startAsync(int operation, IceInternal.AsyncCallback cb, ref bool completedSynchronously) + public override bool startAsync(int operation, IceInternal.AsyncCallback completedCallback) { if(_state >= StateClosed) { return false; } - try - { - if((operation & SocketOperation.Write) != 0) + // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the + // Ice thread pool thread is terminated (.NET Socket read/write fail with a SocketError.OperationAborted + // error if started from a thread which is later terminated). + Task.Run(() => { + lock(this) { - if(_observer != null) + if(_state >= StateClosed) { - observerStartWrite(_writeStream.getBuffer()); + completedCallback(this); + return; } - bool completed; - completedSynchronously = _transceiver.startWrite(_writeStream.getBuffer(), cb, this, out completed); - if(completed && _sendStreams.Count > 0) + try { - // The whole message is written, assume it's sent now for at-most-once semantics. - _sendStreams.First.Value.isSent = true; + if((operation & SocketOperation.Write) != 0) + { + if(_observer != null) + { + observerStartWrite(_writeStream.getBuffer()); + } + + bool completed; + if (_transceiver.startWrite(_writeStream.getBuffer(), completedCallback, this, out completed)) + { + // If the write completed immediately and the buffer + if (completed && _sendStreams.Count > 0) + { + // The whole message is written, assume it's sent now for at-most-once semantics. + _sendStreams.First.Value.isSent = true; + } + completedCallback(this); + } + } + else if((operation & SocketOperation.Read) != 0) + { + if(_observer != null && !_readHeader) + { + observerStartRead(_readStream.getBuffer()); + } + + if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this)) + { + completedCallback(this); + } + } } - } - else if((operation & SocketOperation.Read) != 0) - { - if(_observer != null && !_readHeader) + catch(LocalException ex) { - observerStartRead(_readStream.getBuffer()); + setState(StateClosed, ex); + completedCallback(this); } - - completedSynchronously = _transceiver.startRead(_readStream.getBuffer(), cb, this); } - } - catch(LocalException ex) - { - setState(StateClosed, ex); - return false; - } + }); + return true; } @@ -1152,274 +1175,276 @@ namespace Ice MessageInfo info = new MessageInfo(); int dispatchCount = 0; - ThreadPoolMessage msg = new ThreadPoolMessage(this); - try + using(ThreadPoolMessage msg = new ThreadPoolMessage(current, this)) { lock(this) { - if(!msg.startIOScope(ref current)) - { - return; - } - - if(_state >= StateClosed) - { - return; - } - - int readyOp = current.operation; try { - unscheduleTimeout(current.operation); - - int writeOp = SocketOperation.None; - int readOp = SocketOperation.None; - if((readyOp & SocketOperation.Write) != 0) + if(!msg.startIOScope()) { - if(_observer != null) - { - observerStartWrite(_writeStream.getBuffer()); - } - writeOp = write(_writeStream.getBuffer()); - if(_observer != null && (writeOp & SocketOperation.Write) == 0) - { - observerFinishWrite(_writeStream.getBuffer()); - } + // No read/write IO is ready for the transceiver. This can call startAsync + return; } - while((readyOp & SocketOperation.Read) != 0) + if(_state >= StateClosed) { - IceInternal.Buffer buf = _readStream.getBuffer(); + return; + } - if(_observer != null && !_readHeader) - { - observerStartRead(buf); - } + int readyOp = current.operation; + try + { + unscheduleTimeout(current.operation); - readOp = read(buf); - if((readOp & SocketOperation.Read) != 0) - { - break; - } - if(_observer != null && !_readHeader) + int writeOp = SocketOperation.None; + int readOp = SocketOperation.None; + if((readyOp & SocketOperation.Write) != 0) { - Debug.Assert(!buf.b.hasRemaining()); - observerFinishRead(buf); + if(_observer != null) + { + observerStartWrite(_writeStream.getBuffer()); + } + writeOp = write(_writeStream.getBuffer()); + if(_observer != null && (writeOp & SocketOperation.Write) == 0) + { + observerFinishWrite(_writeStream.getBuffer()); + } } - if(_readHeader) // Read header if necessary. + while((readyOp & SocketOperation.Read) != 0) { - _readHeader = false; + IceInternal.Buffer buf = _readStream.getBuffer(); - if(_observer != null) + if(_observer != null && !_readHeader) { - _observer.receivedBytes(Protocol.headerSize); + observerStartRead(buf); } - // - // Connection is validated on first message. This is only used by - // setState() to check wether or not we can print a connection - // warning (a client might close the connection forcefully if the - // connection isn't validated, we don't want to print a warning - // in this case). - // - _validated = true; + readOp = read(buf); + if((readOp & SocketOperation.Read) != 0) + { + break; + } + if(_observer != null && !_readHeader) + { + Debug.Assert(!buf.b.hasRemaining()); + observerFinishRead(buf); + } - int pos = _readStream.pos(); - if(pos < Protocol.headerSize) + if(_readHeader) // Read header if necessary. { + _readHeader = false; + + if(_observer != null) + { + _observer.receivedBytes(Protocol.headerSize); + } + // - // This situation is possible for small UDP packets. + // Connection is validated on first message. This is only used by + // setState() to check wether or not we can print a connection + // warning (a client might close the connection forcefully if the + // connection isn't validated, we don't want to print a warning + // in this case). // - throw new IllegalMessageSizeException(); + _validated = true; + + int pos = _readStream.pos(); + if(pos < Protocol.headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw new IllegalMessageSizeException(); + } + + _readStream.pos(0); + byte[] m = new byte[4]; + m[0] = _readStream.readByte(); + m[1] = _readStream.readByte(); + m[2] = _readStream.readByte(); + m[3] = _readStream.readByte(); + if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || + m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) + { + BadMagicException ex = new BadMagicException(); + ex.badMagic = m; + throw ex; + } + + ProtocolVersion pv = new ProtocolVersion(); + pv.ice_readMembers(_readStream); + Protocol.checkSupportedProtocol(pv); + EncodingVersion ev = new EncodingVersion(); + ev.ice_readMembers(_readStream); + Protocol.checkSupportedProtocolEncoding(ev); + + _readStream.readByte(); // messageType + _readStream.readByte(); // compress + int size = _readStream.readInt(); + if(size < Protocol.headerSize) + { + throw new IllegalMessageSizeException(); + } + + if(size > _messageSizeMax) + { + Ex.throwMemoryLimitException(size, _messageSizeMax); + } + if(size > _readStream.size()) + { + _readStream.resize(size); + } + _readStream.pos(pos); } - _readStream.pos(0); - byte[] m = new byte[4]; - m[0] = _readStream.readByte(); - m[1] = _readStream.readByte(); - m[2] = _readStream.readByte(); - m[3] = _readStream.readByte(); - if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || - m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) + if(buf.b.hasRemaining()) { - BadMagicException ex = new BadMagicException(); - ex.badMagic = m; - throw ex; + if(_endpoint.datagram()) + { + throw new DatagramLimitException(); // The message was truncated. + } + continue; } + break; + } - ProtocolVersion pv = new ProtocolVersion(); - pv.ice_readMembers(_readStream); - Protocol.checkSupportedProtocol(pv); - EncodingVersion ev = new EncodingVersion(); - ev.ice_readMembers(_readStream); - Protocol.checkSupportedProtocolEncoding(ev); - - _readStream.readByte(); // messageType - _readStream.readByte(); // compress - int size = _readStream.readInt(); - if(size < Protocol.headerSize) - { - throw new IllegalMessageSizeException(); - } + int newOp = readOp | writeOp; + readyOp &= ~newOp; + Debug.Assert(readyOp != 0 || newOp != 0); - if(size > _messageSizeMax) + if(_state <= StateNotValidated) + { + if(newOp != 0) { - Ex.throwMemoryLimitException(size, _messageSizeMax); + // + // Wait for all the transceiver conditions to be + // satisfied before continuing. + // + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); + return; } - if(size > _readStream.size()) + + if(_state == StateNotInitialized && !initialize(current.operation)) { - _readStream.resize(size); + return; } - _readStream.pos(pos); - } - if(buf.b.hasRemaining()) - { - if(_endpoint.datagram()) + if(_state <= StateNotValidated && !validate(current.operation)) { - throw new DatagramLimitException(); // The message was truncated. + return; } - continue; - } - break; - } - int newOp = readOp | writeOp; - readyOp &= ~newOp; - Debug.Assert(readyOp != 0 || newOp != 0); + _threadPool.unregister(this, current.operation); - if(_state <= StateNotValidated) - { - if(newOp != 0) - { // - // Wait for all the transceiver conditions to be - // satisfied before continuing. + // We start out in holding state. // - scheduleTimeout(newOp); - _threadPool.update(this, current.operation, newOp); - return; - } - - if(_state == StateNotInitialized && !initialize(current.operation)) - { - return; + setState(StateHolding); + if(_startCallback != null) + { + startCB = _startCallback; + _startCallback = null; + if(startCB != null) + { + ++dispatchCount; + } + } } - - if(_state <= StateNotValidated && !validate(current.operation)) + else { - return; - } + Debug.Assert(_state <= StateClosingPending); - _threadPool.unregister(this, current.operation); - - // - // We start out in holding state. - // - setState(StateHolding); - if(_startCallback != null) - { - startCB = _startCallback; - _startCallback = null; - if(startCB != null) + // + // We parse messages first, if we receive a close + // connection message we won't send more messages. + // + if((readyOp & SocketOperation.Read) != 0) { - ++dispatchCount; + newOp |= parseMessage(ref info); + dispatchCount += info.messageDispatchCount; } - } - } - else - { - Debug.Assert(_state <= StateClosingPending); - // - // We parse messages first, if we receive a close - // connection message we won't send more messages. - // - if((readyOp & SocketOperation.Read) != 0) - { - newOp |= parseMessage(ref info); - dispatchCount += info.messageDispatchCount; - } + if((readyOp & SocketOperation.Write) != 0) + { + newOp |= sendNextMessage(out sentCBs); + if(sentCBs != null) + { + ++dispatchCount; + } + } - if((readyOp & SocketOperation.Write) != 0) - { - newOp |= sendNextMessage(out sentCBs); - if(sentCBs != null) + if(_state < StateClosed) { - ++dispatchCount; + scheduleTimeout(newOp); + _threadPool.update(this, current.operation, newOp); } } - if(_state < StateClosed) + if(_acmLastActivity > -1) { - scheduleTimeout(newOp); - _threadPool.update(this, current.operation, newOp); + _acmLastActivity = Time.currentMonotonicTimeMillis(); } - } - if(_acmLastActivity > -1) - { - _acmLastActivity = Time.currentMonotonicTimeMillis(); - } - - if(dispatchCount == 0) - { - return; // Nothing to dispatch we're done! - } + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } - _dispatchCount += dispatchCount; + _dispatchCount += dispatchCount; - msg.completed(ref current); - } - catch(DatagramLimitException) // Expected. - { - if(_warnUdp) - { - _logger.warning(string.Format("maximum datagram size of {0} exceeded", _readStream.pos())); + msg.ioCompleted(); } - _readStream.resize(Protocol.headerSize); - _readStream.pos(0); - _readHeader = true; - return; - } - catch(SocketException ex) - { - setState(StateClosed, ex); - return; - } - catch(LocalException ex) - { - if(_endpoint.datagram()) + catch(DatagramLimitException) // Expected. { - if(_warn) + if(_warnUdp) { - _logger.warning(string.Format("datagram connection exception:\n{0}\n{1}", ex, _desc)); + _logger.warning(string.Format("maximum datagram size of {0} exceeded", _readStream.pos())); } _readStream.resize(Protocol.headerSize); _readStream.pos(0); _readHeader = true; + return; } - else + catch(SocketException ex) { setState(StateClosed, ex); + return; } - return; + catch(LocalException ex) + { + if(_endpoint.datagram()) + { + if(_warn) + { + _logger.warning(string.Format("datagram connection exception:\n{0}\n{1}", ex, _desc)); + } + _readStream.resize(Protocol.headerSize); + _readStream.pos(0); + _readHeader = true; + } + else + { + setState(StateClosed, ex); + } + return; + } + } + finally + { + msg.finishIOScope(); + } + } - ThreadPoolCurrent c = current; - _threadPool.dispatch(() => + _threadPool.dispatchFromThisThread(() => { dispatch(startCB, sentCBs, info); - msg.destroy(ref c); - }, this); - } + }, + this); } - finally - { - msg.finishIOScope(ref current); - } - } private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info) @@ -1546,7 +1571,7 @@ namespace Ice // // If there are no callbacks to call, we don't call ioCompleted() since we're not going // to call code that will potentially block (this avoids promoting a new leader and - // unecessary thread creation, especially if this is called on shutdown). + // unnecessary thread creation, especially if this is called on shutdown). // if(_startCallback == null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback == null && _heartbeatCallback == null) @@ -1555,13 +1580,9 @@ namespace Ice return; } - // - // Unlike C++/Java, this method is called from an IO thread of the .NET thread - // pool of from the communicator async IO thread. While it's fine to handle the - // non-blocking activity of the connection from these threads, the dispatching - // of the message must be taken care of by the Ice thread pool. - // - _threadPool.dispatch(finish, this); + current.ioCompleted(); + + _threadPool.dispatchFromThisThread(finish, this); } private void finish() diff --git a/csharp/src/Ice/EventHandler.cs b/csharp/src/Ice/EventHandler.cs index c5560b631ee..c4d5d4d82a7 100644 --- a/csharp/src/Ice/EventHandler.cs +++ b/csharp/src/Ice/EventHandler.cs @@ -10,7 +10,7 @@ public abstract class EventHandler // // Called to start a new asynchronous read or write operation. // - abstract public bool startAsync(int op, AsyncCallback cb, ref bool completedSynchronously); + abstract public bool startAsync(int op, AsyncCallback cb); abstract public bool finishAsync(int op); diff --git a/csharp/src/Ice/Instance.cs b/csharp/src/Ice/Instance.cs index 16054521058..e164d799061 100644 --- a/csharp/src/Ice/Instance.cs +++ b/csharp/src/Ice/Instance.cs @@ -228,25 +228,6 @@ namespace IceInternal } } - public AsyncIOThread - asyncIOThread() - { - lock(this) - { - if(_state == StateDestroyed) - { - throw new Ice.CommunicatorDestroyedException(); - } - - if(_asyncIOThread == null) // Lazy initialization. - { - _asyncIOThread = new AsyncIOThread(this); - } - - return _asyncIOThread; - } - } - public EndpointHostResolver endpointHostResolver() { lock(this) @@ -1285,10 +1266,6 @@ namespace IceInternal { _clientThreadPool.destroy(); } - if(_asyncIOThread != null) - { - _asyncIOThread.destroy(); - } if(_endpointHostResolver != null) { _endpointHostResolver.destroy(); @@ -1309,10 +1286,6 @@ namespace IceInternal { _serverThreadPool.joinWithAllThreads(); } - if(_asyncIOThread != null) - { - _asyncIOThread.joinWithThread(); - } if(_endpointHostResolver != null) { _endpointHostResolver.joinWithThread(); @@ -1373,7 +1346,6 @@ namespace IceInternal _serverThreadPool = null; _clientThreadPool = null; - _asyncIOThread = null; _endpointHostResolver = null; _timer = null; @@ -1503,10 +1475,6 @@ namespace IceInternal { _endpointHostResolver.updateObserver(); } - if(_asyncIOThread != null) - { - _asyncIOThread.updateObserver(); - } if(_timer != null) { _timer.updateObserver(_initData.observer); @@ -1638,7 +1606,6 @@ namespace IceInternal private NetworkProxy _networkProxy; private ThreadPool _clientThreadPool; private ThreadPool _serverThreadPool; - private AsyncIOThread _asyncIOThread; private EndpointHostResolver _endpointHostResolver; private Timer _timer; private RetryQueue _retryQueue; diff --git a/csharp/src/Ice/ThreadPool.cs b/csharp/src/Ice/ThreadPool.cs index cad6db97da2..c113ec1ab57 100644 --- a/csharp/src/Ice/ThreadPool.cs +++ b/csharp/src/Ice/ThreadPool.cs @@ -4,12 +4,14 @@ namespace IceInternal { + using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; - public delegate void ThreadPoolWorkItem(); + public delegate void ThreadPoolWorkItem(ThreadPoolCurrent current); + public delegate void AsyncCallback(object state); // @@ -30,7 +32,7 @@ namespace IceInternal // Dispatch the continuation on the thread pool if this isn't called // already from a thread pool thread. We don't use the dispatcher // for the continuations, the dispatcher is only used when the - // call is initialy invoked (e.g.: a servant dispatch after being + // call is initially invoked (e.g.: a servant dispatch after being // received is dispatched using the dispatcher which might dispatch // the call on the UI thread which will then use its own synchronization // context to execute continuations). @@ -38,7 +40,7 @@ namespace IceInternal var ctx = Current as ThreadPoolSynchronizationContext; if(ctx != this) { - _threadPool.dispatch(() => { d(state); }, null, false); + _threadPool.dispatch(() => { d(state); }, null); } else { @@ -54,100 +56,98 @@ namespace IceInternal private ThreadPool _threadPool; } - internal struct ThreadPoolMessage + + internal class ThreadPoolMessage : IDisposable { - public ThreadPoolMessage(object mutex) + public ThreadPoolMessage(ThreadPoolCurrent current, object mutex) { + _current = current; _mutex = mutex; _finish = false; _finishWithIO = false; } - public bool startIOScope(ref ThreadPoolCurrent current) + public bool startIOScope() { // This must be called with the handler locked. - _finishWithIO = current.startMessage(); + _finishWithIO = _current.startMessage(); return _finishWithIO; } - public void finishIOScope(ref ThreadPoolCurrent current) + public void finishIOScope() { if(_finishWithIO) { - lock(_mutex) - { - current.finishMessage(true); - } + // This must be called with the handler locked. + _current.finishMessage(); } } - public void completed(ref ThreadPoolCurrent current) + public void ioCompleted() { // // Call finishMessage once IO is completed only if serialization is not enabled. // Otherwise, finishMessage will be called when the event handler is done with - // the message (it will be called from destroy below). + // the message (it will be called from Dispose below). // Debug.Assert(_finishWithIO); - if(current.ioCompleted()) + if(_current.ioCompleted()) { _finishWithIO = false; _finish = true; } } - public void destroy(ref ThreadPoolCurrent current) + public void Dispose() { if(_finish) { // - // A ThreadPoolMessage instance must be created outside the synchronization - // of the event handler. We need to lock the event handler here to call - // finishMessage. + // A ThreadPoolMessage instance must be created outside the synchronization of the event handler. We + // need to lock the event handler here to call finishMessage. // lock(_mutex) { - current.finishMessage(false); - Debug.Assert(!current.completedSynchronously); + _current.finishMessage(); } } } + private ThreadPoolCurrent _current; private object _mutex; private bool _finish; private bool _finishWithIO; } - public struct ThreadPoolCurrent + public class ThreadPoolCurrent { - public ThreadPoolCurrent(ThreadPool threadPool, EventHandler handler, int op) + internal ThreadPoolCurrent(ThreadPool threadPool, ThreadPool.WorkerThread thread) { _threadPool = threadPool; - _handler = handler; - operation = op; - completedSynchronously = false; + _thread = thread; } - public readonly int operation; - public bool completedSynchronously; + public int operation; public bool ioCompleted() { - return _threadPool.serialize(); + return _threadPool.ioCompleted(this); } public bool startMessage() { - return _threadPool.startMessage(ref this); + return _threadPool.startMessage(this); } - public void finishMessage(bool fromIOThread) + public void finishMessage() { - _threadPool.finishMessage(ref this, fromIOThread); + _threadPool.finishMessage(this); } internal readonly ThreadPool _threadPool; - internal readonly EventHandler _handler; + internal readonly ThreadPool.WorkerThread _thread; + internal bool _ioCompleted; + internal EventHandler _handler; } public sealed class ThreadPool : System.Threading.Tasks.TaskScheduler @@ -251,7 +251,7 @@ namespace IceInternal _threads = new List<WorkerThread>(); for(int i = 0; i < _size; ++i) { - WorkerThread thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); + var thread = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); thread.start(_priority); _threads.Add(thread); } @@ -329,18 +329,12 @@ namespace IceInternal if((add & SocketOperation.Read) != 0 && (handler._pending & SocketOperation.Read) == 0) { handler._pending |= SocketOperation.Read; - executeNonBlocking(() => - { - messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Read)); - }); + queueReadyForIOHandler(handler, SocketOperation.Read); } else if((add & SocketOperation.Write) != 0 && (handler._pending & SocketOperation.Write) == 0) { handler._pending |= SocketOperation.Write; - executeNonBlocking(() => - { - messageCallback(new ThreadPoolCurrent(this, handler, SocketOperation.Write)); - }); + queueReadyForIOHandler(handler, SocketOperation.Write); } } } @@ -363,11 +357,13 @@ namespace IceInternal // if(handler._pending == 0) { - executeNonBlocking(() => - { - ThreadPoolCurrent current = new ThreadPoolCurrent(this, handler, SocketOperation.None); - handler.finished(ref current); - }); + _workItems.Enqueue(current => + { + current.operation = SocketOperation.None; + current._handler = handler; + handler.finished(ref current); + }); + Monitor.Pulse(this); } else { @@ -376,13 +372,13 @@ namespace IceInternal } } - public void dispatchFromThisThread(System.Action call, Ice.Connection con) + public void dispatchFromThisThread(System.Action call, Ice.Connection connection) { if(_dispatcher != null) { try { - _dispatcher(call, con); + _dispatcher(call, connection); } catch(System.Exception ex) { @@ -399,7 +395,7 @@ namespace IceInternal } } - public void dispatch(System.Action call, Ice.Connection con, bool useDispatcher = true) + public void dispatch(Action workItem, Ice.Connection connection) { lock(this) { @@ -407,63 +403,21 @@ namespace IceInternal { throw new Ice.CommunicatorDestroyedException(); } - - if(useDispatcher) - { - _workItems.Enqueue(() => { dispatchFromThisThread(call, con); }); - } - else - { - _workItems.Enqueue(() => { call(); }); - } - Monitor.Pulse(this); - - // - // If this is a dynamic thread pool which can still grow and if all threads are - // currently busy dispatching or about to dispatch, we spawn a new thread to - // execute this new work item right away. - // - if(_threads.Count < _sizeMax && - (_inUse + _workItems.Count) > _threads.Count && - !_destroyed) - { - if(_instance.traceLevels().threadPool >= 1) - { - string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1); - _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); - } - - try + _workItems.Enqueue(current => { - WorkerThread t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); - t.start(_priority); - _threads.Add(t); - } - catch(System.Exception ex) - { - string s = "cannot create thread for `" + _prefix + "':\n" + ex; - _instance.initializationData().logger.error(s); - } - } - } - } - - public void executeNonBlocking(ThreadPoolWorkItem workItem) - { - lock(this) - { - Debug.Assert(!_destroyed); - _instance.asyncIOThread().queue(workItem); + current.ioCompleted(); + dispatchFromThisThread(workItem, connection); + }); + Monitor.Pulse(this); } } public void joinWithAllThreads() { // - // _threads is immutable after destroy() has been called, - // therefore no synchronization is needed. (Synchronization - // wouldn't be possible here anyway, because otherwise the - // other threads would never terminate.) + // _threads is immutable after destroy() has been called, therefore no synchronization is needed. + // (Synchronization wouldn't be possible here anyway, because otherwise the other threads would never + // terminate.) // Debug.Assert(_destroyed); foreach(WorkerThread thread in _threads) @@ -484,7 +438,7 @@ namespace IceInternal protected sealed override void QueueTask(System.Threading.Tasks.Task task) { - dispatch(() => { TryExecuteTask(task); }, null, _dispatcher != null); + dispatch(() => { TryExecuteTask(task); }, null); } protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) @@ -507,25 +461,38 @@ namespace IceInternal return new System.Threading.Tasks.Task[0]; } - private void run(WorkerThread thread) + private void queueReadyForIOHandler(EventHandler handler, int operation) { - ThreadPoolWorkItem workItem = null; - while(true) + lock(this) { - lock(this) - { - if(workItem != null) + Debug.Assert(!_destroyed); + _workItems.Enqueue(current => { - Debug.Assert(_inUse > 0); - --_inUse; - if(_workItems.Count == 0) + current._handler = handler; + current.operation = operation; + try { - thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); + current._handler.message(ref current); } - } - - workItem = null; + catch(System.Exception ex) + { + string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + + current._handler.ToString(); + _instance.initializationData().logger.error(s); + } + }); + Monitor.Pulse(this); + } + } + private void run(WorkerThread thread) + { + var current = new ThreadPoolCurrent(this, thread); + while(true) + { + ThreadPoolWorkItem workItem = null; + lock(this) + { while(_workItems.Count == 0) { if(_destroyed) @@ -541,15 +508,8 @@ namespace IceInternal { return; } - else if(_serverIdleTime == 0 || _threads.Count > 1) + else if (_inUse < _threads.Count - 1) { - // - // If not the last thread or if server idle time isn't configured, - // we can exit. Unlike C++/Java, there's no need to have a thread - // always spawned in the thread pool because all the IO is done - // by the .NET thread pool threads. Instead, we'll just spawn a - // new thread when needed (i.e.: when a new work item is queued). - // if(_instance.traceLevels().threadPool >= 1) { string s = "shrinking " + _prefix + ": Size=" + (_threads.Count - 1); @@ -558,32 +518,39 @@ namespace IceInternal } _threads.Remove(thread); - _instance.asyncIOThread().queue(() => + _workItems.Enqueue(c => { + // No call to ioCompleted, this shouldn't block (and we don't want to cause + // a new thread to be started). thread.join(); }); + Monitor.Pulse(this); return; } - else + else if (_inUse > 0) { - Debug.Assert(_serverIdleTime > 0 && _inUse == 0 && _threads.Count == 1); - if(!Monitor.Wait(this, _serverIdleTime * 1000) && - _workItems.Count == 0) - { - if(!_destroyed) + // + // If this is the last idle thread but there are still other threads + // busy dispatching, we go back waiting with _threadIdleTime. We only + // wait with _serverIdleTime when there's only one thread left. + // + continue; + } + + Debug.Assert(_threads.Count == 1); + if(!Monitor.Wait(this, _serverIdleTime * 1000) && !_destroyed) + { + _workItems.Enqueue(c => { - _workItems.Enqueue(() => - { - try - { - _instance.objectAdapterFactory().shutdown(); - } - catch(Ice.CommunicatorDestroyedException) - { - } - }); - } - } + c.ioCompleted(); + try + { + _instance.objectAdapterFactory().shutdown(); + } + catch(Ice.CommunicatorDestroyedException) + { + } + }); } } } @@ -596,32 +563,78 @@ namespace IceInternal Debug.Assert(_workItems.Count > 0); workItem = _workItems.Dequeue(); + current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); + current._ioCompleted = false; + } + + try + { + workItem(current); + } + catch(System.Exception ex) + { + string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n'; + _instance.initializationData().logger.error(s); + } + + lock (this) + { + if (_sizeMax > 1 && current._ioCompleted) + { + Debug.Assert(_inUse > 0); + --_inUse; + } + thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); + } + } + } + + public bool ioCompleted(ThreadPoolCurrent current) + { + lock(this) + { + current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. + + current._thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); + + if(_sizeMax > 1) + { Debug.Assert(_inUse >= 0); ++_inUse; - thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForUser); - if(_sizeMax > 1 && _inUse == _sizeWarn) { string s = "thread pool `" + _prefix + "' is running low on threads\n" + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; _instance.initializationData().logger.warning(s); } - } - try - { - workItem(); - } - catch(System.Exception ex) - { - string s = "exception in `" + _prefix + "' while calling on work item:\n" + ex + '\n'; - _instance.initializationData().logger.error(s); + if(!_destroyed && _inUse < _sizeMax && _inUse == _threads.Count) + { + if(_instance.traceLevels().threadPool >= 1) + { + string s = "growing " + _prefix + ": Size = " + (_threads.Count + 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } + + try + { + var t = new WorkerThread(this, _threadPrefix + "-" + _threadIndex++); + t.start(_priority); + _threads.Add(t); + } + catch(System.Exception ex) + { + string s = "cannot create thread for `" + _prefix + "':\n" + ex; + _instance.initializationData().logger.error(s); + } + } } } + return _serialize; } - public bool startMessage(ref ThreadPoolCurrent current) + public bool startMessage(ThreadPoolCurrent current) { Debug.Assert((current._handler._pending & current.operation) != 0); @@ -644,8 +657,7 @@ namespace IceInternal (current._handler._registered & current.operation) != 0) { Debug.Assert((current._handler._started & current.operation) == 0); - bool completed = false; - if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed)) + if(!current._handler.startAsync(current.operation, getCallback(current.operation))) { current._handler._pending &= ~current.operation; if(current._handler._pending == 0 && current._handler._finish) @@ -656,7 +668,6 @@ namespace IceInternal } else { - current.completedSynchronously = completed; current._handler._started |= current.operation; return false; } @@ -679,32 +690,19 @@ namespace IceInternal } } - public void finishMessage(ref ThreadPoolCurrent current, bool fromIOThread) + public void finishMessage(ThreadPoolCurrent current) { if((current._handler._registered & current.operation) != 0) { - if(fromIOThread) + Debug.Assert((current._handler._ready & current.operation) == 0); + if(!current._handler.startAsync(current.operation, getCallback(current.operation))) { - Debug.Assert((current._handler._ready & current.operation) == 0); - bool completed = false; - if(!current._handler.startAsync(current.operation, getCallback(current.operation), ref completed)) - { - current._handler._pending &= ~current.operation; - } - else - { - Debug.Assert((current._handler._pending & current.operation) != 0); - current.completedSynchronously = completed; - current._handler._started |= current.operation; - } + current._handler._pending &= ~current.operation; } else { - ThreadPoolCurrent c = current; - executeNonBlocking(() => - { - messageCallback(c); - }); + Debug.Assert((current._handler._pending & current.operation) != 0); + current._handler._started |= current.operation; } } else @@ -719,55 +717,13 @@ namespace IceInternal } } - public void asyncReadCallback(object state) - { - messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Read)); - } - - public void asyncWriteCallback(object state) - { - messageCallback(new ThreadPoolCurrent(this, (EventHandler)state, SocketOperation.Write)); - } - - public void messageCallback(ThreadPoolCurrent current) - { - try - { - do - { - current.completedSynchronously = false; - current._handler.message(ref current); - } - while(current.completedSynchronously); - } - catch(System.Exception ex) - { - string s = "exception in `" + _prefix + "':\n" + ex + "\nevent handler: " + current._handler.ToString(); - _instance.initializationData().logger.error(s); - } - } - private AsyncCallback getCallback(int operation) { - switch(operation) - { - case SocketOperation.Read: - return asyncReadCallback; - case SocketOperation.Write: - return asyncWriteCallback; - default: - Debug.Assert(false); - return null; - } + Debug.Assert(operation == SocketOperation.Read || operation == SocketOperation.Write); + return state => queueReadyForIOHandler((EventHandler)state, operation); } - private Instance _instance; - private System.Action<System.Action, Ice.Connection> _dispatcher; - private bool _destroyed; - private readonly string _prefix; - private readonly string _threadPrefix; - - private sealed class WorkerThread + internal sealed class WorkerThread { private ThreadPool _threadPool; private Ice.Instrumentation.ThreadObserver _observer; @@ -890,6 +846,11 @@ namespace IceInternal private Thread _thread; } + private Instance _instance; + private System.Action<System.Action, Ice.Connection> _dispatcher; + private bool _destroyed; + private readonly string _prefix; + private readonly string _threadPrefix; private readonly int _size; // Number of threads that are pre-created. private readonly int _sizeMax; // Maximum number of threads. private readonly int _sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed. diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs index de9d5452b4b..c8d75f64846 100644 --- a/csharp/test/Ice/ami/AllTests.cs +++ b/csharp/test/Ice/ami/AllTests.cs @@ -3842,6 +3842,37 @@ namespace Ice } output.WriteLine("ok"); + output.Write("testing back pressure... "); + output.Flush(); + { + // Keep the 3 server thread pool threads busy. + Task sleep1Task = p.sleepAsync(1000); + Task sleep2Task = p.sleepAsync(1000); + Task sleep3Task = p.sleepAsync(1000); + bool canceled = false; + using(var cts = new CancellationTokenSource(200)) + { + try + { + var onewayProxy = (Test.TestIntfPrx)p.ice_oneway(); + + // Sending should be canceled because the TCP send/receive buffer size on the server is set + // to 50KB. Note: we don't use the cancel parameter of the operation here because the + // cancellation doesn't cancel the operation whose payload is being sent. + onewayProxy.opWithPayloadAsync(new byte[768 * 1024]).Wait(cts.Token); + } + catch(OperationCanceledException) + { + canceled = true; + } + } + test(canceled && !sleep1Task.IsCompleted); + sleep1Task.Wait(); + sleep2Task.Wait(); + sleep3Task.Wait(); + } + output.WriteLine("ok"); + p.shutdown(); } } diff --git a/csharp/test/Ice/metrics/AllTests.cs b/csharp/test/Ice/metrics/AllTests.cs index 7daafb02afc..8eb0921b425 100644 --- a/csharp/test/Ice/metrics/AllTests.cs +++ b/csharp/test/Ice/metrics/AllTests.cs @@ -529,7 +529,7 @@ public class AllTests : Test.AllTests test(view["Connection"].Length == 1 && view["Connection"][0].current == 1 && view["Connection"][0].total == 1); } - test(view["Thread"].Length == 1 && view["Thread"][0].current == 5 && view["Thread"][0].total == 5); + test(view["Thread"].Length == 1 && view["Thread"][0].current == 4 && view["Thread"][0].total == 4); output.WriteLine("ok"); output.Write("testing group by id..."); @@ -548,7 +548,7 @@ public class AllTests : Test.AllTests waitForCurrent(serverMetrics, "View", "Dispatch", 0); view = clientMetrics.getMetricsView("View", out timestamp); - test(view["Thread"].Length == 5); + test(view["Thread"].Length == 4); if(!collocated) { test(view["Connection"].Length == 2); diff --git a/java/test/src/main/java/test/Ice/ami/AllTests.java b/java/test/src/main/java/test/Ice/ami/AllTests.java index 10fb6223cd7..a15bfb7cc78 100644 --- a/java/test/src/main/java/test/Ice/ami/AllTests.java +++ b/java/test/src/main/java/test/Ice/ami/AllTests.java @@ -9,9 +9,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import com.zeroc.Ice.InvocationFuture; import com.zeroc.Ice.Util; +import com.zeroc.IceInternal.Time; import com.zeroc.Ice.CompressBatch; import test.Ice.ami.Test.CloseMode; @@ -1103,6 +1106,44 @@ public class AllTests } out.println("ok"); + if (p.ice_getConnection() != null) + { + out.print("testing back pressure... "); + out.flush(); + try + { + // Keep the 3 server thread pool threads busy. + CompletableFuture<Void> sleep1Future = p.sleepAsync(1000); + CompletableFuture<Void> sleep2Future = p.sleepAsync(1000); + CompletableFuture<Void> sleep3Future = p.sleepAsync(1000); + TestIntfPrx onewayProxy = p.ice_oneway(); + + // Sending should block because the TCP send/receive buffer size on the server is set to 50KB. + CompletableFuture<Void> future = onewayProxy.opWithPayloadAsync(new byte[768 * 1024]); + boolean timeout = false; + try + { + future.get(200, TimeUnit.MILLISECONDS); + } + catch(TimeoutException ex) + { + timeout = true; + } + test(timeout); + test(!sleep1Future.isDone()); + + sleep1Future.get(); + sleep2Future.get(); + sleep3Future.get(); + } + catch(Exception ex) + { + out.println(ex.toString()); + test(false); + } + out.println("ok"); + } + executor.shutdown(); p.shutdown(); |