summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/Ice/OutgoingAsync.h1
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp14
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp28
-rw-r--r--cpp/src/Ice/ThreadPool.cpp10
-rw-r--r--cpp/src/Ice/ThreadPool.h9
-rw-r--r--cpp/test/Ice/dispatcher/AllTests.cpp21
-rw-r--r--cpp/test/Ice/dispatcher/Test.ice1
-rw-r--r--cpp/test/Ice/dispatcher/TestI.cpp7
-rw-r--r--cpp/test/Ice/dispatcher/TestI.h1
-rw-r--r--cs/src/Ice/CollocatedRequestHandler.cs16
-rw-r--r--cs/src/Ice/ConnectRequestHandler.cs48
-rw-r--r--cs/src/Ice/ConnectionI.cs128
-rw-r--r--cs/src/Ice/OutgoingAsync.cs181
-rw-r--r--cs/src/Ice/RequestHandler.cs1
-rw-r--r--cs/src/Ice/ThreadPool.cs6
-rw-r--r--cs/test/Ice/dispatcher/AllTests.cs21
-rw-r--r--cs/test/Ice/dispatcher/Test.ice1
-rw-r--r--cs/test/Ice/dispatcher/TestI.cs6
-rw-r--r--java/src/Ice/AsyncResult.java16
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java1
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java8
-rw-r--r--java/src/IceInternal/DispatchWorkItem.java13
-rw-r--r--java/src/IceInternal/OutgoingAsync.java1
-rw-r--r--java/test/Ice/dispatcher/AllTests.java62
-rw-r--r--java/test/Ice/dispatcher/Test.ice1
-rw-r--r--java/test/Ice/dispatcher/TestI.java12
26 files changed, 405 insertions, 209 deletions
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index 96b5d96bd2d..360ddf18acb 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -159,6 +159,7 @@ protected:
const CommunicatorPtr _communicator;
const IceInternal::InstancePtr _instance;
const std::string& _operation;
+ Ice::ConnectionPtr _cachedConnection;
const IceInternal::CallbackBasePtr _callback;
const LocalObjectPtr _cookie;
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 7ff4609d924..c3b4a4b160d 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -29,7 +29,8 @@ class FlushRequestsWithException : public DispatchWorkItem
{
public:
- FlushRequestsWithException(const ConnectRequestHandlerPtr& handler) : _handler(handler)
+ FlushRequestsWithException(const Ice::ConnectionPtr& connection, const ConnectRequestHandlerPtr& handler) :
+ DispatchWorkItem(connection), _handler(handler)
{
}
@@ -48,7 +49,8 @@ class FlushSentRequests : public DispatchWorkItem
{
public:
- FlushSentRequests(const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : _callbacks(callbacks)
+ FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) :
+ DispatchWorkItem(connection), _callbacks(callbacks)
{
}
@@ -367,7 +369,7 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
//
if(!_requests.empty())
{
- _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(_connection, this));
}
notifyAll();
@@ -483,19 +485,19 @@ ConnectRequestHandler::flushRequests()
Lock sync(*this);
assert(!_exception.get() && !_requests.empty());
_exception.reset(ex.get()->ice_clone());
- _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(_connection, this));
}
catch(const Ice::LocalException& ex)
{
Lock sync(*this);
assert(!_exception.get() && !_requests.empty());
_exception.reset(ex.ice_clone());
- _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(this));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushRequestsWithException(_connection, this));
}
if(!sentCallbacks.empty())
{
- _reference->getInstance()->clientThreadPool()->execute(new FlushSentRequests(sentCallbacks));
+ _reference->getInstance()->clientThreadPool()->execute(new FlushSentRequests(_connection, sentCallbacks));
}
//
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 3fb2c8ed08b..8b12805ba4f 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -51,8 +51,8 @@ class AsynchronousException : public DispatchWorkItem
{
public:
- AsynchronousException(const Ice::AsyncResultPtr& result, const Ice::Exception& ex) :
- _result(result), _exception(ex.ice_clone())
+ AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, const Ice::Exception& ex) :
+ DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone())
{
}
@@ -72,7 +72,8 @@ class AsynchronousSent : public DispatchWorkItem
{
public:
- AsynchronousSent(const Ice::AsyncResultPtr& result) : _result(result)
+ AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) :
+ DispatchWorkItem(connection), _result(result)
{
}
@@ -91,8 +92,8 @@ class AsynchronousTimeout : public DispatchWorkItem
{
public:
- AsynchronousTimeout(const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) :
- _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result))
+ AsynchronousTimeout(const Ice::ConnectionPtr& connection, const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) :
+ DispatchWorkItem(connection), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result))
{
assert(_outAsync);
}
@@ -268,7 +269,7 @@ Ice::AsyncResult::__invokeSentAsync()
//
try
{
- _instance->clientThreadPool()->execute(new AsynchronousSent(this));
+ _instance->clientThreadPool()->execute(new AsynchronousSent(_cachedConnection, this));
}
catch(const Ice::CommunicatorDestroyedException&)
{
@@ -300,7 +301,7 @@ Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex)
// CommunicatorDestroyedException is the only exception that can propagate directly
// from this method.
//
- _instance->clientThreadPool()->execute(new AsynchronousException(this, ex));
+ _instance->clientThreadPool()->execute(new AsynchronousException(_cachedConnection, this, ex));
}
void
@@ -342,7 +343,16 @@ Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask()
if(handler)
{
- _instance->clientThreadPool()->execute(new AsynchronousTimeout(handler, this));
+ Ice::ConnectionPtr connection;
+ try
+ {
+ connection = handler->getConnection(false);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore.
+ }
+ _instance->clientThreadPool()->execute(new AsynchronousTimeout(connection, handler, this));
}
}
@@ -507,6 +517,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
AsyncStatus
IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
{
+ _cachedConnection = connection;
return connection->sendAsyncRequest(this, compress, response);
}
@@ -838,6 +849,7 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu
AsyncStatus
IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool)
{
+ _cachedConnection = connection;
return connection->flushAsyncBatchRequests(this);
}
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index d7884f54964..d7c26aced63 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -141,6 +141,14 @@ class ThreadPoolDestroyedException
}
+
+IceInternal::DispatchWorkItem::DispatchWorkItem() {
+}
+
+IceInternal::DispatchWorkItem::DispatchWorkItem(const Ice::ConnectionPtr& connection) : _connection(connection) {
+
+}
+
void
IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
{
@@ -150,7 +158,7 @@ IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
{
try
{
- dispatcher->dispatch(this, 0);
+ dispatcher->dispatch(this, _connection);
}
catch(const std::exception& ex)
{
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 6d2c9d283e5..f4df601605c 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -78,7 +78,6 @@ public:
}
void finish(const EventHandlerPtr&);
void execute(const ThreadPoolWorkItemPtr&);
-
void joinWithAllThreads();
std::string prefix() const;
@@ -184,9 +183,15 @@ public:
class DispatchWorkItem : public ThreadPoolWorkItem, public Ice::DispatcherCall
{
-private:
+public:
+
+ DispatchWorkItem();
+ DispatchWorkItem(const Ice::ConnectionPtr& connection);
+ private:
+
virtual void execute(ThreadPoolCurrent&);
+ Ice::ConnectionPtr _connection;
};
class ThreadPoolWorkQueue : public EventHandler, public IceUtil::Mutex
diff --git a/cpp/test/Ice/dispatcher/AllTests.cpp b/cpp/test/Ice/dispatcher/AllTests.cpp
index 7e82afcf37d..a4a7cc25c5c 100644
--- a/cpp/test/Ice/dispatcher/AllTests.cpp
+++ b/cpp/test/Ice/dispatcher/AllTests.cpp
@@ -52,6 +52,18 @@ public:
called();
}
+ void responseEx()
+ {
+ test(false);
+ }
+
+ void exceptionEx(const ::Ice::Exception& ex)
+ {
+ test(dynamic_cast<const Ice::InvocationTimeoutException*>(&ex));
+ test(Dispatcher::isDispatcherThread());
+ called();
+ }
+
void
payload()
{
@@ -123,6 +135,15 @@ allTests(const Ice::CommunicatorPtr& communicator)
i->begin_op(callback);
cb->check();
+ {
+ //
+ // Expect InvocationTimeoutException.
+ //
+ Test::TestIntfPrx to = p->ice_invocationTimeout(250);
+ to->begin_sleep(500, Test::newCallback_TestIntf_sleep(cb, &Callback::responseEx, &Callback::exceptionEx));
+ cb->check();
+ }
+
testController->holdAdapter();
Test::Callback_TestIntf_opWithPayloadPtr callback2 =
diff --git a/cpp/test/Ice/dispatcher/Test.ice b/cpp/test/Ice/dispatcher/Test.ice
index 857acae24a9..0379b4c7f4c 100644
--- a/cpp/test/Ice/dispatcher/Test.ice
+++ b/cpp/test/Ice/dispatcher/Test.ice
@@ -18,6 +18,7 @@ module Test
interface TestIntf
{
void op();
+ void sleep(int to);
void opWithPayload(Ice::ByteSeq seq);
void shutdown();
};
diff --git a/cpp/test/Ice/dispatcher/TestI.cpp b/cpp/test/Ice/dispatcher/TestI.cpp
index 2a56d659f82..9140fc55fb0 100644
--- a/cpp/test/Ice/dispatcher/TestI.cpp
+++ b/cpp/test/Ice/dispatcher/TestI.cpp
@@ -11,6 +11,7 @@
#include <Ice/Ice.h>
#include <Dispatcher.h>
#include <TestCommon.h>
+#include <IceUtil/Thread.h>
using namespace std;
@@ -21,6 +22,12 @@ TestIntfI::op(const Ice::Current&)
}
void
+TestIntfI::sleep(Ice::Int to, const Ice::Current&)
+{
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(to));
+}
+
+void
TestIntfI::opWithPayload(const Ice::ByteSeq&, const Ice::Current&)
{
test(Dispatcher::isDispatcherThread());
diff --git a/cpp/test/Ice/dispatcher/TestI.h b/cpp/test/Ice/dispatcher/TestI.h
index 99c64dd20d1..228a2842753 100644
--- a/cpp/test/Ice/dispatcher/TestI.h
+++ b/cpp/test/Ice/dispatcher/TestI.h
@@ -20,6 +20,7 @@ class TestIntfI : virtual public Test::TestIntf
public:
virtual void op(const Ice::Current&);
+ virtual void sleep(Ice::Int, const Ice::Current&);
virtual void opWithPayload(const Ice::ByteSeq&, const Ice::Current&);
virtual void shutdown(const Ice::Current&);
};
diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs
index b87c5ff7cec..0f4bd0a7187 100644
--- a/cs/src/Ice/CollocatedRequestHandler.cs
+++ b/cs/src/Ice/CollocatedRequestHandler.cs
@@ -97,10 +97,10 @@ namespace IceInternal
stream.swap(_batchStream);
_adapter.getThreadPool().dispatch(
- delegate()
+ () =>
{
invokeAll(stream, 0, invokeNum, true);
- });
+ }, null);
//
// Reset the batch.
@@ -275,7 +275,7 @@ namespace IceInternal
{
invokeAll(@out.ostr(), requestId, 1, false);
}
- });
+ }, null);
}
else
{
@@ -313,7 +313,7 @@ namespace IceInternal
{
invokeAll(outAsync.ostr__, requestId, 1, false);
}
- });
+ }, null);
sentCallback = null;
return false;
}
@@ -359,13 +359,13 @@ namespace IceInternal
if(_reference.getInvocationTimeout() > 0)
{
_adapter.getThreadPool().dispatch(
- delegate()
+ () =>
{
if(sent(@out))
{
invokeAll(@out.ostr(), 0, invokeNum, true);
}
- });
+ }, null);
}
else
{
@@ -417,13 +417,13 @@ namespace IceInternal
if(invokeNum > 0)
{
_adapter.getThreadPool().dispatch(
- delegate()
+ () =>
{
if(sentAsync(outAsync))
{
invokeAll(outAsync.ostr__, 0, invokeNum, true);
}
- });
+ }, null);
sentCallback = null;
return false;
}
diff --git a/cs/src/Ice/ConnectRequestHandler.cs b/cs/src/Ice/ConnectRequestHandler.cs
index 61e0034d3a4..5bc9f8ceeb9 100644
--- a/cs/src/Ice/ConnectRequestHandler.cs
+++ b/cs/src/Ice/ConnectRequestHandler.cs
@@ -372,10 +372,11 @@ namespace IceInternal
//
if(_requests.Count > 0)
{
- _reference.getInstance().clientThreadPool().dispatch(delegate()
- {
- flushRequestsWithException();
- });
+ _reference.getInstance().clientThreadPool().dispatch(
+ () =>
+ {
+ flushRequestsWithException();
+ }, _connection);
}
_m.NotifyAll();
@@ -518,10 +519,11 @@ namespace IceInternal
{
Debug.Assert(_exception == null && _requests.Count > 0);
_exception = ex.get();
- _reference.getInstance().clientThreadPool().dispatch(delegate()
- {
- flushRequestsWithException();
- });
+ _reference.getInstance().clientThreadPool().dispatch(
+ () =>
+ {
+ flushRequestsWithException();
+ }, _connection);
}
finally
{
@@ -535,10 +537,11 @@ namespace IceInternal
{
Debug.Assert(_exception == null && _requests.Count > 0);
_exception = ex;
- _reference.getInstance().clientThreadPool().dispatch(delegate()
- {
- flushRequestsWithException();
- });
+ _reference.getInstance().clientThreadPool().dispatch(
+ () =>
+ {
+ flushRequestsWithException();
+ }, _connection);
}
finally
{
@@ -549,16 +552,17 @@ namespace IceInternal
if(sentCallbacks.Count > 0)
{
Instance instance = _reference.getInstance();
- instance.clientThreadPool().dispatch(delegate()
- {
- foreach(Request r in sentCallbacks)
- {
- if(r.outAsync != null)
- {
- r.outAsync.invokeSent__(r.sentCallback);
- }
- }
- });
+ instance.clientThreadPool().dispatch(
+ () =>
+ {
+ foreach(Request r in sentCallbacks)
+ {
+ if(r.outAsync != null)
+ {
+ r.outAsync.invokeSent__(r.sentCallback);
+ }
+ }
+ }, _connection);
}
//
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs
index 382ef7b95f2..851c58aac90 100644
--- a/cs/src/Ice/ConnectionI.cs
+++ b/cs/src/Ice/ConnectionI.cs
@@ -1036,8 +1036,8 @@ namespace Ice
_m.Lock();
try
{
- LinkedListNode<OutgoingMessage> p = _sendStreams.First;
- while(p != null)
+ LinkedListNode<OutgoingMessage> p;
+ for(p = _sendStreams.First; p != null; p = p.Next)
{
OutgoingMessage o = p.Value;
if(o.@out == @out)
@@ -1051,18 +1051,14 @@ namespace Ice
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
- if(p == _sendStreams.First)
- {
- o.timedOut();
- }
- else
+ bool isSent = o.timedOut();
+ if(p != _sendStreams.First)
{
_sendStreams.Remove(p);
}
- o.finished(new InvocationTimeoutException());
+ @out.finished(new InvocationTimeoutException(), isSent);
return; // We're done.
}
- p = p.Next;
}
if(@out is IceInternal.Outgoing)
@@ -1087,11 +1083,14 @@ namespace Ice
public void asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync)
{
+ bool isSent = false;
+ bool finished = false;
+
_m.Lock();
try
{
- LinkedListNode<OutgoingMessage> p = _sendStreams.First;
- while(p != null)
+ LinkedListNode<OutgoingMessage> p;
+ for(p = _sendStreams.First; p != null; p = p.Next)
{
OutgoingMessage o = p.Value;
if(o.outAsync == outAsync)
@@ -1105,30 +1104,28 @@ namespace Ice
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
- if(p == _sendStreams.First)
- {
- o.timedOut();
- }
- else
+ isSent = o.timedOut();
+ if(o != _sendStreams.First.Value)
{
_sendStreams.Remove(p);
}
- o.finished(new InvocationTimeoutException());
- return; // We're done.
+ finished = true;
+ break; // We're done.
}
- p = p.Next;
}
- if(outAsync is IceInternal.OutgoingAsync)
+ if(!finished && outAsync is IceInternal.OutgoingAsync)
{
IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync;
foreach(KeyValuePair<int, IceInternal.OutgoingAsync> kvp in _asyncRequests)
{
if(kvp.Value == o)
{
- o.finished__(new InvocationTimeoutException(), true);
+
+ finished = true;
+ isSent = true;
_asyncRequests.Remove(kvp.Key);
- return; // We're done.
+ break; // We're done.
}
}
}
@@ -1137,6 +1134,14 @@ namespace Ice
{
_m.Unlock();
}
+
+
+ if(finished)
+ {
+ // asyncRequestTimedOut is called from the dispatch thread so this is
+ // safe.
+ outAsync.finished__(new InvocationTimeoutException(), isSent);
+ }
}
public void sendResponse(int requestId, IceInternal.BasicStream os, byte compressFlag)
@@ -1631,32 +1636,12 @@ namespace Ice
// of the message must be taken care of by the Ice thread pool.
//
IceInternal.ThreadPoolCurrent c = current;
- _threadPool.execute(
- delegate()
+ _threadPool.execute(() =>
{
- if(_dispatcher != null)
- {
- try
+ callOnDispatcher(() =>
{
- _dispatcher(delegate()
- {
- dispatch(startCB, sentCBs, info);
- },
- this);
- }
- catch(System.Exception ex)
- {
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault(
- "Ice.Warn.Dispatch", 1) > 1)
- {
- warning("dispatch exception", ex);
- }
- }
- }
- else
- {
- dispatch(startCB, sentCBs, info);
- }
+ dispatch(startCB, sentCBs, info);
+ });
msg.destroy(ref c);
});
}
@@ -1810,29 +1795,37 @@ namespace Ice
// non-blocking activity of the connection from these threads, the dispatching
// of the message must be taken care of by the Ice thread pool.
//
- _threadPool.execute(
- delegate()
+ _threadPool.execute(() =>
{
- if(_dispatcher == null)
- {
- finish();
- }
- else
+ callOnDispatcher(finish);
+ });
+ }
+
+ #if COMPACT
+ public void callOnDispatcher(Ice.VoidAction action)
+#else
+ private void callOnDispatcher(System.Action action)
+#endif
+ {
+ if(_dispatcher != null)
+ {
+ try
+ {
+ _dispatcher(action, this);
+ }
+ catch(System.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault(
+ "Ice.Warn.Dispatch", 1) > 1)
{
- try
- {
- _dispatcher(finish, this);
- }
- catch(System.Exception ex)
- {
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault(
- "Ice.Warn.Dispatch", 1) > 1)
- {
- warning("dispatch exception", ex);
- }
- }
+ warning("dispatch exception", ex);
}
- });
+ }
+ }
+ else
+ {
+ action();
+ }
}
private void finish()
@@ -3290,11 +3283,12 @@ namespace Ice
this.isSent = false;
}
- internal void timedOut()
+ internal bool timedOut()
{
Debug.Assert((@out != null || outAsync != null) && !isSent);
@out = null;
outAsync = null;
+ return isSent;
}
internal void adopt()
diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs
index e06d04e389f..2c95043612b 100644
--- a/cs/src/Ice/OutgoingAsync.cs
+++ b/cs/src/Ice/OutgoingAsync.cs
@@ -320,17 +320,17 @@ namespace IceInternal
}
else
{
- instance_.clientThreadPool().dispatch(delegate()
- {
- try
- {
- sentCallback_(this);
- }
- catch(System.Exception ex)
- {
- warning__(ex);
- }
- });
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ sentCallback_(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning__(ex);
+ }
+ }, cachedConnection_);
}
return this;
}
@@ -348,10 +348,10 @@ namespace IceInternal
{
throw new System.ArgumentException("sent callback already set");
}
- sentCallback_ = delegate(Ice.AsyncResult result)
- {
- cb(result.sentSynchronously());
- };
+ sentCallback_ = (Ice.AsyncResult result) =>
+ {
+ cb(result.sentSynchronously());
+ };
if((state_ & Sent) == 0)
{
return this;
@@ -375,17 +375,17 @@ namespace IceInternal
}
else
{
- instance_.clientThreadPool().dispatch(delegate()
- {
- try
- {
- cb(false);
- }
- catch(System.Exception ex)
- {
- warning__(ex);
- }
- });
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ cb(false);
+ }
+ catch(System.Exception ex)
+ {
+ warning__(ex);
+ }
+ }, cachedConnection_);
}
return this;
}
@@ -410,17 +410,17 @@ namespace IceInternal
monitor_.Unlock();
}
- instance_.clientThreadPool().dispatch(delegate()
- {
- try
- {
- cb(this);
- }
- catch(System.Exception ex)
- {
- warning__(ex);
- }
- });
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ cb(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning__(ex);
+ }
+ }, cachedConnection_);
return this;
}
@@ -449,17 +449,17 @@ namespace IceInternal
monitor_.Unlock();
}
- instance_.clientThreadPool().dispatch(delegate()
- {
- try
- {
- completedCallback_(this);
- }
- catch(System.Exception ex)
- {
- warning__(ex);
- }
- });
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning__(ex);
+ }
+ }, cachedConnection_);
return this;
}
@@ -525,10 +525,10 @@ namespace IceInternal
//
try
{
- instance_.clientThreadPool().dispatch(delegate()
- {
- invokeException__(ex);
- });
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ invokeException__(ex);
+ }, cachedConnection_);
}
catch(Ice.CommunicatorDestroyedException)
{
@@ -578,11 +578,11 @@ namespace IceInternal
{
try
{
- instance_.clientThreadPool().dispatch(delegate()
- {
- invokeSent__(callback);
- });
- }
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ invokeSent__(callback);
+ }, cachedConnection_);
+ }
catch(Ice.CommunicatorDestroyedException)
{
}
@@ -794,12 +794,20 @@ namespace IceInternal
if(handler != null)
{
+ Ice.ConnectionI con = null;
+ try
+ {
+ con = handler.getConnection(false);
+ }
+ catch(Ice.LocalException)
+ {
+ // Ignore.
+ }
IceInternal.OutgoingAsyncMessageCallback outAsync = (IceInternal.OutgoingAsyncMessageCallback)this;
- instance_.clientThreadPool().execute(
- delegate()
+ instance_.clientThreadPool().dispatch(() =>
{
handler.asyncRequestTimedOut(outAsync);
- });
+ }, con);
}
}
@@ -814,6 +822,7 @@ namespace IceInternal
protected Ice.Communicator communicator_;
protected IceInternal.Instance instance_;
protected string operation_;
+ protected Ice.Connection cachedConnection_;
protected readonly IceUtilInternal.Monitor monitor_ = new IceUtilInternal.Monitor();
protected IceInternal.BasicStream is_;
@@ -932,6 +941,8 @@ namespace IceInternal
public bool send__(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCB)
{
+ // Store away the connection for passing to the dispatcher.
+ cachedConnection_ = connection;
return connection.sendAsyncRequest(this, compress, response, out sentCB);
}
@@ -1411,17 +1422,19 @@ namespace IceInternal
monitor_.Unlock();
}
- instance_.clientThreadPool().dispatch(delegate()
- {
- try
- {
- completedCallback_(this);
- }
- catch(System.Exception ex)
- {
- warning__(ex);
- }
- });
+
+ instance_.clientThreadPool().dispatch(
+ () =>
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning__(ex);
+ }
+ }, null);
return this;
}
@@ -1451,18 +1464,18 @@ namespace IceInternal
monitor_.Unlock();
}
- instance_.clientThreadPool().dispatch(delegate()
- {
- try
- {
- completedCallback_(this);
- }
- catch(System.Exception ex)
- {
- warning__(ex);
- }
- });
- return this;
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning__(ex);
+ }
+ }, null);
+ return this;
}
new public Ice.AsyncResult<T> whenSent(Ice.SentCallback cb)
@@ -1542,6 +1555,8 @@ namespace IceInternal
public bool send__(Ice.ConnectionI connection, bool compress, bool response, out Ice.AsyncCallback sentCallback)
{
+ // Store away the connection for passing to the dispatcher.
+ cachedConnection_ = connection;
return connection.flushAsyncBatchRequests(this, out sentCallback);
}
diff --git a/cs/src/Ice/RequestHandler.cs b/cs/src/Ice/RequestHandler.cs
index 1ef49ac8532..6b690a3f5d9 100644
--- a/cs/src/Ice/RequestHandler.cs
+++ b/cs/src/Ice/RequestHandler.cs
@@ -22,6 +22,7 @@ namespace IceInternal
bool sendAsyncRequest(OutgoingAsyncMessageCallback @out, out Ice.AsyncCallback cb);
void requestTimedOut(OutgoingMessageCallback @out);
+ // Must be called from the dispatcher thread.
void asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync);
Reference getReference();
diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs
index 84d1861e3cb..ad3869566ac 100644
--- a/cs/src/Ice/ThreadPool.cs
+++ b/cs/src/Ice/ThreadPool.cs
@@ -373,16 +373,16 @@ namespace IceInternal
}
#if COMPACT
- public void dispatch(Ice.VoidAction call)
+ public void dispatch(Ice.VoidAction call, Ice.Connection con)
#else
- public void dispatch(System.Action call)
+ public void dispatch(System.Action call, Ice.Connection con)
#endif
{
if(_dispatcher != null)
{
try
{
- _dispatcher(call, null);
+ _dispatcher(call, con);
}
catch(System.Exception ex)
{
diff --git a/cs/test/Ice/dispatcher/AllTests.cs b/cs/test/Ice/dispatcher/AllTests.cs
index 1fa3a2eee5b..a5cfe2f1b1d 100644
--- a/cs/test/Ice/dispatcher/AllTests.cs
+++ b/cs/test/Ice/dispatcher/AllTests.cs
@@ -65,6 +65,7 @@ public class AllTests : TestCommon.TestApp
called();
}
+
public void payload()
{
test(Dispatcher.isDispatcherThread());
@@ -80,7 +81,7 @@ public class AllTests : TestCommon.TestApp
test(sentSynchronously || Dispatcher.isDispatcherThread());
}
- protected void called()
+ public void called()
{
_m.Lock();
try
@@ -140,6 +141,24 @@ public class AllTests : TestCommon.TestApp
i.begin_op().whenCompleted(cb.exception);
cb.check();
+ //
+ // Expect InvocationTimeoutException.
+ //
+ {
+ Test.TestIntfPrx to = Test.TestIntfPrxHelper.uncheckedCast(p.ice_invocationTimeout(250));
+ to.begin_sleep(500).whenCompleted(
+ () =>
+ {
+ test(false);
+ },
+ (Ice.Exception ex) => {
+ test(ex is Ice.InvocationTimeoutException);
+ test(Dispatcher.isDispatcherThread());
+ cb.called();
+ });
+ cb.check();
+ }
+
testController.holdAdapter();
Test.Callback_TestIntf_opWithPayload resp = cb.payload;
Ice.ExceptionCallback excb = cb.ignoreEx;
diff --git a/cs/test/Ice/dispatcher/Test.ice b/cs/test/Ice/dispatcher/Test.ice
index 852bc74314c..2fbd29f2a6d 100644
--- a/cs/test/Ice/dispatcher/Test.ice
+++ b/cs/test/Ice/dispatcher/Test.ice
@@ -17,6 +17,7 @@ module Test
interface TestIntf
{
void op();
+ void sleep(int to);
void opWithPayload(Ice::ByteSeq seq);
void shutdown();
};
diff --git a/cs/test/Ice/dispatcher/TestI.cs b/cs/test/Ice/dispatcher/TestI.cs
index 42740e2f9fd..56765b15b1a 100644
--- a/cs/test/Ice/dispatcher/TestI.cs
+++ b/cs/test/Ice/dispatcher/TestI.cs
@@ -8,6 +8,7 @@
// **********************************************************************
using Test;
+using System.Threading;
public class TestI : TestIntfDisp_
{
@@ -35,6 +36,11 @@ public class TestI : TestIntfDisp_
test(Dispatcher.isDispatcherThread());
}
+ public override void sleep(int to, Ice.Current current)
+ {
+ Thread.Sleep(to);
+ }
+
override public void
shutdown(Ice.Current current)
{
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index 706616fe32f..d3146d1533f 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -252,7 +252,7 @@ public class AsyncResult
//
try
{
- _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem()
+ _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection)
{
public void
run()
@@ -370,7 +370,7 @@ public class AsyncResult
//
try
{
- _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem()
+ _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection)
{
public void
run()
@@ -489,8 +489,17 @@ public class AsyncResult
if(handler != null)
{
final IceInternal.RequestHandler h = handler;
+ Ice.Connection connection = null;
+ try
+ {
+ connection = handler.getConnection(false);
+ }
+ catch(Ice.LocalException e)
+ {
+ // Ignore.
+ }
_instance.clientThreadPool().execute(
- new IceInternal.DispatchWorkItem()
+ new IceInternal.DispatchWorkItem(connection)
{
public void
run()
@@ -519,6 +528,7 @@ public class AsyncResult
protected Communicator _communicator;
protected IceInternal.Instance _instance;
protected String _operation;
+ protected Ice.Connection _cachedConnection;
protected java.lang.Object _monitor = new java.lang.Object();
protected IceInternal.BasicStream _is;
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index ad289e019fb..2b08cb9aa04 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -19,6 +19,7 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
public int
__send(Ice.ConnectionI connection, boolean compress, boolean response)
{
+ _cachedConnection = connection;
return connection.flushAsyncBatchRequests(this);
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 8b2f346a997..0505688cdda 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -333,7 +333,7 @@ public class ConnectRequestHandler
//
if(!_requests.isEmpty())
{
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem()
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection)
{
public void
run()
@@ -487,7 +487,7 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex.get();
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem()
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection)
{
public void
run()
@@ -503,7 +503,7 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex;
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem()
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection)
{
public void
run()
@@ -517,7 +517,7 @@ public class ConnectRequestHandler
if(!sentCallbacks.isEmpty())
{
_reference.getInstance().clientThreadPool().execute(
- new DispatchWorkItem()
+ new DispatchWorkItem(_connection)
{
public void
run()
diff --git a/java/src/IceInternal/DispatchWorkItem.java b/java/src/IceInternal/DispatchWorkItem.java
index 84a9c438453..a7c5ef7fa57 100644
--- a/java/src/IceInternal/DispatchWorkItem.java
+++ b/java/src/IceInternal/DispatchWorkItem.java
@@ -17,6 +17,15 @@ package IceInternal;
//
abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable
{
+ public DispatchWorkItem()
+ {
+ }
+
+ public DispatchWorkItem(Ice.Connection connection)
+ {
+ _connection = connection;
+ }
+
final public void execute(ThreadPoolCurrent current)
{
Instance instance = current.stream.instance();
@@ -25,7 +34,7 @@ abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable
{
try
{
- dispatcher.dispatch(this, null);
+ dispatcher.dispatch(this, _connection);
}
catch(java.lang.Exception ex)
{
@@ -45,4 +54,6 @@ abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable
this.run();
}
}
+
+ private Ice.Connection _connection;
}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 9d18c397d6a..38e2eea29d7 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -103,6 +103,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
__send(Ice.ConnectionI connection, boolean compress, boolean response)
throws RetryException
{
+ _cachedConnection = connection;
return connection.sendAsyncRequest(this, compress, response);
}
diff --git a/java/test/Ice/dispatcher/AllTests.java b/java/test/Ice/dispatcher/AllTests.java
index 2007bf8514b..843b93d70b4 100644
--- a/java/test/Ice/dispatcher/AllTests.java
+++ b/java/test/Ice/dispatcher/AllTests.java
@@ -17,6 +17,7 @@ import test.Ice.dispatcher.Test.TestIntfControllerPrx;
import test.Ice.dispatcher.Test.TestIntfControllerPrxHelper;
import test.Ice.dispatcher.Test.Callback_TestIntf_op;
import test.Ice.dispatcher.Test.Callback_TestIntf_opWithPayload;
+import test.Ice.dispatcher.Test.Callback_TestIntf_sleep;
public class AllTests
{
@@ -69,6 +70,12 @@ public class AllTests
Ice.ObjectPrx obj = communicator.stringToProxy(sref);
test(obj != null);
+ int mult = 1;
+ if(!communicator.getProperties().getPropertyWithDefault("Ice.Default.Protocol", "tcp").equals("tcp"))
+ {
+ mult = 4;
+ }
+
TestIntfPrx p = TestIntfPrxHelper.uncheckedCast(obj);
sref = "testController:tcp -p 12011";
@@ -121,6 +128,61 @@ public class AllTests
i.begin_op(cb);
cb.check();
+ {
+ //
+ // Expect InvocationTimeoutException.
+ //
+ TestIntfPrx to = TestIntfPrxHelper.uncheckedCast(p.ice_invocationTimeout(250));
+ class Callback_TestIntf_sleepImpl extends Callback_TestIntf_sleep
+ {
+ public void
+ response()
+ {
+ test(false);
+ }
+
+ public void
+ exception(Ice.LocalException ex)
+ {
+ test(ex instanceof Ice.InvocationTimeoutException);
+ test(dispatcher.isDispatcherThread());
+ called();
+ }
+
+ public void
+ sent(boolean sentSynchronously)
+ {
+ test(sentSynchronously || dispatcher.isDispatcherThread());
+ }
+
+ public synchronized void check()
+ {
+ while(!_called)
+ {
+ try
+ {
+ wait();
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ _called = false;
+ }
+ private synchronized void called()
+ {
+ assert(!_called);
+ _called = true;
+ notify();
+ }
+ private boolean _called;
+ };
+ Callback_TestIntf_sleepImpl callback = new Callback_TestIntf_sleepImpl();
+ to.begin_sleep(500 * mult, callback);
+ callback.check();
+ }
+
testController.holdAdapter();
Callback_TestIntf_opWithPayload callback = new Callback_TestIntf_opWithPayload()
{
diff --git a/java/test/Ice/dispatcher/Test.ice b/java/test/Ice/dispatcher/Test.ice
index 25f412335f8..bbeabc2210a 100644
--- a/java/test/Ice/dispatcher/Test.ice
+++ b/java/test/Ice/dispatcher/Test.ice
@@ -18,6 +18,7 @@ module Test
interface TestIntf
{
void op();
+ void sleep(int to);
void opWithPayload(Ice::ByteSeq seq);
void shutdown();
};
diff --git a/java/test/Ice/dispatcher/TestI.java b/java/test/Ice/dispatcher/TestI.java
index 9b2681c1ca7..a3e0e717e88 100644
--- a/java/test/Ice/dispatcher/TestI.java
+++ b/java/test/Ice/dispatcher/TestI.java
@@ -34,6 +34,18 @@ public class TestI extends _TestIntfDisp
}
public void
+ sleep(int to, Ice.Current current)
+ {
+ try
+ {
+ Thread.currentThread().sleep(to);
+ }
+ catch(InterruptedException ex)
+ {
+ }
+ }
+
+ public void
opWithPayload(byte[] seq, Ice.Current current)
{
test(_dispatcher.isDispatcherThread());