summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/Ice/OutgoingAsync.h6
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp109
-rw-r--r--cpp/src/Ice/ConnectionI.h2
-rw-r--r--cpp/src/Ice/Instance.cpp8
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp83
6 files changed, 136 insertions, 76 deletions
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index 13185c386b4..b7a14f2dd39 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -113,6 +113,7 @@ public:
bool __wait();
void __throwUserException();
virtual void __invokeExceptionAsync(const Exception&);
+ void __invokeCompleted();
static void __check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&);
static void __check(const AsyncResultPtr&, const Connection*, const ::std::string&);
@@ -143,7 +144,6 @@ protected:
const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
void __invokeSentAsync();
- void __invokeCompleted();
void runTimerTask(); // Implementation of TimerTask::runTimerTask()
@@ -257,7 +257,7 @@ public:
virtual void __finished(const Ice::Exception&);
virtual void __invokeExceptionAsync(const Ice::Exception&);
- void __finished();
+ bool __finished();
bool __invoke(bool);
BasicStream* __startWriteParams(Ice::FormatType format)
@@ -297,7 +297,7 @@ protected:
private:
- bool handleException(const Ice::Exception&);
+ void handleException(const Ice::Exception&);
RequestHandlerPtr _handler;
Ice::EncodingVersion _encoding;
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 452511591a5..95c151ff6e2 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -527,9 +527,9 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
}
}
- if(outAsync)
+ if(outAsync && outAsync->__finished())
{
- outAsync->__finished();
+ outAsync->__invokeCompleted();
}
_adapter->decDirectCount();
}
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index b42b8558a7f..fcf05ef9db0 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1601,6 +1601,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
ObjectAdapterPtr adapter;
OutgoingAsyncPtr outAsync;
ConnectionCallbackPtr heartbeatCallback;
+ int dispatchCount = 0;
ThreadPoolMessage<ConnectionI> msg(current, *this);
@@ -1760,7 +1761,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
swap(_startCallback, startCB);
if(startCB)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1781,7 +1782,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
servantManager,
adapter,
outAsync,
- heartbeatCallback));
+ heartbeatCallback,
+ dispatchCount));
}
if(readyOp & SocketOperationWrite)
@@ -1789,7 +1791,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs));
if(!sentCBs.empty())
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
@@ -1801,9 +1803,23 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
if(!readyOp)
{
+ assert(dispatchCount == 0);
return;
}
}
+
+ if(_acmLastActivity != IceUtil::Time())
+ {
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ io.completed();
}
catch(const DatagramLimitException&) // Expected.
{
@@ -1841,11 +1857,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
return;
}
- if(_acmLastActivity != IceUtil::Time())
- {
- _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
- }
- io.completed();
}
if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
@@ -1867,7 +1878,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1876,7 +1887,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
if(startCB)
{
startCB->connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1893,13 +1904,17 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
if(p->receivedReply)
{
- OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished();
+ OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
+ if(outAsync->__finished())
+ {
+ outAsync->__invokeCompleted();
+ }
}
#else
p->outAsync->__invokeSent();
#endif
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1908,8 +1923,8 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->__finished();
- ++count;
+ outAsync->__invokeCompleted();
+ ++dispatchedCount;
}
if(heartbeatCallback)
@@ -1928,7 +1943,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
Error out(_instance->initializationData().logger);
out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc;
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1949,10 +1964,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -2046,7 +2061,11 @@ Ice::ConnectionI::finish()
}
if(message->receivedReply)
{
- OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished();
+ OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
+ if(outAsync->__finished())
+ {
+ outAsync->__invokeCompleted();
+ }
}
_sendStreams.pop_front();
}
@@ -2896,27 +2915,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
_observer.finishWrite(_writeStream);
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we schedule
+ // the close timeout to wait for the peer to close the connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ SocketOperation op = _transceiver->closing(true, *_exception.get());
+ if(op)
+ {
+ return op;
+ }
+ }
}
catch(const Ice::LocalException& ex)
{
setState(StateClosed, ex);
- return SocketOperationNone;
- }
-
- //
- // If all the messages were sent and we are in the closing state, we schedule
- // the close timeout to wait for the peer to close the connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- SocketOperation op = _transceiver->closing(true, *_exception.get());
- if(op)
- {
- return op;
- }
}
-
return SocketOperationNone;
}
@@ -3215,7 +3232,8 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback)
+ OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3301,7 +3319,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
invokeNum = 1;
servantManager = _servantManager;
adapter = _adapter;
- ++_dispatchCount;
+ ++dispatchCount;
}
break;
}
@@ -3324,7 +3342,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
servantManager = _servantManager;
adapter = _adapter;
- _dispatchCount += invokeNum;
+ dispatchCount += invokeNum;
}
break;
}
@@ -3410,12 +3428,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
+ else if(outAsync->__finished())
+ {
+ ++dispatchCount;
+ }
else
{
- ++_dispatchCount;
+ outAsync = 0;
}
#else
- ++_dispatchCount;
+ if(outAsync->__finished())
+ {
+ ++dispatchCount;
+ }
+ else
+ {
+ outAsync = 0;
+ }
#endif
notifyAll(); // Notify threads blocked in close(false)
}
@@ -3429,7 +3458,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
if(_callback)
{
heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++dispatchCount;
}
break;
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index b7254df51ba..d9ff91c5f53 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -268,7 +268,7 @@ private:
#endif
IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&,
IceInternal::ServantManagerPtr&, ObjectAdapterPtr&,
- IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&);
+ IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&, int&);
void invokeAll(IceInternal::BasicStream&, Int, Int, Byte,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&);
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index a223683cc6b..f4b0d89c90c 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -1531,6 +1531,7 @@ IceInternal::Instance::destroy()
ThreadPoolPtr serverThreadPool;
ThreadPoolPtr clientThreadPool;
EndpointHostResolverPtr endpointHostResolver;
+ IceUtil::TimerPtr timer;
{
IceUtil::RecMutex::Lock sync(*this);
@@ -1557,8 +1558,7 @@ IceInternal::Instance::destroy()
if(_timer)
{
- _timer->destroy();
- _timer = 0;
+ std::swap(_timer, timer);
}
if(_servantFactoryManager)
@@ -1610,6 +1610,10 @@ IceInternal::Instance::destroy()
//
// Join with the thread pool threads outside the synchronization.
//
+ if(timer)
+ {
+ timer->destroy();
+ }
if(clientThreadPool)
{
clientThreadPool->joinWithAllThreads();
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index c9d0ac28234..509ba18ad06 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -608,12 +608,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
//
try
{
- if(!handleException(exc)) // This will throw if the invocation can't be retried.
- {
- return; // Can't be retried immediately.
- }
-
- __invoke(false); // Retry the invocation
+ handleException(exc);
}
catch(const Ice::Exception& ex)
{
@@ -640,9 +635,14 @@ IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex)
AsyncResult::__invokeExceptionAsync(ex);
}
-void
+bool
IceInternal::OutgoingAsync::__finished()
{
+ //
+ // NOTE: this method is called from ConnectionI.parseMessage
+ // with the connection locked. Therefore, it must not invoke
+ // any user callbacks.
+ //
assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
Ice::Byte replyStatus;
@@ -789,15 +789,43 @@ IceInternal::OutgoingAsync::__finished()
_state |= OK;
}
_monitor.notifyAll();
+
+ if(!_callback)
+ {
+ _observer.detach();
+ return false;
+ }
+ return true;
}
- catch(const LocalException& ex)
+ catch(const LocalException& exc)
{
- __finished(ex);
- return;
- }
+ //
+ // We don't call finished(exc) here because we don't want
+ // to invoke the completion callback. The completion
+ // callback is invoked by the connection is this method
+ // returns true.
+ //
+ try
+ {
+ handleException(exc);
+ return false; // Invocation will be retried.
+ }
+ catch(const Ice::Exception& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ _exception.reset(ex.ice_clone());
+ _monitor.notifyAll();
- assert(replyStatus == replyOK || replyStatus == replyUserException);
- __invokeCompleted();
+ if(!_callback)
+ {
+ _observer.detach();
+ return false;
+ }
+ return true;
+ }
+ }
}
bool
@@ -851,39 +879,38 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
}
}
}
- break;
}
catch(const RetryException&)
{
_proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ continue;
}
catch(const Ice::Exception& ex)
{
- if(!handleException(ex)) // This will throw if the invocation can't be retried.
- {
- break; // Can't be retried immediately.
- }
+ handleException(ex);
}
+ break;
}
return _sentSynchronously;
}
-bool
+void
IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc)
{
try
{
int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
_observer.retried(); // Invocation is being retried.
- if(interval > 0)
- {
- _instance->retryQueue()->add(this, interval);
- return false; // Don't retry immediately, the retry queue will take care of the retry.
- }
- else
- {
- return true; // Retry immediately.
- }
+
+ //
+ // Schedule the retry. Note that we always schedule the retry
+ // on the retry queue even if the invocation can be retried
+ // immediately. This is required because it might not be safe
+ // to retry from this thread (this is for instance called by
+ // finished(BasicStream) which is called with the connection
+ // locked.
+ //
+ _instance->retryQueue()->add(this, interval);
}
catch(const Ice::Exception& ex)
{