summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/Ice/OutgoingAsync.h24
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp54
-rw-r--r--cpp/src/Ice/RetryQueue.cpp17
-rw-r--r--cpp/src/Ice/RetryQueue.h18
-rw-r--r--cpp/src/slice2java/Gen.cpp4
-rw-r--r--cs/src/Ice/OutgoingAsync.cs178
-rw-r--r--cs/src/Ice/RetryQueue.cs17
-rw-r--r--java/src/Ice/CommunicatorI.java4
-rw-r--r--java/src/Ice/ConnectionI.java4
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java28
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java9
-rw-r--r--java/src/IceInternal/CommunicatorBatchOutgoingAsync.java2
-rw-r--r--java/src/IceInternal/GetConnectionOutgoingAsync.java51
-rw-r--r--java/src/IceInternal/OutgoingAsync.java26
-rw-r--r--java/src/IceInternal/OutgoingAsyncBase.java (renamed from java/src/IceInternal/AsyncResultI.java)20
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java5
-rw-r--r--java/src/IceInternal/RetryQueue.java2
-rw-r--r--java/src/IceInternal/RetryTask.java15
18 files changed, 377 insertions, 101 deletions
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index 507ead6df62..a6718de1fe6 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -231,6 +231,11 @@ public:
virtual void __finished(const Ice::Exception&) = 0;
//
+ // Called by the retry queue to process retry.
+ //
+ virtual void __processRetry(bool destroyed) = 0;
+
+ //
// Helper to dispatch invocation timeout.
//
void __dispatchInvocationTimeout(const ThreadPoolPtr&, const Ice::ConnectionPtr&);
@@ -255,6 +260,7 @@ public:
virtual bool __sent();
virtual void __invokeSent();
virtual void __finished(const Ice::Exception&);
+ virtual void __processRetry(bool);
virtual void __invokeExceptionAsync(const Ice::Exception&);
bool __finished();
@@ -294,8 +300,6 @@ public:
protected:
Ice::ObjectPrx _proxy;
- RequestHandlerPtr _handler;
- int _cnt;
private:
@@ -304,6 +308,8 @@ private:
Ice::EncodingVersion _encoding;
+ RequestHandlerPtr _handler;
+ int _cnt;
bool _sent;
Ice::OperationMode _mode;
};
@@ -320,6 +326,7 @@ public:
virtual bool __sent();
virtual void __invokeSent();
virtual void __finished(const Ice::Exception&);
+ virtual void __processRetry(bool);
};
class ICE_API ProxyBatchOutgoingAsync : public BatchOutgoingAsync
@@ -375,7 +382,7 @@ private:
int _useCount;
};
-class ICE_API GetConnectionOutgoingAsync : public OutgoingAsync
+class ICE_API GetConnectionOutgoingAsync : public OutgoingAsyncMessageCallback, public Ice::AsyncResult
{
public:
@@ -384,15 +391,26 @@ public:
void __invoke();
+ virtual Ice::ObjectPrx
+ getProxy() const
+ {
+ return _proxy;
+ }
+
virtual AsyncStatus __send(const Ice::ConnectionIPtr&, bool, bool);
virtual AsyncStatus __invokeCollocated(CollocatedRequestHandler*);
virtual bool __sent();
virtual void __invokeSent();
virtual void __finished(const Ice::Exception&);
+ virtual void __processRetry(bool);
private:
void handleException(const Ice::Exception&);
+
+ Ice::ObjectPrx _proxy;
+ RequestHandlerPtr _handler;
+ int _cnt;
};
//
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index cd3442113f8..c03cdd5a441 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -618,6 +618,26 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
}
void
+IceInternal::OutgoingAsync::__processRetry(bool destroyed)
+{
+ if(destroyed)
+ {
+ __invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__));
+ }
+ else
+ {
+ try
+ {
+ __invoke(false);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ __invokeExceptionAsync(ex);
+ }
+ }
+}
+
+void
IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex)
{
if((_state & Done) == 0 && _handler)
@@ -986,6 +1006,12 @@ IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc)
__invokeException(exc);
}
+void
+IceInternal::BatchOutgoingAsync::__processRetry(bool destroyed)
+{
+ // Does not support retry
+}
+
IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy,
const std::string& operation,
const CallbackBasePtr& delegate,
@@ -1208,13 +1234,15 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
}
}
-IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& proxy,
+IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& prx,
const std::string& operation,
const CallbackBasePtr& delegate,
const Ice::LocalObjectPtr& cookie) :
- OutgoingAsync(proxy, operation, delegate, cookie)
+ AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie),
+ _proxy(prx),
+ _cnt(0)
{
- _observer.attach(proxy.get(), operation, 0);
+ _observer.attach(prx.get(), operation, 0);
}
void
@@ -1285,6 +1313,26 @@ IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc)
}
void
+IceInternal::GetConnectionOutgoingAsync::__processRetry(bool destroyed)
+{
+ if(destroyed)
+ {
+ __invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__));
+ }
+ else
+ {
+ try
+ {
+ __invoke();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ __invokeExceptionAsync(ex);
+ }
+ }
+}
+
+void
IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc)
{
try
diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp
index e10c09b1396..730b15ba73d 100644
--- a/cpp/src/Ice/RetryQueue.cpp
+++ b/cpp/src/Ice/RetryQueue.cpp
@@ -18,7 +18,7 @@ using namespace IceInternal;
IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; }
-IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const OutgoingAsyncPtr& outAsync) :
+IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const OutgoingAsyncMessageCallbackPtr& outAsync) :
_queue(queue), _outAsync(outAsync)
{
}
@@ -28,21 +28,14 @@ IceInternal::RetryTask::runTimerTask()
{
if(_queue->remove(this))
{
- try
- {
- _outAsync->__invoke(false);
- }
- catch(const Ice::LocalException& ex)
- {
- _outAsync->__invokeExceptionAsync(ex);
- }
+ _outAsync->__processRetry(false);
}
}
void
IceInternal::RetryTask::destroy()
{
- _outAsync->__invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__));
+ _outAsync->__processRetry(true);
}
bool
@@ -56,7 +49,7 @@ IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(ins
}
void
-IceInternal::RetryQueue::add(const OutgoingAsyncPtr& out, int interval)
+IceInternal::RetryQueue::add(const OutgoingAsyncMessageCallbackPtr& out, int interval)
{
Lock sync(*this);
RetryTaskPtr task = new RetryTask(this, out);
@@ -66,7 +59,7 @@ IceInternal::RetryQueue::add(const OutgoingAsyncPtr& out, int interval)
}
catch(const IceUtil::IllegalArgumentException&) // Expected if the communicator destroyed the timer.
{
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
}
_requests.insert(task);
}
diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h
index 2270363c841..4339a7765b2 100644
--- a/cpp/src/Ice/RetryQueue.h
+++ b/cpp/src/Ice/RetryQueue.h
@@ -23,18 +23,18 @@ namespace IceInternal
class RetryTask : public IceUtil::TimerTask
{
public:
-
- RetryTask(const RetryQueuePtr&, const OutgoingAsyncPtr&);
-
+
+ RetryTask(const RetryQueuePtr&, const OutgoingAsyncMessageCallbackPtr&);
+
virtual void runTimerTask();
void destroy();
-
+
bool operator<(const RetryTask&) const;
-
+
private:
-
+
const RetryQueuePtr _queue;
- const OutgoingAsyncPtr _outAsync;
+ const OutgoingAsyncMessageCallbackPtr _outAsync;
};
typedef IceUtil::Handle<RetryTask> RetryTaskPtr;
@@ -43,8 +43,8 @@ class RetryQueue : public IceUtil::Shared, public IceUtil::Mutex
public:
RetryQueue(const InstancePtr&);
-
- void add(const OutgoingAsyncPtr&, int);
+
+ void add(const OutgoingAsyncMessageCallbackPtr&, int);
void destroy();
private:
diff --git a/cpp/src/slice2java/Gen.cpp b/cpp/src/slice2java/Gen.cpp
index 9de67726eb9..9889c96a214 100644
--- a/cpp/src/slice2java/Gen.cpp
+++ b/cpp/src/slice2java/Gen.cpp
@@ -4740,8 +4740,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p)
out << sb;
if(op->returnsData())
{
- out << nl << "IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;";
- out << nl << "IceInternal.AsyncResultI.check(__result, this, __" << op->name() << "_name);";
+ out << nl << "IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;";
+ out << nl << "IceInternal.OutgoingAsyncBase.check(__result, this, __" << op->name() << "_name);";
out << nl << "try";
out << sb;
diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs
index 0f84936de22..c1c119d45db 100644
--- a/cs/src/Ice/OutgoingAsync.cs
+++ b/cs/src/Ice/OutgoingAsync.cs
@@ -125,6 +125,11 @@ namespace IceInternal
void finished(Ice.Exception ex);
//
+ // Called by the retry queue to process retry.
+ //
+ void processRetry(bool destroyed);
+
+ //
// Helper to dispatch invocation timeout.
//
void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection);
@@ -1019,6 +1024,25 @@ namespace IceInternal
}
}
+ public virtual void processRetry(bool destroyed)
+ {
+ if(destroyed)
+ {
+ invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ }
+ else
+ {
+ try
+ {
+ invoke(false);
+ }
+ catch(Ice.LocalException ex)
+ {
+ invokeExceptionAsync(ex);
+ }
+ }
+ }
+
public void
dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
@@ -1458,9 +1482,9 @@ namespace IceInternal
}
protected Ice.ObjectPrxHelperBase proxy_;
- protected RequestHandler handler_;
- protected int cnt_;
+ private RequestHandler handler_;
+ private int cnt_;
private Ice.EncodingVersion _encoding;
private Ice.OperationMode _mode;
private bool _sent;
@@ -1634,14 +1658,18 @@ namespace IceInternal
private ProxyOnewayCallback<T> _completed;
}
- public class GetConnectionOutgoingAsync : TwowayOutgoingAsync<Ice.Callback_Object_ice_getConnection>
+ public class GetConnectionOutgoingAsync : OutgoingAsyncBase, OutgoingAsyncMessageCallback,
+ Ice.AsyncResult<Ice.Callback_Object_ice_getConnection>
{
- public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase proxy, string operation,
+ public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, string operation,
ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> cb,
object cookie) :
- base(proxy, operation, cb, cookie)
+ base(prx.ice_getCommunicator(), prx.reference__().getInstance(), operation, cookie)
{
- observer_ = ObserverHelper.get(proxy, operation);
+ proxy_ = prx;
+ completed_ = cb;
+ cnt_ = 0;
+ observer_ = ObserverHelper.get(prx, operation);
}
public void invoke()
@@ -1666,22 +1694,22 @@ namespace IceInternal
}
}
- public override bool send(Ice.ConnectionI connection, bool compress, bool response,
- out Ice.AsyncCallback sentCallback)
+ public bool send(Ice.ConnectionI connection, bool compress, bool response,
+ out Ice.AsyncCallback sentCallback)
{
sent();
sentCallback = null;
return false;
}
- public override bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback)
+ public bool invokeCollocated(CollocatedRequestHandler handler, out Ice.AsyncCallback sentCallback)
{
sent();
sentCallback = null;
return false;
}
- public override Ice.AsyncCallback sent()
+ public Ice.AsyncCallback sent()
{
lock(monitor_)
{
@@ -1692,12 +1720,12 @@ namespace IceInternal
return null;
}
- public override void invokeSent(Ice.AsyncCallback cb)
+ new public void invokeSent(Ice.AsyncCallback cb)
{
// No sent callback
}
- public override void finished(Ice.Exception exc)
+ public void finished(Ice.Exception exc)
{
try
{
@@ -1709,6 +1737,120 @@ namespace IceInternal
}
}
+ public void processRetry(bool destroyed)
+ {
+ if(destroyed)
+ {
+ invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ }
+ else
+ {
+ try
+ {
+ invoke();
+ }
+ catch(Ice.LocalException ex)
+ {
+ invokeExceptionAsync(ex);
+ }
+ }
+ }
+
+ public void
+ dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
+ {
+ GetConnectionOutgoingAsync self = this;
+ threadPool.dispatch(() =>
+ {
+ self.finished(ex);
+ }, connection);
+ }
+
+ new public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection>
+ whenCompleted(Ice.ExceptionCallback excb)
+ {
+ lock(monitor_)
+ {
+ if(excb == null)
+ {
+ throw new System.ArgumentException("callback is null");
+ }
+ setCompletedCallback(getCompletedCallback());
+ exceptionCallback_ = excb;
+ if((state_ & StateDone) == 0)
+ {
+ return this;
+ }
+ }
+
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning(ex);
+ }
+ }, null);
+ return this;
+ }
+
+ virtual public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection>
+ whenCompleted(Ice.Callback_Object_ice_getConnection cb, Ice.ExceptionCallback excb)
+ {
+ lock(monitor_)
+ {
+ if(cb == null && excb == null)
+ {
+ throw new System.ArgumentException("callback is null");
+ }
+ setCompletedCallback(getCompletedCallback());
+ responseCallback_ = cb;
+ exceptionCallback_ = excb;
+ if((state_ & StateDone) == 0)
+ {
+ return this;
+ }
+ }
+
+ instance_.clientThreadPool().dispatch(() =>
+ {
+ try
+ {
+ completedCallback_(this);
+ }
+ catch(System.Exception ex)
+ {
+ warning(ex);
+ }
+ }, null);
+ return this;
+ }
+
+ new public Ice.AsyncResult<Ice.Callback_Object_ice_getConnection> whenSent(Ice.SentCallback cb)
+ {
+ // Sent callback not supported
+ Debug.Assert(false);
+ return this;
+ }
+
+ public override Ice.ObjectPrx getProxy()
+ {
+ return proxy_;
+ }
+
+ protected override Ice.AsyncCallback getCompletedCallback()
+ {
+ return completed__;
+ }
+
+ private void completed__(Ice.AsyncResult result)
+ {
+ completed_(this, responseCallback_, exceptionCallback_);
+ }
+
private void handleException(Ice.Exception exc)
{
try
@@ -1729,6 +1871,13 @@ namespace IceInternal
throw ex;
}
}
+
+ private Ice.ObjectPrxHelperBase proxy_;
+ private ProxyTwowayCallback<Ice.Callback_Object_ice_getConnection> completed_;
+ private int cnt_;
+
+ private Ice.Callback_Object_ice_getConnection responseCallback_ = null;
+ private RequestHandler handler_ = null;
}
public class BatchOutgoingAsync : OutgoingAsyncBase, OutgoingAsyncMessageCallback, TimerTask
@@ -1809,6 +1958,11 @@ namespace IceInternal
invokeException(exc);
}
+ public virtual void processRetry(bool destroyed)
+ {
+ // Does not support retry
+ }
+
public void
dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
diff --git a/cs/src/Ice/RetryQueue.cs b/cs/src/Ice/RetryQueue.cs
index ad424a52da1..f97b7431091 100644
--- a/cs/src/Ice/RetryQueue.cs
+++ b/cs/src/Ice/RetryQueue.cs
@@ -13,7 +13,7 @@ namespace IceInternal
public class RetryTask : TimerTask
{
- public RetryTask(RetryQueue retryQueue, OutgoingAsync outAsync)
+ public RetryTask(RetryQueue retryQueue, OutgoingAsyncMessageCallback outAsync)
{
_retryQueue = retryQueue;
_outAsync = outAsync;
@@ -23,24 +23,17 @@ namespace IceInternal
{
if(_retryQueue.remove(this))
{
- try
- {
- _outAsync.invoke(false);
- }
- catch(Ice.LocalException ex)
- {
- _outAsync.invokeExceptionAsync(ex);
- }
+ _outAsync.processRetry(false);
}
}
public void destroy()
{
- _outAsync.invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ _outAsync.processRetry(true);
}
private RetryQueue _retryQueue;
- private OutgoingAsync _outAsync;
+ private OutgoingAsyncMessageCallback _outAsync;
}
public class RetryQueue
@@ -50,7 +43,7 @@ namespace IceInternal
_instance = instance;
}
- public void add(OutgoingAsync outAsync, int interval)
+ public void add(OutgoingAsyncMessageCallback outAsync, int interval)
{
lock(this)
{
diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java
index 121d82d7f2b..ed517e4ae9f 100644
--- a/java/src/Ice/CommunicatorI.java
+++ b/java/src/Ice/CommunicatorI.java
@@ -283,8 +283,8 @@ public final class CommunicatorI implements Communicator
public void
end_flushBatchRequests(AsyncResult r)
{
- IceInternal.AsyncResultI ri = (IceInternal.AsyncResultI)r;
- IceInternal.AsyncResultI.check(ri, this, __flushBatchRequests_name);
+ IceInternal.OutgoingAsyncBase ri = (IceInternal.OutgoingAsyncBase)r;
+ IceInternal.OutgoingAsyncBase.check(ri, this, __flushBatchRequests_name);
ri.__wait();
}
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index cb3383bfb56..e3c555f6174 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -651,8 +651,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
@Override
public void end_flushBatchRequests(AsyncResult ir)
{
- IceInternal.AsyncResultI r = (IceInternal.AsyncResultI) ir;
- IceInternal.AsyncResultI.check(r, this, __flushBatchRequests_name);
+ IceInternal.OutgoingAsyncBase r = (IceInternal.OutgoingAsyncBase) ir;
+ IceInternal.OutgoingAsyncBase.check(r, this, __flushBatchRequests_name);
r.__wait();
}
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index 72d3d99f150..9387bb4a6ba 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -305,8 +305,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final boolean
end_ice_isA(AsyncResult __iresult)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
- IceInternal.AsyncResultI.check(__result, this, __ice_isA_name);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, __ice_isA_name);
try
{
if(!__result.__wait())
@@ -797,8 +797,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final String[]
end_ice_ids(AsyncResult __iresult)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult;
- IceInternal.AsyncResultI.check(__result, this, __ice_ids_name);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, __ice_ids_name);
try
{
if(!__result.__wait())
@@ -1073,8 +1073,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final String
end_ice_id(AsyncResult __iresult)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult;
- IceInternal.AsyncResultI.check(__result, this, __ice_id_name);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, __ice_id_name);
try
{
if(!__result.__wait())
@@ -1468,8 +1468,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final boolean
end_ice_invoke(ByteSeqHolder outParams, AsyncResult __iresult)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI) __iresult;
- IceInternal.AsyncResultI.check(__result, this, __ice_invoke_name);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase) __iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, __ice_invoke_name);
try
{
boolean ok = __result.__wait();
@@ -2488,8 +2488,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public Ice.Connection
end_ice_getConnection(AsyncResult __iresult)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
- IceInternal.AsyncResultI.check(__result, this, __ice_getConnection_name);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, __ice_getConnection_name);
__result.__wait();
return ice_getCachedConnection();
}
@@ -2654,8 +2654,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public void
end_ice_flushBatchRequests(AsyncResult __iresult)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
- IceInternal.AsyncResultI.check(__result, this, __ice_flushBatchRequests_name);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, __ice_flushBatchRequests_name);
__result.__wait();
}
@@ -2780,8 +2780,8 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
public final void
__end(AsyncResult __iresult, String operation)
{
- IceInternal.AsyncResultI __result = (IceInternal.AsyncResultI)__iresult;
- IceInternal.AsyncResultI.check(__result, this, operation);
+ IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;
+ IceInternal.OutgoingAsyncBase.check(__result, this, operation);
try
{
boolean ok = __result.__wait();
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index d5953310639..e78879afa6a 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class BatchOutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback
+public class BatchOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
{
BatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation, CallbackBase callback)
{
@@ -96,6 +96,13 @@ public class BatchOutgoingAsync extends AsyncResultI implements OutgoingAsyncMes
@Override
public void
+ processRetry(boolean destroyed)
+ {
+ // Does not implement retry
+ }
+
+ @Override
+ public void
dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
threadPool.dispatch(new DispatchWorkItem(connection)
diff --git a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
index 6ebd6fb3a3e..d5c6189064a 100644
--- a/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
+++ b/java/src/IceInternal/CommunicatorBatchOutgoingAsync.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class CommunicatorBatchOutgoingAsync extends IceInternal.AsyncResultI
+public class CommunicatorBatchOutgoingAsync extends OutgoingAsyncBase
{
public CommunicatorBatchOutgoingAsync(Ice.Communicator communicator, Instance instance, String operation,
CallbackBase callback)
diff --git a/java/src/IceInternal/GetConnectionOutgoingAsync.java b/java/src/IceInternal/GetConnectionOutgoingAsync.java
index 95878b90709..75ef2d9b60f 100644
--- a/java/src/IceInternal/GetConnectionOutgoingAsync.java
+++ b/java/src/IceInternal/GetConnectionOutgoingAsync.java
@@ -9,11 +9,13 @@
package IceInternal;
-public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync
+public class GetConnectionOutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
{
- public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase callback)
+ public GetConnectionOutgoingAsync(Ice.ObjectPrxHelperBase prx, String operation, CallbackBase cb)
{
- super(prx, operation, callback);
+ super(prx.ice_getCommunicator(), ((Ice.ObjectPrxHelperBase) prx).__reference().getInstance(), operation, cb);
+ _proxy = (Ice.ObjectPrxHelperBase) prx;
+ _cnt = 0;
_observer = ObserverHelper.get(prx, operation);
}
@@ -39,6 +41,12 @@ public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync
}
@Override
+ public Ice.ObjectPrx getProxy()
+ {
+ return _proxy;
+ }
+
+ @Override
public int send(Ice.ConnectionI conection, boolean compress, boolean response)
throws RetryException
{
@@ -84,6 +92,39 @@ public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync
}
}
+ @Override
+ public void processRetry(boolean destroyed)
+ {
+ if(destroyed)
+ {
+ invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ }
+ else
+ {
+ try
+ {
+ __invoke();
+ }
+ catch(Ice.LocalException ex)
+ {
+ invokeExceptionAsync(ex);
+ }
+ }
+ }
+
+ @Override
+ public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
+ {
+ threadPool.dispatch(new DispatchWorkItem(connection)
+ {
+ @Override
+ public void run()
+ {
+ GetConnectionOutgoingAsync.this.finished(ex);
+ }
+ });
+ }
+
private void handleException(Ice.Exception exc)
{
try
@@ -105,4 +146,8 @@ public class GetConnectionOutgoingAsync extends IceInternal.OutgoingAsync
throw ex;
}
}
+
+ private Ice.ObjectPrxHelperBase _proxy;
+ private RequestHandler _handler = null;
+ private int _cnt;
}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 992b9ca6265..94e3a8ae2b4 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -9,7 +9,7 @@
package IceInternal;
-public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageCallback
+public class OutgoingAsync extends OutgoingAsyncBase implements OutgoingAsyncMessageCallback
{
public OutgoingAsync(Ice.ObjectPrx prx, String operation, CallbackBase cb)
{
@@ -245,6 +245,26 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
@Override
+ public void processRetry(boolean destroyed)
+ {
+ if(destroyed)
+ {
+ invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ }
+ else
+ {
+ try
+ {
+ invoke(false);
+ }
+ catch(Ice.LocalException ex)
+ {
+ invokeExceptionAsync(ex);
+ }
+ }
+ }
+
+ @Override
public void dispatchInvocationCancel(final Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection)
{
threadPool.dispatch(new DispatchWorkItem(connection)
@@ -654,9 +674,9 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
protected Ice.ObjectPrxHelperBase _proxy;
- protected RequestHandler _handler;
- protected int _cnt;
+ private RequestHandler _handler;
+ private int _cnt;
private Ice.EncodingVersion _encoding;
private Ice.OperationMode _mode;
private boolean _sent;
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/OutgoingAsyncBase.java
index e2ca0b7c466..8be0b3072bd 100644
--- a/java/src/IceInternal/AsyncResultI.java
+++ b/java/src/IceInternal/OutgoingAsyncBase.java
@@ -21,10 +21,10 @@ import Ice.UserException;
* With this object, an application can obtain several attributes of the
* invocation and discover its outcome.
**/
-public class AsyncResultI implements Ice.AsyncResult
+public class OutgoingAsyncBase implements Ice.AsyncResult
{
- protected AsyncResultI(Communicator communicator, IceInternal.Instance instance, String op,
- IceInternal.CallbackBase del)
+ protected OutgoingAsyncBase(Communicator communicator, IceInternal.Instance instance, String op,
+ IceInternal.CallbackBase del)
{
_communicator = communicator;
_instance = instance;
@@ -35,9 +35,9 @@ public class AsyncResultI implements Ice.AsyncResult
_exception = null;
_callback = del;
}
-
- protected AsyncResultI(Communicator communicator, Instance instance, String op, CallbackBase del, BasicStream is,
- BasicStream os)
+
+ protected OutgoingAsyncBase(Communicator communicator, Instance instance, String op, CallbackBase del,
+ BasicStream is, BasicStream os)
{
_communicator = communicator;
_instance = instance;
@@ -455,7 +455,7 @@ public class AsyncResultI implements Ice.AsyncResult
public void cacheMessageBuffers()
{
}
-
+
public final void invokeCompleted()
{
//
@@ -493,7 +493,7 @@ public class AsyncResultI implements Ice.AsyncResult
Thread.currentThread().setContextClassLoader(null);
}
}
-
+
cacheMessageBuffers();
}
@@ -520,7 +520,7 @@ public class AsyncResultI implements Ice.AsyncResult
new Ice.InvocationTimeoutException());
}
}
-
+
private static void check(AsyncResult r, String operation)
{
if(r == null)
@@ -548,7 +548,7 @@ public class AsyncResultI implements Ice.AsyncResult
String s = "error raised by AMI callback:\n" + IceInternal.Ex.toString(error);
_instance.initializationData().logger.error(s);
}
-
+
protected Communicator _communicator;
protected IceInternal.Instance _instance;
protected String _operation;
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index 95c5fd2bb45..9c069918933 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -44,6 +44,11 @@ public interface OutgoingAsyncMessageCallback
void finished(Ice.Exception ex);
//
+ // Called by the retry queue to process retry.
+ //
+ void processRetry(boolean destroyed);
+
+ //
// Helper to dispatch the cancellation exception.
//
void dispatchInvocationCancel(Ice.LocalException ex, ThreadPool threadPool, Ice.Connection connection);
diff --git a/java/src/IceInternal/RetryQueue.java b/java/src/IceInternal/RetryQueue.java
index 6a6de427dfe..023992f31f9 100644
--- a/java/src/IceInternal/RetryQueue.java
+++ b/java/src/IceInternal/RetryQueue.java
@@ -17,7 +17,7 @@ public class RetryQueue
}
synchronized public void
- add(OutgoingAsync outAsync, int interval)
+ add(OutgoingAsyncMessageCallback outAsync, int interval)
{
RetryTask task = new RetryTask(this, outAsync);
task.setFuture(_instance.timer().schedule(task, interval, java.util.concurrent.TimeUnit.MILLISECONDS));
diff --git a/java/src/IceInternal/RetryTask.java b/java/src/IceInternal/RetryTask.java
index e4f3a6c26fe..2c991f65819 100644
--- a/java/src/IceInternal/RetryTask.java
+++ b/java/src/IceInternal/RetryTask.java
@@ -11,7 +11,7 @@ package IceInternal;
class RetryTask implements Runnable
{
- RetryTask(RetryQueue queue, OutgoingAsync outAsync)
+ RetryTask(RetryQueue queue, OutgoingAsyncMessageCallback outAsync)
{
_queue = queue;
_outAsync = outAsync;
@@ -23,14 +23,7 @@ class RetryTask implements Runnable
{
if(_queue.remove(this))
{
- try
- {
- _outAsync.invoke(false);
- }
- catch(Ice.LocalException ex)
- {
- _outAsync.invokeExceptionAsync(ex);
- }
+ _outAsync.processRetry(false);
}
}
@@ -38,7 +31,7 @@ class RetryTask implements Runnable
destroy()
{
_future.cancel(false);
- _outAsync.invokeExceptionAsync(new Ice.CommunicatorDestroyedException());
+ _outAsync.processRetry(true);
}
public void setFuture(java.util.concurrent.Future<?> future)
@@ -47,6 +40,6 @@ class RetryTask implements Runnable
}
private final RetryQueue _queue;
- private final OutgoingAsync _outAsync;
+ private final OutgoingAsyncMessageCallback _outAsync;
private java.util.concurrent.Future<?> _future;
}