diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 176 |
1 files changed, 101 insertions, 75 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 1bdbeb12448..92dc4a1693d 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -242,7 +242,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) } void -Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) +Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream) { assert((out || outAsync)); // Only requests can timeout. out = 0; @@ -253,7 +253,7 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) } else { - assert(!adopted && !stream); + assert(!adopted); } } @@ -273,25 +273,28 @@ Ice::ConnectionI::OutgoingMessage::sent() else if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - invokeSent = outAsync->__sent(); + invokeSent = outAsync->sent(); return invokeSent || receivedReply; #else - return outAsync->__sent(); + return outAsync->sent(); #endif } return false; } void -Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) +Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) { if(out) { - out->finished(ex); + out->completed(ex); } else if(outAsync) { - outAsync->__finished(ex); + if(outAsync->completed(ex)) + { + outAsync->invokeCompleted(); + } } if(adopted) @@ -651,8 +654,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) #endif } - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - static_cast<Int>(os->b.size() - headerSize - 4)); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); // // Send the message. If it can't be sent without blocking the message is added @@ -685,7 +687,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) { - BasicStream* os = out->__getOs(); + BasicStream* os = out->getOs(); IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) @@ -731,8 +733,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } - out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - static_cast<Int>(os->b.size() - headerSize - 4)); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); AsyncStatus status = AsyncStatusQueued; try @@ -747,6 +748,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _exception->ice_throw(); } + if(response || status & AsyncStatusQueued) + { + out->cancelable(this); // Notify the request that it's cancelable + } + if(response) { // @@ -961,7 +967,7 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name); + FlushBatch out(this, _instance.get(), __flushBatchRequests_name); out.invoke(); } @@ -986,9 +992,8 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR #ifdef ICE_CPP11 AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, - const IceInternal::Function<void (bool)>& sent) +Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC @@ -1026,16 +1031,13 @@ Ice::ConnectionI::begin_flushBatchRequests( AsyncResultPtr Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { - ConnectionBatchOutgoingAsyncPtr result = - new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie); - try - { - result->__invoke(); - } - catch(const LocalException& __ex) - { - result->__invokeExceptionAsync(__ex); - } + ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this, + _communicator, + _instance, + __flushBatchRequests_name, + cb, + cookie); + result->invoke(); return result; } @@ -1047,7 +1049,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) } bool -Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) +Ice::ConnectionI::flushBatchRequests(OutgoingBase* out) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_batchStreamInUse && !_exception.get()) @@ -1075,12 +1077,10 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, - static_cast<Int>(_batchStream.b.size() - headerSize - 4)); - _batchStream.swap(*out->os()); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); + // // Send the batch stream. // @@ -1109,7 +1109,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) } AsyncStatus -Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) +Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_batchStreamInUse && !_exception.get()) @@ -1125,7 +1125,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; - if(outAsync->__sent()) + if(outAsync->sent()) { status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); } @@ -1141,11 +1141,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif + _batchStream.swap(*outAsync->getOs()); - outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, - static_cast<Int>(_batchStream.b.size() - headerSize - 4)); - - _batchStream.swap(*outAsync->__getOs()); + outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); // // Send the batch stream. @@ -1153,7 +1151,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) AsyncStatus status = AsyncStatusQueued; try { - OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0); + OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0); status = sendMessage(message); } catch(const Ice::LocalException& ex) @@ -1163,6 +1161,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } + if(status & AsyncStatusQueued) + { + outAsync->cancelable(this); // Notify the request that it's cancelable. + } + // // Reset the batch stream. // @@ -1276,9 +1279,14 @@ Ice::ConnectionI::getACM() } void -Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) +Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->out == out) @@ -1302,16 +1310,15 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) // if(o == _sendStreams.begin()) { - o->timedOut(true); // true = adopt the stream. + o->canceled(true); // true = adopt the stream. } else { - o->timedOut(false); + o->canceled(false); _sendStreams.erase(o); } - InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); return; } } @@ -1321,8 +1328,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { if(_requestsHint != _requests.end() && _requestsHint->second == o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); _requests.erase(_requestsHint); _requestsHint = _requests.end(); } @@ -1332,8 +1338,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { if(p->second == o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); assert(p != _requestsHint); _requests.erase(p); return; // We're done. @@ -1344,10 +1349,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } void -Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // + // NOTE: This isn't called from a thread pool thread. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->outAsync.get() == outAsync.get()) @@ -1365,25 +1378,29 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou _asyncRequests.erase(o->requestId); } } - + // // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // if(o == _sendStreams.begin()) { - o->timedOut(true); // true = adopt the stream + o->canceled(true); // true = adopt the stream } else { - o->timedOut(false); + o->canceled(false); _sendStreams.erase(o); } - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + sync.release(); + outAsync->invokeCompleted(); + } + return; } } - + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); if(o) { @@ -1393,19 +1410,25 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } + return; } } - + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { if(p->second.get() == o.get()) { assert(p != _asyncRequestsHint); _asyncRequests.erase(p); - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } + return; } } } @@ -1972,18 +1995,18 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) if(p->invokeSent) { - p->outAsync->__invokeSent(); + p->outAsync->invokeSent(); } if(p->receivedReply) { OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); - if(outAsync->__finished()) + if(outAsync->completed()) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } } #else - p->outAsync->__invokeSent(); + p->outAsync->invokeSent(); #endif } ++dispatchedCount; @@ -1995,7 +2018,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); ++dispatchedCount; } @@ -2147,14 +2170,14 @@ Ice::ConnectionI::finish() { if(message->sent() && message->invokeSent) { - message->outAsync->__invokeSent(); + message->outAsync->invokeSent(); } if(message->receivedReply) { OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); - if(outAsync->__finished()) + if(outAsync->completed()) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } } _sendStreams.pop_front(); @@ -2164,7 +2187,7 @@ Ice::ConnectionI::finish() for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { - o->finished(*_exception.get()); + o->completed(*_exception.get()); if(o->requestId) // Make sure finished isn't called twice. { if(o->out) @@ -2182,13 +2205,16 @@ Ice::ConnectionI::finish() for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - p->second->finished(*_exception.get()); + p->second->completed(*_exception.get()); } _requests.clear(); for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get()); + if(q->second->completed(*_exception.get())) + { + q->second->invokeCompleted(); + } } _asyncRequests.clear(); @@ -3481,7 +3507,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(p != _requests.end()) { - p->second->finished(stream); + p->second->completed(stream); if(p == _requestsHint) { @@ -3508,7 +3534,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request _asyncRequests.erase(q); } - stream.swap(*outAsync->__getIs()); + stream.swap(*outAsync->getIs()); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // @@ -3522,7 +3548,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } - else if(outAsync->__finished()) + else if(outAsync->completed()) { ++dispatchCount; } @@ -3531,7 +3557,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request outAsync = 0; } #else - if(outAsync->__finished()) + if(outAsync->completed()) { ++dispatchCount; } |