diff options
author | Matthew Newhook <matthew@zeroc.com> | 2014-07-10 17:17:55 -0230 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2014-07-10 17:17:55 -0230 |
commit | 9326917abfa8ebd9815e192db2dd57cd8e85179d (patch) | |
tree | 26045ee4f12e9eab55cbeaefb6a3855ac16d6f50 | |
parent | Partial fix for ICE-5548: better C++11 detection (diff) | |
download | ice-9326917abfa8ebd9815e192db2dd57cd8e85179d.tar.bz2 ice-9326917abfa8ebd9815e192db2dd57cd8e85179d.tar.xz ice-9326917abfa8ebd9815e192db2dd57cd8e85179d.zip |
ICE-5573 - .NET TimeoutException ignores Dispatcher
- Added test for C++, C# & Java to the dispatcher test
to ensure that the invocation timeout is throwin
in the correct thread.
- Pass the connection to the dispatcher invocation in
more cases.
26 files changed, 405 insertions, 209 deletions
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index 96b5d96bd2d..360ddf18acb 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -159,6 +159,7 @@ protected: const CommunicatorPtr _communicator; const IceInternal::InstancePtr _instance; const std::string& _operation; + Ice::ConnectionPtr _cachedConnection; const IceInternal::CallbackBasePtr _callback; const LocalObjectPtr _cookie; diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 7ff4609d924..c3b4a4b160d 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -29,7 +29,8 @@ class FlushRequestsWithException : public DispatchWorkItem { public: - FlushRequestsWithException(const ConnectRequestHandlerPtr& handler) : _handler(handler) + FlushRequestsWithException(const Ice::ConnectionPtr& connection, const ConnectRequestHandlerPtr& handler) : + DispatchWorkItem(connection), _handler(handler) { } @@ -48,7 +49,8 @@ class FlushSentRequests : public DispatchWorkItem { public: - FlushSentRequests(const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : _callbacks(callbacks) + FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : + DispatchWorkItem(connection), _callbacks(callbacks) { } @@ -367,7 +369,7 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex) // if(!_requests.empty()) { - _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this)); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(_connection, this)); } notifyAll(); @@ -483,19 +485,19 @@ ConnectRequestHandler::flushRequests() Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(ex.get()->ice_clone()); - _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this)); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(_connection, this)); } catch(const Ice::LocalException& ex) { Lock sync(*this); assert(!_exception.get() && !_requests.empty()); _exception.reset(ex.ice_clone()); - _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this)); + _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(_connection, this)); } if(!sentCallbacks.empty()) { - _reference->getInstance()->clientThreadPool()->execute(new FlushSentRequests(sentCallbacks)); + _reference->getInstance()->clientThreadPool()->execute(new FlushSentRequests(_connection, sentCallbacks)); } // diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 3fb2c8ed08b..8b12805ba4f 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -51,8 +51,8 @@ class AsynchronousException : public DispatchWorkItem { public: - AsynchronousException(const Ice::AsyncResultPtr& result, const Ice::Exception& ex) : - _result(result), _exception(ex.ice_clone()) + AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, const Ice::Exception& ex) : + DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone()) { } @@ -72,7 +72,8 @@ class AsynchronousSent : public DispatchWorkItem { public: - AsynchronousSent(const Ice::AsyncResultPtr& result) : _result(result) + AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) : + DispatchWorkItem(connection), _result(result) { } @@ -91,8 +92,8 @@ class AsynchronousTimeout : public DispatchWorkItem { public: - AsynchronousTimeout(const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) : - _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result)) + AsynchronousTimeout(const Ice::ConnectionPtr& connection, const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) : + DispatchWorkItem(connection), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result)) { assert(_outAsync); } @@ -268,7 +269,7 @@ Ice::AsyncResult::__invokeSentAsync() // try { - _instance->clientThreadPool()->execute(new AsynchronousSent(this)); + _instance->clientThreadPool()->execute(new AsynchronousSent(_cachedConnection, this)); } catch(const Ice::CommunicatorDestroyedException&) { @@ -300,7 +301,7 @@ Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) // CommunicatorDestroyedException is the only exception that can propagate directly // from this method. // - _instance->clientThreadPool()->execute(new AsynchronousException(this, ex)); + _instance->clientThreadPool()->execute(new AsynchronousException(_cachedConnection, this, ex)); } void @@ -342,7 +343,16 @@ Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() if(handler) { - _instance->clientThreadPool()->execute(new AsynchronousTimeout(handler, this)); + Ice::ConnectionPtr connection; + try + { + connection = handler->getConnection(false); + } + catch(const Ice::LocalException&) + { + // Ignore. + } + _instance->clientThreadPool()->execute(new AsynchronousTimeout(connection, handler, this)); } } @@ -507,6 +517,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod AsyncStatus IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response) { + _cachedConnection = connection; return connection->sendAsyncRequest(this, compress, response); } @@ -838,6 +849,7 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu AsyncStatus IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) { + _cachedConnection = connection; return connection->flushAsyncBatchRequests(this); } diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index d7884f54964..d7c26aced63 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -141,6 +141,14 @@ class ThreadPoolDestroyedException } + +IceInternal::DispatchWorkItem::DispatchWorkItem() { +} + +IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection) { + +} + void IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current) { @@ -150,7 +158,7 @@ IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current) { try { - dispatcher->dispatch(this, 0); + dispatcher->dispatch(this, _connection); } catch(const std::exception& ex) { diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 6d2c9d283e5..f4df601605c 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -78,7 +78,6 @@ public: } void finish(const EventHandlerPtr&); void execute(const ThreadPoolWorkItemPtr&); - void joinWithAllThreads(); std::string prefix() const; @@ -184,9 +183,15 @@ public: class DispatchWorkItem : public ThreadPoolWorkItem, public Ice::DispatcherCall { -private: +public: + + DispatchWorkItem(); + DispatchWorkItem(const Ice::ConnectionPtr& connection); + private: + virtual void execute(ThreadPoolCurrent&); + Ice::ConnectionPtr _connection; }; class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex diff --git a/cpp/test/Ice/dispatcher/AllTests.cpp b/cpp/test/Ice/dispatcher/AllTests.cpp index 7e82afcf37d..a4a7cc25c5c 100644 --- a/cpp/test/Ice/dispatcher/AllTests.cpp +++ b/cpp/test/Ice/dispatcher/AllTests.cpp @@ -52,6 +52,18 @@ public: called(); } + void responseEx() + { + test(false); + } + + void exceptionEx(const ::Ice::Exception& ex) + { + test(dynamic_cast<const Ice::InvocationTimeoutException*>(&ex)); + test(Dispatcher::isDispatcherThread()); + called(); + } + void payload() { @@ -123,6 +135,15 @@ allTests(const Ice::CommunicatorPtr& communicator) i->begin_op(callback); cb->check(); + { + // + // Expect InvocationTimeoutException. + // + Test::TestIntfPrx to = p->ice_invocationTimeout(250); + to->begin_sleep(500, Test::newCallback_TestIntf_sleep(cb, &Callback::responseEx, &Callback::exceptionEx)); + cb->check(); + } + testController->holdAdapter(); Test::Callback_TestIntf_opWithPayloadPtr callback2 = diff --git a/cpp/test/Ice/dispatcher/Test.ice b/cpp/test/Ice/dispatcher/Test.ice index 857acae24a9..0379b4c7f4c 100644 --- a/cpp/test/Ice/dispatcher/Test.ice +++ b/cpp/test/Ice/dispatcher/Test.ice @@ -18,6 +18,7 @@ module Test interface TestIntf { void op(); + void sleep(int to); void opWithPayload(Ice::ByteSeq seq); void shutdown(); }; diff --git a/cpp/test/Ice/dispatcher/TestI.cpp b/cpp/test/Ice/dispatcher/TestI.cpp index 2a56d659f82..9140fc55fb0 100644 --- a/cpp/test/Ice/dispatcher/TestI.cpp +++ b/cpp/test/Ice/dispatcher/TestI.cpp @@ -11,6 +11,7 @@ #include <Ice/Ice.h> #include <Dispatcher.h> #include <TestCommon.h> +#include <IceUtil/Thread.h> using namespace std; @@ -21,6 +22,12 @@ TestIntfI::op(const Ice::Current&) } void +TestIntfI::sleep(Ice::Int to, const Ice::Current&) +{ + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(to)); +} + +void TestIntfI::opWithPayload(const Ice::ByteSeq&, const Ice::Current&) { test(Dispatcher::isDispatcherThread()); diff --git a/cpp/test/Ice/dispatcher/TestI.h b/cpp/test/Ice/dispatcher/TestI.h index 99c64dd20d1..228a2842753 100644 --- a/cpp/test/Ice/dispatcher/TestI.h +++ b/cpp/test/Ice/dispatcher/TestI.h @@ -20,6 +20,7 @@ class TestIntfI : virtual public Test::TestIntf public: virtual void op(const Ice::Current&); + virtual void sleep(Ice::Int, const Ice::Current&); virtual void opWithPayload(const Ice::ByteSeq&, const Ice::Current&); virtual void shutdown(const Ice::Current&); }; diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs index b87c5ff7cec..0f4bd0a7187 100644 --- a/cs/src/Ice/CollocatedRequestHandler.cs +++ b/cs/src/Ice/CollocatedRequestHandler.cs @@ -97,10 +97,10 @@ namespace IceInternal stream.swap(_batchStream); _adapter.getThreadPool().dispatch( - delegate() + () => { invokeAll(stream, 0, invokeNum, true); - }); + }, null); // // Reset the batch. @@ -275,7 +275,7 @@ namespace IceInternal { invokeAll(@out.ostr(), requestId, 1, false); } - }); + }, null); } else { @@ -313,7 +313,7 @@ namespace IceInternal { invokeAll(outAsync.ostr__, requestId, 1, false); } - }); + }, null); sentCallback = null; return false; } @@ -359,13 +359,13 @@ namespace IceInternal if(_reference.getInvocationTimeout() > 0) { _adapter.getThreadPool().dispatch( - delegate() + () => { if(sent(@out)) { invokeAll(@out.ostr(), 0, invokeNum, true); } - }); + }, null); } else { @@ -417,13 +417,13 @@ namespace IceInternal if(invokeNum > 0) { _adapter.getThreadPool().dispatch( - delegate() + () => { if(sentAsync(outAsync)) { invokeAll(outAsync.ostr__, 0, invokeNum, true); } - }); + }, null); sentCallback = null; return false; } diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs index 61e0034d3a4..5bc9f8ceeb9 100644 --- a/cs/src/Ice/ConnectRequestHandler.cs +++ b/cs/src/Ice/ConnectRequestHandler.cs @@ -372,10 +372,11 @@ namespace IceInternal // if(_requests.Count > 0) { - _reference.getInstance().clientThreadPool().dispatch(delegate() - { - flushRequestsWithException(); - }); + _reference.getInstance().clientThreadPool().dispatch( + () => + { + flushRequestsWithException(); + }, _connection); } _m.NotifyAll(); @@ -518,10 +519,11 @@ namespace IceInternal { Debug.Assert(_exception == null && _requests.Count > 0); _exception = ex.get(); - _reference.getInstance().clientThreadPool().dispatch(delegate() - { - flushRequestsWithException(); - }); + _reference.getInstance().clientThreadPool().dispatch( + () => + { + flushRequestsWithException(); + }, _connection); } finally { @@ -535,10 +537,11 @@ namespace IceInternal { Debug.Assert(_exception == null && _requests.Count > 0); _exception = ex; - _reference.getInstance().clientThreadPool().dispatch(delegate() - { - flushRequestsWithException(); - }); + _reference.getInstance().clientThreadPool().dispatch( + () => + { + flushRequestsWithException(); + }, _connection); } finally { @@ -549,16 +552,17 @@ namespace IceInternal if(sentCallbacks.Count > 0) { Instance instance = _reference.getInstance(); - instance.clientThreadPool().dispatch(delegate() - { - foreach(Request r in sentCallbacks) - { - if(r.outAsync != null) - { - r.outAsync.invokeSent__(r.sentCallback); - } - } - }); + instance.clientThreadPool().dispatch( + () => + { + foreach(Request r in sentCallbacks) + { + if(r.outAsync != null) + { + r.outAsync.invokeSent__(r.sentCallback); + } + } + }, _connection); } // diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index 382ef7b95f2..851c58aac90 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -1036,8 +1036,8 @@ namespace Ice _m.Lock(); try { - LinkedListNode<OutgoingMessage> p = _sendStreams.First; - while(p != null) + LinkedListNode<OutgoingMessage> p; + for(p = _sendStreams.First; p != null; p = p.Next) { OutgoingMessage o = p.Value; if(o.@out == @out) @@ -1051,18 +1051,14 @@ namespace Ice // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - if(p == _sendStreams.First) - { - o.timedOut(); - } - else + bool isSent = o.timedOut(); + if(p != _sendStreams.First) { _sendStreams.Remove(p); } - o.finished(new InvocationTimeoutException()); + @out.finished(new InvocationTimeoutException(), isSent); return; // We're done. } - p = p.Next; } if(@out is IceInternal.Outgoing) @@ -1087,11 +1083,14 @@ namespace Ice public void asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync) { + bool isSent = false; + bool finished = false; + _m.Lock(); try { - LinkedListNode<OutgoingMessage> p = _sendStreams.First; - while(p != null) + LinkedListNode<OutgoingMessage> p; + for(p = _sendStreams.First; p != null; p = p.Next) { OutgoingMessage o = p.Value; if(o.outAsync == outAsync) @@ -1105,30 +1104,28 @@ namespace Ice // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // - if(p == _sendStreams.First) - { - o.timedOut(); - } - else + isSent = o.timedOut(); + if(o != _sendStreams.First.Value) { _sendStreams.Remove(p); } - o.finished(new InvocationTimeoutException()); - return; // We're done. + finished = true; + break; // We're done. } - p = p.Next; } - if(outAsync is IceInternal.OutgoingAsync) + if(!finished && outAsync is IceInternal.OutgoingAsync) { IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync; foreach(KeyValuePair<int, IceInternal.OutgoingAsync> kvp in _asyncRequests) { if(kvp.Value == o) { - o.finished__(new InvocationTimeoutException(), true); + + finished = true; + isSent = true; _asyncRequests.Remove(kvp.Key); - return; // We're done. + break; // We're done. } } } @@ -1137,6 +1134,14 @@ namespace Ice { _m.Unlock(); } + + + if(finished) + { + // asyncRequestTimedOut is called from the dispatch thread so this is + // safe. + outAsync.finished__(new InvocationTimeoutException(), isSent); + } } public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag) @@ -1631,32 +1636,12 @@ namespace Ice // of the message must be taken care of by the Ice thread pool. // IceInternal.ThreadPoolCurrent c = current; - _threadPool.execute( - delegate() + _threadPool.execute(() => { - if(_dispatcher != null) - { - try + callOnDispatcher(() => { - _dispatcher(delegate() - { - dispatch(startCB, sentCBs, info); - }, - this); - } - catch(System.Exception ex) - { - if(_instance.initializationData().properties.getPropertyAsIntWithDefault( - "Ice.Warn.Dispatch", 1) > 1) - { - warning("dispatch exception", ex); - } - } - } - else - { - dispatch(startCB, sentCBs, info); - } + dispatch(startCB, sentCBs, info); + }); msg.destroy(ref c); }); } @@ -1810,29 +1795,37 @@ namespace Ice // non-blocking activity of the connection from these threads, the dispatching // of the message must be taken care of by the Ice thread pool. // - _threadPool.execute( - delegate() + _threadPool.execute(() => { - if(_dispatcher == null) - { - finish(); - } - else + callOnDispatcher(finish); + }); + } + + #if COMPACT + public void callOnDispatcher(Ice.VoidAction action) +#else + private void callOnDispatcher(System.Action action) +#endif + { + if(_dispatcher != null) + { + try + { + _dispatcher(action, this); + } + catch(System.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault( + "Ice.Warn.Dispatch", 1) > 1) { - try - { - _dispatcher(finish, this); - } - catch(System.Exception ex) - { - if(_instance.initializationData().properties.getPropertyAsIntWithDefault( - "Ice.Warn.Dispatch", 1) > 1) - { - warning("dispatch exception", ex); - } - } + warning("dispatch exception", ex); } - }); + } + } + else + { + action(); + } } private void finish() @@ -3290,11 +3283,12 @@ namespace Ice this.isSent = false; } - internal void timedOut() + internal bool timedOut() { Debug.Assert((@out != null || outAsync != null) && !isSent); @out = null; outAsync = null; + return isSent; } internal void adopt() diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index e06d04e389f..2c95043612b 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -320,17 +320,17 @@ namespace IceInternal } else { - instance_.clientThreadPool().dispatch(delegate() - { - try - { - sentCallback_(this); - } - catch(System.Exception ex) - { - warning__(ex); - } - }); + instance_.clientThreadPool().dispatch(() => + { + try + { + sentCallback_(this); + } + catch(System.Exception ex) + { + warning__(ex); + } + }, cachedConnection_); } return this; } @@ -348,10 +348,10 @@ namespace IceInternal { throw new System.ArgumentException("sent callback already set"); } - sentCallback_ = delegate(Ice.AsyncResult result) - { - cb(result.sentSynchronously()); - }; + sentCallback_ = (Ice.AsyncResult result) => + { + cb(result.sentSynchronously()); + }; if((state_ & Sent) == 0) { return this; @@ -375,17 +375,17 @@ namespace IceInternal } else { - instance_.clientThreadPool().dispatch(delegate() - { - try - { - cb(false); - } - catch(System.Exception ex) - { - warning__(ex); - } - }); + instance_.clientThreadPool().dispatch(() => + { + try + { + cb(false); + } + catch(System.Exception ex) + { + warning__(ex); + } + }, cachedConnection_); } return this; } @@ -410,17 +410,17 @@ namespace IceInternal monitor_.Unlock(); } - instance_.clientThreadPool().dispatch(delegate() - { - try - { - cb(this); - } - catch(System.Exception ex) - { - warning__(ex); - } - }); + instance_.clientThreadPool().dispatch(() => + { + try + { + cb(this); + } + catch(System.Exception ex) + { + warning__(ex); + } + }, cachedConnection_); return this; } @@ -449,17 +449,17 @@ namespace IceInternal monitor_.Unlock(); } - instance_.clientThreadPool().dispatch(delegate() - { - try - { - completedCallback_(this); - } - catch(System.Exception ex) - { - warning__(ex); - } - }); + instance_.clientThreadPool().dispatch(() => + { + try + { + completedCallback_(this); + } + catch(System.Exception ex) + { + warning__(ex); + } + }, cachedConnection_); return this; } @@ -525,10 +525,10 @@ namespace IceInternal // try { - instance_.clientThreadPool().dispatch(delegate() - { - invokeException__(ex); - }); + instance_.clientThreadPool().dispatch(() => + { + invokeException__(ex); + }, cachedConnection_); } catch(Ice.CommunicatorDestroyedException) { @@ -578,11 +578,11 @@ namespace IceInternal { try { - instance_.clientThreadPool().dispatch(delegate() - { - invokeSent__(callback); - }); - } + instance_.clientThreadPool().dispatch(() => + { + invokeSent__(callback); + }, cachedConnection_); + } catch(Ice.CommunicatorDestroyedException) { } @@ -794,12 +794,20 @@ namespace IceInternal if(handler != null) { + Ice.ConnectionI con = null; + try + { + con = handler.getConnection(false); + } + catch(Ice.LocalException) + { + // Ignore. + } IceInternal.OutgoingAsyncMessageCallback outAsync = (IceInternal.OutgoingAsyncMessageCallback)this; - instance_.clientThreadPool().execute( - delegate() + instance_.clientThreadPool().dispatch(() => { handler.asyncRequestTimedOut(outAsync); - }); + }, con); } } @@ -814,6 +822,7 @@ namespace IceInternal protected Ice.Communicator communicator_; protected IceInternal.Instance instance_; protected string operation_; + protected Ice.Connection cachedConnection_; protected readonly IceUtilInternal.Monitor monitor_ = new IceUtilInternal.Monitor(); protected IceInternal.BasicStream is_; @@ -932,6 +941,8 @@ namespace IceInternal public bool send__(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB) { + // Store away the connection for passing to the dispatcher. + cachedConnection_ = connection; return connection.sendAsyncRequest(this, compress, response, out sentCB); } @@ -1411,17 +1422,19 @@ namespace IceInternal monitor_.Unlock(); } - instance_.clientThreadPool().dispatch(delegate() - { - try - { - completedCallback_(this); - } - catch(System.Exception ex) - { - warning__(ex); - } - }); + + instance_.clientThreadPool().dispatch( + () => + { + try + { + completedCallback_(this); + } + catch(System.Exception ex) + { + warning__(ex); + } + }, null); return this; } @@ -1451,18 +1464,18 @@ namespace IceInternal monitor_.Unlock(); } - instance_.clientThreadPool().dispatch(delegate() - { - try - { - completedCallback_(this); - } - catch(System.Exception ex) - { - warning__(ex); - } - }); - return this; + instance_.clientThreadPool().dispatch(() => + { + try + { + completedCallback_(this); + } + catch(System.Exception ex) + { + warning__(ex); + } + }, null); + return this; } new public Ice.AsyncResult<T> whenSent(Ice.SentCallback cb) @@ -1542,6 +1555,8 @@ namespace IceInternal public bool send__(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCallback) { + // Store away the connection for passing to the dispatcher. + cachedConnection_ = connection; return connection.flushAsyncBatchRequests(this, out sentCallback); } diff --git a/cs/src/Ice/RequestHandler.cs b/cs/src/Ice/RequestHandler.cs index 1ef49ac8532..6b690a3f5d9 100644 --- a/cs/src/Ice/RequestHandler.cs +++ b/cs/src/Ice/RequestHandler.cs @@ -22,6 +22,7 @@ namespace IceInternal bool sendAsyncRequest(OutgoingAsyncMessageCallback @out, out Ice.AsyncCallback cb); void requestTimedOut(OutgoingMessageCallback @out); + // Must be called from the dispatcher thread. void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync); Reference getReference(); diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs index 84d1861e3cb..ad3869566ac 100644 --- a/cs/src/Ice/ThreadPool.cs +++ b/cs/src/Ice/ThreadPool.cs @@ -373,16 +373,16 @@ namespace IceInternal } #if COMPACT - public void dispatch(Ice.VoidAction call) + public void dispatch(Ice.VoidAction call, Ice.Connection con) #else - public void dispatch(System.Action call) + public void dispatch(System.Action call, Ice.Connection con) #endif { if(_dispatcher != null) { try { - _dispatcher(call, null); + _dispatcher(call, con); } catch(System.Exception ex) { diff --git a/cs/test/Ice/dispatcher/AllTests.cs b/cs/test/Ice/dispatcher/AllTests.cs index 1fa3a2eee5b..a5cfe2f1b1d 100644 --- a/cs/test/Ice/dispatcher/AllTests.cs +++ b/cs/test/Ice/dispatcher/AllTests.cs @@ -65,6 +65,7 @@ public class AllTests : TestCommon.TestApp called(); } + public void payload() { test(Dispatcher.isDispatcherThread()); @@ -80,7 +81,7 @@ public class AllTests : TestCommon.TestApp test(sentSynchronously || Dispatcher.isDispatcherThread()); } - protected void called() + public void called() { _m.Lock(); try @@ -140,6 +141,24 @@ public class AllTests : TestCommon.TestApp i.begin_op().whenCompleted(cb.exception); cb.check(); + // + // Expect InvocationTimeoutException. + // + { + Test.TestIntfPrx to = Test.TestIntfPrxHelper.uncheckedCast(p.ice_invocationTimeout(250)); + to.begin_sleep(500).whenCompleted( + () => + { + test(false); + }, + (Ice.Exception ex) => { + test(ex is Ice.InvocationTimeoutException); + test(Dispatcher.isDispatcherThread()); + cb.called(); + }); + cb.check(); + } + testController.holdAdapter(); Test.Callback_TestIntf_opWithPayload resp = cb.payload; Ice.ExceptionCallback excb = cb.ignoreEx; diff --git a/cs/test/Ice/dispatcher/Test.ice b/cs/test/Ice/dispatcher/Test.ice index 852bc74314c..2fbd29f2a6d 100644 --- a/cs/test/Ice/dispatcher/Test.ice +++ b/cs/test/Ice/dispatcher/Test.ice @@ -17,6 +17,7 @@ module Test interface TestIntf { void op(); + void sleep(int to); void opWithPayload(Ice::ByteSeq seq); void shutdown(); }; diff --git a/cs/test/Ice/dispatcher/TestI.cs b/cs/test/Ice/dispatcher/TestI.cs index 42740e2f9fd..56765b15b1a 100644 --- a/cs/test/Ice/dispatcher/TestI.cs +++ b/cs/test/Ice/dispatcher/TestI.cs @@ -8,6 +8,7 @@ // ********************************************************************** using Test; +using System.Threading; public class TestI : TestIntfDisp_ { @@ -35,6 +36,11 @@ public class TestI : TestIntfDisp_ test(Dispatcher.isDispatcherThread()); } + public override void sleep(int to, Ice.Current current) + { + Thread.Sleep(to); + } + override public void shutdown(Ice.Current current) { diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index 706616fe32f..d3146d1533f 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -252,7 +252,7 @@ public class AsyncResult // try { - _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem() + _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection) { public void run() @@ -370,7 +370,7 @@ public class AsyncResult // try { - _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem() + _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection) { public void run() @@ -489,8 +489,17 @@ public class AsyncResult if(handler != null) { final IceInternal.RequestHandler h = handler; + Ice.Connection connection = null; + try + { + connection = handler.getConnection(false); + } + catch(Ice.LocalException e) + { + // Ignore. + } _instance.clientThreadPool().execute( - new IceInternal.DispatchWorkItem() + new IceInternal.DispatchWorkItem(connection) { public void run() @@ -519,6 +528,7 @@ public class AsyncResult protected Communicator _communicator; protected IceInternal.Instance _instance; protected String _operation; + protected Ice.Connection _cachedConnection; protected java.lang.Object _monitor = new java.lang.Object(); protected IceInternal.BasicStream _is; diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java index ad289e019fb..2b08cb9aa04 100644 --- a/java/src/IceInternal/BatchOutgoingAsync.java +++ b/java/src/IceInternal/BatchOutgoingAsync.java @@ -19,6 +19,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync public int __send(Ice.ConnectionI connection, boolean compress, boolean response) { + _cachedConnection = connection; return connection.flushAsyncBatchRequests(this); } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 8b2f346a997..0505688cdda 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -333,7 +333,7 @@ public class ConnectRequestHandler // if(!_requests.isEmpty()) { - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem() + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection) { public void run() @@ -487,7 +487,7 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex.get(); - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem() + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection) { public void run() @@ -503,7 +503,7 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex; - _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem() + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection) { public void run() @@ -517,7 +517,7 @@ public class ConnectRequestHandler if(!sentCallbacks.isEmpty()) { _reference.getInstance().clientThreadPool().execute( - new DispatchWorkItem() + new DispatchWorkItem(_connection) { public void run() diff --git a/java/src/IceInternal/DispatchWorkItem.java b/java/src/IceInternal/DispatchWorkItem.java index 84a9c438453..a7c5ef7fa57 100644 --- a/java/src/IceInternal/DispatchWorkItem.java +++ b/java/src/IceInternal/DispatchWorkItem.java @@ -17,6 +17,15 @@ package IceInternal; // abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable { + public DispatchWorkItem() + { + } + + public DispatchWorkItem(Ice.Connection connection) + { + _connection = connection; + } + final public void execute(ThreadPoolCurrent current) { Instance instance = current.stream.instance(); @@ -25,7 +34,7 @@ abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable { try { - dispatcher.dispatch(this, null); + dispatcher.dispatch(this, _connection); } catch(java.lang.Exception ex) { @@ -45,4 +54,6 @@ abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable this.run(); } } + + private Ice.Connection _connection; } diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 9d18c397d6a..38e2eea29d7 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -103,6 +103,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa __send(Ice.ConnectionI connection, boolean compress, boolean response) throws RetryException { + _cachedConnection = connection; return connection.sendAsyncRequest(this, compress, response); } diff --git a/java/test/Ice/dispatcher/AllTests.java b/java/test/Ice/dispatcher/AllTests.java index 2007bf8514b..843b93d70b4 100644 --- a/java/test/Ice/dispatcher/AllTests.java +++ b/java/test/Ice/dispatcher/AllTests.java @@ -17,6 +17,7 @@ import test.Ice.dispatcher.Test.TestIntfControllerPrx; import test.Ice.dispatcher.Test.TestIntfControllerPrxHelper; import test.Ice.dispatcher.Test.Callback_TestIntf_op; import test.Ice.dispatcher.Test.Callback_TestIntf_opWithPayload; +import test.Ice.dispatcher.Test.Callback_TestIntf_sleep; public class AllTests { @@ -69,6 +70,12 @@ public class AllTests Ice.ObjectPrx obj = communicator.stringToProxy(sref); test(obj != null); + int mult = 1; + if(!communicator.getProperties().getPropertyWithDefault("Ice.Default.Protocol", "tcp").equals("tcp")) + { + mult = 4; + } + TestIntfPrx p = TestIntfPrxHelper.uncheckedCast(obj); sref = "testController:tcp -p 12011"; @@ -121,6 +128,61 @@ public class AllTests i.begin_op(cb); cb.check(); + { + // + // Expect InvocationTimeoutException. + // + TestIntfPrx to = TestIntfPrxHelper.uncheckedCast(p.ice_invocationTimeout(250)); + class Callback_TestIntf_sleepImpl extends Callback_TestIntf_sleep + { + public void + response() + { + test(false); + } + + public void + exception(Ice.LocalException ex) + { + test(ex instanceof Ice.InvocationTimeoutException); + test(dispatcher.isDispatcherThread()); + called(); + } + + public void + sent(boolean sentSynchronously) + { + test(sentSynchronously || dispatcher.isDispatcherThread()); + } + + public synchronized void check() + { + while(!_called) + { + try + { + wait(); + } + catch(InterruptedException ex) + { + } + } + + _called = false; + } + private synchronized void called() + { + assert(!_called); + _called = true; + notify(); + } + private boolean _called; + }; + Callback_TestIntf_sleepImpl callback = new Callback_TestIntf_sleepImpl(); + to.begin_sleep(500 * mult, callback); + callback.check(); + } + testController.holdAdapter(); Callback_TestIntf_opWithPayload callback = new Callback_TestIntf_opWithPayload() { diff --git a/java/test/Ice/dispatcher/Test.ice b/java/test/Ice/dispatcher/Test.ice index 25f412335f8..bbeabc2210a 100644 --- a/java/test/Ice/dispatcher/Test.ice +++ b/java/test/Ice/dispatcher/Test.ice @@ -18,6 +18,7 @@ module Test interface TestIntf { void op(); + void sleep(int to); void opWithPayload(Ice::ByteSeq seq); void shutdown(); }; diff --git a/java/test/Ice/dispatcher/TestI.java b/java/test/Ice/dispatcher/TestI.java index 9b2681c1ca7..a3e0e717e88 100644 --- a/java/test/Ice/dispatcher/TestI.java +++ b/java/test/Ice/dispatcher/TestI.java @@ -34,6 +34,18 @@ public class TestI extends _TestIntfDisp } public void + sleep(int to, Ice.Current current) + { + try + { + Thread.currentThread().sleep(to); + } + catch(InterruptedException ex) + { + } + } + + public void opWithPayload(byte[] seq, Ice.Current current) { test(_dispatcher.isDispatcherThread()); |